~aleteoryx/muditaos

ref: e4f506b56d9d9ae2d4f03000cd7c17ba3f6dd817 muditaos/test/send_update_package.py -rwxr-xr-x 3.9 KiB
e4f506b5 — Bartosz Cichocki [EGD-7149] Fix connecting BT devices 4 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python3
# Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

import sys
import time
import sys
import os.path
import json
import atexit
import binascii
from tqdm import tqdm

sys.path.append(
    os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))

from harness.harness import Harness
from harness.interface.defs import status
from harness.interface.error import TestError, Error
from functools import partial

def send_update_package(harness, package_filepath: str):
    file_size = os.path.getsize(package_filepath)
    file_name = package_filepath.split('/')[-1]

    with open(package_filepath, 'rb') as file:
        file_data = open(package_filepath,'rb').read()
        file_crc32 = format((binascii.crc32(file_data) & 0xFFFFFFFF), '08x')

    print(f"Sending {file_name}, size {file_size}, CRC32 {file_crc32}")

    body = {"fileName" : "/sys/user/updates/" + file_name, "fileSize" : file_size, "fileCrc32" : file_crc32}
    ret = harness.endpoint_request("filesystem", "put", body)

    assert ret["status"] == status["OK"]
    assert ret["body"]["txID"] != 0

    txID      = ret["body"]["txID"]
    chunkSize = ret["body"]["chunkSize"]
    chunkNo = 1

    with open(package_filepath, 'rb') as file:
        with tqdm(total=file_size, unit='B', unit_scale=True) as pbar:
            for chunk in iter(partial(file.read, chunkSize), b''):
                data = binascii.b2a_base64(chunk).decode()
                pbar.update(chunkSize)
                body = {"txID" : txID, "chunkNo": chunkNo, "data" : data}
                ret = harness.endpoint_request("filesystem", "put", body)
                assert ret["status"] == status["OK"]
                time.sleep(.1)
                chunkNo += 1

    print("Sending complete")

    return True

def check_update_package(harness, package_filepath: str):
    body = {}
    ret = harness.endpoint_request("update", "get", body)

    if ret["status"] != status["OK"]:
       print("Failed to get package list")
       print(json.dumps(ret))
       return False

    expected_file_name = package_filepath.split('/')[-1]
    expected_file_size = os.path.getsize(package_filepath)

    file_list = ret["body"]["updateFileList"]

    if len(file_list) == 0:
        print("No update package found")
        return False

    for file in file_list:
        file_name = file["name"].split('/')[-1]

        if expected_file_name != file_name:
            print(f'File {expected_file_name} not found')
            return False

        file_size = int(file["size"])

        if expected_file_size != file_size:
            print(f'File {expected_file_name} size {file["size"]} does not match expected {expected_file_size}')
            return False

    print("Transfered package OK")
    return True

def setPasscode(harness, flag):
    body = {"phoneLockCodeEnabled": flag}
    ret = harness.endpoint_request("developerMode", "put", body)
    assert ret["status"] == status["NoContent"]

def main():
    if len(sys.argv) == 1:
        print(f'Please pass update file path as the parameter: python {sys.argv[0]} file_path ')
        raise TestError(Error.OTHER_ERROR)

    package_filepath = str(sys.argv[1])

    if (not os.path.exists(package_filepath)):
        print(f'Update file {package_filepath} not found')
        raise TestError(Error.OTHER_ERROR)

    harness = Harness.from_detect()

    atexit.register(setPasscode, harness, True)

    setPasscode(harness, False)

    time.sleep(.1)

    if not send_update_package(harness, package_filepath):
        print("Update package transfer failed")
        exit(1)

    time.sleep(.1)

    if not check_update_package(harness, package_filepath):
        print("Update package corrupted in transfer")
        exit(1)

    exit(0)

if __name__ == "__main__":
    try:
        main()
    except TestError as err:
        print(err)
        exit(err.get_error_code())