# Copyright (c) 2017-2020, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
import time
import serial
import json
import typing
import logging
from enum import Enum
from harness.interface.defs import endpoint, method, status
log = logging.getLogger(__name__)
class Keytype(Enum):
long_press = 0
short_press = 1
class CDCSerial:
def __init__(self, port_name):
self.body = ""
try:
self.serial = serial.Serial(port_name, baudrate=115200, timeout=10)
self.serial.flushInput()
log.info("port opened!")
except (FileNotFoundError, serial.serialutil.SerialException) as err:
log.error("uart {} not found".format(port_name))
exit(1)
def __del__(self):
self.serial.close()
def __wrap_message(self, body):
msg = {
"endpoint": endpoint["developerMode"],
"method": method["put"],
"uuid": 0,
"body": body
}
return msg
def get_serial(self):
return self.serial
def __build_message(self, json_data):
json_dump = json.dumps(json_data)
return "#%09d%s" % (len(json_dump), json_dump)
def write(self, msg, timeout=10):
message = self.__build_message(msg)
self.serial.write(message.encode())
header = self.serial.read(timeout).decode()
payload_length = int(header[1:])
result = self.serial.read(payload_length).decode()
return json.loads(result)
def send_key(self, key_code, key_type=Keytype.short_press, wait=10):
if key_type is Keytype.long_press:
body = {"keyPressed": key_code, "state": 4}
else:
body = {"keyPressed": key_code, "state": 2}
ret = self.write(self.__wrap_message(body), wait)
time.sleep(0.3)
return ret
def send_at(self, at_command, wait=10):
body = {
"AT": at_command + "\r"
}
ret = self.write(self.__wrap_message(body), wait)
return ret["body"]["ATResponse"]
def get_window(self):
body = {
"focus": True
}
ret = self.write(self.__wrap_message(body))
return ret["body"]["focus"]
def is_phone_locked(self):
body = {
"isLocked": True
}
ret = self.write(self.__wrap_message(body))
return ret["body"]["isLocked"]