~aleteoryx/muditaos

103af76459540f4a5fbf93e04496ca25f07313b4 — SP2FET 5 years ago 7ed1380
[EGD-4836] Add firmware update test

Add test utility to update firmware via USB (service-desktop API)
Add timeout to harness port discovery

[EGD-5240] Add common timeout to python test harness
General purpose timeout based on signals library

[EGD-5239] Preserve 1Phone testing
In the long run test harness requires two phones,
but allow for one phone to still pass some test cases
M .idea/modules.xml => .idea/modules.xml +1 -1
@@ 5,4 5,4 @@
      <module fileurl="file://$PROJECT_DIR$/.idea/PurePhone.iml" filepath="$PROJECT_DIR$/.idea/PurePhone.iml" />
    </modules>
  </component>
</project>
</project>
\ No newline at end of file

M changelog.md => changelog.md +2 -0
@@ 9,6 9,8 @@
* Support for asynchronous callbacks on application side.
* APN settings window - empty.
* Bluetooth settings middleware layer
* Add firmware update test


### Changed


M module-apps/application-desktop/windows/PowerOffWindow.cpp => module-apps/application-desktop/windows/PowerOffWindow.cpp +1 -0
@@ 173,6 173,7 @@ namespace gui
            sys::SystemManager::CloseSystem(application);
            return true;
        };
        powerOffTimer->start();
        application->connect(std::move(powerOffTimer), this);
    }


M module-services/service-gui/SynchronizationMechanism.cpp => module-services/service-gui/SynchronizationMechanism.cpp +0 -1
@@ 2,7 2,6 @@
// For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

#include "SynchronizationMechanism.hpp"

#include <stdexcept>

namespace service::gui

A test/firmware_update_test/.gdbinit-1051-autocontinue => test/firmware_update_test/.gdbinit-1051-autocontinue +10 -0
@@ 0,0 1,10 @@
set pagination off
target remote localhost:2331
monitor reset 0
monitor halt
monitor memU32 0x401BC000 = 128;
load 
eval "monitor exec SetRTTAddr %p", &_SEGGER_RTT
info threads
thread 2
continue

A test/firmware_update_test/README.md => test/firmware_update_test/README.md +25 -0
@@ 0,0 1,25 @@
# Firmware update test

The aim of this test is to check the correctness of the `service-desktop` API and upgrade the phone software.

This test is based on [Harness](../README.md) class and uses `pyTest` to verify the update process. It makes use of `service-desktop` 
API to upload a tarball update package to the phone's memory and execute the update. After a successful update the phone should reset
and an update check will be performed. In the end, an update verification status will be shown in the console (along with the exit code).

## Usage
To be able to conduct the test (or just use the script as an update utility) a few prerequisites have to be met:

* Pure needs connected via USB
* Your Python version has to be at least 3.8
* Your Python virtual environment needs to be properly set up (as mentioned in PyTest's [readme](../README.md))

After all of the requirements are fulfilled, the script can be safely executed:

```python
python3 ./update.py tarball_path
```
The serial port used by the phone will be detected automatically.


To obtain the tarball package use either proper CMake target (work in progress) or the `genupdatepkg.sh` script located 
in the `config` subfolder in the root repository.
\ No newline at end of file

A test/firmware_update_test/update.py => test/firmware_update_test/update.py +147 -0
@@ 0,0 1,147 @@
#!/usr/bin/env python
# Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

import sys
import time
import sys
import os.path
import json

sys.path.append(
    os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))

from harness import log
from harness.harness import Harness
from harness.interface.defs import key_codes, endpoint, method
from harness.utils import Timeout
from harness.interface.error import TestError, Error
from functools import partial

# uploaded file chunk size - according to
# https://appnroll.atlassian.net/wiki/spaces/MFP/pages/656637953/Protocol+description
CHUNK_SIZE = 1024 * 16

# update performing timeout
UPDATE_TIMEOUT = 30

update_status_code = {
    0: "Initial",
    1: "UpdateFileSet",
    2: "CreatingDirectories",
    3: "ExtractingFiles",
    4: "UpdatingBootloader",
    5: "ChecksumVerification",
    6: "VersionVerification",
    7: "ReadyForReset"
}


def update(harness, update_filepath: str):
    connection = harness.get_connection()
    serial = harness.get_connection().get_serial()
    file_size = os.path.getsize(update_filepath)
    filename = update_filepath.split('/')[-1]
    body = {"command": "download", "fileName": filename, "fileSize": file_size}

    ret = harness.endpoint_request("filesystemUpload", "post", body)["body"]
    if ret["status"] is not None:
        log.info(f"Update status: {update_status_code[int(ret['status'])]}")

    log.info("Downloading update file to the target")
    with open(update_filepath, 'rb') as file:
        for chunk in iter(partial(file.read, CHUNK_SIZE), b''):
            print(".", end='', flush=True)
            serial.write(chunk)
    print(" ")

    body = {"fileName": filename}
    ret = harness.endpoint_request("update", "post", body)["body"]
    if ret["fileName"] != filename and int(ret[fileSize]) != file_size:
        log.error("Upload error!")
        exit(1)
    timer = time.time()

    while True:
        if time.time() - timer > UPDATE_TIMEOUT:
            log.error("Update timeout!")
            return False
        if serial.in_waiting > 0:
            result = connection.read(10)
            ret = json.loads(result)
            body = ret['body']
            if "status" in body:
                status = body["status"]
                log.info(f"Update status: {status}")
                timer = time.time()
                if "reset" in status:
                    log.info("Update finished, wait for device reset...")
                    return True


def get_update_list(harness):
    ret = harness.endpoint_request("deviceInfo", "get", {})
    device_info = ret["body"]
    update_history = device_info["updateHistory"]
    failed_updates = 0

    if update_history is None:
        log.info("Update history clean!")
        return [None, 0]

    for update in update_history:
        if update["finishedError"] != 0 and update["finishedState"] != 6:
            failed_updates = failed_updates + 1

    log.info(f"Found {len(update_history)} update entries with {failed_updates} failed updates")
    return [update_history, failed_updates]


def main():
    if len(sys.argv) == 1:
        print(f'Please pass update file path as the parameter: python {sys.argv[0]} file_path ')
        raise TestError(Error.PORT_NOT_FOUND)

    harness = None

    with Timeout.limit(seconds=20):
        while not harness:
            try:
                harness = Harness.from_detect()
            except TestError:
                pass

    update_filename = str(sys.argv[1])
    history, fails = get_update_list(harness)

    if update(harness, update_filename):
        # wait for reboot
        harness = None
        log.info("Waiting for device to reset")
        time.sleep(5)
        # connect to the phone once again
        with Timeout.limit(seconds=20):
            while not harness:
                try:
                    harness = Harness.from_detect()
                except TestError:
                    pass

        [new_history, new_fails] = get_update_list(harness)
        if new_fails != fails or (history is None) or (history is not None and (len(new_history) != len(history) + 1)):
            log.error("Update failed!")
            exit(1)
        else:
            log.info("Update successful!")
            exit(0)
    else:
        log.error("Update error!")
        exit(1)


if __name__ == "__main__":
    try:
        main()
    except TestError as err:
        log.error(err)
        exit(err.get_error_code())

M test/harness/interface/CDCSerial.py => test/harness/interface/CDCSerial.py +2 -3
@@ 49,7 49,7 @@ class CDCSerial:
        msg = {
            "endpoint": endpoint["developerMode"],
            "method": method["put"],
            "uuid": randrange(1,100),
            "uuid": randrange(1, 100),
            "body": body
        }
        return msg


@@ 65,7 65,6 @@ class CDCSerial:
        header = self.readRaw(length)
        payload_length = int(header[1:])
        result = self.readRaw(payload_length)
        log.info(f"received length: {len(result)}, payload length:{payload_length}")
        return result

    def readRaw(self, length):


@@ 78,7 77,6 @@ class CDCSerial:
        return json.loads(result)

    def writeRaw(self, message, timeout=30):
        log.info(message)
        self.serial.write(message.encode())
        self.serial.timeout = timeout



@@ 139,3 137,4 @@ class CDCSerial:
        '''
        import serial.tools.list_ports as list_ports
        return [_.device for _ in list_ports.comports() if _.manufacturer == 'Mudita' and _.product == 'Mudita Pure']


M test/harness/utils.py => test/harness/utils.py +30 -0
@@ 136,3 136,33 @@ def send_number(number: str, connection):
            connection.send_key_code(int(digit))
            time.sleep(0.3)


### timeout from https://stackoverflow.com/a/601168/5752094

from contextlib import contextmanager

import signal


class Timeout(Exception):
    '''
    usage:
        try:
            with Timeout.limit(10):
                long_function_call()
        except Timeout as e:
            print("Timed out!")
    '''

    @classmethod
    @contextmanager
    def limit(cls, seconds):
        def signal_handler(signum, frame):
            raise Timeout("Timed out!")

        signal.signal(signal.SIGALRM, signal_handler)
        signal.alarm(seconds)
        try:
            yield
        finally:
            signal.alarm(0)

M test/pytest/conftest.py => test/pytest/conftest.py +43 -36
@@ 12,6 12,7 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.

from harness import log
from harness.harness import Harness
from harness import utils
from harness.interface.error import TestError, Error
from harness.interface.CDCSerial import CDCSerial as serial



@@ 53,42 54,47 @@ def harness(request):
    Try to init one Pure phone with serial port path or automatically
    '''
    port_name = request.config.option.port

    RETRY_EVERY_SECONDS = 0.7
    retries = request.config.option.timeout / RETRY_EVERY_SECONDS

    if port_name is None:
        log.warning("no port provided! trying automatic detection")
        try:
            harness = Harness.from_detect()
        except TestError as e:
            if e.get_error_code() == Error.PORT_NOT_FOUND:
                pytest.exit("couldn't find any viable port. exiting")
            else:
                raise(e)
    TIMEOUT = int(request.config.option.timeout)
    timeout_started = time.time()

    RETRY_EVERY_SECONDS = 1.0
    try:
        if port_name is None:
            log.warning("no port provided! trying automatic detection")
            harness = None

            with utils.Timeout.limit(seconds=TIMEOUT):
                while not harness:
                    try:
                        harness = Harness.from_detect()
                    except TestError as e:
                        if e.get_error_code() == Error.PORT_NOT_FOUND:
                            log.info(f"waiting for a serial port… ({TIMEOUT- int(time.time() - timeout_started)})")
                            time.sleep(RETRY_EVERY_SECONDS)
        else:
            assert '/dev' in port_name or simulator_port in port_name

            if simulator_port in port_name:
                file = None
                with utils.Timeout.limit(seconds=TIMEOUT):
                    while not file:
                        try:
                            file = open("/tmp/purephone_pts_name", "r")
                        except FileNotFoundError as err:
                            log.info(
                                f"waiting for a simulator port… ({TIMEOUT- int(time.time() - timeout_started)})")
                            time.sleep(RETRY_EVERY_SECONDS)
                port_name = file.readline()
                if port_name.isascii():
                    log.debug("found {} entry!".format(port_name))
                else:
                    pytest.exit("not a valid sim pts entry!")

            harness = Harness(port_name)
    except utils.Timeout:
        pytest.exit("couldn't find any viable port. exiting")
    else:
        assert '/dev' in port_name or simulator_port in port_name

        if simulator_port in port_name:
            while retries > 0:
                try:
                    file = open("/tmp/purephone_pts_name", "r")
                    break
                except FileNotFoundError as err:
                    time.sleep(RETRY_EVERY_SECONDS)
                    retries -= 1
                    log.info("waiting for simulator port...")
            else:
                raise TestError(Error.PORT_FILE_NOT_FOUND)

            port_name = file.readline()
            if port_name.isascii():
                log.debug("found {} entry!".format(port_name))
            else:
                pytest.exit("not a valid sim pts entry!")

        harness = Harness(port_name)
    return harness
        return harness


@pytest.fixture(scope='session')


@@ 108,8 114,9 @@ def phone_unlocked(harness):
    harness.unlock_phone()
    assert harness.is_phone_unlocked


@pytest.fixture(scope='session')
def phone_unlocked(harnesses):
def phones_unlocked(harnesses):
    for harness in harnesses:
        harness.unlock_phone()
        assert harness.is_phone_unlocked

M test/pytest/test_two_phones.py => test/pytest/test_two_phones.py +2 -2
@@ 13,7 13,7 @@ def two_phones_available(harnesses):
    assert len(harnesses) == 2

@pytest.mark.rt1051
@pytest.mark.usefixtures("phone_unlocked")
@pytest.mark.usefixtures("phones_unlocked")
@pytest.mark.usefixtures("two_phones_available")
def test_call_number(harnesses, phone_number, call_duration):
    role_calling = harnesses[1]


@@ 28,7 28,7 @@ def test_call_number(harnesses, phone_number, call_duration):


@pytest.mark.rt1051
@pytest.mark.usefixtures("phone_unlocked")
@pytest.mark.usefixtures("phones_unlocked")
@pytest.mark.usefixtures("two_phones_available")
def test_call_back(harnesses, phone_number, call_duration):
    role_calling = harnesses[0]