M test/get_os_log.py => test/get_os_log.py +10 -49
@@ 3,63 3,26 @@
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
import sys
-import time
-import sys
import os.path
import atexit
-import base64
-from tqdm import tqdm
-
from harness.harness import Harness
-from harness.interface.defs import status
from harness.interface.error import TestError, Error
-from functools import partial
-
-def transfer_data(harness: Harness, logDir: str, fileName: str, rxID, fileSize, chunkSize):
-
- totalChunks = int(((fileSize + chunkSize - 1) / chunkSize))
- print(f'Transfering {fileName} to {logDir}:')
-
- with open(os.path.join(logDir, fileName), 'wb') as logFile:
- with tqdm(total=fileSize, unit='B', unit_scale=True) as pbar:
- for n in range(1, totalChunks + 1):
- body = {"rxID" : rxID, "chunkNo": n}
- ret = harness.endpoint_request("filesystem", "get", body)
-
- assert ret["status"] == status["OK"]
- pbar.update(chunkSize)
- data = ret["body"]["data"][0:-1]
- decodedData = base64.standard_b64decode(data)
- logFile.write(decodedData)
-
-def get_log_file(harness, logDir: str):
- logDir = os.path.abspath(logDir)
-
- fileName = "MuditaOS.log"
- body = {"fileName" : "/sys/user/" + fileName}
- ret = harness.endpoint_request("filesystem", "get", body)
-
- assert ret["status"] == status["OK"]
- assert ret["body"]["fileSize"] != 0
-
- rxID = ret["body"]["rxID"]
- fileSize = ret["body"]["fileSize"]
- chunkSize = ret["body"]["chunkSize"]
-
- transfer_data(harness, logDir, fileName, rxID, fileSize, chunkSize)
-
- print("Log transfer complete")
+from harness.api.developermode import PhoneModeLock
+from harness.api.filesystem import get_log_file
def set_passcode(harness: Harness, flag: bool):
- body = {"phoneLockCodeEnabled": flag}
- ret = harness.endpoint_request("developerMode", "put", body)
- assert ret["status"] == status["NoContent"]
+ '''
+ on exit -> restore PhoneModeLock
+ '''
+ PhoneModeLock(flag).run(harness)
+
def main():
if len(sys.argv) == 1:
- print(f'Please pass log storage directory as the parameter: \'python {sys.argv[0]} <log dir>\' ')
+ print(
+ f'Please pass log storage directory as the parameter: \'python {sys.argv[0]} <log dir>\' ')
raise TestError(Error.OTHER_ERROR)
log_dir = str(sys.argv[1])
@@ 71,13 34,11 @@ def main():
harness = Harness.from_detect()
atexit.register(set_passcode, harness, True)
-
set_passcode(harness, False)
-
get_log_file(harness, log_dir)
-
exit(0)
+
if __name__ == "__main__":
try:
main()
M test/harness => test/harness +1 -1
@@ 1,1 1,1 @@
-Subproject commit 6f0613a6b7d71c9eb2d35ed7613c468f28e6621f
+Subproject commit b1ae209e9c257f8f62b3023ca30a361bd4d01e21
M test/pytest/service-desktop/test_get_file.py => test/pytest/service-desktop/test_get_file.py +30 -93
@@ 1,13 1,17 @@
# Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
import pytest
-import base64
from harness.interface.defs import status
+from harness.api.developermode import PhoneModeLock
+from harness.api.filesystem import get_file, FsInitGet, FsGetChunk
+from harness import log
+from harness.request import TransactionError
+from harness.interface.defs import Status
+
def setPasscode(harness, flag):
- body = {"phoneLockCodeEnabled": flag}
- ret = harness.endpoint_request("developerMode", "put", body)
- assert ret["status"] == status["NoContent"]
+ PhoneModeLock(flag).run(harness)
+
@pytest.mark.service_desktop_test
@pytest.mark.usefixtures("phone_unlocked")
@@ 24,10 28,8 @@ def test_get_not_existing_file(harness):
setPasscode(harness, False)
fileName = "Unknown.file"
- body = {"fileName" : "/sys/user/" + fileName}
- ret = harness.endpoint_request("filesystem", "get", body)
-
- assert ret["status"] == status["NotFound"]
+ with pytest.raises(TransactionError, match=r".*" + str(Status.NotFound.value) + ".*"):
+ get_file(harness, fileName, "./", "/sys/user/")
@pytest.mark.service_desktop_test
@@ 35,31 37,32 @@ def test_get_not_existing_file(harness):
@pytest.mark.rt1051
def test_get_invalid_chunks(harness):
"""
- Attempt requesting data with invalid chunk numbers
+ Attempts:
+ requesting data with invalid chunk numbers, which are:
+ - 0 as numbering start with 1
+ - chunk count + 1 (or any more) as it exceeds maximum no of chunks
+ requesting data with invalid rxID:
+ - rxID != current rxID transfer
"""
fileName = "MuditaOS.log"
- body = {"fileName" : "/sys/user/" + fileName}
- ret = harness.endpoint_request("filesystem", "get", body)
+ ret = FsInitGet("/sys/user/", fileName).run(harness)
- assert ret["status"] == status["OK"]
- assert ret["body"]["fileSize"] != 0
+ assert ret.fileSize != 0
- rxID = ret["body"]["rxID"]
- fileSize = ret["body"]["fileSize"]
- chunkSize = ret["body"]["chunkSize"]
+ totalChunks = int(((ret.fileSize + ret.chunkSize - 1) / ret.chunkSize))
+ log.info(f"totalChunks #: {totalChunks}")
- totalChunks = int(((fileSize + chunkSize - 1) / chunkSize))
- print("totalChunks #: " + str(totalChunks))
+ with pytest.raises(TransactionError, match=r".*" + str(Status.BadRequest.value) + ".*"):
+ FsGetChunk(ret.rxID, 0).run(harness)
- body = {"rxID" : rxID, "chunkNo": 0}
- ret = harness.endpoint_request("filesystem", "get", body)
+ with pytest.raises(TransactionError, match=r".*" + str(Status.BadRequest.value) + ".*"):
+ FsGetChunk(ret.rxID, totalChunks + 1).run(harness)
- assert ret["status"] == status["BadRequest"]
+ with pytest.raises(TransactionError, match=r".*" + str(Status.BadRequest.value) + ".*"):
+ FsGetChunk(ret.rxID - 1, totalChunks + 1).run(harness)
- body = {"rxID" : rxID, "chunkNo": int(totalChunks + 1)}
- ret = harness.endpoint_request("filesystem", "get", body)
-
- assert ret["status"] == status["BadRequest"]
+ with pytest.raises(TransactionError, match=r".*" + str(Status.BadRequest.value) + ".*"):
+ FsGetChunk(ret.rxID + 1, totalChunks + 1).run(harness)
@pytest.mark.service_desktop_test
@@ 67,73 70,7 @@ def test_get_invalid_chunks(harness):
@pytest.mark.rt1051
def test_get_file(harness):
"""
- Attempt requesting and transfering file data
+ Get file MuditaOS.log file - whole transfer
"""
- fileName = "MuditaOS.log"
- body = {"fileName" : "/sys/user/" + fileName}
- ret = harness.endpoint_request("filesystem", "get", body)
-
- assert ret["status"] == status["OK"]
- assert ret["body"]["fileSize"] != 0
-
- rxID = ret["body"]["rxID"]
- fileSize = ret["body"]["fileSize"]
- chunkSize = ret["body"]["chunkSize"]
-
- totalChunks = int(((fileSize + chunkSize - 1) / chunkSize))
- print("totalChunks #: " + str(totalChunks))
-
- data = ""
- for n in range(1, totalChunks + 1):
- body = {"rxID" : rxID, "chunkNo": n}
- ret = harness.endpoint_request("filesystem", "get", body)
-
- assert ret["status"] == status["OK"]
-
- data += ret["body"]["data"][0:-1] # Skiping null char at end of chunk
-
- file_64 = open(fileName + ".base64" , 'w')
- file_64.write(data)
-
- file_64_decode = base64.standard_b64decode(data)
- file_result = open(fileName, 'wb')
- file_result.write(file_64_decode)
-
-
-@pytest.mark.service_desktop_test
-@pytest.mark.usefixtures("phone_unlocked")
-@pytest.mark.rt1051
-def test_get_invalid_rxID(harness):
- """
- Attempt requesting data with invalid rxIDs
- """
- fileName = "MuditaOS.log"
- body = {"fileName" : "/sys/user/" + fileName}
- ret = harness.endpoint_request("filesystem", "get", body)
-
- assert ret["status"] == status["OK"]
- assert ret["body"]["fileSize"] != 0
-
- rxID = ret["body"]["rxID"]
- fileSize = ret["body"]["fileSize"]
- chunkSize = ret["body"]["chunkSize"]
-
- totalChunks = int(((fileSize + chunkSize - 1) / chunkSize))
- print("totalChunks #: " + str(totalChunks))
-
- body = {"rxID" : int(rxID - 1), "chunkNo": 1}
- ret = harness.endpoint_request("filesystem", "get", body)
-
- assert ret["status"] == status["BadRequest"]
-
- body = {"rxID" : int(rxID + 1), "chunkNo": 1}
- ret = harness.endpoint_request("filesystem", "get", body)
-
- assert ret["status"] == status["BadRequest"]
-
- """
- Getting a large file may hit screen auto lock.
- We need to disable pass code for duration of test
- """
- setPasscode(harness, True)
+ get_file(harness, "MuditaOS.log", "./")
M test/pytest/service-desktop/test_send_file.py => test/pytest/service-desktop/test_send_file.py +10 -64
@@ 1,11 1,13 @@
# Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
import pytest
-import binascii
-import os.path
+import filecmp
+
+
+from harness.api.filesystem import get_file, put_file
+
+fileName = "lorem-ipsum.txt"
-from harness.interface.defs import status
-from functools import partial
@pytest.mark.service_desktop_test
@pytest.mark.usefixtures("phone_unlocked")
@@ 14,38 16,7 @@ def test_send_file(harness):
"""
Attempt requesting and sending file to Pure
"""
- fileName = "lorem-ipsum.txt"
- filePath = os.getcwd() + "/" + fileName
-
- with open(filePath, 'rb') as file:
- file_data = file.read()
- fileCrc32 = format((binascii.crc32(file_data) & 0xFFFFFFFF), '08x')
- print("fileCrc32: " + fileCrc32)
-
- fileSize = os.path.getsize(fileName)
-
- body = {"fileName" : "/sys/user/" + fileName, "fileSize" : fileSize, "fileCrc32" : fileCrc32}
- ret = harness.endpoint_request("filesystem", "put", body)
-
- assert ret["status"] == status["OK"]
- assert ret["body"]["txID"] != 0
-
- txID = ret["body"]["txID"]
- chunkSize = ret["body"]["chunkSize"]
- print("chunkSize: " + str(chunkSize))
-
- totalChunks = int(((fileSize + chunkSize - 1) / chunkSize))
- print("totalChunks #: " + str(totalChunks))
- chunkNo = 1
-
- with open(filePath, 'rb') as file:
- for chunk in iter(partial(file.read, chunkSize), b''):
- data = binascii.b2a_base64(chunk).decode()
-
- body = {"txID" : txID, "chunkNo": chunkNo, "data" : data}
- ret = harness.endpoint_request("filesystem", "put", body)
- assert ret["status"] == status["OK"]
- chunkNo += 1
+ put_file(harness, fileName, "/sys/user/")
@pytest.mark.service_desktop_test
@@ 55,33 26,8 @@ def test_get_file_back(harness):
"""
Attempt requesting and transfering file data
"""
- fileName = "lorem-ipsum.txt"
- body = {"fileName" : "/sys/user/" + fileName}
- ret = harness.endpoint_request("filesystem", "get", body)
-
- assert ret["status"] == status["OK"]
- assert ret["body"]["fileSize"] != 0
-
- rxID = ret["body"]["rxID"]
- fileSize = ret["body"]["fileSize"]
- chunkSize = ret["body"]["chunkSize"]
- expectedCrc32 = ret["body"]["fileCrc32"]
-
- totalChunks = int(((fileSize + chunkSize - 1) / chunkSize))
- print("totalChunks #: " + str(totalChunks))
- print("Expected file CRC32: " + expectedCrc32)
-
- data = ""
-
- for n in range(1, totalChunks + 1):
- body = {"rxID" : rxID, "chunkNo": n}
- ret = harness.endpoint_request("filesystem", "get", body)
-
- assert ret["status"] == status["OK"]
- data += ret["body"]["data"][0:-1] # Skiping null char at end of chunk
+ loadedFile = "./lorem-ipsum-2.txt"
- file_64_decode = binascii.a2b_base64(data)
- actualCrc32 = format((binascii.crc32(file_64_decode) & 0xFFFFFFFF), '08x')
- print("Actual file CRC32: " + actualCrc32)
- assert expectedCrc32 == actualCrc32
+ get_file(harness, fileName, "./", "/sys/user/", loadedFile)
+ assert filecmp.cmp(fileName, loadedFile, shallow=True)
M test/pytest/test_updater.py => test/pytest/test_updater.py +35 -50
@@ 3,68 3,53 @@
import os
import pytest
+import logging
from harness import log
from harness.interface.defs import Method, Endpoint
from harness.request import Transaction, Request,TransactionError
-
+from harness.rt_harness_discovery import get_rt1051_harness
from harness.harness import Harness
-import binascii
-
-
-def send_file(harness: Harness, file: str, where: str) -> Transaction:
-
- fileSize = os.path.getsize(file)
-
-
- with open(file, 'rb') as l_file:
- file_data = l_file.read()
- fileCrc32 = format((binascii.crc32(file_data) & 0xFFFFFFFF), '08x')
-
- body = {"fileName": where + file,
- "fileSize": fileSize, "fileCrc32": fileCrc32}
-
- log.debug(f"sending {body}")
+from harness.api.filesystem import put_file, get_file
+from harness.api.developermode import PhoneModeLock, PhoneReboot, Reboot
- ret = harness.request(Endpoint.FILESYSTEM, Method.PUT, body)
- log.debug(f"response {ret.response}")
- assert ret.response.body["txID"] != 0
+def get_version(harness: Harness):
+ r = harness.request(Endpoint.DEVICEINFO, Method.GET, {}).response
+ version = r.body["version"]
+ sha = r.body["gitRevision"]
+ return f"version: {version} sha: {sha}"
- txID = ret.response.body["txID"]
- chunkSize = ret.response.body["chunkSize"]
- chunkNo = 1
- sent = 0
-
- from functools import partial
- import base64
- with open(file, 'rb') as l_file:
- for chunk in iter(partial(l_file.read, chunkSize), b''):
- data = base64.standard_b64encode(chunk).decode()
-
- body = {"txID": txID, "chunkNo": chunkNo, "data": data}
- ret = harness.request(Endpoint.FILESYSTEM, Method.PUT, body)
- chunkNo += 1
-
- sent += len(chunk)
- percent = (sent / fileSize)*100
- time = ret.send_time + ret.read_time
- Mbps = (len(data)*8)/(time*1024*1024)
-
- log.info(f"-> {percent:0.2f}% in {time:0.3f}s {Mbps:0.3f}Mbps")
-
- return ret
+def disable_some_logs(harness: Harness):
+ from harness.interface.defs import PureLogLevel
+ for val in ["SysMgrService", "ServiceDesktop_w2", "CellularMux"]:
+ ret = harness.request(Endpoint.DEVELOPERMODE, Method.POST, {
+ "log": True, "service": val, "level": PureLogLevel.LOGERROR.value})
+ log.info(f"{ret.response}")
@pytest.mark.usefixtures("phone_unlocked")
@pytest.mark.rt1051
def test_update(harness: Harness):
filename = "update.tar"
- try:
- harness.request(Endpoint.DEVELOPERMODE, Method.PUT, {"phoneLockCodeEnabled": False})
- send_file(harness, filename, "/sys/user/")
- harness.request(Endpoint.DEVELOPERMODE, Method.PUT, {"phoneLockCodeEnabled": True})
- harness.request(Endpoint.DEVELOPERMODE, Method.POST, {"update": True, "reboot": True})
- except TransactionError as err:
- log.error(f"{err.message} Status: {err.status}")
+
+ log.info(get_version(harness))
+ PhoneModeLock(False).run(harness)
+ put_file(harness, filename, "/sys/user/")
+ PhoneReboot(Reboot.UPDATE).run(harness)
+ assert harness.connection.watch_port_reboot(60)
+
+ harness = get_rt1051_harness(60)
+ import time
+ time.sleep(10)
+ harness.unlock_phone()
+ PhoneModeLock(False).run(harness)
+
+ log.info(get_version(harness))
+ get_file(harness, "updater.log", "./")
+ with open("updater.log") as f:
+ line = f.readline()
+ assert "OK" in line
+ PhoneModeLock(True).run(harness)
+
log.info("update done!")
M test/send_update_package.py => test/send_update_package.py +5 -45
@@ 4,7 4,6 @@
import sys
import time
-import sys
import os.path
import json
import atexit
@@ 18,41 17,12 @@ from harness.harness import Harness
from harness.interface.defs import status
from harness.interface.error import TestError, Error
from functools import partial
+from harness.api.filesystem import put_file
+from harness.api.developermode import PhoneModeLock
def send_update_package(harness, package_filepath: str):
- file_size = os.path.getsize(package_filepath)
file_name = package_filepath.split('/')[-1]
-
- with open(package_filepath, 'rb') as file:
- file_data = open(package_filepath,'rb').read()
- file_crc32 = format((binascii.crc32(file_data) & 0xFFFFFFFF), '08x')
-
- print(f"Sending {file_name}, size {file_size}, CRC32 {file_crc32}")
-
- body = {"fileName" : "/sys/user/updates/" + file_name, "fileSize" : file_size, "fileCrc32" : file_crc32}
- ret = harness.endpoint_request("filesystem", "put", body)
-
- assert ret["status"] == status["OK"]
- assert ret["body"]["txID"] != 0
-
- txID = ret["body"]["txID"]
- chunkSize = ret["body"]["chunkSize"]
- chunkNo = 1
-
- with open(package_filepath, 'rb') as file:
- with tqdm(total=file_size, unit='B', unit_scale=True) as pbar:
- for chunk in iter(partial(file.read, chunkSize), b''):
- data = binascii.b2a_base64(chunk).decode()
- pbar.update(chunkSize)
- body = {"txID" : txID, "chunkNo": chunkNo, "data" : data}
- ret = harness.endpoint_request("filesystem", "put", body)
- assert ret["status"] == status["OK"]
- time.sleep(.1)
- chunkNo += 1
-
- print("Sending complete")
-
- return True
+ put_file(harness, file_name , "/sys/user/updates/")
def check_update_package(harness, package_filepath: str):
body = {}
@@ 89,9 59,7 @@ def check_update_package(harness, package_filepath: str):
return True
def setPasscode(harness, flag):
- body = {"phoneLockCodeEnabled": flag}
- ret = harness.endpoint_request("developerMode", "put", body)
- assert ret["status"] == status["NoContent"]
+ PhoneModeLock(flag).run(harness)
def main():
if len(sys.argv) == 1:
@@ 107,16 75,8 @@ def main():
harness = Harness.from_detect()
atexit.register(setPasscode, harness, True)
-
setPasscode(harness, False)
-
- time.sleep(.1)
-
- if not send_update_package(harness, package_filepath):
- print("Update package transfer failed")
- exit(1)
-
- time.sleep(.1)
+ send_update_package(harness, package_filepath)
if not check_update_package(harness, package_filepath):
print("Update package corrupted in transfer")