diff --git a/esp32/include/communication/comm_base.hpp b/esp32/include/communication/comm_base.hpp index 001f513..12a23bb 100644 --- a/esp32/include/communication/comm_base.hpp +++ b/esp32/include/communication/comm_base.hpp @@ -64,6 +64,8 @@ class CommAdapterBase { if (clientId < 0 && !hasSubscribers(tag)) return; + xSemaphoreTake(mutex_, portMAX_DELAY); + msg_.which_message = tag; MessageTraits::assign(msg_, data); @@ -77,18 +79,22 @@ class CommAdapterBase { pb_ostream_t stream = pb_ostream_from_buffer(buffer, out_size); if (!pb_encode(&stream, socket_message_Message_fields, &msg_)) { ESP_LOGE("ProtoComm", "Failed to encode message (tag %d), buffer too small?", (int)tag); + xSemaphoreGive(mutex_); + if (pb_heap_enc_buf != buffer) free(buffer); return; } if (clientId >= 0) { send(buffer, stream.bytes_written, clientId); } else { - sendToSubscribers(tag, buffer, stream.bytes_written); + sendToSubscribersLocked(tag, buffer, stream.bytes_written); } if (pb_heap_enc_buf != buffer) { free(buffer); } + + xSemaphoreGive(mutex_); } protected: @@ -151,11 +157,9 @@ class CommAdapterBase { std::vector> eventBusHandles_; private: - void sendToSubscribers(int32_t tag, const uint8_t* data, size_t len) { - xSemaphoreTake(mutex_, portMAX_DELAY); + void sendToSubscribersLocked(int32_t tag, const uint8_t* data, size_t len) { for (int cid : client_subscriptions_[tag]) { send(data, len, cid); } - xSemaphoreGive(mutex_); } }; diff --git a/esp32/include/event_bus/event_registry.h b/esp32/include/event_bus/event_registry.h index f339988..40840fa 100644 --- a/esp32/include/event_bus/event_registry.h +++ b/esp32/include/event_bus/event_registry.h @@ -36,10 +36,28 @@ REGISTER_SETTINGS_TYPE(api_PeripheralSettings, EventType::PERIPHERAL_SETTINGS) REGISTER_SETTINGS_TYPE(api_ServoSettings, EventType::SERVO_SETTINGS) REGISTER_SETTINGS_TYPE(api_CameraSettings, EventType::CAMERA_SETTINGS) -REGISTER_EVENT_TYPE(socket_message_IMUData, EventType::IMU_DATA) -REGISTER_EVENT_TYPE(socket_message_ControllerData, EventType::MOTION_COMMAND) -REGISTER_EVENT_TYPE(socket_message_ModeData, EventType::MOTION_MODE) -REGISTER_EVENT_TYPE(socket_message_AnglesData, EventType::MOTION_ANGLES) -REGISTER_EVENT_TYPE(socket_message_WalkGaitData, EventType::MOTION_WALK_GAIT) -REGISTER_EVENT_TYPE(socket_message_ServoStateData, EventType::SERVO_STATE) -REGISTER_EVENT_TYPE(socket_message_ServoPWMData, EventType::SERVO_PWM) +#define REGISTER_COMMAND_TYPE(MsgType, EventTypeValue) \ + REGISTER_EVENT_TYPE(MsgType, EventTypeValue) \ + template <> \ + struct EventBusConfig { \ + static constexpr size_t QueueDepth = 1; \ + static constexpr size_t MaxSubs = 3; \ + static constexpr size_t BatchSize = 1; \ + }; + +#define REGISTER_STREAM_TYPE(MsgType, EventTypeValue) \ + REGISTER_EVENT_TYPE(MsgType, EventTypeValue) \ + template <> \ + struct EventBusConfig { \ + static constexpr size_t QueueDepth = 4; \ + static constexpr size_t MaxSubs = 3; \ + static constexpr size_t BatchSize = 4; \ + }; + +REGISTER_STREAM_TYPE(socket_message_IMUData, EventType::IMU_DATA) +REGISTER_COMMAND_TYPE(socket_message_ControllerData, EventType::MOTION_COMMAND) +REGISTER_COMMAND_TYPE(socket_message_ModeData, EventType::MOTION_MODE) +REGISTER_COMMAND_TYPE(socket_message_AnglesData, EventType::MOTION_ANGLES) +REGISTER_COMMAND_TYPE(socket_message_WalkGaitData, EventType::MOTION_WALK_GAIT) +REGISTER_COMMAND_TYPE(socket_message_ServoStateData, EventType::SERVO_STATE) +REGISTER_COMMAND_TYPE(socket_message_ServoPWMData, EventType::SERVO_PWM) diff --git a/esp32/include/event_bus/typed_event_bus.h b/esp32/include/event_bus/typed_event_bus.h index 3f02495..923fbee 100644 --- a/esp32/include/event_bus/typed_event_bus.h +++ b/esp32/include/event_bus/typed_event_bus.h @@ -10,6 +10,7 @@ #include #include #include +#include template class FixedFn; @@ -65,82 +66,121 @@ class TypedEventBus { TickType_t interval; TickType_t last; EmitMode mode; - std::array buf; + Msg* buf; size_t cnt; std::atomic enabled; std::atomic running; + + Sub() : buf(nullptr), cnt(0), enabled(false), running(0) { + buf = static_cast(heap_caps_malloc(BatchSize * sizeof(Msg), MALLOC_CAP_SPIRAM | MALLOC_CAP_8BIT)); + if (!buf) buf = static_cast(malloc(BatchSize * sizeof(Msg))); + } + ~Sub() { + if (buf) free(buf); + } + Sub(const Sub&) = delete; + Sub& operator=(const Sub&) = delete; }; - inline static StaticQueue_t qbuf; - inline static Item qStorage[QueueDepth]; - inline static QueueHandle_t queue = - xQueueCreateStatic(QueueDepth, sizeof(Item), reinterpret_cast(qStorage), &qbuf); - inline static std::array, MaxSubs> subs {}; - inline static portMUX_TYPE mux = portMUX_INITIALIZER_UNLOCKED; - inline static Msg latest {}; - inline static std::atomic hasLatest {false}; - inline static std::atomic subCount {0}; - inline static std::atomic taskStarted {false}; + + struct BusState { + QueueHandle_t queue; + Sub* subs[MaxSubs]; + portMUX_TYPE mux; + Msg latest; + std::atomic hasLatest; + std::atomic subCount; + std::atomic taskStarted; + + BusState() + : queue(nullptr), + mux(portMUX_INITIALIZER_UNLOCKED), + latest {}, + hasLatest(false), + subCount(0), + taskStarted(false) { + for (size_t i = 0; i < MaxSubs; ++i) subs[i] = nullptr; + } + }; + + static BusState& state() { + static BusState s; + return s; + } + + static void ensureQueue() { + auto& s = state(); + if (s.queue) return; + portENTER_CRITICAL(&s.mux); + if (!s.queue) { + s.queue = xQueueCreate(QueueDepth, sizeof(Item)); + } + portEXIT_CRITICAL(&s.mux); + } static void storeISR(const Msg& m) { - UBaseType_t s = portSET_INTERRUPT_MASK_FROM_ISR(); - latest = m; - hasLatest.store(true, std::memory_order_release); - portCLEAR_INTERRUPT_MASK_FROM_ISR(s); + auto& s = state(); + UBaseType_t saved = portSET_INTERRUPT_MASK_FROM_ISR(); + s.latest = m; + s.hasLatest.store(true, std::memory_order_release); + portCLEAR_INTERRUPT_MASK_FROM_ISR(saved); } static void dispatch(const Msg& m, size_t ex) { + auto& s = state(); TickType_t now = xTaskGetTickCount(); Sub* ready[MaxSubs]; size_t readyCnt = 0; - portENTER_CRITICAL(&mux); + portENTER_CRITICAL(&s.mux); for (size_t i = 0; i < MaxSubs; ++i) { - auto& opt = subs[i]; - if (!opt || i == ex) continue; - Sub& s = *opt; - if (!s.enabled.load(std::memory_order_acquire)) continue; + Sub* sub = s.subs[i]; + if (!sub || i == ex) continue; + if (!sub->enabled.load(std::memory_order_acquire)) continue; - TickType_t dt = now - s.last; + TickType_t dt = now - sub->last; - if (s.interval && dt < s.interval) { - if (s.mode == EmitMode::Batch) { - if (s.cnt < BatchSize) - s.buf[s.cnt++] = m; + if (sub->interval && dt < sub->interval) { + if (sub->mode == EmitMode::Batch) { + if (sub->cnt < BatchSize) + sub->buf[sub->cnt++] = m; else - s.buf[BatchSize - 1] = m; + sub->buf[BatchSize - 1] = m; } else { - s.buf[0] = m; - s.cnt = 1; + sub->buf[0] = m; + sub->cnt = 1; } } else { - if (s.cnt < BatchSize) - s.buf[s.cnt++] = m; + if (sub->cnt < BatchSize) + sub->buf[sub->cnt++] = m; else - s.buf[BatchSize - 1] = m; - s.last = now; - s.running.fetch_add(1, std::memory_order_acq_rel); - ready[readyCnt++] = &s; + sub->buf[BatchSize - 1] = m; + sub->last = now; + sub->running.fetch_add(1, std::memory_order_acq_rel); + ready[readyCnt++] = sub; } } - portEXIT_CRITICAL(&mux); + portEXIT_CRITICAL(&s.mux); for (size_t i = 0; i < readyCnt; ++i) { - Sub* s = ready[i]; - s->cb(s->buf.data(), s->cnt); - s->cnt = 0; - s->running.fetch_sub(1, std::memory_order_acq_rel); + Sub* sub = ready[i]; + sub->cb(sub->buf, sub->cnt); + sub->cnt = 0; + sub->running.fetch_sub(1, std::memory_order_acq_rel); } } static void worker(void*) { + auto& s = state(); Item it; - while (xQueueReceive(queue, &it, portMAX_DELAY) == pdTRUE) dispatch(it.payload, it.exclude); + while (xQueueReceive(s.queue, &it, portMAX_DELAY) == pdTRUE) dispatch(it.payload, it.exclude); } static void ensureTask() { - if (!taskStarted.load(std::memory_order_acquire)) { + auto& s = state(); + if (!s.taskStarted.load(std::memory_order_acquire)) { bool expected = false; - if (taskStarted.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { + if (s.taskStarted.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { + ensureQueue(); xTaskCreatePinnedToCore(worker, "evtbus", 4096, nullptr, 6, nullptr, 1); } } @@ -148,8 +188,9 @@ class TypedEventBus { static bool push(const Msg& m, size_t ex = NO_EX, TickType_t to = 0) { ensureTask(); + auto& s = state(); Item it {m, ex}; - return xQueueSend(queue, &it, to) == pdTRUE; + return xQueueSend(s.queue, &it, to) == pdTRUE; } public: @@ -174,19 +215,21 @@ class TypedEventBus { ~Handle() { unsubscribe(); } void unsubscribe() { if (idx < MaxSubs) { - Sub* s = nullptr; - portENTER_CRITICAL(&mux); - if (subs[idx]) { - s = &*subs[idx]; - s->enabled.store(false, std::memory_order_release); + auto& s = state(); + Sub* sub = nullptr; + portENTER_CRITICAL(&s.mux); + sub = s.subs[idx]; + if (sub) { + sub->enabled.store(false, std::memory_order_release); } - portEXIT_CRITICAL(&mux); - if (s) { - while (s->running.load(std::memory_order_acquire) != 0) taskYIELD(); - portENTER_CRITICAL(&mux); - subs[idx].reset(); - portEXIT_CRITICAL(&mux); - subCount.fetch_sub(1, std::memory_order_acq_rel); + portEXIT_CRITICAL(&s.mux); + if (sub) { + while (sub->running.load(std::memory_order_acquire) != 0) taskYIELD(); + portENTER_CRITICAL(&s.mux); + s.subs[idx] = nullptr; + portEXIT_CRITICAL(&s.mux); + delete sub; + s.subCount.fetch_sub(1, std::memory_order_acq_rel); } idx = NO_EX; } @@ -195,10 +238,11 @@ class TypedEventBus { }; static void store(const Msg& m) { - portENTER_CRITICAL(&mux); - latest = m; - hasLatest.store(true, std::memory_order_release); - portEXIT_CRITICAL(&mux); + auto& s = state(); + portENTER_CRITICAL(&s.mux); + s.latest = m; + s.hasLatest.store(true, std::memory_order_release); + portEXIT_CRITICAL(&s.mux); } template @@ -210,23 +254,25 @@ class TypedEventBus { template static Handle subscribe(uint32_t ms, EmitMode mode, C fn) { ensureTask(); - portENTER_CRITICAL(&mux); - for (size_t i = 0; i < MaxSubs; ++i) - if (!subs[i]) { - subs[i].emplace(); - Sub& s = *subs[i]; - s.cb.set(std::move(fn)); - s.interval = pdMS_TO_TICKS(ms); - s.last = xTaskGetTickCount() - s.interval; - s.mode = mode; - s.cnt = 0; - s.enabled.store(true, std::memory_order_release); - s.running.store(0, std::memory_order_release); - subCount.fetch_add(1, std::memory_order_acq_rel); - portEXIT_CRITICAL(&mux); + auto& s = state(); + portENTER_CRITICAL(&s.mux); + for (size_t i = 0; i < MaxSubs; ++i) { + if (!s.subs[i]) { + Sub* sub = new Sub(); + sub->cb.set(std::move(fn)); + sub->interval = pdMS_TO_TICKS(ms); + sub->last = xTaskGetTickCount() - sub->interval; + sub->mode = mode; + sub->cnt = 0; + sub->enabled.store(true, std::memory_order_release); + sub->running.store(0, std::memory_order_release); + s.subs[i] = sub; + s.subCount.fetch_add(1, std::memory_order_acq_rel); + portEXIT_CRITICAL(&s.mux); return Handle(i); } - portEXIT_CRITICAL(&mux); + } + portEXIT_CRITICAL(&s.mux); return Handle(NO_EX); } @@ -261,27 +307,31 @@ class TypedEventBus { } static void publishISR(const Msg& m, BaseType_t* hpw = nullptr) { + auto& s = state(); storeISR(m); + ensureQueue(); Item it {m, NO_EX}; - xQueueSendFromISR(queue, &it, hpw); + xQueueSendFromISR(s.queue, &it, hpw); } static bool peek(Msg& out) { - if (!hasLatest.load(std::memory_order_acquire)) return false; - portENTER_CRITICAL(&mux); - out = latest; - portEXIT_CRITICAL(&mux); + auto& s = state(); + if (!s.hasLatest.load(std::memory_order_acquire)) return false; + portENTER_CRITICAL(&s.mux); + out = s.latest; + portEXIT_CRITICAL(&s.mux); return true; } static bool take(Msg& out) { - if (!hasLatest.load(std::memory_order_acquire)) return false; - portENTER_CRITICAL(&mux); - out = latest; - hasLatest.store(false, std::memory_order_release); - portEXIT_CRITICAL(&mux); + auto& s = state(); + if (!s.hasLatest.load(std::memory_order_acquire)) return false; + portENTER_CRITICAL(&s.mux); + out = s.latest; + s.hasLatest.store(false, std::memory_order_release); + portEXIT_CRITICAL(&s.mux); return true; } - static bool hasSubscribers() { return subCount.load(std::memory_order_acquire) > 0; } + static bool hasSubscribers() { return state().subCount.load(std::memory_order_acquire) > 0; } };