~aleteoryx/muditaos

ref: 6168e25a2e09d785727095c40fd5b0a35a61b1e0 muditaos/test/send_update_package.py -rwxr-xr-x 3.6 KiB
6168e25a — Marek Niepieklo [CP-248] Harness script to transfer Update package to Pure 4 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
# Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

import sys
import time
import sys
import os.path
import json
import atexit

sys.path.append(
    os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))

from harness.harness import Harness
from harness.interface.defs import status
from harness.utils import Timeout
from harness.interface.error import TestError, Error
from functools import partial

# uploaded file chunk size - according to
# https://appnroll.atlassian.net/wiki/spaces/MFP/pages/656637953/Protocol+description
CHUNK_SIZE = 1024 * 16

def send_update_package(harness, package_filepath: str):
    file_size = os.path.getsize(package_filepath)
    file_name = package_filepath.split('/')[-1]

    body = {"command": "download", "fileName": file_name, "fileSize": file_size}
    ret = harness.endpoint_request("filesystem", "post", body)

    if ret["status"] != status["Accepted"]:
        print("Failed to initiate package transfer")
        print(json.dumps(ret))
        return False

    time.sleep(.1)

    print("Sending update file to the target")
    serial = harness.get_connection().get_serial()
    with open(package_filepath, 'rb') as file:
        for chunk in iter(partial(file.read, CHUNK_SIZE), b''):
            print(".", end='', flush=True)
            serial.write(chunk)

    print("")

    time.sleep(.1)

    if serial.in_waiting > 0:
        result = harness.get_connection().read(10)
        ret = json.loads(result)
        body = ret['body']

        if "status" in body:
            stat = body["status"]
            print(f"Transfer status: {stat}")

    print("Sending complete")

    return True

def check_update_package(harness, package_filepath: str):
    body = {}
    ret = harness.endpoint_request("update", "get", body)

    if ret["status"] != status["OK"]:
       print("Failed to get package list")
       print(json.dumps(ret))
       return False

    expected_file_name = package_filepath.split('/')[-1]
    expected_file_size = os.path.getsize(package_filepath)

    file_list = ret["body"]["updateFileList"]

    if len(file_list) == 0:
        print("No update package found")
        return False

    for file in file_list:
        file_name = file["name"].split('/')[-1]

        if expected_file_name != file_name:
            print(f'File {expected_file_name} not found')
            return False

        file_size = int(file["size"])

        if expected_file_size != file_size:
            print(f'File {expected_file_name} size {file["size"]} does not match expected {expected_file_size}')
            return False

    print("Transfered package OK")
    return True

def setPasscode(harness, flag):
    body = {"phoneLockCodeEnabled": flag}
    ret = harness.endpoint_request("developerMode", "put", body)
    assert ret["status"] == status["NoContent"]

def main():
    if len(sys.argv) == 1:
        print(f'Please pass update file path as the parameter: python {sys.argv[0]} file_path ')
        raise TestError(Error.PORT_NOT_FOUND)

    harness = Harness.from_detect()

    package_filepath = str(sys.argv[1])

    atexit.register(setPasscode, harness, True)

    setPasscode(harness, False)

    time.sleep(.1)

    if not send_update_package(harness, package_filepath):
        print("Update package transfer failed")
        exit(1)

    time.sleep(.1)

    if not check_update_package(harness, package_filepath):
        print("Update package corrupted in transfer")
        exit(1)

    exit(0)

if __name__ == "__main__":
    try:
        main()
    except TestError as err:
        print(err)
        exit(err.get_error_code())