~aleteoryx/muditaos

ref: 149514e8364c0753c9eec29d48b227fb85fc4bda muditaos/test/pytest/service-desktop/test_send_file.py -rw-r--r-- 2.8 KiB
149514e8 — Jakub Pyszczak [EGD-7169] Changed genlfs path 4 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
import pytest
import binascii
import os.path

from harness.interface.defs import status
from functools import partial

@pytest.mark.service_desktop_test
@pytest.mark.usefixtures("phone_unlocked")
@pytest.mark.rt1051
def test_send_file(harness):
    """
    Attempt requesting and sending file to Pure
    """
    fileName = "lorem-ipsum.txt"
    filePath = os.getcwd() + "/" + fileName

    with open(filePath, 'rb') as file:
        file_data = file.read()
        fileCrc32 = format((binascii.crc32(file_data) & 0xFFFFFFFF), '08x')
        print("fileCrc32: " + fileCrc32)

    fileSize = os.path.getsize(fileName)

    body = {"fileName" : "/sys/user/" + fileName, "fileSize" : fileSize, "fileCrc32" : fileCrc32}
    ret = harness.endpoint_request("filesystem", "put", body)

    assert ret["status"] == status["OK"]
    assert ret["body"]["txID"] != 0

    txID      = ret["body"]["txID"]
    chunkSize = ret["body"]["chunkSize"]
    print("chunkSize: " + str(chunkSize))

    totalChunks = int(((fileSize + chunkSize - 1) / chunkSize))
    print("totalChunks #: " + str(totalChunks))
    chunkNo = 1

    with open(filePath, 'rb') as file:
        for chunk in iter(partial(file.read, chunkSize), b''):
            data = binascii.b2a_base64(chunk).decode()

            body = {"txID" : txID, "chunkNo": chunkNo, "data" : data}
            ret = harness.endpoint_request("filesystem", "put", body)
            assert ret["status"] == status["OK"]
            chunkNo += 1


@pytest.mark.service_desktop_test
@pytest.mark.usefixtures("phone_unlocked")
@pytest.mark.rt1051
def test_get_file_back(harness):
    """
    Attempt requesting and transfering file data
    """
    fileName = "lorem-ipsum.txt"
    body = {"fileName" : "/sys/user/" + fileName}
    ret = harness.endpoint_request("filesystem", "get", body)

    assert ret["status"] == status["OK"]
    assert ret["body"]["fileSize"] != 0

    rxID      = ret["body"]["rxID"]
    fileSize  = ret["body"]["fileSize"]
    chunkSize = ret["body"]["chunkSize"]
    expectedCrc32 = ret["body"]["fileCrc32"]

    totalChunks = int(((fileSize + chunkSize - 1) / chunkSize))
    print("totalChunks #: " + str(totalChunks))
    print("Expected file CRC32: " + expectedCrc32)

    data = ""

    for n in range(1, totalChunks + 1):
        body = {"rxID" : rxID, "chunkNo": n}
        ret = harness.endpoint_request("filesystem", "get", body)

        assert ret["status"] == status["OK"]
        data += ret["body"]["data"][0:-1] # Skiping null char at end of chunk

    file_64_decode = binascii.a2b_base64(data)
    actualCrc32 = format((binascii.crc32(file_64_decode) & 0xFFFFFFFF), '08x')
    print("Actual file CRC32: " + actualCrc32)
    assert expectedCrc32 == actualCrc32