import base64
import os.path
import serial
import random
import json
import zlib
import argparse
import time
from tqdm import tqdm
http_methods = {
'GET': 1,
'POST': 2,
'PUT': 3,
'DELETE': 4
}
endpoint_types = {
'Invalid': 0,
'DeviceInfo': 1,
'Update': 2,
'FilesystemUpload': 3,
'Backup': 4,
'Restore': 5,
'Factory': 6,
'Contacts': 7,
'Messages': 8,
'Calllog': 9,
'CalendarEventsPlaceholder': 10,
'DeveloperMode': 11,
'Bluetooth': 12,
'UsbSecurity': 13,
'Outbox': 14,
'Reboot': 15,
'TimeSync': 16,
'Quotes': 17
}
payload_marker = '#'
payload_size_len = 9
port_timeout_short_s = 1
port_timeout_medium_s = 2
port_timeout_long_s = 5
port_baudrate = 115200
default_rx_data_size = 1024
chunk_rx_data_size = 1024 * 200
sync_filename = 'sync.tar'
sync_path = '/user/temp/' + sync_filename
def get_new_uuid() -> int:
return random.randint(1, 10000)
def send_data(port_path: str, payload_str: str, expected_response: int) -> bool:
payload_len_str = str(len(payload_str)).rjust(payload_size_len, '0')
request_str = payload_marker + payload_len_str + payload_str
with serial.Serial(port_path, port_baudrate, timeout=port_timeout_short_s) as port:
port.write(request_str.encode('ascii'))
response = port.read(2048)
resp_json = json.loads(response[10:])
status = resp_json['status']
if status == expected_response:
print(f'Request success')
return True
print(f'Request failed, status {status}')
return False
def get_request_response(port_path: str, request: str, expected_size: int) -> bytes:
payload_len_str = str(len(request)).rjust(payload_size_len, '0')
request_str = payload_marker + payload_len_str + request
with serial.Serial(port_path, port_baudrate, timeout=port_timeout_medium_s) as port:
port.write(request_str.encode('ascii'))
response = port.read(expected_size)
return response
def get_str_data(port_path: str, payload_str: str, expected_responses: list, body_str: str) -> str:
payload_len_str = str(len(payload_str)).rjust(payload_size_len, '0')
request_str = payload_marker + payload_len_str + payload_str
with serial.Serial(port_path, port_baudrate, timeout=port_timeout_short_s) as port:
port.write(request_str.encode('ascii'))
response = port.read(2048)
resp_json = json.loads(response[10:])
status = resp_json['status']
for expected_status in expected_responses:
if status == expected_status:
print(f'Request success')
body = resp_json['body']
return body[body_str]
print(f'Request failed, status {status}')
return ""
def add_quote(port_path: str, quote_str: str, author_str: str) -> bool:
uuid = get_new_uuid()
payload = {
'endpoint': endpoint_types['Quotes'],
'method': http_methods['POST'],
'uuid': uuid,
'body': {
'quote': quote_str,
'author': author_str
}
}
payload_str = json.dumps(payload)
payload_len_str = str(len(payload_str)).rjust(payload_size_len, '0')
request_str = payload_marker + payload_len_str + payload_str
with serial.Serial(port_path, port_baudrate, timeout=port_timeout_short_s) as port:
port.write(request_str.encode('ascii'))
response = port.read(2048)
resp_json = json.loads(response[10:])
status = resp_json['status']
if status == 200:
print(f'Request success {resp_json["body"]}')
return True
print(f'Request failed, status {status}')
return False
def edit_quote(port_path: str, quote_id: int, quote_str: str, author_str: str) -> bool:
uuid = get_new_uuid()
payload = {
'endpoint': endpoint_types['Quotes'],
'method': http_methods['PUT'],
'uuid': uuid,
'body': {
'quoteID': quote_id,
'quote': quote_str,
'author': author_str
}
}
payload_str = json.dumps(payload)
return send_data(port_path, payload_str, 200)
def delete_quote(port_path: str, quote_id: int) -> bool:
uuid = get_new_uuid()
payload = {
'endpoint': endpoint_types['Quotes'],
'method': http_methods['DELETE'],
'uuid': uuid,
'body': {
'quoteID': quote_id
}
}
payload_str = json.dumps(payload)
return send_data(port_path, payload_str, 204)
def change_group(port_path: str, quote_group: str) -> bool:
uuid = get_new_uuid()
payload = {
'endpoint': endpoint_types['Quotes'],
'method': http_methods['PUT'],
'uuid': uuid,
'body': {
'group': quote_group
}
}
payload_str = json.dumps(payload)
return send_data(port_path, payload_str, 200)
def change_interval(port_path: str, quote_interval: str) -> bool:
uuid = get_new_uuid()
payload = {
'endpoint': endpoint_types['Quotes'],
'method': http_methods['PUT'],
'uuid': uuid,
'body': {
'interval': quote_interval
}
}
payload_str = json.dumps(payload)
return send_data(port_path, payload_str, 200)
def get_settings(port_path: str, settings: str) -> str:
uuid = get_new_uuid()
payload = {
'endpoint': endpoint_types['Quotes'],
'method': http_methods['GET'],
'uuid': uuid,
'body': {
'settings': settings
}
}
payload_str = json.dumps(payload)
return get_str_data(port_path, payload_str, [200], settings)
def start_sync(port_path: str) -> bool:
uuid = get_new_uuid()
payload = {
'endpoint': endpoint_types['Backup'],
'method': http_methods['POST'],
'uuid': uuid,
'body': {
"category" : "sync"
}
}
payload_str = json.dumps(payload)
return send_data(port_path, payload_str, 202)
def wait_for_sync_end(port_path: str, timeout: int) -> bool:
uuid = get_new_uuid()
payload = {
'endpoint': endpoint_types['Backup'],
'method': http_methods['GET'],
'uuid': uuid,
'body': {
"category" : "sync"
}
}
payload_str = json.dumps(payload)
start_timestamp = time.time()
while time.time() < start_timestamp + timeout:
status = get_str_data(port_path, payload_str, [204, 303], 'state')
print("sync status: " + status)
if status == "finished":
return True
time.sleep(1)
return False
def download_file(port_path: str, path: str) -> bool:
uuid = get_new_uuid()
chunk_size = 0
file_size = 0
payload = {
'endpoint': endpoint_types['FilesystemUpload'],
'method': http_methods['GET'],
'uuid': uuid,
'body': {
'fileName': path
}
}
payload_str = json.dumps(payload)
request_id = {
'endpoint': endpoint_types['FilesystemUpload'],
'method': http_methods['GET'],
'uuid': uuid,
'body': {
'rxID': 0,
'chunkNo': 1
}
}
response = get_request_response(port_path, payload_str, default_rx_data_size)
if not response:
print("Request GET file " + path + " info failed!")
return False
resp_json = json.loads(response[10:])
status = resp_json['status']
if status == 200:
request_id['body']['rxID'] = resp_json['body']['rxID']
chunk_size = resp_json['body']['chunkSize']
file_size = resp_json['body']['fileSize']
else:
print(f'Request failed, status: {status}')
return False
chunks = int((file_size / chunk_size) + 1)
print(f'File size: {file_size} chunk size: {chunk_size} chunks: {chunks}')
filename = "temp/" + sync_filename
os.makedirs(os.path.dirname(filename), exist_ok=True)
progress_bar = tqdm(desc=f'Downloading \'{path}\'', total=file_size, unit='bytes', unit_scale=True, unit_divisor=1024)
with open(filename, 'wb') as file:
for i in range(1, chunks + 1):
request_id_str = json.dumps(request_id)
response = get_request_response(port_path, request_id_str, chunk_rx_data_size)
if not response:
print("Request GET chunkNo: " + request_id['body']['chunkNo'] + " failed!")
return False
resp_json = json.loads(response[10:])
status = resp_json['status']
if status == 200:
data = base64.b64decode(resp_json['body']['data'])
file.write(data)
request_id['body']['chunkNo'] = i + 1
progress_bar.update(len(data))
else:
print(f'Request failed, status {status}')
progress_bar.close()
return False
progress_bar.close()
return True
def delete_sync(port_path: str, path: str) -> bool:
uuid = get_new_uuid()
payload = {
'endpoint': endpoint_types['FilesystemUpload'],
'method': http_methods['DELETE'],
'uuid': uuid,
'body': {
"removeFile" : path
}
}
payload_str = json.dumps(payload)
return send_data(port_path, payload_str, 204)
def get_quotes(port_path: str) -> bool:
if not start_sync(port_path):
print("Error! Synchronization has not been started.")
return False
print("Synchronization has started...")
if not wait_for_sync_end(port_path, 20):
print("Error! Synchronization fail.")
return False
print("Synchronization completed successfully.")
if not download_file(port_path, sync_path):
print("Error! Sync file download fail.")
return False
print("Sync file download completed.")
if not delete_sync(port_path, sync_path):
print("Error! Failed to delete sync file.")
return False
print("Sync file deletion completed successfully.")
return True
def main():
parser = argparse.ArgumentParser(
prog='custom_quotes',
description='Script used to test functionality of custom quotes endpoint in Harmony project'
)
parser.add_argument('-p', '--port',
metavar='path_to_com_port',
help='path to COM port of the device (e.g. /dev/ttyACM0)')
parser.add_argument('-a', '--add',
action='store_true',
help='add new custom quote')
parser.add_argument('-d', '--delete',
metavar='quote_id_to_delete',
type=int,
help='delete custom quote')
parser.add_argument('-e', '--edit',
metavar='quote_id_to_edit',
type=int,
help='edit custom quote')
parser.add_argument('-q', '--quote',
metavar='string_with_quote',
help='string with new quote')
parser.add_argument('-t', '--author',
metavar='string_with_author',
help='string with author')
parser.add_argument('-g', '--group',
metavar='string_with_group_type',
help='string with type of group of displayed quotes [Predefined or Custom]')
parser.add_argument('-i', '--interval',
metavar='quotes_display_interval',
help='quotes display interval [\'x\' minutes or \'AtMidnight\']')
parser.add_argument('-s', '--synchro',
metavar='settings_to_get',
help='get quotes file or settings [\'quotes\' or \'group\' or \'interval\']')
args = parser.parse_args()
if not args.port:
print('Invalid usage: please specify device port')
print('Run with -h to see help')
return
if not args.add and not args.delete and not args.edit and not args.group and not args.interval and not args.synchro:
print('Invalid usage: please specify add, delete, edit, synchro, group or interval argument')
print('Run with -h to see help')
return
if args.add:
if not args.quote:
print('Invalid usage: please add quote')
print('Run with -h to see help')
return
else:
print("adding new quotes: " + args.quote)
add_quote(args.port, args.quote, args.author)
elif args.delete:
print("deleting quotes nr: " + str(args.delete))
delete_quote(args.port, args.delete)
elif args.edit:
if not args.quote:
print('Invalid usage: please add quote')
print('Run with -h to see help')
return
else:
print("editing quotes nr: " + str(args.edit))
edit_quote(args.port, args.edit, args.quote, args.author)
elif args.group:
print("quotes group: " + args.group)
change_group(args.port, args.group)
elif args.interval:
print("quotes interval: " + args.interval)
change_interval(args.port, args.interval)
elif args.synchro:
if args.synchro == "quotes":
print("downloading quotes file")
get_quotes(args.port)
elif args.synchro == "group":
group = get_settings(args.port, args.synchro)
print("quotes group: " + group)
elif args.synchro == "interval":
interval = get_settings(args.port, args.synchro)
print("quotes interval: " + interval)
else:
print('Invalid usage: please choose one option: \'quotes\' or \'group\' or \'interval\'')
print('Run with -h to see help')
if __name__ == '__main__':
main()