#!/usr/bin/env python3
# Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
import sys
import time
import sys
import os.path
import json
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
from harness import log
from harness.harness import Harness
from harness.interface.defs import key_codes, endpoint, method
from harness.utils import Timeout
from harness.interface.error import TestError, Error
from functools import partial
# uploaded file chunk size - according to
# https://appnroll.atlassian.net/wiki/spaces/MFP/pages/656637953/Protocol+description
CHUNK_SIZE = 1024 * 16
# update performing timeout
UPDATE_TIMEOUT = 90
update_status_code = {
0: "Initial",
1: "UpdateFileSet",
2: "CreatingDirectories",
3: "ExtractingFiles",
4: "UpdatingBootloader",
5: "ChecksumVerification",
6: "VersionVerification",
7: "ReadyForReset"
}
def update(harness, update_filepath: str):
connection = harness.get_connection()
serial = harness.get_connection().get_serial()
file_size = os.path.getsize(update_filepath)
filename = update_filepath.split('/')[-1]
body = {"command": "download", "fileName": filename, "fileSize": file_size}
ret = harness.endpoint_request("filesystemUpload", "post", body)["body"]
if ret["status"] is not None:
log.info(f"Update status: {update_status_code[int(ret['status'])]}")
log.info("Downloading update file to the target")
with open(update_filepath, 'rb') as file:
for chunk in iter(partial(file.read, CHUNK_SIZE), b''):
print(".", end='', flush=True)
serial.write(chunk)
print(" ")
body = {"fileName": filename}
ret = harness.endpoint_request("update", "post", body)["body"]
if ret["fileName"] != filename and int(ret[fileSize]) != file_size:
log.error("Upload error!")
exit(1)
timer = time.time()
while True:
if time.time() - timer > UPDATE_TIMEOUT:
log.error("Update timeout!")
return False
if serial.in_waiting > 0:
result = connection.read(10)
ret = json.loads(result)
body = ret['body']
if "status" in body:
status = body["status"]
log.info(f"Update status: {status}")
timer = time.time()
if "reset" in status:
log.info("Update finished, wait for device reset...")
return True
def get_update_list(harness):
harness.unlock_phone()
ret = harness.endpoint_request("deviceInfo", "get", {})
device_info = ret["body"]
update_history = device_info["updateHistory"]
failed_updates = 0
if update_history is None:
log.info("Update history clean!")
return [None, 0]
for update in update_history:
if update["finishedError"] != 0 and update["finishedState"] != 6:
failed_updates = failed_updates + 1
log.info(f"Found {len(update_history)} update entries with {failed_updates} failed updates")
return [update_history, failed_updates]
def main():
if len(sys.argv) == 1:
print(f'Please pass update file path as the parameter: python {sys.argv[0]} file_path ')
raise TestError(Error.PORT_NOT_FOUND)
harness = None
with Timeout.limit(seconds=20):
while not harness:
try:
harness = Harness.from_detect()
except TestError:
pass
update_filename = str(sys.argv[1])
history, fails = get_update_list(harness)
if update(harness, update_filename):
# wait for reboot
harness = None
log.info("Waiting for device to reset")
time.sleep(5)
# connect to the phone once again
with Timeout.limit(seconds=90):
while not harness:
try:
harness = Harness.from_detect()
except TestError:
pass
[new_history, new_fails] = get_update_list(harness)
if new_fails != fails or (history is None) or (history is not None and (len(new_history) != len(history) + 1)):
log.error("Update failed!")
exit(1)
else:
log.info("Update successful!")
exit(0)
else:
log.error("Update error!")
exit(1)
if __name__ == "__main__":
try:
main()
except TestError as err:
log.error(err)
exit(err.get_error_code())