# Copyright (c) 2017-2022, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
import pytest
from harness.request import TransactionError
from harness.api.messages import GetThreadsWithOffsetAndLimit, GetThreadById, MarkThreadAsUnread, DeleteThreadById, \
GetMessageById, AddMessage, DeleteMessageById
class ThreadsTester:
def __init__(self, harness):
self.harness = harness
def get_threads_with_offset_and_limit(self, offset, limit):
try:
result = GetThreadsWithOffsetAndLimit(offset, limit).run(self.harness)
except TransactionError:
return False
else:
return True, result
def get_thread_by_id(self, thread_record_id):
try:
result = GetThreadById(thread_record_id).run(self.harness)
except TransactionError:
return False
else:
return True, result.thread
def mark_thread_as_unread(self, thread_id, set_thread_unread):
try:
result = MarkThreadAsUnread(thread_id, set_thread_unread).run(self.harness)
except TransactionError:
return False
else:
return True, result
def delete_thread_by_id(self, thread_record_id):
try:
DeleteThreadById(thread_record_id).run(self.harness)
except TransactionError:
return False
else:
return True
def get_message_by_id(self, message_record_id):
try:
result = GetMessageById(message_record_id).run(self.harness)
except TransactionError:
return False
else:
return True, result.message
def add_message(self, message_number, message_body):
try:
message = AddMessage(message_number, message_body).run(self.harness).message
except TransactionError:
return False
else:
return True, message
def delete_message_by_id(self, message_record_id):
try:
DeleteMessageById(message_record_id).run(self.harness)
except TransactionError:
return False
else:
return True
@pytest.mark.service_desktop_test
@pytest.mark.usefixtures("phone_unlocked")
def test_create_and_delete_thread(harness):
message_number = "123456789"
message_body = "Test message"
threads_tester = ThreadsTester(harness)
result, message_record = threads_tester.add_message(message_number, message_body)
assert result, "Failed to add message!"
result, received_message_record = threads_tester.get_message_by_id(message_record["messageID"])
assert result, "Failed to get message by id!"
assert threads_tester.delete_thread_by_id(message_record["threadID"]), "Failed to delete a thread!"
@pytest.mark.service_desktop_test
@pytest.mark.usefixtures("phone_unlocked")
def test_marking_thread_as_read(harness):
message_number = "123456789"
message_body = "Test message nr 1"
threads_tester = ThreadsTester(harness)
result, message_record = threads_tester.add_message(message_number, message_body)
assert result, "Failed to add message!"
result, received_message_record = threads_tester.get_message_by_id(message_record["messageID"])
assert result, "Failed to get message by id!"
assert threads_tester.mark_thread_as_unread(received_message_record["threadID"], False)
assert threads_tester.mark_thread_as_unread(received_message_record["threadID"], True)
assert threads_tester.delete_message_by_id(message_record["messageID"]), "Failed to delete a message!"
@pytest.mark.service_desktop_test
@pytest.mark.usefixtures("phone_unlocked")
def test_getting_threads(harness):
THREADS_PAGE_SIZE = 4
message_numbers = ["123456789", "223456789", "323456789", "423456789", "523456789", "623456789"]
message_body = "Test message"
threads_tester = ThreadsTester(harness)
for number in message_numbers:
result = threads_tester.add_message(number, message_body)
assert result, "Failed to add message!"
result, response = threads_tester.get_threads_with_offset_and_limit(
0, 0)
assert result, "Failed to get all threads!"
total_count_of_threads = response.totalCount
result, response = threads_tester.get_threads_with_offset_and_limit(
total_count_of_threads - 6, 6)
assert result, "Failed to get threads with offset and limit!"
assert len(response.threads) == THREADS_PAGE_SIZE
assert response.nextPage["limit"] == 2
assert response.nextPage["offset"] == total_count_of_threads - 2
received_threads = response.threads
result, response = threads_tester.get_threads_with_offset_and_limit(
response.nextPage["offset"], response.nextPage["limit"])
assert result, "Failed to get threads with offset and limit!"
received_threads += response.threads
assert len(received_threads) == 6
for thread in received_threads:
assert type(thread["isUnread"]) == bool
assert type(thread["lastUpdatedAt"]) == int
assert type(thread["messageCount"]) == int
assert type(thread["messageSnippet"]) == str
assert type(thread["messageType"]) == int
assert type(thread["number"]) == str
assert type(thread["numberID"]) == str
assert type(thread["threadID"]) == int
message_types = [1, 2, 4, 8, 16, 18, 255]
assert thread["messageType"] in message_types
@pytest.mark.service_desktop_test
@pytest.mark.usefixtures("phone_unlocked")
def test_get_message_by_id(harness):
threads_tester = ThreadsTester(harness)
message_number = "123456789"
message_body = "Hello, how are you?"
result, message_record = threads_tester.add_message(message_number, message_body)
assert result, "Failed to add message!"
result, received_message_record = threads_tester.get_message_by_id(message_record["messageID"])
assert result, "Failed to get message by id!"
result, received_thread_record = threads_tester.get_thread_by_id(received_message_record["threadID"])
assert result, "Failed to get thread by id!"
assert received_thread_record["threadID"] == received_message_record["threadID"], "Wrong thread id!"
assert threads_tester.delete_message_by_id(message_record["messageID"]), "Failed to delete a message!"