~aleteoryx/muditaos

8a3eb6466232befb5c3f9a77f110a345c991a5e9 — Hubert Chrzaniuk 5 years ago 5c57dff
[EGD-4899] Worker class design improvements and fixes (#1190)

* allow workers to run independently from service
* change worker to use FreeRTOS wrapper Queue class instead of
  freeRTOS native queues
* fix bug disallowing to destruct worker that have never ran
* fixed worker bad state transition to Running state
M changelog.md => changelog.md +1 -0
@@ 19,6 19,7 @@

* `[PowerManagement]` Change hardware timers clock source
* `[bluetooth]` Underlying communication with the Bluetooth module over DMA (direct access)
* `[system]` Workers refactor and state transition fixes

### Fixed


M module-bluetooth/Bluetooth/BluetoothWorker.cpp => module-bluetooth/Bluetooth/BluetoothWorker.cpp +4 -3
@@ 42,7 42,8 @@ const char *c_str(Bt::Error::Code code)
    return "";
}

BluetoothWorker::BluetoothWorker(sys::Service *service) : Worker(service), currentProfile(std::make_shared<Bt::HSP>())
BluetoothWorker::BluetoothWorker(sys::Service *service)
    : Worker(service), service(service), currentProfile(std::make_shared<Bt::HSP>())
{
    init({
        {"qBtIO", sizeof(Bt::Message), 10},


@@ 67,7 68,7 @@ bool BluetoothWorker::run()
    if (Worker::run()) {
        is_running                          = true;
        auto el                             = queues[queueIO_handle];
        BlueKitchen::getInstance()->qHandle = el;
        BlueKitchen::getInstance()->qHandle = el->GetQueueHandle();
        Bt::initialize_stack();
        Bt::register_hw_error_callback();
        Bt::GAP::register_scan();


@@ 127,7 128,7 @@ bool BluetoothWorker::start_pan()
bool BluetoothWorker::handleMessage(uint32_t queueID)
{

    QueueHandle_t queue = queues[queueID];
    QueueHandle_t queue = queues[queueID]->GetQueueHandle();
    if (queueID == queueService) {
        LOG_DEBUG("not interested");
        return true;

M module-bluetooth/Bluetooth/BluetoothWorker.hpp => module-bluetooth/Bluetooth/BluetoothWorker.hpp +1 -0
@@ 76,6 76,7 @@ class BluetoothWorker : private sys::Worker

    TaskHandle_t bt_worker_task = nullptr;
    int is_running              = false;
    sys::Service *service       = nullptr;

  public:
    enum Error

M module-services/service-desktop/WorkerDesktop.cpp => module-services/service-desktop/WorkerDesktop.cpp +6 -6
@@ 29,8 29,8 @@ bool WorkerDesktop::init(std::list<sys::WorkerQueueInfo> queues)
{
    Worker::init(queues);

    receiveQueue                         = Worker::getQueueByName(sdesktop::RECEIVE_QUEUE_BUFFER_NAME);
    parserFSM::MessageHandler::sendQueue = Worker::getQueueByName(sdesktop::SEND_QUEUE_BUFFER_NAME);
    receiveQueue                         = Worker::getQueueHandleByName(sdesktop::RECEIVE_QUEUE_BUFFER_NAME);
    parserFSM::MessageHandler::sendQueue = Worker::getQueueHandleByName(sdesktop::SEND_QUEUE_BUFFER_NAME);

    return (bsp::usbInit(receiveQueue, this) < 0) ? false : true;
}


@@ 53,15 53,15 @@ bool WorkerDesktop::deinit(void)

bool WorkerDesktop::handleMessage(uint32_t queueID)
{
    QueueHandle_t queue = queues[queueID];
    auto &queue       = queues[queueID];
    const auto &qname = queue->GetQueueName();

    std::string qname = queueNameMap[queue];
    LOG_INFO("handleMessage received data from queue: %s", qname.c_str());
    static std::string *sendMsg = nullptr;
    static std::string receivedMsg;

    if (qname == sdesktop::RECEIVE_QUEUE_BUFFER_NAME) {
        if (xQueueReceive(queue, &receivedMsg, 0) != pdTRUE) {
        if (!queue->Dequeue(&receivedMsg, 0)) {
            LOG_ERROR("handleMessage failed to receive from \"%s\"", sdesktop::RECEIVE_QUEUE_BUFFER_NAME);
            return false;
        }


@@ 70,7 70,7 @@ bool WorkerDesktop::handleMessage(uint32_t queueID)
        }
    }
    else if (qname == sdesktop::SEND_QUEUE_BUFFER_NAME) {
        if (xQueueReceive(queue, &sendMsg, 0) != pdTRUE) {
        if (!queue->Dequeue(&sendMsg, 0)) {
            LOG_ERROR("handleMessage xQueueReceive failed for %s size %d bytes",
                      sdesktop::SEND_QUEUE_BUFFER_NAME,
                      static_cast<unsigned int>(sendMsg->length()));

M module-services/service-evtmgr/WorkerEvent.cpp => module-services/service-evtmgr/WorkerEvent.cpp +19 -20
@@ 46,12 46,12 @@ extern "C"
bool WorkerEvent::handleMessage(uint32_t queueID)
{

    xQueueHandle queue = queues[queueID];
    auto &queue = queues[queueID];

    // service queue
    if (queueID == static_cast<uint32_t>(WorkerEventQueues::queueService)) {
        sys::WorkerCommand wcmd;
        if (xQueueReceive(queue, &wcmd, 0) != pdTRUE) {
        if (!queue->Dequeue(&wcmd, 0)) {
            return false;
        }
        wcmd.command = 1;


@@ 60,7 60,7 @@ bool WorkerEvent::handleMessage(uint32_t queueID)

    if (queueID == static_cast<uint32_t>(WorkerEventQueues::queueKeyboardIRQ)) {
        uint8_t notification;
        if (xQueueReceive(queue, &notification, 0) != pdTRUE) {
        if (!queue->Dequeue(&notification, 0)) {
            return false;
        }
        uint8_t state, code;


@@ 71,7 71,7 @@ bool WorkerEvent::handleMessage(uint32_t queueID)

    if (queueID == static_cast<uint32_t>(WorkerEventQueues::queueHeadsetIRQ)) {
        uint8_t notification;
        if (xQueueReceive(queue, &notification, 0) != pdTRUE) {
        if (!queue->Dequeue(&notification, 0)) {
            return false;
        }



@@ 86,7 86,7 @@ bool WorkerEvent::handleMessage(uint32_t queueID)

    if (queueID == static_cast<uint32_t>(WorkerEventQueues::queueBattery)) {
        uint8_t notification;
        if (xQueueReceive(queue, &notification, 0) != pdTRUE) {
        if (!queue->Dequeue(&notification, 0)) {
            return false;
        }
        if (notification & static_cast<uint8_t>(bsp::batteryIRQSource::INTB)) {


@@ 110,7 110,7 @@ bool WorkerEvent::handleMessage(uint32_t queueID)

    if (queueID == static_cast<uint32_t>(WorkerEventQueues::queueRTC)) {
        uint8_t notification;
        if (xQueueReceive(queue, &notification, 0) != pdTRUE) {
        if (!queue->Dequeue(&notification, 0)) {
            return false;
        }



@@ 129,7 129,7 @@ bool WorkerEvent::handleMessage(uint32_t queueID)

    if (queueID == static_cast<uint32_t>(WorkerEventQueues::queueCellular)) {
        uint8_t notification;
        if (xQueueReceive(queue, &notification, 0) != pdTRUE) {
        if (!queue->Dequeue(&notification, 0)) {
            return false;
        }



@@ 157,7 157,7 @@ bool WorkerEvent::handleMessage(uint32_t queueID)

    if (queueID == static_cast<uint32_t>(WorkerEventQueues::queueMagnetometerIRQ)) {
        uint8_t notification;
        if (xQueueReceive(queue, &notification, 0) != pdTRUE) {
        if (!queue->Dequeue(&notification, 0)) {
            return false;
        }



@@ 169,7 169,7 @@ bool WorkerEvent::handleMessage(uint32_t queueID)

    if (queueID == static_cast<uint32_t>(WorkerEventQueues::queueLightSensor)) {
        uint8_t notification;
        if (xQueueReceive(queue, &notification, 0) != pdTRUE) {
        if (!queue->Dequeue(&notification, 0)) {
            return false;
        }
        LOG_DEBUG("Light sensor IRQ");


@@ 178,21 178,20 @@ bool WorkerEvent::handleMessage(uint32_t queueID)
    return true;
}

bool WorkerEvent::init(std::list<sys::WorkerQueueInfo> queues)
bool WorkerEvent::init(std::list<sys::WorkerQueueInfo> queuesList)
{
    Worker::init(queues);
    std::vector<xQueueHandle> qhandles = this->queues;
    Worker::init(queuesList);
    bsp::vibrator::init();
    bsp::keyboard_Init(qhandles[static_cast<int32_t>(WorkerEventQueues::queueKeyboardIRQ)]);
    bsp::headset::Init(qhandles[static_cast<int32_t>(WorkerEventQueues::queueHeadsetIRQ)]);
    bsp::battery_Init(qhandles[static_cast<int32_t>(WorkerEventQueues::queueBattery)]);
    bsp::rtc_Init(qhandles[static_cast<int32_t>(WorkerEventQueues::queueRTC)]);
    bsp::cellular::init(qhandles[static_cast<int32_t>(WorkerEventQueues::queueCellular)]);
    bsp::magnetometer::init(qhandles[static_cast<int32_t>(WorkerEventQueues::queueMagnetometerIRQ)]);
    bsp::torch::init(qhandles[static_cast<int32_t>(WorkerEventQueues::queueTorch)]);
    bsp::keyboard_Init(queues[static_cast<int32_t>(WorkerEventQueues::queueKeyboardIRQ)]->GetQueueHandle());
    bsp::headset::Init(queues[static_cast<int32_t>(WorkerEventQueues::queueHeadsetIRQ)]->GetQueueHandle());
    bsp::battery_Init(queues[static_cast<int32_t>(WorkerEventQueues::queueBattery)]->GetQueueHandle());
    bsp::rtc_Init(queues[static_cast<int32_t>(WorkerEventQueues::queueRTC)]->GetQueueHandle());
    bsp::cellular::init(queues[static_cast<int32_t>(WorkerEventQueues::queueCellular)]->GetQueueHandle());
    bsp::magnetometer::init(queues[static_cast<int32_t>(WorkerEventQueues::queueMagnetometerIRQ)]->GetQueueHandle());
    bsp::torch::init(queues[static_cast<int32_t>(WorkerEventQueues::queueTorch)]->GetQueueHandle());
    bsp::keypad_backlight::init();
    bsp::eink_frontlight::init();
    bsp::light_sensor::init(qhandles[static_cast<int32_t>(WorkerEventQueues::queueLightSensor)]);
    bsp::light_sensor::init(queues[static_cast<int32_t>(WorkerEventQueues::queueLightSensor)]->GetQueueHandle());

    time_t timestamp;
    bsp::rtc_GetCurrentTimestamp(&timestamp);

M module-services/service-evtmgr/service-evtmgr/WorkerEvent.hpp => module-services/service-evtmgr/service-evtmgr/WorkerEvent.hpp +4 -3
@@ 48,7 48,7 @@ class WorkerEvent : public sys::Worker
     * @brief This method is responsible for catch and process keyboard event.
     * @param event key event
     * @param code key code
     * @note It sends message to service if event is processed succesfully.
     * @note It sends message to service if event is processed successfully.
     */
    void processKeyEvent(bsp::KeyEvents event, bsp::KeyCodes code);
    /**


@@ 58,14 58,15 @@ class WorkerEvent : public sys::Worker
    bool longPressTaskEnabled = false;
    bsp::KeyEvents lastState  = bsp::KeyEvents::Released;
    bsp::KeyCodes lastPressed = static_cast<bsp::KeyCodes>(0);
    sys::Service *service     = nullptr;

  public:
    WorkerEvent(sys::Service *service) : sys::Worker(service){};
    WorkerEvent(sys::Service *service) : sys::Worker(service), service(service){};
    /**
     * This function is responsible for creating all queues provided in the constructor.
     * When all queues are created this method creates set of queues.
     */
    virtual bool init(std::list<sys::WorkerQueueInfo> queues) override;
    virtual bool init(std::list<sys::WorkerQueueInfo> queuesList) override;
    virtual bool deinit() override;

    /**

M module-services/service-gui/WorkerGUI.cpp => module-services/service-gui/WorkerGUI.cpp +3 -3
@@ 24,18 24,18 @@
namespace sgui
{

    WorkerGUI::WorkerGUI(ServiceGUI *service) : Worker(service)
    WorkerGUI::WorkerGUI(ServiceGUI *service) : Worker(service), service(service)
    {}

    bool WorkerGUI::handleMessage(uint32_t queueID)
    {
        QueueHandle_t queue = queues[queueID];
        auto &queue = queues[queueID];

        auto serviceGUI = static_cast<sgui::ServiceGUI *>(service);

        if (queueID == 0) {
            sys::WorkerCommand received;
            xQueueReceive(queue, &received, 0);
            queue->Dequeue(&received, 0);

            std::list<std::unique_ptr<gui::DrawCommand>> uniqueCommands;
            if (xSemaphoreTake(serviceGUI->semCommands, pdMS_TO_TICKS(1000)) == pdTRUE) {

M module-services/service-gui/WorkerGUI.hpp => module-services/service-gui/WorkerGUI.hpp +1 -0
@@ 31,6 31,7 @@ namespace sgui
    {
        // object responsible for rendering images to context
        gui::Renderer renderer;
        sys::Service *service;

      public:
        WorkerGUI(ServiceGUI *service);

M module-sys/Service/Worker.cpp => module-sys/Service/Worker.cpp +85 -73
@@ 22,14 22,14 @@ namespace sys
    void Worker::taskAdapter(void *taskParam)
    {
        Worker *worker = static_cast<Worker *>(taskParam);
        worker->setState(State::Running);
        worker->task();
    }

    bool Worker::handleControlMessage()
    {
        std::uint8_t receivedMessage;

        xQueueReceive(controlQueue, &receivedMessage, 0);
        auto receivedMessage = static_cast<std::uint8_t>(Worker::ControlMessage::Stop);
        getControlQueue().Dequeue(&receivedMessage, 0);
        LOG_INFO("Handle control message: %u", receivedMessage);
        assert(receivedMessage < controlMessagesCount);



@@ 51,19 51,20 @@ namespace sys
    void Worker::task()
    {
        QueueSetMemberHandle_t activeMember;
        assert(getState() == State::Running);

        while (getState() == State::Running) {
            activeMember = xQueueSelectFromSet(queueSet, portMAX_DELAY);

            // handle control messages from parent service
            if (activeMember == controlQueue) {
            if (activeMember == getControlQueue().GetQueueHandle()) {
                handleControlMessage();
                continue;
            }

            // find id of the queue that was activated
            for (uint32_t i = 0; i < queues.size(); i++) {
                if (queues[i] == activeMember) {
                if (queues[i]->GetQueueHandle() == activeMember) {
                    handleMessage(i);
                }
            }


@@ 75,8 76,16 @@ namespace sys
        vTaskDelete(nullptr);
    }

    Worker::Worker(sys::Service *service) : service{service}
    {}
    Worker::Worker(sys::Service *service) : name(service->GetName()), priority(service->GetPriority())
    {
        constructName();
    }

    Worker::Worker(std::string workerNamePrefix, UBaseType_t priority)
        : name(std::move(workerNamePrefix)), priority(priority)
    {
        constructName();
    }

    Worker::~Worker()
    {


@@ 85,11 94,34 @@ namespace sys
        }
    }

    void Worker::addQueueInfo(xQueueHandle q, std::string qName)
    void Worker::constructName()
    {
        // assign worker id
        taskENTER_CRITICAL();
        id = count++;
        taskEXIT_CRITICAL();

        name.append("_w" + std::to_string(id));
    }

    size_t Worker::addQueue(const std::string &qName, UBaseType_t maxItems, UBaseType_t itemSize)
    {
        queueNameMap.emplace(std::make_pair(q, qName));
        vQueueAddToRegistry(q, qName.c_str());
        queues.push_back(q);
        auto idx = queues.size();
        queues.push_back(std::make_shared<WorkerQueue>(qName, maxItems, itemSize));
        vQueueAddToRegistry(queues.back()->GetQueueHandle(), qName.c_str());
        return idx;
    }

    WorkerQueue &Worker::getControlQueue() const
    {
        assert(controlQueueIndex);
        return *queues.at(controlQueueIndex.value());
    }

    WorkerQueue &Worker::getServiceQueue() const
    {
        assert(serviceQueueIndex);
        return *queues.at(serviceQueueIndex.value());
    }

    inline std::string Worker::getControlQueueName() const


@@ 101,13 133,6 @@ namespace sys
    {
        assert(state == State::New);

        // assign worker id
        taskENTER_CRITICAL();
        id = count++;
        taskEXIT_CRITICAL();

        name = service->GetName() + "_w" + std::to_string(id);

        // initial value is because there is always a service and control queue
        // to communicate with the parent service
        auto setSize = SERVICE_QUEUE_LENGTH + CONTROL_QUEUE_LENGTH;


@@ 125,41 150,19 @@ namespace sys
        }

        // create and add all queues to the set. First service queue is created.
        serviceQueue = xQueueCreate(SERVICE_QUEUE_LENGTH, SERVICE_QUEUE_SIZE);
        if (serviceQueue == nullptr) {
            state = State::Invalid;
            deinit();
            return false;
        }

        addQueueInfo(serviceQueue, SERVICE_QUEUE_NAME);
        serviceQueueIndex = addQueue(SERVICE_QUEUE_NAME, SERVICE_QUEUE_LENGTH, SERVICE_QUEUE_SIZE);

        // create control queue
        controlQueue = xQueueCreate(CONTROL_QUEUE_LENGTH, sizeof(std::uint8_t));
        if (controlQueue == nullptr) {
            state = State::Invalid;
            deinit();
            return false;
        }

        addQueueInfo(controlQueue, getControlQueueName());
        controlQueueIndex = addQueue(getControlQueueName(), CONTROL_QUEUE_LENGTH, sizeof(std::uint8_t));

        // create and add all queues provided from service
        for (auto wqi : queuesList) {
            auto q = xQueueCreate(wqi.length, wqi.elementSize);
            if (q == nullptr) {
                LOG_FATAL("xQueueCreate %s failed", wqi.name.c_str());
                state = State::Invalid;
                deinit();
                return false;
            }
            addQueueInfo(q, wqi.name);
            addQueue(wqi.name, wqi.length, wqi.elementSize);
        };

        // iterate over all queues and add them to set
        // iterate over all user queues and add them to set
        for (uint32_t i = 0; i < queues.size(); ++i) {

            if (xQueueAddToSet(queues[i], queueSet) != pdPASS) {
            if (xQueueAddToSet(queues[i]->GetQueueHandle(), queueSet) != pdPASS) {
                state = State::Invalid;
                deinit();
                return false;


@@ 181,11 184,9 @@ namespace sys
    bool Worker::deinit()
    {
        // for all queues - remove from set and delete queue
        for (auto q : queues) {
        for (auto &q : queues) {
            // remove queues from set
            xQueueRemoveFromSet(q, queueSet);
            // delete queue
            vQueueDelete(q);
            xQueueRemoveFromSet(q->GetQueueHandle(), queueSet);
        }
        queues.clear();



@@ 209,29 210,31 @@ namespace sys

        runnerTask = xTaskGetCurrentTaskHandle();

        BaseType_t task_error =
            xTaskCreate(Worker::taskAdapter, name.c_str(), defaultStackSize, this, service->GetPriority(), &taskHandle);
        BaseType_t task_error = 0;
        task_error = xTaskCreate(Worker::taskAdapter, name.c_str(), defaultStackSize, this, priority, &taskHandle);

        if (task_error != pdPASS) {
            LOG_ERROR("Failed to start the task");
            return false;
        }

        setState(State::Running);
        return true;
    }

    bool Worker::stop()
    {
        assert(xTaskGetCurrentTaskHandle() == runnerTask);
        assert(getState() == State::Running);

        return sendControlMessage(ControlMessage::Stop);
        if (runnerTask) {
            assert(xTaskGetCurrentTaskHandle() == runnerTask);
            assert(getState() == State::Running);
            return sendControlMessage(ControlMessage::Stop);
        }
        return true;
    }

    bool Worker::sendControlMessage(ControlMessage message)
    {
        auto messageToSend = static_cast<std::uint8_t>(message);
        return xQueueSend(controlQueue, &messageToSend, portMAX_DELAY) == pdTRUE;
        return getControlQueue().Enqueue(&messageToSend, portMAX_DELAY);
    }

    bool Worker::send(uint32_t cmd, uint32_t *data)


@@ 239,34 242,44 @@ namespace sys
        assert(xTaskGetCurrentTaskHandle() == runnerTask);
        assert(getState() == State::Running);

        if (serviceQueue != nullptr) {
            WorkerCommand workerCommand{cmd, data};
            if (xQueueSend(serviceQueue, &workerCommand, portMAX_DELAY) == pdTRUE) {
                return true;
            }
        WorkerCommand workerCommand{cmd, data};
        if (getServiceQueue().Enqueue(&workerCommand, portMAX_DELAY)) {
            return true;
        }
        return false;
    }

    xQueueHandle Worker::getQueueByName(std::string qname)
    xQueueHandle Worker::getQueueHandleByName(const std::string &qname) const
    {
        for (auto &q : queues) {
            if (q->GetQueueName() == qname) {
                return q->GetQueueHandle();
            }
        }
        return nullptr;
    }

    std::shared_ptr<WorkerQueue> Worker::getQueueByName(const std::string &qname) const
    {
        for (auto q_handle : this->queues) {
            if (this->queueNameMap[q_handle] == qname)
                return q_handle;
        for (auto &q : queues) {
            if (q->GetQueueName() == qname) {
                return q;
            }
        }
        return nullptr;
    }

    bool Worker::join(TickType_t timeout)
    {
        assert(xTaskGetCurrentTaskHandle() == runnerTask);
        assert(getState() == State::Running);
        if (runnerTask) {
            assert(xTaskGetCurrentTaskHandle() == runnerTask);
            assert(getState() == State::Running || getState() == State::Stopped);

        if (xSemaphoreTake(joinSemaphore, timeout) != pdTRUE) {
            return false;
            if (xSemaphoreTake(joinSemaphore, timeout) != pdTRUE) {
                return false;
            }
            while (eTaskGetState(taskHandle) != eDeleted) {}
        }
        while (eTaskGetState(taskHandle) != eDeleted) {}

        return true;
    }



@@ 302,5 315,4 @@ namespace sys
        // a worker in case of unexpected failure without knowing its state.
        vTaskDelete(taskHandle);
    }

} /* namespace sys */

M module-sys/Service/Worker.hpp => module-sys/Service/Worker.hpp +38 -9
@@ 9,6 9,7 @@
#include <map>
#include <string>
#include <vector>
#include <queue.hpp>

namespace sys
{


@@ 21,6 22,27 @@ namespace sys
        int length;
    };

    class WorkerQueue : public cpp_freertos::Queue
    {
      public:
        WorkerQueue(const std::string &name, UBaseType_t maxItems, UBaseType_t itemSize)
            : Queue(maxItems, itemSize), name(name)
        {}

        QueueHandle_t GetQueueHandle() const
        {
            return handle;
        }

        const std::string &GetQueueName() const
        {
            return name;
        }

      private:
        const std::string name;
    };

    struct WorkerCommand
    {
        uint32_t command = 0;


@@ 68,16 90,21 @@ namespace sys
        static void taskAdapter(void *taskParam);
        bool handleControlMessage();
        void task();
        void addQueueInfo(xQueueHandle queue, std::string queueName);
        void setState(State newState);
        void constructName();
        std::string getControlQueueName() const;
        size_t addQueue(const std::string &queueName, UBaseType_t maxItems, UBaseType_t itemSize);

        std::optional<size_t> controlQueueIndex;
        std::optional<size_t> serviceQueueIndex;
        WorkerQueue &getControlQueue() const;
        WorkerQueue &getServiceQueue() const;

        static constexpr std::size_t controlMessagesCount = static_cast<std::size_t>(ControlMessage::MessageCount);
        static constexpr std::size_t defaultStackSize     = 2048;
        static constexpr TickType_t defaultJoinTimeout    = portMAX_DELAY;
        static constexpr auto controlQueueNamePrefix      = "wctrl";

        xQueueHandle controlQueue      = nullptr;
        xSemaphoreHandle joinSemaphore = nullptr;
        xTaskHandle runnerTask         = nullptr;
        xSemaphoreHandle stateMutex    = nullptr;


@@ 89,7 116,10 @@ namespace sys

      protected:
        virtual bool handleMessage(uint32_t queueID) = 0;
        xQueueHandle getQueueByName(std::string queueName);

        xQueueHandle getQueueHandleByName(const std::string &qname) const;
        std::shared_ptr<WorkerQueue> getQueueByName(const std::string &qname) const;

        bool sendControlMessage(ControlMessage message);
        State getState() const;



@@ 99,22 129,22 @@ namespace sys
        const std::string SERVICE_QUEUE_NAME       = "ServiceQueue";

        static unsigned int count;
        const UBaseType_t priority;

        sys::Service *service     = nullptr;
        xQueueHandle serviceQueue = nullptr;
        QueueSetHandle_t queueSet = nullptr;
        std::vector<xQueueHandle> queues;
        std::map<xQueueHandle, std::string> queueNameMap;
        std::vector<std::shared_ptr<WorkerQueue>> queues;

      public:
        Worker(sys::Service *service);
        Worker(std::string workerNamePrefix, const UBaseType_t priority);

        virtual ~Worker();

        /**
         * @brief This function is responsible for creating all queues provided in the constructor.
         * When all queues are created this method creates set of queues.
         */
        virtual bool init(std::list<WorkerQueueInfo> queues = std::list<WorkerQueueInfo>());
        virtual bool init(std::list<WorkerQueueInfo> queuesList = std::list<WorkerQueueInfo>());
        /**
         * @brief This function is responsible for destroying all resources created in the
         * init mehtod.


@@ 148,5 178,4 @@ namespace sys
         */
        void kill();
    };

} /* namespace sys */