~aleteoryx/muditaos

ref: 50f28e8f19b23384aac9efd5c22cc155d451cace muditaos/test/harness/interface/CDCSerial.py -rw-r--r-- 2.4 KiB
50f28e8f — Bartosz Cichocki [EGD-4270] updated test API, added sending message case in test harness (#1042) 5 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# Copyright (c) 2017-2020, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
import time

import serial
import json
import typing
import logging
from enum import Enum
from harness.interface.defs import endpoint, method, status

log = logging.getLogger(__name__)


class Keytype(Enum):
    long_press = 0
    short_press = 1


class CDCSerial:
    def __init__(self, port_name):
        self.body = ""
        try:
            self.serial = serial.Serial(port_name, baudrate=115200, timeout=10)
            self.serial.flushInput()
            log.info("port opened!")
        except (FileNotFoundError, serial.serialutil.SerialException) as err:
            log.error("uart {} not found".format(port_name))
            exit(1)

    def __del__(self):
        self.serial.close()

    def __wrap_message(self, body):
        msg = {
            "endpoint": endpoint["developerMode"],
            "method": method["put"],
            "uuid": 0,
            "body": body
        }
        return msg

    def get_serial(self):
        return self.serial

    def __build_message(self, json_data):
        json_dump = json.dumps(json_data)
        return "#%09d%s" % (len(json_dump), json_dump)

    def write(self, msg, timeout=10):
        message = self.__build_message(msg)
        self.serial.write(message.encode())

        header = self.serial.read(timeout).decode()
        payload_length = int(header[1:])
        result = self.serial.read(payload_length).decode()
        return json.loads(result)

    def send_key(self, key_code, key_type=Keytype.short_press, wait=10):
        if key_type is Keytype.long_press:
            body = {"keyPressed": key_code, "state": 4}
        else:
            body = {"keyPressed": key_code, "state": 2}
        ret = self.write(self.__wrap_message(body), wait)
        time.sleep(0.3)
        return ret

    def send_at(self, at_command, wait=10):
        body = {
            "AT": at_command + "\r"
        }

        ret = self.write(self.__wrap_message(body), wait)
        return ret["body"]["ATResponse"]

    def get_window(self):
        body = {
            "focus": True
        }

        ret = self.write(self.__wrap_message(body))
        return ret["body"]["focus"]

    def is_phone_locked(self):
        body = {
            "isLocked": True
        }

        ret = self.write(self.__wrap_message(body))
        return ret["body"]["isLocked"]