~aleteoryx/muditaos

ref: cecfb8a00ebe0c947275370b4050ca6f41a9f9b4 muditaos/test/pytest/service-desktop/test_message_threads.py -rw-r--r-- 6.2 KiB
cecfb8a0 — Lukasz Mastalerz [CP-1624] Cover new fields in contact and message endpoints with tests 2 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# Copyright (c) 2017-2022, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
import pytest
from harness.request import TransactionError
from harness.api.messages import GetThreadsWithOffsetAndLimit, GetThreadById, MarkThreadAsUnread, DeleteThreadById, \
    GetMessageById, AddMessage, DeleteMessageById


class ThreadsTester:
    def __init__(self, harness):
        self.harness = harness

    def get_threads_with_offset_and_limit(self, offset, limit):
        try:
            result = GetThreadsWithOffsetAndLimit(offset, limit).run(self.harness)
        except TransactionError:
            return False
        else:
            return True, result

    def get_thread_by_id(self, thread_record_id):
        try:
            result = GetThreadById(thread_record_id).run(self.harness)
        except TransactionError:
            return False
        else:
            return True, result.thread

    def mark_thread_as_unread(self, thread_id, set_thread_unread):
        try:
            result = MarkThreadAsUnread(thread_id, set_thread_unread).run(self.harness)
        except TransactionError:
            return False
        else:
            return True, result

    def delete_thread_by_id(self, thread_record_id):
        try:
            DeleteThreadById(thread_record_id).run(self.harness)
        except TransactionError:
            return False
        else:
            return True

    def get_message_by_id(self, message_record_id):
        try:
            result = GetMessageById(message_record_id).run(self.harness)
        except TransactionError:
            return False
        else:
            return True, result.message

    def add_message(self, message_number, message_body):
        try:
            message = AddMessage(message_number, message_body).run(self.harness).message
        except TransactionError:
            return False
        else:
            return True, message

    def delete_message_by_id(self, message_record_id):
        try:
            DeleteMessageById(message_record_id).run(self.harness)
        except TransactionError:
            return False
        else:
            return True


@pytest.mark.service_desktop_test
@pytest.mark.usefixtures("phone_unlocked")
def test_create_and_delete_thread(harness):
    message_number = "123456789"
    message_body = "Test message"

    threads_tester = ThreadsTester(harness)

    result, message_record = threads_tester.add_message(message_number, message_body)
    assert result, "Failed to add message!"
    result, received_message_record = threads_tester.get_message_by_id(message_record["messageID"])
    assert result, "Failed to get message by id!"

    assert threads_tester.delete_thread_by_id(message_record["threadID"]), "Failed to delete a thread!"


@pytest.mark.service_desktop_test
@pytest.mark.usefixtures("phone_unlocked")
def test_marking_thread_as_read(harness):
    message_number = "123456789"
    message_body = "Test message nr 1"

    threads_tester = ThreadsTester(harness)

    result, message_record = threads_tester.add_message(message_number, message_body)
    assert result, "Failed to add message!"
    result, received_message_record = threads_tester.get_message_by_id(message_record["messageID"])
    assert result, "Failed to get message by id!"

    assert threads_tester.mark_thread_as_unread(received_message_record["threadID"], False)
    assert threads_tester.mark_thread_as_unread(received_message_record["threadID"], True)

    assert threads_tester.delete_message_by_id(message_record["messageID"]), "Failed to delete a message!"


@pytest.mark.service_desktop_test
@pytest.mark.usefixtures("phone_unlocked")
def test_getting_threads(harness):
    THREADS_PAGE_SIZE = 4
    message_numbers = ["123456789", "223456789", "323456789", "423456789", "523456789", "623456789"]
    message_body = "Test message"

    threads_tester = ThreadsTester(harness)

    for number in message_numbers:
        result = threads_tester.add_message(number, message_body)
        assert result, "Failed to add message!"

    result, response = threads_tester.get_threads_with_offset_and_limit(
        0, 0)
    assert result, "Failed to get all threads!"
    total_count_of_threads = response.totalCount
    result, response = threads_tester.get_threads_with_offset_and_limit(
        total_count_of_threads - 6, 6)
    assert result, "Failed to get threads with offset and limit!"
    assert len(response.threads) == THREADS_PAGE_SIZE
    assert response.nextPage["limit"] == 2
    assert response.nextPage["offset"] == total_count_of_threads - 2
    received_threads = response.threads
    result, response = threads_tester.get_threads_with_offset_and_limit(
        response.nextPage["offset"], response.nextPage["limit"])
    assert result, "Failed to get threads with offset and limit!"
    received_threads += response.threads
    assert len(received_threads) == 6

    for thread in received_threads:
        assert type(thread["isUnread"]) == bool
        assert type(thread["lastUpdatedAt"]) == int
        assert type(thread["messageCount"]) == int
        assert type(thread["messageSnippet"]) == str
        assert type(thread["messageType"]) == int
        assert type(thread["number"]) == str
        assert type(thread["numberID"]) == str
        assert type(thread["threadID"]) == int
        message_types = [1, 2, 4, 8, 16, 18, 255]
        assert thread["messageType"] in message_types


@pytest.mark.service_desktop_test
@pytest.mark.usefixtures("phone_unlocked")
def test_get_message_by_id(harness):
    threads_tester = ThreadsTester(harness)

    message_number = "123456789"
    message_body = "Hello, how are you?"

    result, message_record = threads_tester.add_message(message_number, message_body)
    assert result, "Failed to add message!"

    result, received_message_record = threads_tester.get_message_by_id(message_record["messageID"])
    assert result, "Failed to get message by id!"

    result, received_thread_record = threads_tester.get_thread_by_id(received_message_record["threadID"])
    assert result, "Failed to get thread by id!"
    assert received_thread_record["threadID"] == received_message_record["threadID"], "Wrong thread id!"

    assert threads_tester.delete_message_by_id(message_record["messageID"]), "Failed to delete a message!"