~aleteoryx/muditaos

ref: 0e8b4c848e3f87f3bbb1f64ef7460cf56017b87f muditaos/test/pytest/service-bluetooth/bt_utils.py -rw-r--r-- 3.6 KiB
0e8b4c84 — Lefucjusz [BH-2108] Fix misaligned charging symbol 3 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

from harness.interface.defs import status
from harness import log
import time

def bt_get_state(harness):
    time.sleep(1)
    body = {"state": True}
    ret = harness.endpoint_request("bluetooth", "get", body)
    assert ret["status"] == status["OK"]
    return ret["body"]["state"]

def bt_set_status(harness, power, visibility = "off"):
    log.info("Turning Bluetooth {} with visibility {}...".format(power, visibility))
    body = {"state": {"power" : power , "visibility" : visibility }}
    ret = harness.endpoint_request("bluetooth", "put", body)
    assert ret["status"] == status["OK"]

def bt_command(harness, command,  http_method = "put"):
    log.info("Sending command: {} with http_method={}".format(command, http_method))
    body = {"command": command}
    ret = harness.endpoint_request("bluetooth", http_method, body)
    assert ret["status"] == status["OK"]

def bt_pair_command(harness, pair_command, address, http_method):
    log.info("Requesting {}ing with address={}...".format(pair_command, address))
    body = {pair_command: address}
    ret = harness.endpoint_request("bluetooth", http_method, body)
    assert ret["status"] == status["OK"]

def bt_get_device_by_name(devices, name) -> dict :
    for device in devices:
        if device["name"] == name :
            return device
    return {}

def bt_find_device(harness, device_origin, device_name, max_attempts = 7):
    log.info("Getting {} devices".format(device_origin))
    body = {"devices": device_origin}

    for i in range(max_attempts):
        ret = harness.endpoint_request("bluetooth", "get", body)
        device = bt_get_device_by_name(ret["body"]["devices"], device_name)
        if device:
            log.info("Found {}, address={}".format(device_name, device.get('address')))
            return device
        if i != max_attempts - 1:
            log.info("Device {} not found, retrying...".format(device_name))
            time.sleep(2)
    return {}

def bt_is_device_forgotten(harness, device_name, max_attempts = 7):
    log.info("Checking if pair forgetting succeeded...")
    body = {"devices": "bonded"}

    for i in range(max_attempts):
        ret = harness.endpoint_request("bluetooth", "get", body)
        device = bt_get_device_by_name(ret["body"]["devices"], device_name)
        if not device:
            return True
        if i != max_attempts - 1:
            log.info("Device {} still paired, retrying...".format(device_name))
            time.sleep(2)
    return False

def bt_get_connected_address(harness, max_attempts = 10):
    log.info("Getting address of connected device")
    body = {"devices": "bonded"}

    for i in range(max_attempts):
        ret = harness.endpoint_request("bluetooth", "get", body)
        address = ret["body"]["address"]
        if len(address) > 0:
            log.info("Device connected={}".format(address))
            return address
        if i != max_attempts - 1:
            log.info("No device connected, retrying...")
            time.sleep(2)
    return ""

def bt_is_device_disconnected(harness, max_attempts = 7):
    log.info("Checking if disconnecting succeeded...")
    body = {"devices": "bonded"}

    for i in range(max_attempts):
        ret = harness.endpoint_request("bluetooth", "get", body)
        address = ret["body"]["address"]
        if len(address) == 0:
            return True
        if i != max_attempts - 1:
            log.info("Device {} connected, retrying...".format(address))
            time.sleep(2)
    return False