~aleteoryx/muditaos

ref: 5f03b9148ed78d558e139b8a3d563c6f08f33447 muditaos/module-sys/Service/Worker.hpp -rw-r--r-- 5.8 KiB
5f03b914 — Marcin Smoczyński [EGD-7641] Disable service desktop tests 4 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
// For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

#pragma once

#include "Service.hpp"

#include <memory>
#include <map>
#include <string>
#include <vector>
#include <queue.hpp>

namespace sys
{

    class WorkerQueueInfo
    {
      public:
        WorkerQueueInfo(std::string_view _name, int _elementSize, int _length)
            : name(_name), elementSize(_elementSize), length(_length)
        {}
        std::string name;
        int elementSize;
        int length;
    };

    class WorkerQueue : public cpp_freertos::Queue
    {
      public:
        WorkerQueue(const std::string &name, UBaseType_t maxItems, UBaseType_t itemSize)
            : Queue(maxItems, itemSize), name(name)
        {}

        QueueHandle_t GetQueueHandle() const
        {
            return handle;
        }

        const std::string &GetQueueName() const
        {
            return name;
        }

      private:
        const std::string name;
    };

    struct WorkerCommand
    {
        uint32_t command = 0;
        uint32_t *data   = NULL;
    };

    /*
     * @brief Worker is a wrapper for freeRTOS task used to separate sysmanager environment from
     * the rest of the system. Its purpose is to handle asynchronous events like IRQ or timers.
     * Flow of creating worker is as follows:
     * - create new Worker object and provide pointer to the service that owns the worker,
     * - call init method and provide list of parameters to create queues. Those queues can be later
     * used to wake up the worker.
     * - call run method to start the worker.
     *
     * Flow for closing the worker is as follows:
     * - call stop method - task will end itself
     * - call join method to wait for the task to end
     * - call deinit to destroy all resources owned by the worker
     * - delete the object.
     *
     */
    class Worker
    {
      private:
        enum class ControlMessage
        {
            Stop,
            MessageCount
        };

        enum class State
        {
            New,
            Initiated,
            Running,
            Stopping,
            Stopped,
            Destroyed,
            Invalid
        };

        using Id = unsigned int;

        static void taskAdapter(void *taskParam);
        bool handleControlMessage();
        void task();
        void setState(State newState);
        void constructName();
        std::string getControlQueueName() const;
        size_t addQueue(const std::string &queueName, UBaseType_t maxItems, UBaseType_t itemSize);

        std::optional<size_t> controlQueueIndex;
        std::optional<size_t> serviceQueueIndex;
        WorkerQueue &getControlQueue() const;

        static constexpr std::size_t controlMessagesCount = static_cast<std::size_t>(ControlMessage::MessageCount);
        static constexpr std::size_t defaultStackSize     = 8192;
        static constexpr TickType_t defaultJoinTimeout    = portMAX_DELAY;
        static constexpr auto controlQueueNamePrefix      = "wctrl";

        xSemaphoreHandle joinSemaphore = nullptr;
        xTaskHandle runnerTask         = nullptr;
        xSemaphoreHandle stateMutex    = nullptr;
        xTaskHandle taskHandle         = nullptr;

        Id id;
        std::string name;
        State state = State::New;

      protected:
        virtual bool handleMessage(uint32_t queueID) = 0;

        WorkerQueue &getServiceQueue() const;

        xQueueHandle getQueueHandleByName(const std::string &qname) const;
        std::shared_ptr<WorkerQueue> getQueueByName(const std::string &qname) const;

        bool sendControlMessage(ControlMessage message);
        bool sendCommand(WorkerCommand command);
        State getState() const;

        const static uint32_t SERVICE_QUEUE_LENGTH = 10;
        const static uint32_t CONTROL_QUEUE_LENGTH = 4;
        const static uint32_t SERVICE_QUEUE_SIZE   = sizeof(WorkerCommand);
        const std::string SERVICE_QUEUE_NAME       = "ServiceQueue";

        static unsigned int count;
        const UBaseType_t priority;
        std::uint16_t stackDepth = defaultStackSize;

        QueueSetHandle_t queueSet = nullptr;
        std::vector<std::shared_ptr<WorkerQueue>> queues;

      public:
        Worker(sys::Service *service, std::uint16_t stackDepth = defaultStackSize);
        Worker(std::string workerNamePrefix, const UBaseType_t priority, std::uint16_t stackDepth = defaultStackSize);

        virtual ~Worker();

        /**
         * @brief This function is responsible for creating all queues provided in the constructor.
         * When all queues are created this method creates set of queues.
         */
        virtual bool init(std::list<WorkerQueueInfo> queuesList = std::list<WorkerQueueInfo>());
        /**
         * @brief This function is responsible for destroying all resources created in the
         * init mehtod.
         */
        virtual bool deinit();
        /**
         * @brief Starts RTOS thread that waits for incoming queue events.
         */
        virtual bool run();
        /**
         * @brief Sends stop command to worker.
         */
        virtual bool stop();
        /**
         * @brief Joins the thread
         *
         * @param timeout - ticks to wait for the thread to end
         */
        bool join(TickType_t timeout = defaultJoinTimeout);
        /**
         * @brief Sends command and pointer to data to worker
         */
        virtual bool send(uint32_t cmd, uint32_t *data);
        /**
         * @brief Closes worker by combining stop, join and deinit operations in a single call.
         * If it is not possible to close the worker gently it would kill it forcibly.
         */
        void close();
        /**
         * @brief Kills the worker. Does not deinit it.
         */
        void kill();
    };
} /* namespace sys */