~aleteoryx/muditaos

ref: sign_test muditaos/module-bsp/WorkerQueue.hpp -rw-r--r-- 3.5 KiB
a217eeb3 — Dawid Wojtas [BH-2024] Fix lack of alarm directory after updating software 1 year, 5 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// Copyright (c) 2017-2023, Mudita Sp. z.o.o. All rights reserved.
// For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

#pragma once

#include "FreeRTOS.h"
#include "task.h"
#include "queue.h"
#include <log/log.hpp>

#include <functional>
#include <utility>
#include <cstdint>

/*
 * Worker queue implementation is used to defer work from interrupt routines.
 * This way, it is possible to keep interrupt routines short but at the same time process more intensive tasks.
 * The design goal was to use as few resources as possible, hence it is a lightweight mechanism.
 * The stack assigned to the queue worker handler is relatively small.
 * It is not suitable for performing stack-consuming tasks, for instance handling I/O.
 * If necessary, stack size can be manually reconfigured upon creation.
 */
template <typename Message>
class WorkerQueue
{
    static constexpr auto minimalStackSizeInBytes = configMINIMAL_STACK_SIZE * sizeof(std::uint32_t);

  public:
    using WorkerHandle = std::function<void(const Message &)>;

    WorkerQueue(const char *name, WorkerHandle workerHandle, std::uint32_t stackSize = minimalStackSizeInBytes);
    ~WorkerQueue();

    WorkerQueue(const WorkerQueue &) = delete;
    WorkerQueue &operator=(const WorkerQueue &) = delete;
    WorkerQueue &operator=(WorkerQueue &&) = delete;

    BaseType_t post(const Message &msg);

  private:
    struct InternalMessage
    {
        Message msg;
        bool kill{false};
    };

    static constexpr auto queueLength = 4;

    xQueueHandle queueHandle{};
    xTaskHandle taskHandle{};
    WorkerHandle workerHandle{};
    xTaskHandle callerHandle{};

    void worker();
};

template <typename Message>
WorkerQueue<Message>::WorkerQueue(const char *name, WorkerHandle workerHandle, const std::uint32_t stackSize)
    : workerHandle{std::move(workerHandle)}
{
    queueHandle = xQueueCreate(queueLength, sizeof(InternalMessage));
    xTaskCreate(
        [](void *pvp) {
            WorkerQueue *inst = static_cast<WorkerQueue *>(pvp);
            inst->worker();
            vTaskDelete(nullptr);
        },
        name,
        stackSize / sizeof(std::uint32_t),
        this,
        0,
        &taskHandle);
}

template <typename Message>
WorkerQueue<Message>::~WorkerQueue()
{
    if ((queueHandle != nullptr) && (taskHandle != nullptr)) {
        const InternalMessage killMsg{{}, true};
        InternalMessage responseMsg;
        callerHandle = xTaskGetCurrentTaskHandle();
        xQueueReset(queueHandle);
        if (xQueueSend(queueHandle, &killMsg, pdMS_TO_TICKS(500)) != pdPASS) {
            LOG_FATAL("xQueueSend error will result in aborting worker thread");
        }

        /// Wait 500ms for a response from the worker. If it does not arrive, kill it.
        if (const auto result = ulTaskNotifyTake(pdTRUE, pdMS_TO_TICKS(500)); result == pdFALSE) {
            vTaskDelete(taskHandle);
        }
        vQueueDelete(queueHandle);
    }
}

template <typename Message>
void WorkerQueue<Message>::worker()
{
    while (true) {
        InternalMessage msg{};
        xQueueReceive(queueHandle, &msg, portMAX_DELAY);

        if (msg.kill) {
            xTaskNotifyGive(callerHandle);
            return;
        }
        else {
            workerHandle(msg.msg);
        }
    }
}

template <typename Message>
BaseType_t WorkerQueue<Message>::post(const Message &msg)
{
    BaseType_t notifyRequired{};
    const InternalMessage postMsg{msg, false};
    xQueueSendFromISR(queueHandle, &postMsg, &notifyRequired);
    return notifyRequired;
}