M changelog.md => changelog.md +2 -1
@@ 3,6 3,7 @@
## [Current release]
### Added
+
* `[desktop][gui]` Added SIM and PUK lock-related windows
* `[GUI]` Added rich text parsing for full text styling needs
* `[calendar]` Add notifications from database handling.
@@ 20,7 21,7 @@
### Other
* `[audio]` Audio service and api cleanup/refactor.
-
+* `[system]` Improved destroying of worker threads.
## [0.39.1 2020-09-25]
M module-services/service-desktop/ServiceDesktop.cpp => module-services/service-desktop/ServiceDesktop.cpp +1 -3
@@ 15,9 15,6 @@ ServiceDesktop::ServiceDesktop() : sys::Service(service::name::service_desktop,
ServiceDesktop::~ServiceDesktop()
{
LOG_INFO("[ServiceDesktop] Cleaning resources");
- if (desktopWorker != nullptr) {
- desktopWorker->deinit();
- }
}
sys::ReturnCodes ServiceDesktop::InitHandler()
@@ 75,6 72,7 @@ sys::ReturnCodes ServiceDesktop::InitHandler()
sys::ReturnCodes ServiceDesktop::DeinitHandler()
{
+ desktopWorker->close();
return sys::ReturnCodes::Success;
}
M module-services/service-evtmgr/EventManager.cpp => module-services/service-evtmgr/EventManager.cpp +3 -6
@@ 33,6 33,8 @@
#include "bsp/cellular/bsp_cellular.hpp"
#include "bsp/common.hpp"
+#include <cassert>
+
EventManager::EventManager(const std::string &name) : sys::Service(name)
{
LOG_INFO("[%s] Initializing", name.c_str());
@@ 44,9 46,6 @@ EventManager::EventManager(const std::string &name) : sys::Service(name)
EventManager::~EventManager()
{
LOG_INFO("[%s] Cleaning resources", GetName().c_str());
- if (EventWorker != nullptr) {
- EventWorker->deinit();
- }
}
// Invoked upon receiving data message
@@ 253,9 252,7 @@ sys::ReturnCodes EventManager::InitHandler()
sys::ReturnCodes EventManager::DeinitHandler()
{
-
- EventWorker->deinit();
- EventWorker.reset();
+ EventWorker->close();
return sys::ReturnCodes::Success;
}
M module-services/service-evtmgr/WorkerEvent.hpp => module-services/service-evtmgr/WorkerEvent.hpp +1 -0
@@ 27,6 27,7 @@ struct KeyState
enum class WorkerEventQueues
{
queueService = 0,
+ queueControl = 1,
queueKeyboardIRQ,
queueHeadsetIRQ,
queueBattery,
M module-services/service-gui/ServiceGUI.cpp => module-services/service-gui/ServiceGUI.cpp +5 -3
@@ 63,8 63,6 @@ namespace sgui
delete renderContext;
if (transferContext)
delete transferContext;
-
- worker->deinit();
}
void ServiceGUI::sendBuffer()
@@ 96,7 94,7 @@ namespace sgui
}
// Invoked upon receiving data message
- sys::Message_t ServiceGUI::DataReceivedHandler(sys::DataMessage * msgl, sys::ResponseMessage * resp)
+ sys::Message_t ServiceGUI::DataReceivedHandler(sys::DataMessage *msgl, sys::ResponseMessage *resp)
{
sgui::GUIMessage *msg = static_cast<sgui::GUIMessage *>(msgl);
@@ 263,6 261,10 @@ namespace sgui
vSemaphoreDelete(semCommands);
semCommands = NULL;
+ worker->stop();
+ worker->join();
+ worker->deinit();
+
return sys::ReturnCodes::Success;
}
M module-sys/Service/Worker.cpp => module-sys/Service/Worker.cpp +180 -44
@@ 6,43 6,108 @@ extern "C"
#include "task.h"
}
-#include <string.h>
+#include <map>
+#include <string>
+#include <utility>
+
+#include <cassert>
namespace sys
{
+ unsigned int Worker::count = 0;
+
+ void Worker::taskAdapter(void *taskParam)
+ {
+ Worker *worker = static_cast<Worker *>(taskParam);
+ worker->task();
+ }
- void workerTaskFunction(void *ptr)
+ bool Worker::handleControlMessage()
+ {
+ std::uint8_t receivedMessage;
+
+ xQueueReceive(controlQueue, &receivedMessage, 0);
+ LOG_INFO("Handle control message: %u", receivedMessage);
+ assert(receivedMessage < controlMessagesCount);
+
+ switch (static_cast<Worker::ControlMessage>(receivedMessage)) {
+ // stop the thread
+ case ControlMessage::Stop: {
+ setState(State::Stopping);
+ } break;
+
+ default: {
+ LOG_FATAL("Unexpected control message %d received", receivedMessage);
+ return false;
+ } break;
+ }
+
+ return true;
+ }
+
+ void Worker::task()
{
- Worker *worker = reinterpret_cast<Worker *>(ptr);
QueueSetMemberHandle_t activeMember;
- std::vector<xQueueHandle> queues = worker->queues;
- while (1) {
- activeMember = xQueueSelectFromSet(worker->queueSet, portMAX_DELAY);
- // find id of the queue that was activated
+ while (getState() == State::Running) {
+ activeMember = xQueueSelectFromSet(queueSet, portMAX_DELAY);
+
+ // handle control messages from parent service
+ if (activeMember == controlQueue) {
+ handleControlMessage();
+ continue;
+ }
+ // find id of the queue that was activated
for (uint32_t i = 0; i < queues.size(); i++) {
if (queues[i] == activeMember) {
- worker->handleMessage(i);
+ handleMessage(i);
}
}
}
+
+ // inform about thread end and wait for the deletion
+ xSemaphoreGive(joinSemaphore);
+ setState(State::Stopped);
+ vTaskDelete(nullptr);
}
- Worker::Worker(sys::Service *service) : service{service}, serviceQueue{NULL}, queueSet{NULL}, taskHandle{NULL}
+ Worker::Worker(sys::Service *service) : service{service}
{}
+ Worker::~Worker()
+ {
+ if (state != State::Destroyed) {
+ LOG_FATAL("Calling destructor of an undestroyed worker.");
+ }
+ }
+
+ void Worker::addQueueInfo(xQueueHandle q, std::string qName)
+ {
+ queueNameMap.emplace(std::make_pair(q, qName));
+ vQueueAddToRegistry(q, qName.c_str());
+ queues.push_back(q);
+ }
+
+ inline std::string Worker::getControlQueueName() const
+ {
+ return controlQueueNamePrefix + std::to_string(id);
+ }
+
bool Worker::init(std::list<WorkerQueueInfo> queuesList)
{
+ assert(state == State::New);
- // initial value is because there is always a queue to communicate with service
- uint32_t setSize = SERVICE_QUEUE_LENGTH;
+ // assign worker id
+ taskENTER_CRITICAL();
+ id = count++;
+ taskEXIT_CRITICAL();
- auto addQueueInfo = [&](xQueueHandle q, std::string qName) {
- queueNameMap.insert(std::pair<xQueueHandle, std::string>(q, qName));
- vQueueAddToRegistry(q, qName.c_str());
- queues.push_back(q);
- };
+ name = service->GetName() + "_w" + std::to_string(id);
+
+ // initial value is because there is always a service and control queue
+ // to communicate with the parent service
+ auto setSize = SERVICE_QUEUE_LENGTH + CONTROL_QUEUE_LENGTH;
// iterate over all entries in the list of queues and summarize queue sizes
for (auto wqi : queuesList) {
@@ 51,23 116,37 @@ namespace sys
// create set of queues
queueSet = xQueueCreateSet(setSize);
- if (queueSet == NULL)
+ if (queueSet == nullptr) {
+ state = State::Invalid;
return false;
+ }
// create and add all queues to the set. First service queue is created.
serviceQueue = xQueueCreate(SERVICE_QUEUE_LENGTH, SERVICE_QUEUE_SIZE);
if (serviceQueue == nullptr) {
+ state = State::Invalid;
deinit();
return false;
}
addQueueInfo(serviceQueue, SERVICE_QUEUE_NAME);
+ // create control queue
+ controlQueue = xQueueCreate(CONTROL_QUEUE_LENGTH, sizeof(std::uint8_t));
+ if (controlQueue == nullptr) {
+ state = State::Invalid;
+ deinit();
+ return false;
+ }
+
+ addQueueInfo(controlQueue, getControlQueueName());
+
// create and add all queues provided from service
for (auto wqi : queuesList) {
auto q = xQueueCreate(wqi.length, wqi.elementSize);
- if (q == NULL) {
+ if (q == nullptr) {
LOG_FATAL("xQueueCreate %s failed", wqi.name.c_str());
+ state = State::Invalid;
deinit();
return false;
}
@@ 78,19 157,26 @@ namespace sys
for (uint32_t i = 0; i < queues.size(); ++i) {
if (xQueueAddToSet(queues[i], queueSet) != pdPASS) {
- // LOG_FATAL("xQueueAddToSet %d failed", i);
+ state = State::Invalid;
deinit();
return false;
}
}
+ // create join semaphore
+ joinSemaphore = xSemaphoreCreateBinary();
+
+ // state protector
+ stateMutex = xSemaphoreCreateMutex();
+
+ // it is safe to use getState/setState methods now
+ setState(State::Initiated);
+
return true;
}
+
bool Worker::deinit()
{
-
- vTaskDelete(taskHandle);
-
// for all queues - remove from set and delete queue
for (auto q : queues) {
// remove queues from set
@@ 102,55 188,59 @@ namespace sys
// delete queues set
vQueueDelete((QueueHandle_t)queueSet);
- queueSet = NULL;
- taskHandle = NULL;
+
+ vSemaphoreDelete(joinSemaphore);
+ vSemaphoreDelete(stateMutex);
+
+ setState(State::Destroyed);
return true;
};
+
/**
* This method starts RTOS thread that waits for incoming queue events.
*/
bool Worker::run()
{
- static int workerCount = 0;
- std::string workerName = service->GetName() + "_w" + std::to_string(workerCount);
+ assert(getState() == State::Initiated);
+
+ runnerTask = xTaskGetCurrentTaskHandle();
+
BaseType_t task_error =
- xTaskCreate(workerTaskFunction, workerName.c_str(), 2048, this, service->GetPriority(), &taskHandle);
+ xTaskCreate(Worker::taskAdapter, name.c_str(), defaultStackSize, this, service->GetPriority(), &taskHandle);
if (task_error != pdPASS) {
LOG_ERROR("Failed to start the task");
return false;
}
+
+ setState(State::Running);
return true;
}
bool Worker::stop()
{
- return send(0, NULL);
+ assert(xTaskGetCurrentTaskHandle() == runnerTask);
+ assert(getState() == State::Running);
+
+ return sendControlMessage(ControlMessage::Stop);
}
- bool Worker::handleMessage(uint32_t queueID)
+ bool Worker::sendControlMessage(ControlMessage message)
{
-
- QueueHandle_t queue = queues[queueID];
-
- // service queue
- if (queueID == 0) {
- WorkerCommand workerCommand;
- if (xQueueReceive(queue, &workerCommand, 0) != pdTRUE) {
- return false;
- }
- workerCommand.command = 1;
- // place some code here to handle messages from service
- }
- return true;
+ auto messageToSend = static_cast<std::uint8_t>(message);
+ return xQueueSend(controlQueue, &messageToSend, portMAX_DELAY) == pdTRUE;
}
bool Worker::send(uint32_t cmd, uint32_t *data)
{
- if (serviceQueue != NULL) {
+ assert(xTaskGetCurrentTaskHandle() == runnerTask);
+ assert(getState() == State::Running);
+
+ if (serviceQueue != nullptr) {
WorkerCommand workerCommand{cmd, data};
- if (xQueueSend(serviceQueue, &workerCommand, portMAX_DELAY) == pdTRUE)
+ if (xQueueSend(serviceQueue, &workerCommand, portMAX_DELAY) == pdTRUE) {
return true;
+ }
}
return false;
}
@@ 164,4 254,50 @@ namespace sys
return nullptr;
}
+ bool Worker::join(TickType_t timeout)
+ {
+ assert(xTaskGetCurrentTaskHandle() == runnerTask);
+ assert(getState() == State::Running);
+
+ if (xSemaphoreTake(joinSemaphore, timeout) != pdTRUE) {
+ return false;
+ }
+ while (eTaskGetState(taskHandle) != eDeleted) {}
+
+ return true;
+ }
+
+ void Worker::setState(State newState)
+ {
+ xSemaphoreTake(stateMutex, portMAX_DELAY);
+ state = newState;
+ xSemaphoreGive(stateMutex);
+ }
+
+ Worker::State Worker::getState() const
+ {
+ State currentState;
+
+ xSemaphoreTake(stateMutex, portMAX_DELAY);
+ currentState = state;
+ xSemaphoreGive(stateMutex);
+
+ return currentState;
+ }
+
+ void Worker::close()
+ {
+ if (!stop() || !join()) {
+ kill();
+ }
+ deinit();
+ }
+
+ void Worker::kill()
+ {
+ // do not check state - this is intentional, we want to be able to kill
+ // a worker in case of unexpected failure without knowing its state.
+ vTaskDelete(taskHandle);
+ }
+
} /* namespace sys */
M module-sys/Service/Worker.hpp => module-sys/Service/Worker.hpp +78 -18
@@ 3,7 3,9 @@
#include "Service.hpp"
#include <memory>
+#include <map>
#include <string>
+#include <vector>
namespace sys
{
@@ 30,60 32,118 @@ namespace sys
* - call init method and provide list of parameters to create queues. Those queues can be later
* used to wake up the worker.
* - call run method to start the worker.
+ *
* Flow for closing the worker is as follows:
- * - call stop method - task will send a close confirmation to the service after exiting its main loop
- * - call deinit to destroy all queues ued by the worker
+ * - call stop method - task will end itself
+ * - call join method to wait for the task to end
+ * - call deinit to destroy all resources owned by the worker
* - delete the object.
*
*/
class Worker
{
+ private:
+ enum class ControlMessage
+ {
+ Stop,
+ MessageCount
+ };
+
+ enum class State
+ {
+ New,
+ Initiated,
+ Running,
+ Stopping,
+ Stopped,
+ Destroyed,
+ Invalid
+ };
+
+ using Id = unsigned int;
+
+ static void taskAdapter(void *taskParam);
+ bool handleControlMessage();
+ void task();
+ void addQueueInfo(xQueueHandle queue, std::string queueName);
+ void setState(State newState);
+ std::string getControlQueueName() const;
+
+ static constexpr std::size_t controlMessagesCount = static_cast<std::size_t>(ControlMessage::MessageCount);
+ static constexpr std::size_t defaultStackSize = 2048;
+ static constexpr TickType_t defaultJoinTimeout = portMAX_DELAY;
+ static inline std::string controlQueueNamePrefix = "wctrl";
+
+ xQueueHandle controlQueue = nullptr;
+ xSemaphoreHandle joinSemaphore = nullptr;
+ xTaskHandle runnerTask = nullptr;
+ xSemaphoreHandle stateMutex = nullptr;
+ xTaskHandle taskHandle = nullptr;
+
+ Id id;
+ std::string name;
+ State state = State::New;
+
protected:
+ virtual bool handleMessage(uint32_t queueID) = 0;
+ xQueueHandle getQueueByName(std::string queueName);
+ bool sendControlMessage(ControlMessage message);
+ State getState() const;
+
const static uint32_t SERVICE_QUEUE_LENGTH = 10;
+ const static uint32_t CONTROL_QUEUE_LENGTH = 4;
const static uint32_t SERVICE_QUEUE_SIZE = sizeof(WorkerCommand);
const std::string SERVICE_QUEUE_NAME = "ServiceQueue";
- sys::Service *service;
- // queue used by service to send commands to service.
- xQueueHandle serviceQueue;
+ static unsigned int count;
+
+ sys::Service *service = nullptr;
+ xQueueHandle serviceQueue = nullptr;
+ QueueSetHandle_t queueSet = nullptr;
std::vector<xQueueHandle> queues;
- QueueSetHandle_t queueSet;
std::map<xQueueHandle, std::string> queueNameMap;
- xTaskHandle taskHandle;
-
- friend void workerTaskFunction(void *ptr);
public:
Worker(sys::Service *service);
- virtual ~Worker() = default;
+ virtual ~Worker();
/**
- * This function is responsible for creating all queues provided in the constructor.
+ * @brief This function is responsible for creating all queues provided in the constructor.
* When all queues are created this method creates set of queues.
*/
virtual bool init(std::list<WorkerQueueInfo> queues = std::list<WorkerQueueInfo>());
+ /**
+ * @brief This function is responsible for destroying all resources created in the
+ * init mehtod.
+ */
virtual bool deinit();
/**
- * This method starts RTOS thread that waits for incoming queue events.
+ * @brief Starts RTOS thread that waits for incoming queue events.
*/
virtual bool run();
/**
- * Sends stop command to worker.
+ * @brief Sends stop command to worker.
*/
virtual bool stop();
/**
- * This method is called from thread when new message arrives in queue.
- * @param queueID Index of the queue in the queues vector.
+ * @brief Joins the thread
+ *
+ * @param timeout - ticks to wait for the thread to end
*/
- virtual bool handleMessage(uint32_t queueID);
+ bool join(TickType_t timeout = defaultJoinTimeout);
/**
* @brief Sends command and pointer to data to worker
*/
virtual bool send(uint32_t cmd, uint32_t *data);
/**
- * @brief Returns handle to queue by its name
+ * @brief Closes worker by combining stop, join and deinit operations in a single call.
+ * If it is not possible to close the worker gently it would kill it forcibly.
+ */
+ void close();
+ /**
+ * @brief Kills the worker. Does not deinit it.
*/
- virtual xQueueHandle getQueueByName(std::string);
+ void kill();
};
} /* namespace sys */