~aleteoryx/muditaos

50f28e8f19b23384aac9efd5c22cc155d451cace — Bartosz Cichocki 5 years ago 5a8ae75
[EGD-4270] updated test API, added sending message case in test harness (#1042)

[EGD-4270] build fix for GCC 10

Co-authored-by: SP2FET <bartosz.cichocki@mudita.com>
M changelog.md => changelog.md +1 -0
@@ 9,6 9,7 @@
* `[cellular]` Added SIM PIN/PUK handling
* `[cellular]` Added change PIN functionality
* `[cellular]` Added possibility of unlock SIM card (no PIN on start) functionality
* `[testing]` Added test harness with message sending case

### Changed


M module-audio/Audio/encoder/Encoder.hpp => module-audio/Audio/encoder/Encoder.hpp +2 -0
@@ 4,6 4,8 @@
#pragma once

#include <memory>
#include <cstdio>
#include <string>

namespace audio
{

M module-services/service-desktop/endpoints/developerMode/DeveloperModeHelper.cpp => module-services/service-desktop/endpoints/developerMode/DeveloperModeHelper.cpp +14 -7
@@ 7,11 7,14 @@

#include <service-desktop/parser/MessageHandler.hpp>
#include <service-evtmgr/Constants.hpp>
#include <service-evtmgr/KbdMessage.hpp>
#include <module-sys/Service/Bus.hpp>
#include <Service/Bus.hpp>
#include <service-cellular/CellularMessage.hpp>
#include <service-cellular/ServiceCellular.hpp>

#include <gui/Common.hpp>
#include <service-appmgr/Actions.hpp>
#include <messages/AppMessage.hpp>

namespace parserFSM
{
    class Context;


@@ 23,7 26,8 @@ auto DeveloperModeHelper::processPutRequest(Context &context) -> sys::ReturnCode
    auto body = context.getBody();
    if (body[json::developerMode::keyPressed].is_number()) {
        auto keyValue = body[json::developerMode::keyPressed].int_value();
        sendKeypress(getKeyCode(keyValue));
        auto state    = body[json::developerMode::state].int_value();
        sendKeypress(getKeyCode(keyValue), static_cast<gui::InputEvent::State>(state));
        MessageHandler::putToSendQueue(context.createSimpleResponse());
    }
    else if (body[json::developerMode::AT].is_string()) {


@@ 107,11 111,14 @@ auto DeveloperModeHelper::getKeyCode(int val) noexcept -> bsp::KeyCodes
        return bsp::KeyCodes::Undefined;
    };
}
void DeveloperModeHelper::sendKeypress(bsp::KeyCodes keyCode)

void DeveloperModeHelper::sendKeypress(bsp::KeyCodes keyCode, gui::InputEvent::State state)
{
    auto message          = std::make_shared<sevm::KbdMessage>();
    message->key.key_code = keyCode;
    message->key.state    = RawKey::State::Released;
    RawKey key{.state = RawKey::State::Released, .key_code = keyCode};

    gui::InputEvent event(key, state, static_cast<gui::KeyCode>(keyCode));
    LOG_INFO("Sending %s", event.str().c_str());
    auto message = std::make_shared<app::AppInputEventMessage>(std::move(event));

    sys::Bus::SendUnicast(std::move(message), service::name::evt_manager, ownerServicePtr);
}

M module-services/service-desktop/endpoints/developerMode/DeveloperModeHelper.hpp => module-services/service-desktop/endpoints/developerMode/DeveloperModeHelper.hpp +4 -2
@@ 8,7 8,8 @@
#include <Common/Query.hpp>
#include <Service/Common.hpp>
#include <Service/Service.hpp>
#include <module-bsp/bsp/keyboard/key_codes.hpp>
#include <bsp/keyboard/key_codes.hpp>
#include <input/InputEvent.hpp>

namespace sys
{


@@ 22,7 23,7 @@ namespace parserFSM
    {
        sys::Service *ownerServicePtr = nullptr;
        static auto getKeyCode(int val) noexcept -> bsp::KeyCodes;
        void sendKeypress(bsp::KeyCodes keyCode);
        void sendKeypress(bsp::KeyCodes keyCode, gui::InputEvent::State state);

      public:
        DeveloperModeHelper(sys::Service *_ownerServicePtr) : ownerServicePtr(_ownerServicePtr){};


@@ 32,6 33,7 @@ namespace parserFSM
    namespace json::developerMode
    {
        inline constexpr auto keyPressed    = "keyPressed";
        inline constexpr auto state         = "state";
        inline constexpr auto systemStarted = "systemStarted";
        inline constexpr auto ATResponse    = "ATResponse";
        inline constexpr auto AT            = "AT";

M module-services/service-evtmgr/EventManager.cpp => module-services/service-evtmgr/EventManager.cpp +13 -0
@@ 34,6 34,7 @@
#include <list>
#include <tuple>
#include <vector>
#include <module-apps/messages/AppMessage.hpp>

EventManager::EventManager(const std::string &name) : sys::Service(name)
{


@@ 259,6 260,18 @@ sys::ReturnCodes EventManager::InitHandler()
        return std::make_shared<sys::ResponseMessage>();
    });

    connect(app::AppInputEventMessage(gui::InputEvent(RawKey())), [&](sys::Message *msgl) {
        auto msg = static_cast<app::AppInputEventMessage *>(msgl);
        assert(msg);

        auto message = std::make_shared<app::AppInputEventMessage>(msg->getEvent());
        if (!targetApplication.empty()) {
            sys::Bus::SendUnicast(std::move(message), targetApplication, this);
        }

        return std::make_shared<sys::ResponseMessage>();
    });

    // initialize keyboard worker
    EventWorker = std::make_unique<WorkerEvent>(this);


M source/main.cpp => source/main.cpp +1 -0
@@ 102,6 102,7 @@ int main()
        ret &= sys::SystemManager::CreateService(std::make_shared<ServiceAudio>(), sysmgr.get());
        ret &= sys::SystemManager::CreateService(std::make_shared<ServiceBluetooth>(), sysmgr.get());
        ret &= sys::SystemManager::CreateService(std::make_shared<ServiceLwIP>(), sysmgr.get());
        ret &= sys::SystemManager::CreateService(std::make_shared<ServiceDesktop>(), sysmgr.get());

        ret &= sys::SystemManager::CreateService(std::make_shared<stm::ServiceTime>(), sysmgr.get());


M test/harness/harness.py => test/harness/harness.py +22 -3
@@ 1,9 1,12 @@
# Copyright (c) 2017-2020, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
import time

from harness import utils
from harness.interface import CDCSerial as serial
from harness.interface.defs import key_codes
from harness.utils import send_keystoke, application_keypath
from harness.interface.defs import key_codes, endpoint, method
from harness.utils import send_keystoke, application_keypath, send_char
import random


class Harness:


@@ 32,8 35,24 @@ class Harness:
        if not self.is_phone_unlocked:
            self.unlock_phone()
            self.is_phone_unlocked = True
            

        func(self.connection)

    def open_application(self, app):
        send_keystoke(application_keypath[app], self.connection)

    def send_text(self, text: str):
        for letter in text:
            send_char(letter, self.connection)

    def send_number(self, number: str):
        utils.send_number(number, self.connection)

    def endpoint_request(self, ep_name: str, met: str, body: dict) -> dict:
        ret = self.connection.write({
            "endpoint": endpoint[ep_name],
            "method": method[met],
            "uuid": random.randint(1, 32000),
            "body": body
        })
        return ret["body"]

M test/harness/interface/CDCSerial.py => test/harness/interface/CDCSerial.py +13 -7
@@ 4,14 4,19 @@ import time

import serial
import json
import random
import typing
import logging

from enum import Enum
from harness.interface.defs import endpoint, method, status

log = logging.getLogger(__name__)


class Keytype(Enum):
    long_press = 0
    short_press = 1


class CDCSerial:
    def __init__(self, port_name):
        self.body = ""


@@ 51,12 56,13 @@ class CDCSerial:
        result = self.serial.read(payload_length).decode()
        return json.loads(result)

    def send_key(self, key_code, wait=10):
        body = {
            "keyPressed": key_code
        }

    def send_key(self, key_code, key_type=Keytype.short_press, wait=10):
        if key_type is Keytype.long_press:
            body = {"keyPressed": key_code, "state": 4}
        else:
            body = {"keyPressed": key_code, "state": 2}
        ret = self.write(self.__wrap_message(body), wait)
        time.sleep(0.3)
        return ret

    def send_at(self, at_command, wait=10):

M test/harness/interface/defs.py => test/harness/interface/defs.py +2 -1
@@ 40,6 40,7 @@ key_codes = {
    "sliderUp": 16,
    "sliderMid": 18,
    "sliderDown": 17,
    "#": ord('#')
    "#": ord('#'),
    "*": ord('*')

}

M test/harness/utils.py => test/harness/utils.py +78 -1
@@ 3,6 3,7 @@

import time

from harness.interface.CDCSerial import Keytype
from harness.interface.defs import key_codes

# assuming that the harness is actually in the menu


@@ 51,9 52,85 @@ application_keypath = {
    ]
}

keymap = {
    "a": "2",
    "b": "22",
    "c": "222",
    "d": "3",
    "e": "33",
    "f": "333",
    "g": "4",
    "h": "44",
    "i": "444",
    "j": "5",
    "k": "55",
    "l": "555",
    "m": "6",
    "n": "66",
    "o": "666",
    "p": "7",
    "q": "77",
    "r": "777",
    "s": "7777",
    "t": "8",
    "u": "88",
    "v": "888",
    "w": "9",
    "x": "99",
    "y": "999",
    "z": "9999",
    " ": "0",
    ".": "1",
    ",": "11",
    "_": "111",
    ":": "1111",
    ";": "11111",
    ")": "111111",
    "(": "1111111",
    "?": "11111111",
    "!": "111111111",
    "#": "1111111111",
    "/": "11111111111",
    "*": "111111111111",
    "+": "1111111111111",
    "del": key_codes["#"],
    "caps": key_codes["*"],
}


def send_keystoke(keypath, connection):
    for i in keypath:
        print(i)
        connection.send_key(key_codes[i])
        time.sleep(0.3)


last_char = '\0'


def send_char(char: str, connection):
    global last_char
    key_type = Keytype.short_press
    if char.isdigit():
        key_type = Keytype.long_press
        if last_char is char:
            print("repeated key!")
            connection.send_key(key_codes["right"])
        connection.send_key(int(char), key_type)
        connection.send_key(key_codes["right"])
        last_char = char

    else:
        if last_char is keymap[char][0]:
            print("repeated key!")
            connection.send_key(key_codes["right"], key_type)
        for key in keymap[char]:
            connection.send_key(int(key), key_type)
        last_char = keymap[char][0]


def send_number(number: str, connection):
    if number.isnumeric():
        for digit in number:
            connection.send_key(int(digit))
            time.sleep(0.3)


D test/phone_send_sms.py => test/phone_send_sms.py +0 -25
@@ 1,25 0,0 @@
#!/usr/bin/env python
# Copyright (c) 2017-2020, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

# get to sms from desktop and send sms ABC


from harness.uart import connection

# get in sms
connection.send_key(ord('d'), 0.3)
connection.send_key(ord('d'), 0.3)
connection.send_key(ord('\n'), 0.3)

# select newest thread
connection.send_key(ord('\n'), 3)

# add sms ABC
# writeKey(ser, 1, 0.3)
connection.send_key(2, 3) # wait 3 sec -> so that needed assets for text processing would load bufor 100%
connection.send_key(3, 0.5)
connection.send_key(4, 0.5)

#send
connection.send_key(ord('\n'), 0.3)

A test/send_message.py => test/send_message.py +63 -0
@@ 0,0 1,63 @@
#!/usr/bin/env python
# Copyright (c) 2017-2020, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

import sys
import time

from harness.harness import Harness
from harness.interface.defs import key_codes, endpoint, method


def send_message(harness, phone_number: str, message: str):
    @harness.with_phone_unlocked
    def send(connection):
        # enter menu
        connection.send_key(key_codes["enter"])
        harness.open_application("messages")
        if harness.connection.get_window() != "ApplicationMessages":
            time.sleep(2)
            if harness.connection.get_window() != "ApplicationMessages":
                print("Application didn't switch, exiting...")
                exit(1)

        # create new message
        connection.send_key(key_codes["left"])
        # enter phone number
        harness.send_number(phone_number)
        # move down to message body
        connection.send_key(key_codes["down"])
        # write a message
        harness.send_text(message)
        # send
        connection.send_key(key_codes["enter"])


def get_message_by_text(harness, message: str):
    body = {"messageBody": message}
    return harness.endpoint_request("messages", "get", body)


def main():
    if len(sys.argv) == 1 or sys.argv[1] is None:
        print(f'Please pass port name as the parameter: python {sys.argv[0]} /dev/ttyACM0 number text ')
        exit(1)

    harness = Harness(sys.argv[1])
    message = str(sys.argv[3])
    messages = get_message_by_text(harness, message.upper())

    send_message(harness, str(sys.argv[2]), message)
    time.sleep(2)
    new_messages = get_message_by_text(harness, message.upper())

    diff = [i for i in messages + new_messages if i not in messages or i not in new_messages]
    if len(diff) != 1 or diff[0]["type"] != 0x08:  # 0x08 - SMSType::OUTBOX
        print("sending error!")
        exit(1)
    else:
        print("sending success!")


if __name__ == "__main__":
    main()

D test/test.py => test/test.py +0 -31
@@ 1,31 0,0 @@
#!/usr/bin/env python
# Copyright (c) 2017-2020, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

import sys
from harness.harness import Harness
from harness.interface.defs import key_codes


def main():
    if len(sys.argv) == 1 or sys.argv[1] is None:
        print('Please pass port name as the parameter: python test.py /dev/ttyACM0 ')
        exit(1)

    test = Harness(sys.argv[1])

    print(test.connection.get_window())

    @test.with_phone_unlocked
    def decorator_test(connection):
        # enter menu
        connection.send_key(key_codes["enter"])

    print(test.connection.get_window())

    test.open_application("settings")
    print(test.connection.get_window())


if __name__ == "__main__":
    main()