~aleteoryx/muditaos

ref: 7897f863809a3fe98844c2a520249decffe069d9 muditaos/test/pytest/service-desktop/test_message_threads.py -rw-r--r-- 4.2 KiB
7897f863 — Adam Dobrowolski [EGD-8164] Multiple same popups can be stored 4 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# Copyright (c) 2017-2022, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
import pytest
from harness.request import TransactionError
from harness.api.messages import GetThreadsWithOffsetAndLimit, MarkThreadAsUnread, GetMessageById, AddMessage, DeleteMessageById


class ThreadsTester:
    def __init__(self, harness):
        self.harness = harness

    def get_threads_with_offset_and_limit(self, offset, limit):
        try:
            result = GetThreadsWithOffsetAndLimit(offset, limit).run(self.harness)
        except TransactionError:
            return False
        else:
            return True, result

    def mark_thread_as_unread(self, thread_id, set_thread_unread):
        try:
            result = MarkThreadAsUnread(thread_id, set_thread_unread).run(self.harness)
        except TransactionError:
            return False
        else:
            return True, result

    def get_message_by_id(self, message_record_id):
        try:
            result = GetMessageById(message_record_id).run(self.harness)
        except TransactionError:
            return False
        else:
            return True, result.message

    def add_message(self, message_number, message_body):
        try:
            message = AddMessage(message_number, message_body).run(self.harness).message
        except TransactionError:
            return False
        else:
            return True, message

    def delete_message_by_id(self, message_record_id):
        try:
            DeleteMessageById(message_record_id).run(self.harness)
        except TransactionError:
            return False
        else:
            return True


@pytest.mark.service_desktop_test
@pytest.mark.usefixtures("phone_unlocked")
def test_marking_thread_as_read(harness):
    message_number = "123456789"
    message_body = "Test message nr 1"

    threads_tester = ThreadsTester(harness)

    result, message_record = threads_tester.add_message(message_number, message_body)
    assert result, "Failed to add message!"
    result, received_message_record = threads_tester.get_message_by_id(message_record["messageID"])
    assert result, "Failed to get message by id!"

    assert threads_tester.mark_thread_as_unread(received_message_record["threadID"], False)
    assert threads_tester.mark_thread_as_unread(received_message_record["threadID"], True)

    assert threads_tester.delete_message_by_id(message_record["messageID"]), "Failed to delete a message!"


@pytest.mark.service_desktop_test
@pytest.mark.usefixtures("phone_unlocked")
def test_getting_threads(harness):
    THREADS_PAGE_SIZE = 4
    message_numbers = ["123456789", "223456789", "323456789", "423456789", "523456789", "623456789"]
    message_body = "Test message"

    threads_tester = ThreadsTester(harness)

    for number in message_numbers:
        result = threads_tester.add_message(number, message_body)
        assert result, "Failed to add message!"

    result, response = threads_tester.get_threads_with_offset_and_limit(
        0, 0)
    assert result, "Failed to get all threads!"
    total_count_of_threads = response.totalCount
    result, response = threads_tester.get_threads_with_offset_and_limit(
        total_count_of_threads - 6, 6)
    assert result, "Failed to get threads with offset and limit!"
    assert len(response.threads) == THREADS_PAGE_SIZE
    assert response.nextPage["limit"] == 2
    assert response.nextPage["offset"] == total_count_of_threads - 2
    received_threads = response.threads
    result, response = threads_tester.get_threads_with_offset_and_limit(
        response.nextPage["offset"], response.nextPage["limit"])
    assert result, "Failed to get threads with offset and limit!"
    received_threads += response.threads
    assert len(received_threads) == 6

    for thread in received_threads:
        assert type(thread["isUnread"]) == bool
        assert type(thread["lastUpdatedAt"]) == int
        assert type(thread["messageCount"]) == int
        assert type(thread["messageSnippet"]) == str
        assert type(thread["messageType"]) == int
        assert type(thread["number"]) == str
        assert type(thread["threadID"]) == int
        message_types = [1, 2, 4, 8, 16, 18, 255]
        assert thread["messageType"] in message_types