#!/usr/bin/env python3 # Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved. # For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md import sys import time import sys import os.path import json import atexit import binascii from tqdm import tqdm sys.path.append( os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir))) from harness.harness import Harness from harness.interface.defs import status from harness.interface.error import TestError, Error from functools import partial def send_update_package(harness, package_filepath: str): file_size = os.path.getsize(package_filepath) file_name = package_filepath.split('/')[-1] with open(package_filepath, 'rb') as file: file_data = open(package_filepath,'rb').read() file_crc32 = format((binascii.crc32(file_data) & 0xFFFFFFFF), '08x') print(f"Sending {file_name}, size {file_size}, CRC32 {file_crc32}") body = {"fileName" : "/sys/user/updates/" + file_name, "fileSize" : file_size, "fileCrc32" : file_crc32} ret = harness.endpoint_request("filesystem", "put", body) assert ret["status"] == status["OK"] assert ret["body"]["txID"] != 0 txID = ret["body"]["txID"] chunkSize = ret["body"]["chunkSize"] chunkNo = 1 with open(package_filepath, 'rb') as file: with tqdm(total=file_size, unit='B', unit_scale=True) as pbar: for chunk in iter(partial(file.read, chunkSize), b''): data = binascii.b2a_base64(chunk).decode() pbar.update(chunkSize) body = {"txID" : txID, "chunkNo": chunkNo, "data" : data} ret = harness.endpoint_request("filesystem", "put", body) assert ret["status"] == status["OK"] time.sleep(.1) chunkNo += 1 print("Sending complete") return True def check_update_package(harness, package_filepath: str): body = {} ret = harness.endpoint_request("update", "get", body) if ret["status"] != status["OK"]: print("Failed to get package list") print(json.dumps(ret)) return False expected_file_name = package_filepath.split('/')[-1] expected_file_size = os.path.getsize(package_filepath) file_list = ret["body"]["updateFileList"] if len(file_list) == 0: print("No update package found") return False for file in file_list: file_name = file["name"].split('/')[-1] if expected_file_name != file_name: print(f'File {expected_file_name} not found') return False file_size = int(file["size"]) if expected_file_size != file_size: print(f'File {expected_file_name} size {file["size"]} does not match expected {expected_file_size}') return False print("Transfered package OK") return True def setPasscode(harness, flag): body = {"phoneLockCodeEnabled": flag} ret = harness.endpoint_request("developerMode", "put", body) assert ret["status"] == status["NoContent"] def main(): if len(sys.argv) == 1: print(f'Please pass update file path as the parameter: python {sys.argv[0]} file_path ') raise TestError(Error.OTHER_ERROR) package_filepath = str(sys.argv[1]) if (not os.path.exists(package_filepath)): print(f'Update file {package_filepath} not found') raise TestError(Error.OTHER_ERROR) harness = Harness.from_detect() atexit.register(setPasscode, harness, True) setPasscode(harness, False) time.sleep(.1) if not send_update_package(harness, package_filepath): print("Update package transfer failed") exit(1) time.sleep(.1) if not check_update_package(harness, package_filepath): print("Update package corrupted in transfer") exit(1) exit(0) if __name__ == "__main__": try: main() except TestError as err: print(err) exit(err.get_error_code())