~aleteoryx/muditaos

2cf6bd69621ceef27f13354315e430dec6a62636 — Adam Dobrowolski 4 years ago 7f72e4c
[EGD-8208] Removed unused ping, added simple Async

Removed unused code: ping and stale messages
Detached UnicastSync send and sync part so that we would be able
to await for message in the code.
Added sys::Async to be able to handle situations where we want to
sync to the response, but after some time.
Fixed messages logging: for some reason demangling abi use caused
bad system behaviour
M module-sys/Service/BusProxy.cpp => module-sys/Service/BusProxy.cpp +10 -1
@@ 1,4 1,4 @@
// Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
// Copyright (c) 2017-2022, Mudita Sp. z.o.o. All rights reserved.
// For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

#include <Service/BusProxy.hpp>


@@ 24,6 24,15 @@ namespace sys
        return ret;
    }

    SendResult BusProxy::unicastSync(std::shared_ptr<Message> message, sys::Service*whose,  std::uint32_t timeout)
    {
        auto ret = busImpl->UnicastSync(message, whose, timeout);
        if (ret.first != ReturnCodes::Failure) {
            watchdog.refresh();
        }
        return ret;
    }

    SendResult BusProxy::sendUnicastSync(std::shared_ptr<Message> message,
                                         const std::string &targetName,
                                         uint32_t timeout)

M module-sys/Service/Service.cpp => module-sys/Service/Service.cpp +20 -35
@@ 21,35 21,29 @@
#include <typeinfo>                // for type_info
#include <system/Constants.hpp>

#if (DEBUG_SERVICE_MESSAGES > 0)
#include <cxxabi.h>
#endif

// this could use Scoped() class from utils to print execution time too
void debug_msg(sys::Service *srvc, const sys::Message *ptr)
{
#if (DEBUG_SERVICE_MESSAGES > 0)

    int status           = -4; /// assign error number which is not  defined for __cxa_demangle
    const char *realname = nullptr;
    realname             = typeid(*ptr).name();
    char *demangled      = abi::__cxa_demangle(realname, 0, 0, &status);
    const char *realname = typeid(*ptr).name();

    assert(srvc);
    assert(ptr);

    LOG_DEBUG("Handle message ([%s] -> [%s] (%s) data: %s %s",
              ptr ? ptr->sender.c_str() : "",
              srvc ? srvc->GetName().c_str() : "",
              status == 0 ? demangled ? demangled : realname : realname,
              std::string(*ptr).c_str(),
              status != 0 ? status == -1   ? "!mem fail!"
                            : status == -2 ? "name ABI fail"
                            : status == -3 ? "arg invalid"
                                           : "other failure!"
                          : "");

    free(demangled);
    if (xPortIsInsideInterrupt()){
        throw std::runtime_error("message sent from irq");
    }
    if (xTaskGetCurrentTaskHandle() == nullptr) {
        throw std::runtime_error("message sent from crit section");
    }

    LOG_DEBUG("([%s] -> [%s] (%s) data: %s | %s",
             ptr ? ptr->sender.c_str() : "",
             srvc ? srvc->GetName().c_str() : "",
             realname,
             std::string(*ptr).c_str(),
             ptr->to_string().c_str());
#else
#endif
}


@@ 68,7 62,7 @@ namespace sys
    Service::Service(
        std::string name, std::string parent, uint32_t stackDepth, ServicePriority priority, Watchdog &watchdog)
        : cpp_freertos::Thread(name, stackDepth / 4 /* Stack depth in bytes */, static_cast<UBaseType_t>(priority)),
          parent(parent), bus(this, watchdog), mailbox(this), watchdog(watchdog), pingTimestamp(UINT32_MAX),
          parent(parent), bus(this, watchdog), mailbox(this), watchdog(watchdog),
          isReady(false), enableRunLoop(false)
    {}



@@ 101,16 95,6 @@ namespace sys
                continue;
            }

            // Remove all staled messages
            uint32_t timestamp = cpp_freertos::Ticks::GetTicks();
            staleUniqueMsg.erase(std::remove_if(staleUniqueMsg.begin(),
                                                staleUniqueMsg.end(),
                                                [&](const auto &id) {
                                                    return ((id.first == msg->uniID) ||
                                                            ((timestamp - id.second) >= 15000));
                                                }),
                                 staleUniqueMsg.end());

            const bool respond = msg->type != Message::Type::Response && GetName() != msg->sender;
            currentlyProcessing = msg;
            auto response      = msg->Execute(this);


@@ 192,7 176,7 @@ namespace sys

    bool Service::disconnect(const std::type_info &type)
    {
        auto iter = message_handlers.find(type);
        auto iter = message_handlers.find(type_index(type));
        if (iter == std::end(message_handlers)) {
            return false;
        }


@@ 286,9 270,6 @@ namespace sys
    {
        auto ret = ReturnCodes::Success;
        switch (message->systemMessageType) {
        case SystemMessageType::Ping:
            service->pingTimestamp = cpp_freertos::Ticks::GetTicks();
            break;
        case SystemMessageType::SwitchPowerMode:
            service->SwitchPowerModeHandler(message->powerMode);
            break;


@@ 312,4 293,8 @@ namespace sys
        return std::make_shared<ResponseMessage>(ret);
    }

    bool Service::isConnected(std::type_index idx)
    {
        return message_handlers.find(idx) != message_handlers.end();
    }
} // namespace sys

M module-sys/Service/details/bus/Bus.cpp => module-sys/Service/details/bus/Bus.cpp +25 -46
@@ 1,4 1,4 @@
// Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
// Copyright (c) 2017-2022, Mudita Sp. z.o.o. All rights reserved.
// For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

#include "Bus.hpp"


@@ 101,16 101,22 @@ namespace sys
        return false;
    }

    namespace {
        template <class W, class X>
        void restoreMessagess(W &mailbox, X &tempMsg)
        {
                // Push messages collected during waiting for response to processing queue
                for (const auto &w : tempMsg) {
                    mailbox.push(w);
                }
        }
    }

    SendResult Bus::SendUnicastSync(std::shared_ptr<Message> message,
                                    const std::string &targetName,
                                    Service *sender,
                                    std::uint32_t timeout)
    {
        std::vector<std::shared_ptr<Message>> tempMsg;
        tempMsg.reserve(4); // reserve space for 4 elements to avoid costly memory allocations

        MessageUIDType unicastID = unicastMsgId.get();

        {
            cpp_freertos::CriticalSectionGuard guard;
            message->id    = uniqueMsgId.getNext();


@@ 132,6 138,14 @@ namespace sys
            return std::make_pair(ReturnCodes::ServiceDoesntExist, nullptr);
        }

        return UnicastSync(message, sender, timeout);
    }

    SendResult Bus::UnicastSync(const std::shared_ptr<Message> &message, Service *sender, std::uint32_t timeout)
    {
        std::vector<std::shared_ptr<Message>> tempMsg;
        tempMsg.reserve(4); // reserve space for 4 elements to avoid costly memory allocations

        uint32_t currentTime   = cpp_freertos::Ticks::GetTicks();
        uint32_t timeoutNeeded = currentTime + timeout;
        uint32_t timeElapsed   = currentTime;


@@ 139,61 153,26 @@ namespace sys
        while (1) {
            // timeout
            if (timeElapsed >= timeoutNeeded) {

                // Push messages collected during waiting for response to processing queue
                for (const auto &w : tempMsg) {
                    sender->mailbox.push(w);
                }

                // Register that we didn't receive response. Even if it arrives it will be dropped
                sender->staleUniqueMsg.push_back(std::make_pair(unicastID, cpp_freertos::Ticks::GetTicks()));
                restoreMessagess(sender->mailbox, tempMsg);
                return std::make_pair(ReturnCodes::Timeout, nullptr);
            }

            // Immediately block on rx queue
            auto rxmsg = sender->mailbox.pop(timeoutNeeded - timeElapsed);

            timeElapsed = cpp_freertos::Ticks::GetTicks();

            // check for timeout
            if (rxmsg == nullptr) {

                // Push messages collected during waiting for response to processing queue
                for (const auto &w : tempMsg) {
                    sender->mailbox.push(w);
                }

                // Register that we didn't receive response. Even if it arrives it will be dropped
                sender->staleUniqueMsg.push_back(std::make_pair(unicastID, cpp_freertos::Ticks::GetTicks()));
                restoreMessagess(sender->mailbox, tempMsg);
                return CreateSendResult(ReturnCodes::Timeout, nullptr);
            }

            // Received response
            if ((rxmsg->uniID == unicastID) && (message->sender == sender->GetName())) {

                // Push messages collected during waiting for response to processing queue
                for (const auto &w : tempMsg) {
                    sender->mailbox.push(w);
                }

            if ((rxmsg->uniID == message->uniID) && (message->sender == sender->GetName())) {
                restoreMessagess(sender->mailbox, tempMsg);
                return CreateSendResult(ReturnCodes::Success, rxmsg);
            }
            // Check for system messages
            else if (rxmsg->type == Message::Type::System) {

                SystemMessage *sysmsg = static_cast<SystemMessage *>(rxmsg.get());

                if (sysmsg->systemMessageType == SystemMessageType::Ping) {
                    rxmsg->Execute(sender);
                }
                else {
                    tempMsg.push_back(rxmsg);
                }
            }
            // Plain data message, save it for later
            else {
                tempMsg.push_back(rxmsg);
            }
            tempMsg.push_back(rxmsg);
        }
    }


M module-sys/Service/details/bus/Bus.hpp => module-sys/Service/details/bus/Bus.hpp +4 -1
@@ 1,4 1,4 @@
// Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
// Copyright (c) 2017-2022, Mudita Sp. z.o.o. All rights reserved.
// For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

#pragma once


@@ 43,6 43,9 @@ namespace sys
                                   Service *sender,
                                   std::uint32_t timeout);

        /// await for response on source message with timeout
        SendResult UnicastSync(const std::shared_ptr<Message> &message, Service *sender, std::uint32_t timeout);

        /**
         * Sends a message to the specified channel.
         * @param message   Message to be sent

M module-sys/Service/include/Service/BusProxy.hpp => module-sys/Service/include/Service/BusProxy.hpp +2 -1
@@ 1,4 1,4 @@
// Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
// Copyright (c) 2017-2022, Mudita Sp. z.o.o. All rights reserved.
// For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

#pragma once


@@ 24,6 24,7 @@ namespace sys
        ~BusProxy() noexcept;

        bool sendUnicast(std::shared_ptr<Message> message, const std::string &targetName);
        SendResult unicastSync(std::shared_ptr<Message> message, sys::Service *whose, std::uint32_t timeout);
        SendResult sendUnicastSync(std::shared_ptr<Message> message,
                                   const std::string &targetName,
                                   std::uint32_t timeout);

M module-sys/Service/include/Service/Message.hpp => module-sys/Service/include/Service/Message.hpp +1 -2
@@ 1,4 1,4 @@
// Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
// Copyright (c) 2017-2022, Mudita Sp. z.o.o. All rights reserved.
// For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

#pragma once


@@ 133,7 133,6 @@ namespace sys

    enum class SystemMessageType
    {
        Ping,
        SwitchPowerMode,
        Start,
        Timer,

M module-sys/Service/include/Service/Service.hpp => module-sys/Service/include/Service/Service.hpp +101 -4
@@ 22,9 22,60 @@
#include <utility>                           // for pair
#include <vector>                            // for vector<>::iterator, vector
#include <typeinfo>                          // for connect by type
#include <stdexcept>

namespace sys
{

    class async_fail : public std::runtime_error
    {
        public: 
            async_fail() : runtime_error("async failure") {}
    };

    template <typename Request, typename Response> struct Async // : public with_timer
    {
        enum class State
        {
            Pending,
            TimedOut,
            Error,
            Done
        };

        State getState()
        {
            return m->state;
        }

        Response& getResult()
        {
            return *m->result;
        }

        Request& getRequest()
        {
            return *m->request;
        }

        static_assert(std::is_base_of<sys::Message, Request>::value, "Request has to be based on system message");
        static_assert(std::is_base_of<sys::ResponseMessage, Response>::value, "Response has to be based on system response message");
        explicit operator bool()
        {
            return m->state == State::Done;
        }

      private:
        friend sys::Service;
        void setState(State state) { this->m->state=state;}
        struct M {
            State state = State::Pending;
            std::shared_ptr<Request> request;
            std::shared_ptr<Response> result;
        };
        std::shared_ptr<M> m = std::make_shared<M>();
    };

    class Service : public cpp_freertos::Thread, public std::enable_shared_from_this<Service>
    {
      public:


@@ 87,18 138,63 @@ namespace sys

        Watchdog &watchdog;

        uint32_t pingTimestamp;

        bool isReady;

        std::vector<std::pair<uint64_t, uint32_t>> staleUniqueMsg;

        /// connect: register message handler
        bool connect(const std::type_info &type, MessageHandler handler);
        bool connect(Message *msg, MessageHandler handler);
        bool connect(Message &&msg, MessageHandler handler);
        bool disconnect(const std::type_info &type);

        template <typename Request, typename Response, typename... Arg>
        Async<Request, Response> async_call(std::string whom, Arg... arg)
        {
            static_assert(std::is_base_of<sys::ResponseMessage, Response>::value, "Response has to be based on system message");
            Async<Request,Response> p;
            auto request = std::make_shared<Request>(arg...);
            if (isConnected(std::type_index(typeid(Response)))) {
                p.setState(Async<Request,Response>::State::Error);
                throw async_fail();
            }
            auto meta = p.m;
            meta->request = request;
            std::function<MessagePointer(Message *)> foo = [this, meta](Message *m) {
                auto &response = dynamic_cast<Response &>(*m);
                meta->state = Async<Request,Response>::State::Done;
                // this is counter productive - but this is what needs to be done here
                meta->result = std::make_shared<Response>(response);
                disconnect(typeid(Response));
                return sys::msgHandled();
            };

            if (!connect(typeid(Response), foo)) {
                throw async_fail();
            }
            if (!bus.sendUnicast(request, whom)) {
                throw async_fail();
            }
            return p;
        }

        template <typename Request,typename Response>
        void sync(Async<Request,Response> &p, std::uint32_t timeout=std::numeric_limits<std::uint32_t>::max())
        {
            static_assert(std::is_base_of<sys::ResponseMessage, Response>::value, "Response has to be based on system message");
            if (p.m->request == nullptr) {
                throw async_fail();
            }
            if (p.getState() !=  Async<Request,Response>::State::Pending) {
                return;
            }
            auto val = bus.unicastSync(p.m->request, this, timeout);
            if (val.first != sys::ReturnCodes::Success) {
                val.first == sys::ReturnCodes::Timeout ? p.setState(Async<Request,Response>::State::TimedOut):
                                                         p.setState(Async<Request,Response>::State::Error);
                return;
            }
            this->HandleResponse(dynamic_cast<sys::ResponseMessage*>(val.second.get()));
        }

        void sendCloseReadyMessage(Service *service);
        std::string getCurrentProcessing();



@@ 110,6 206,7 @@ namespace sys
        std::map<std::type_index, MessageHandler> message_handlers;

      private:
        bool isConnected(std::type_index idx);
        /// first point of enttry on messages - actually used method in run
        /// First calls message_handlers
        /// If not - fallback to DataReceivedHandler

M module-sys/Service/tests/test-system_messages.cpp => module-sys/Service/tests/test-system_messages.cpp +3 -3
@@ 1,4 1,4 @@
// Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
// Copyright (c) 2017-2022, Mudita Sp. z.o.o. All rights reserved.
// For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

#include <catch2/catch.hpp>


@@ 16,12 16,12 @@ class MockedMessageUID : public sys::MessageUID
TEST_CASE("Test basic messages constructors")
{
    auto dataMsg     = sys::DataMessage();
    auto systemMsg   = sys::SystemMessage(sys::SystemMessageType::Ping);
    auto systemMsg   = sys::SystemMessage(sys::SystemMessageType::Start);
    auto responseMsg = sys::ResponseMessage();

    REQUIRE(dataMsg.type == sys::Message::Type::Data);
    REQUIRE(
        (systemMsg.type == sys::Message::Type::System && systemMsg.systemMessageType == sys::SystemMessageType::Ping));
        (systemMsg.type == sys::Message::Type::System && systemMsg.systemMessageType == sys::SystemMessageType::Start));
    REQUIRE(responseMsg.type == sys::Message::Type::Response);
}