diff --git a/esp32/include/adapters/comm_base.hpp b/esp32/include/adapters/comm_base.hpp index ca0b17b..bb1b250 100644 --- a/esp32/include/adapters/comm_base.hpp +++ b/esp32/include/adapters/comm_base.hpp @@ -7,7 +7,7 @@ #include "topic.hpp" #ifndef MAX_CID -#define MAX_CID 4 +#define MAX_CID 64 #endif enum class MsgKind : uint8_t { Connect = 0, Disconnect = 1, Event = 2, Ping = 3, Pong = 4 }; diff --git a/esp32/include/event_bus.hpp b/esp32/include/event_bus.hpp index d790644..28ff65a 100644 --- a/esp32/include/event_bus.hpp +++ b/esp32/include/event_bus.hpp @@ -1,213 +1,259 @@ #pragma once #include -#include -#include #include #include - +#include +#include +#include +#include #include #include #include +template +class FixedFn; + +template +class FixedFn { + alignas(void*) std::byte buf[MaxSize]; + void (*call)(void*, A&&...) {}; + void (*moveFn)(void*, void*) {}; + void (*destroy)(void*) {}; + + public: + template + void set(Fun&& f) { + static_assert(sizeof(Fun) <= MaxSize); + new (buf) Fun(std::forward(f)); + call = [](void* p, A&&... as) { (*reinterpret_cast(p))(std::forward(as)...); }; + moveFn = [](void* d, void* s) { new (d) Fun(std::move(*reinterpret_cast(s))); }; + destroy = [](void* p) { reinterpret_cast(p)->~Fun(); }; + } + R operator()(A... as) const { + return call(const_cast(static_cast(buf)), std::forward(as)...); + } + FixedFn() = default; + FixedFn(FixedFn&& o) { + if (o.moveFn) o.moveFn(buf, o.buf); + call = o.call; + moveFn = o.moveFn; + destroy = o.destroy; + o.destroy = nullptr; + } + FixedFn(const FixedFn& o) { + std::memcpy(buf, o.buf, MaxSize); + call = o.call; + moveFn = o.moveFn; + destroy = o.destroy; + } + ~FixedFn() { + if (destroy) destroy(buf); + } + FixedFn& operator=(const FixedFn&) = delete; + FixedFn& operator=(FixedFn&&) = delete; +}; + enum class EmitMode { Latest, Batch }; template class EventBus { - static_assert(BatchSize > 0); - struct Item { Msg payload; - uint8_t exclude; // 0-MaxSubs-1 or 0xFF for “none” + size_t exclude; }; - - using ExIdx = uint8_t; - - static constexpr ExIdx NO_EX = 0xFF; - + static constexpr size_t NO_EX = MaxSubs; struct Sub { - std::function cb; + FixedFn cb; TickType_t interval; TickType_t last; EmitMode mode; std::array buf; size_t cnt; }; - - inline static StaticQueue_t q_buf {}; - inline static Item q_storage[QueueDepth]; - inline static QueueHandle_t q_handle = - xQueueCreateStatic(QueueDepth, sizeof(Item), reinterpret_cast(q_storage), &q_buf); - + inline static StaticQueue_t qbuf; + inline static Item qStorage[QueueDepth]; + inline static QueueHandle_t queue = + xQueueCreateStatic(QueueDepth, sizeof(Item), reinterpret_cast(qStorage), &qbuf); inline static std::array, MaxSubs> subs {}; inline static portMUX_TYPE mux = portMUX_INITIALIZER_UNLOCKED; inline static Msg latest {}; - inline static volatile bool has_latest = false; - inline static std::atomic sub_cnt {0}; + inline static std::atomic hasLatest {false}; + inline static std::atomic subCount {0}; static void store(const Msg& m) { portENTER_CRITICAL(&mux); latest = m; - has_latest = true; + hasLatest.store(true, std::memory_order_release); portEXIT_CRITICAL(&mux); } - static void storeISR(const Msg& m) { UBaseType_t s = portSET_INTERRUPT_MASK_FROM_ISR(); latest = m; - has_latest = true; + hasLatest.store(true, std::memory_order_release); portCLEAR_INTERRUPT_MASK_FROM_ISR(s); } - static void dispatch(const Msg& m, Sub* ex) { - store(m); + static void dispatch(const Msg& m, size_t ex) { TickType_t now = xTaskGetTickCount(); - for (auto& sref : subs) { - if (!sref) continue; - auto& sub = *sref; - if (&sub == ex) continue; - TickType_t dt = now - sub.last; - if (sub.interval && dt < sub.interval) { - if (sub.mode == EmitMode::Batch && sub.cnt < BatchSize) - sub.buf[sub.cnt++] = m; - else if (sub.mode == EmitMode::Latest) - sub.buf[0] = m, sub.cnt = 1; - } else { - if (sub.cnt == 0) - sub.cb(&m, 1); - else { - sub.cb(sub.buf.data(), sub.cnt); - sub.cnt = 0; + Sub* ready[MaxSubs]; + size_t readyCnt = 0; + + portENTER_CRITICAL(&mux); + for (size_t i = 0; i < MaxSubs; ++i) { + auto& opt = subs[i]; + if (!opt || i == ex) continue; + Sub& s = *opt; + TickType_t dt = now - s.last; + + if (s.interval && dt < s.interval) { + if (s.mode == EmitMode::Batch && s.cnt < BatchSize) + s.buf[s.cnt++] = m; + else if (s.mode == EmitMode::Latest) { + s.buf[0] = m; + s.cnt = 1; } - sub.last = now; + } else { + s.buf[s.cnt++] = m; + s.last = now; + ready[readyCnt++] = &s; } } + portEXIT_CRITICAL(&mux); + + for (size_t i = 0; i < readyCnt; ++i) { + Sub* s = ready[i]; + s->cb(s->buf.data(), s->cnt); + s->cnt = 0; + } } static void worker(void*) { Item it; - for (;;) - if (xQueueReceive(q_handle, &it, portMAX_DELAY) == pdTRUE) - dispatch(it.payload, it.exclude == NO_EX ? nullptr : &*subs[it.exclude]); + while (xQueueReceive(queue, &it, portMAX_DELAY) == pdTRUE) dispatch(it.payload, it.exclude); } - static void ensureTask() { - static bool once = - (xTaskCreatePinnedToCore(worker, "eventbus", 4096, nullptr, 6, nullptr, tskNO_AFFINITY), true); + static bool once = (xTaskCreatePinnedToCore(worker, "evtbus", 4096, nullptr, 6, nullptr, 1), true); (void)once; } - - static void push(const Msg& m, uint8_t ex = NO_EX) { + static bool push(const Msg& m, size_t ex = NO_EX, TickType_t to = 0) { + ensureTask(); Item it {m, ex}; - xQueueSend(q_handle, &it, portMAX_DELAY); + return xQueueSend(queue, &it, to) == pdTRUE; } public: class Handle { - size_t index {MaxSubs}; + size_t idx {NO_EX}; friend class EventBus; - explicit Handle(size_t i) : index(i) {} + explicit Handle(size_t i) : idx(i) {} public: Handle() = default; Handle(const Handle&) = delete; Handle& operator=(const Handle&) = delete; - Handle(Handle&& h) noexcept : index(h.index) { h.index = MaxSubs; } - Handle& operator=(Handle&& h) noexcept { - if (this != &h) { + Handle(Handle&& o) noexcept : idx(o.idx) { o.idx = NO_EX; } + Handle& operator=(Handle&& o) noexcept { + if (this != &o) { unsubscribe(); - index = h.index; - h.index = MaxSubs; + idx = o.idx; + o.idx = NO_EX; } return *this; } - ~Handle() = default; + ~Handle() { unsubscribe(); } void unsubscribe() { - if (index < MaxSubs) { - subs[index].reset(); - sub_cnt.fetch_sub(1, std::memory_order_acq_rel); - index = MaxSubs; + if (idx < MaxSubs) { + portENTER_CRITICAL(&mux); + subs[idx].reset(); + portEXIT_CRITICAL(&mux); + subCount.fetch_sub(1, std::memory_order_acq_rel); + idx = NO_EX; } } - bool valid() const { return index < MaxSubs; } + bool valid() const { return idx < MaxSubs; } }; template - static Handle subscribe(uint32_t ms, EmitMode mode, C&& fn) { + static Handle subscribe(uint32_t ms, EmitMode mode, C fn) { ensureTask(); + portENTER_CRITICAL(&mux); for (size_t i = 0; i < MaxSubs; ++i) if (!subs[i]) { - subs[i].emplace(Sub {.cb = std::forward(fn), - .interval = pdMS_TO_TICKS(ms), - .last = xTaskGetTickCount(), - .mode = mode, - .cnt = 0}); - sub_cnt.fetch_add(1, std::memory_order_acq_rel); + subs[i].emplace(); + Sub& s = *subs[i]; + s.cb.set(std::move(fn)); + s.interval = pdMS_TO_TICKS(ms); + s.last = xTaskGetTickCount(); + s.mode = mode; + s.cnt = 0; + subCount.fetch_add(1, std::memory_order_acq_rel); + portEXIT_CRITICAL(&mux); return Handle(i); } - return Handle(MaxSubs); + portEXIT_CRITICAL(&mux); + return Handle(NO_EX); } template - static Handle subscribe(C&& fn) { - using F = std::decay_t; - if constexpr (std::is_invocable_v) - return subscribe(0, EmitMode::Latest, [f = std::forward(fn)](const Msg* p, size_t n) { - for (size_t i = 0; i < n; ++i) f(p[i]); - }); + static Handle subscribe(C fn) { + if constexpr (std::is_invocable_v) + return subscribe(0, EmitMode::Latest, std::move(fn)); else - return subscribe(0, EmitMode::Latest, std::forward(fn)); + return subscribe(0, EmitMode::Latest, [fn = std::move(fn)](const Msg* p, size_t n) { + for (size_t i = 0; i < n; ++i) fn(p[i]); + }); } template - static Handle subscribe(uint32_t ms, C&& fn) { - using F = std::decay_t; - if constexpr (std::is_invocable_v) - return subscribe(ms, EmitMode::Batch, [f = std::forward(fn)](const Msg* p, size_t n) { - for (size_t i = 0; i < n; ++i) f(p[i]); - }); + static Handle subscribe(uint32_t ms, C fn) { + if constexpr (std::is_invocable_v) + return subscribe(ms, EmitMode::Batch, std::move(fn)); else - return subscribe(ms, EmitMode::Batch, std::forward(fn)); + return subscribe(ms, EmitMode::Batch, [fn = std::move(fn)](const Msg* p, size_t n) { + for (size_t i = 0; i < n; ++i) fn(p[i]); + }); } static void publish(const Msg& m) { store(m); - push(m); + push(m, NO_EX, portMAX_DELAY); } - - static void publishAsync(const Msg& m, const Handle& ex) { - store(m); - push(m, ex.valid() ? ex.index : NO_EX); - } - - static void publish(const Msg& m, const Handle& ex) { - if (ex.valid()) - dispatch(m, &*subs[ex.index]); + static void publish(const Msg& m, const Handle& h) { + if (h.valid()) + dispatch(m, h.idx); else publish(m); } + static bool publishAsync(const Msg& m) { + store(m); + return push(m); + } + static bool publishAsync(const Msg& m, const Handle& h) { + if (h.valid()) dispatch(m, h.idx); + return publishAsync(m); + } + static void publishISR(const Msg& m, BaseType_t* hpw = nullptr) { storeISR(m); Item it {m, NO_EX}; - xQueueSendFromISR(q_handle, &it, hpw); + xQueueSendFromISR(queue, &it, hpw); } static bool peek(Msg& out) { - if (!has_latest) return false; + if (!hasLatest.load(std::memory_order_acquire)) return false; portENTER_CRITICAL(&mux); out = latest; portEXIT_CRITICAL(&mux); return true; } - static bool take(Msg& out) { - if (!has_latest) return false; + if (!hasLatest.load(std::memory_order_acquire)) return false; portENTER_CRITICAL(&mux); out = latest; - has_latest = false; + hasLatest.store(false, std::memory_order_release); portEXIT_CRITICAL(&mux); return true; } - - static bool hasSubscribers() { return sub_cnt.load(std::memory_order_acquire) != 0; } + static bool hasSubscribers() { return subCount.load(std::memory_order_acquire) > 0; } }; \ No newline at end of file