~aleteoryx/muditaos

05d26bd1fc5f6fc1a83c4d9344101747ad5fdd33 — Adam Dobrowolski 4 years ago 354b47a
[EGD-6027] Moved test harness to separate repository

Common code for QAE and development moved to separate repository to
achieve more granular controll and more fluent flow
10 files changed, 5 insertions(+), 773 deletions(-)

M .gitmodules
A test/harness
D test/harness/__init__.py
D test/harness/dom_parser_utils.py
D test/harness/harness.py
D test/harness/interface/CDCSerial.py
D test/harness/interface/__init__.py
D test/harness/interface/defs.py
D test/harness/interface/error.py
D test/harness/utils.py
M .gitmodules => .gitmodules +4 -0
@@ 78,3 78,7 @@
[submodule "tools/misc"]
	path = tools/misc
	url = ../misc-tools.git
[submodule "test/harness"]
	path = test/harness
	url = ../QAMuditaHarness.git
	shallow = true

A test/harness => test/harness +1 -0
@@ 0,0 1,1 @@
Subproject commit beb1c1351638839148f576cc63bc450e69d310a4

D test/harness/__init__.py => test/harness/__init__.py +0 -8
@@ 1,8 0,0 @@
# Copyright (c) 2017-2020, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

import logging
import harness.interface
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.DEBUG)
log = logging.getLogger(__name__)


D test/harness/dom_parser_utils.py => test/harness/dom_parser_utils.py +0 -95
@@ 1,95 0,0 @@
import time

children_attr_name = 'Children'

def get_window_content(harness, sleep_before_dump_time = 0 ) :
    time.sleep(sleep_before_dump_time)
    body = {"ui": True, "getWindow": True}
    return harness.endpoint_request("developerMode", "get", body)['body']['dom']

def item_contains(body: dict, attr_name, attr_value ):
    if attr_name in body:
        return body[attr_name] == attr_value

#extends item_contains to check children
def item_contains_recursively(body :dict, attr_name, attr_value) :
    if len(body) == 1:
        return item_contains_recursively(next(iter(body.values())), attr_name, attr_value)

    if item_contains(body, attr_name, attr_value) :
        return True

    if children_attr_name in body:
        for child in body[children_attr_name] :
            value_found = item_contains_recursively(child, attr_name, attr_value)
            if value_found :
                return True
    return False

#extends item_contains_recursively to support list of attribute name-value pairs
def item_contains_multiple_recursively(body :dict, attr_name_value_pairs) :
    for name,value in attr_name_value_pairs:
        if not item_contains_recursively(body, name, value) :
            return False
    return True

#restricts item_contains_multiple_recursively to check only children
def item_has_child_that_contains_recursively(body :dict, attr_name_value_pairs) :
    if len(body) == 1:
        return item_has_child_that_contains_recursively(next(iter(body.values())), attr_name_value_pairs)

    if children_attr_name in body:
        for child in body[children_attr_name] :
            if item_contains_multiple_recursively(child, attr_name_value_pairs) :
                return True
    return False

#in children, finds child that contains given name-value attribute pairs and returns the child index
#usefull in ListView navigation
def get_child_number_that_contains_recursively(body :dict, attr_name_value_pairs) :
    if len(body) == 1:
        return get_child_number_that_contains_recursively(next(iter(body.values())), attr_name_value_pairs)

    if children_attr_name in body:
        child_index = 0
        for child in body[children_attr_name] :
            if item_contains_multiple_recursively(child, attr_name_value_pairs) :
                return child_index
            child_index += 1

    return -1

def find_parent(body: dict, child_name) -> dict :
    if len(body) == 1:
        return find_parent(next(iter(body.values())), child_name)

    if children_attr_name in body:
        for child in body[children_attr_name] :
            if next(iter(child)) == child_name :
                return body
            result = find_parent(next(iter(child.values())), child_name)
            if result :
                return result
    return {}

def find_child_that_contains(body: dict, attr_name, attr_value ) -> dict :
    if len(body) == 1:
        return find_child_that_contains(next(iter(body.values())), attr_name, attr_value)
    if children_attr_name in body:
        for child in body[children_attr_name] :
            if item_contains_recursively(child, attr_name, attr_value) :
                return child

    return {}

def find_item_depth_first(body: dict, attr_name ) -> dict:
    if attr_name in body:
        return body[attr_name]
    if children_attr_name in body:
        for child in body[children_attr_name] :
            result = find_item_depth_first(child, attr_name)
            if result:
                return result
    elif len(body) == 1:
        return find_item_depth_first(next(iter(body.values())), attr_name)
    return {}

D test/harness/harness.py => test/harness/harness.py +0 -122
@@ 1,122 0,0 @@
# Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
import time
from enum import Enum

from harness import utils, log
from harness.interface import CDCSerial as serial
from harness.interface.defs import key_codes, endpoint, method
from harness.interface.CDCSerial import Keytype
from harness.utils import send_keystoke, application_keypath, send_char, clear_last_char
from harness.interface.error import TestError, Error
import random


class Harness:
    connection = None
    is_phone_unlocked = False
    is_echo_mode = False
    port_name = ''

    def __init__(self, port):
        self.port_name = port
        self.connection = serial.CDCSerial(port)

    @classmethod
    def from_detect(cls):
        '''
        Try to instantiate from first detected device.
        Do not use this method if you need >1 unique devices.
        '''
        found = serial.CDCSerial.find_Pures()
        if found:
            port = found[0]
            return cls(port)
        else:
            raise TestError(Error.PORT_NOT_FOUND)

    def get_connection(self):
        return self.connection

    def get_application_name(self):
        return self.connection.get_application_name()

    def unlock_usb(self):
        self.connection.usb_unlock()

    def lock_usb(self):
        self.connection.usb_lock()

    def unlock_phone(self):
        if self.connection.is_phone_locked():
            self.connection.send_key_code(key_codes["enter"])
            self.connection.send_key_code(key_codes["#"])
            if self.connection.is_phone_locked():
                self.connection.send_key_code(3)
                self.connection.send_key_code(3)
                self.connection.send_key_code(3)
                self.connection.send_key_code(3)
            log.info("Phone unlocked")
        else:
            log.info("Phone already unlocked")
        self.is_phone_unlocked = True

    def with_phone_unlocked(self, func):
        if not self.is_phone_unlocked:
            self.unlock_phone()

        func(self.connection)

    def connection_echo_mode_on(self):
        if self.connection.enable_echo_mode():
            self.is_echo_mode = True

    def connection_echo_mode_off(self):
        if self.connection.disable_echo_mode():
            self.is_echo_mode = False

    def open_application(self, app):
        send_keystoke(application_keypath[app], self.connection)

    def send_text(self, text: str):
        clear_last_char()
        for letter in text:
            try:
                send_char(letter, self.connection)
            except KeyError as e:
                available = ' '.join((f"'{_}'" for _ in utils.keymap.keys()))
                raise LookupError(f"Character {e} not present in the keymap\nAvailable characters: {available}")

    def send_number(self, number: str):
        utils.send_number(number, self.connection)

    def endpoint_request(self, ep_name: str, met: str, body: dict) -> dict:
        ret = self.connection.write({
            "endpoint": endpoint[ep_name],
            "method": method[met],
            "uuid": random.randint(1, 32000),
            "body": body
        })
        return ret

    def turn_phone_off(self):
        log.info("Turning phone off...")
        app_desktop = "ApplicationDesktop"
        end_loop_counter = 10

        while not self.get_application_name() == app_desktop:
            if not end_loop_counter > 0:
                raise LookupError("Filed to switch to {}".format(app_desktop))
            log.info("Not on the Application Desktop, fnRight.")
            self.connection.send_key_code(key_codes["fnRight"], Keytype.long_press)
            end_loop_counter -=  1

        self.connection.send_key_code(key_codes["fnRight"], Keytype.long_press)
        self.connection.send_key_code(key_codes["right"])
        self.connection.send_key_code(key_codes["enter"])

    def set_tethering_state(self, enabled: bool):
        state = 'on' if enabled else 'off'
        body = {"tethering": state}
        log.info(f"Set tethering state to: {state}")
        return self.endpoint_request("developerMode", "put", body)

D test/harness/interface/CDCSerial.py => test/harness/interface/CDCSerial.py +0 -157
@@ 1,157 0,0 @@
# Copyright (c) 2017-2020, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
import time
from random import randrange

import serial
import json
import logging
from enum import Enum

from harness.interface.defs import endpoint, method, status
from harness.interface.error import TestError, Error

log = logging.getLogger(__name__)


class Keytype(Enum):
    long_press = 0
    short_press = 1


class CDCSerial:
    def __init__(self, port_name, timeout=10):
        self.timeout = timeout
        self.body = ""
        self.header_length = 10
        while timeout != 0:
            try:
                self.serial = serial.Serial(port_name, baudrate=115200, timeout=10)
                self.serial.flushInput()
                log.info(f"opened port {port_name}!")
                break
            except (FileNotFoundError, serial.serialutil.SerialException) as err:
                log.error(f"can't open {port_name}, retrying...")
                time.sleep(1)
                self.timeout = self.timeout - 1
                if self.timeout == 0:
                    log.error(f"uart {port_name} not found - probably OS did not boot")
                    raise TestError(Error.PORT_NOT_FOUND)

    def __del__(self):
        try:
            self.serial.close()
            log.info(f"closed port {self.serial.name}")
        except (serial.serialutil.SerialException, AttributeError):
            pass

    def __wrap_message(self, body):
        msg = {
            "endpoint": endpoint["developerMode"],
            "method": method["put"],
            "uuid": randrange(1, 100),
            "body": body
        }
        return msg

    def get_serial(self):
        return self.serial

    def __build_message(self, json_data):
        json_dump = json.dumps(json_data)
        return "#%09d%s" % (len(json_dump), json_dump)

    def read(self, length):
        header = self.readRaw(length)
        payload_length = int(header[1:])
        result = self.readRaw(payload_length)
        return result

    def readRaw(self, length):
        return self.serial.read(length).decode()

    def write(self, msg, timeout=30):
        message = self.__build_message(msg)
        self.writeRaw(message)
        result = self.read(self.header_length)
        return json.loads(result)

    def writeRaw(self, message, timeout=30):
        self.serial.write(message.encode())
        self.serial.timeout = timeout

    def send_key_code(self, key_code, key_type=Keytype.short_press, wait=10):
        if key_type is Keytype.long_press:
            body = {"keyPressed": key_code, "state": 4}
        else:
            body = {"keyPressed": key_code, "state": 2}
        ret = self.write(self.__wrap_message(body), wait)
        time.sleep(0.3)
        return ret

    def enable_echo_mode(self):
        echoOnCmd = "UsbCdcEcho=ON"
        self.writeRaw(echoOnCmd)
        result = self.readRaw(len(echoOnCmd))
        log.info(f"received length: {len(result)}, result:{result}")
        ret = (result == echoOnCmd)
        return ret

    def disable_echo_mode(self):
        echoOffCmd = "UsbCdcEcho=OFF"
        self.writeRaw(echoOffCmd)
        result = self.readRaw(len(echoOffCmd))
        log.info(f"received length: {len(result)}, result:{result}")
        ret = (result == echoOffCmd)
        return ret

    def send_at(self, at_command, timeout, wait=10):
        body = {
            "AT": at_command + "\r",
            "timeout": timeout
        }

        ret = self.write(self.__wrap_message(body), timeout / 1000 + wait)
        log.info(f"at response {ret}")
        return ret["body"]["ATResponse"]

    def get_application_name(self):
        body = {
            "focus": True
        }

        ret = self.write(self.__wrap_message(body))
        return ret["body"]["focus"]

    def is_phone_locked(self):
        body = {
            "isLocked": True
        }

        ret = self.write(self.__wrap_message(body))
        return ret["body"]["isLocked"]

    def usb_lock(self):
        body = {
            "usbSecurity": "usbLock"
        }

        ret = self.write(self.__wrap_message(body))
        return ret["status"]

    def usb_unlock(self):
        body = {
            "usbSecurity": "usbUnlock"
        }

        ret = self.write(self.__wrap_message(body))
        return ret["status"]

    @staticmethod
    def find_Pures() -> str:
        '''
        Return a list of unique paths to all the Mudita Pure phones found connected to the system
        '''
        import serial.tools.list_ports as list_ports
        return [_.device for _ in list_ports.comports() if _.manufacturer == 'Mudita' and _.product == 'Mudita Pure']


D test/harness/interface/__init__.py => test/harness/interface/__init__.py +0 -3
@@ 1,3 0,0 @@
# Copyright (c) 2017-2020, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md


D test/harness/interface/defs.py => test/harness/interface/defs.py +0 -64
@@ 1,64 0,0 @@
# Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
from enum import Enum

endpoint = {
    "deviceInfo": 1,
    "update": 2,
    "filesystemUpload": 3,
    "backup": 4,
    "restore": 5,
    "factory": 6,
    "contacts": 7,
    "messages": 8,
    "calllog": 9,
    "events": 10,
    "developerMode": 11,
    "bluetooth":12
}

method = {
    "get": 1,
    "post": 2,
    "put": 3,
    "del": 4
}

status = {
    "OK": 200,
    "Accepted": 202,
    "SeeOther": 303,
    "BadRequest": 400,
    "Forbidden": 403,
    "NotFound": 404,
    "NotAcceptable": 406,
    "InternalServerError": 500,
}

key_codes = {
    "left": ord('a'),
    "right": ord('d'),
    "up": ord('w'),
    "down": ord('s'),
    "enter": ord('\n'),
    "fnLeft": 11,
    "fnRight": 12,
    "volUp": 13,
    "volDown": 14,
    "torch": 15,
    "sliderUp": 16,
    "sliderMid": 18,
    "sliderDown": 17,
    "#": ord('#'),
    "*": ord('*'),
}


class SMSType(Enum):
    DRAFT = 0x01
    FAILED = 0x02
    INBOX = 0x04
    OUTBOX = 0x08
    QUEUED = 0x10
    INPUT = 0x12
    UNKNOWN = 0xFF

D test/harness/interface/error.py => test/harness/interface/error.py +0 -27
@@ 1,27 0,0 @@
# Copyright (c) 2017-2020, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

from enum import IntEnum


class Error(IntEnum):
    PORT_NOT_FOUND = 1,
    PORT_FILE_NOT_FOUND = 2,
    TEST_FAILED = 3,
    VERSION_MISMATCH = 4,
    OTHER_ERROR = 5


class TestError(Exception):

    def __init__(self, error_code: Error):
        self.error_code = error_code
        self.message = f"Test failed with error code: {error_code.name}"
        super().__init__(self.message)

    def __str__(self):
        return self.message

    def get_error_code(self):
        return int(self.error_code)


D test/harness/utils.py => test/harness/utils.py +0 -297
@@ 1,297 0,0 @@
# Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

import time
from enum import Enum

from harness.interface.CDCSerial import Keytype
from harness.interface.defs import key_codes


# assuming that the harness is actually in the menu
application_keypath = {
    "calllog": [
        "enter"
    ],
    "contacts": [
        "right",
        "enter"
    ],
    "messages": [
        "right",
        "right",
        "enter"
    ],
    "music": [
        "down",
        "enter"
    ],
    "meditation": [
        "down",
        "right",
        "enter"
    ],
    "settings": [
        "down",
        "right",
        "right",
        "enter"
    ],
    "tools": [
        "up",
        "enter"
    ],
    "alarm": [
        "up",
        "right",
        "enter"
    ],
    "calendar": [
        "up",
        "left",
        "enter"
    ]
}

keymap = {
    "A": "2",
    "B": "22",
    "C": "222",
    "D": "3",
    "E": "33",
    "F": "333",
    "G": "4",
    "H": "44",
    "I": "444",
    "J": "5",
    "K": "55",
    "L": "555",
    "M": "6",
    "N": "66",
    "O": "666",
    "P": "7",
    "Q": "77",
    "R": "777",
    "S": "7777",
    "T": "8",
    "U": "88",
    "V": "888",
    "W": "9",
    "X": "99",
    "Y": "999",
    "Z": "9999",
    " ": "0",
    ".": "1",
    ",": "11",
    "_": "111",
    ":": "1111",
    ";": "11111",
    ")": "111111",
    "(": "1111111",
    "?": "11111111",
    "!": "111111111",
    "#": "1111111111",
    "/": "11111111111",
    "*": "111111111111",
    "+": "1111111111111",
    "del": key_codes["#"],
    "caps": key_codes["*"],
}

special_chars_keymap = {
    U'.': "",
    U',': "d",
    U'\'': "dd",
    U'?': "ddd",
    U'!': "dddd",
    U'"': "ddddd",
    U'-': "dddddd",
    U'(': "s",
    U')': "sd",
    U'@': "sdd",
    U'/': "sddd",
    U':': "sdddd",
    U'_': "sddddd",
    U';': "sdddddd",
    U'␤': "ss",
    U'+': "ssd",
    U'&': "ssdd",
    U'%': "ssddd",
    U'*': "ssdddd",
    U'<': "ssddddd",
    U'>': "ssdddddd",
    U'=': "sss",
    U'£': "sssd",
    U'€': "sssdd",
    U'$': "sssddd",
    U'[': "sssdddd",
    U']': "sssddddd",
    U'{': "sssdddddd",
    U'}': "ssss",
    U'\'': "ssssd",
    U'^': "ssssdd",
    U'~': "ssssddd",
    U'`': "ssssdddd",
    U'į': "ssssddddd",
    U'§': "ssssdddddd",
    U'…': "sssss",
    U'#': "sssssd",
    U'|': "sssssdd",
    U'÷': "sssssddd",
    U'·': "sssssdddd",
    U'°': "sssssddddd",
    U'¿': "sssssdddddd",
    U'¡': "ssssss",
    U'ą': "ssssssd",
    U'à': "ssssssdd",
    U'á': "ssssssddd",
    U'ä': "ssssssdddd",
    U'â': "ssssssddddd",
    U'ć': "ssssssdddddd",
    U'ç': "sssssss",
    U'ę': "sssssssd",
    U'é': "sssssssdd",
    U'è': "sssssssddd",
    U'ê': "sssssssdddd",
    U'ë': "sssssssddddd",
    U'î': "sssssssdddddd",
    U'ï': "ssssssss",
    U'í': "ssssssssd",
    U'ł': "ssssssssdd",
    U'ń': "ssssssssddd",
    U'ñ': "ssssssssdddd",
    U'ó': "ssssssssddddd",
    U'ô': "ssssssssdddddd",
    U'ö': "sssssssss",
    U'ś': "sssssssssd",
    U'û': "sssssssssdd",
    U'ú': "sssssssssddd",
    U'ù': "sssssssssdddd",
    U'ü': "sssssssssddddd",
    U'ÿ': "sssssssssdddddd",
    U'ż': "ssssssssss",
    U'ź': "ssssssssssd",
    U'ß': "ssssssssssdd"
}

emojis_keymap = {
    U'😁': "",
    U'😂': "d",
    U'😃': "dd",
    U'😄': "ddd",
    U'😅': "dddd",
    U'😆': "ddddd",
    U'😉': "dddddd",
    U'😊': "s",
    U'😋': "sd",
    U'😌': "sdd",
    U'😍': "sddd",
    U'😏': "sdddd",
    U'😒': "sddddd",
    U'😓': "sdddddd",
    U'😔': "ss",
    U'😖': "ssd",
    U'😘': "ssdd",
    U'😚': "ssddd",
    U'😜': "ssdddd",
    U'😝': "ssddddd",
    U'😼': "ssdddddd"
}

def send_keystoke(keypath, connection):
    for key in keypath:
        connection.send_key_code(key_codes[key])
        time.sleep(0.3)


last_char = '\0'


def clear_last_char():
    global last_char
    last_char = '\0'


def send_char(char: str, connection):
    global last_char
    key_type = Keytype.short_press
    if char.isascii():
        if char.isdigit():
            key_type = Keytype.long_press
            if last_char is char:
                print("repeated key!")
                connection.send_key_code(key_codes["right"])
            connection.send_key_code(int(char), key_type)
            connection.send_key_code(key_codes["right"])
            last_char = char
        elif char.islower():
            tmp = char.upper()
            # toggle to lowercase mode
            connection.send_key_code(key_codes["*"], key_type)
            if last_char is keymap[tmp][0]:
                print("repeated key!")
                connection.send_key_code(key_codes["right"], key_type)
            for key in keymap[tmp]:
                connection.send_key_code(int(key), key_type)
            last_char = keymap[tmp][0]
            # toggle to uppercase mode
            connection.send_key_code(key_codes["*"], key_type)
            connection.send_key_code(key_codes["*"], key_type)
        else:
            if last_char is keymap[char][0]:
                print("repeated key!")
                connection.send_key_code(key_codes["right"], key_type)
            for key in keymap[char]:
                connection.send_key_code(int(key), key_type)
            last_char = keymap[char][0]
    else:
        connection.send_key_code(key_codes["*"], Keytype.long_press)
        if char in special_chars_keymap:
            for key in special_chars_keymap[char]:
                connection.send_key_code(ord(key), key_type)
        elif char in emojis_keymap:
            connection.send_key_code(key_codes["fnLeft"], Keytype.short_press)
            for key in emojis_keymap[char]:
                connection.send_key_code(ord(key), key_type)
        else:
            print("Not supported char {} !!!".format(char))
        connection.send_key_code(key_codes["enter"], key_type)


def send_number(number: str, connection):
    if number.isnumeric():
        for digit in number:
            connection.send_key_code(int(digit))
            time.sleep(0.3)


### timeout from https://stackoverflow.com/a/601168/5752094

from contextlib import contextmanager

import signal


class Timeout(Exception):
    '''
    usage:
        try:
            with Timeout.limit(10):
                long_function_call()
        except Timeout as e:
            print("Timed out!")
    '''

    @classmethod
    @contextmanager
    def limit(cls, seconds: int):
        assert seconds >= 1, "Timeout must be at least 1 second !"
        def signal_handler(signum, frame):
            raise Timeout("Timed out!")

        signal.signal(signal.SIGALRM, signal_handler)
        signal.alarm(seconds)
        try:
            yield
        finally:
            signal.alarm(0)