⚡️Improve eventbus static alloc
This commit is contained in:
@@ -7,7 +7,7 @@
|
|||||||
#include "topic.hpp"
|
#include "topic.hpp"
|
||||||
|
|
||||||
#ifndef MAX_CID
|
#ifndef MAX_CID
|
||||||
#define MAX_CID 4
|
#define MAX_CID 64
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
enum class MsgKind : uint8_t { Connect = 0, Disconnect = 1, Event = 2, Ping = 3, Pong = 4 };
|
enum class MsgKind : uint8_t { Connect = 0, Disconnect = 1, Event = 2, Ping = 3, Pong = 4 };
|
||||||
|
|||||||
+148
-102
@@ -1,213 +1,259 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
#include <array>
|
#include <array>
|
||||||
#include <bitset>
|
|
||||||
#include <functional>
|
|
||||||
#include <optional>
|
#include <optional>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
#include <cstddef>
|
||||||
|
#include <cstring>
|
||||||
|
#include <type_traits>
|
||||||
|
#include <utility>
|
||||||
#include <freertos/FreeRTOS.h>
|
#include <freertos/FreeRTOS.h>
|
||||||
#include <freertos/task.h>
|
#include <freertos/task.h>
|
||||||
#include <freertos/queue.h>
|
#include <freertos/queue.h>
|
||||||
|
|
||||||
|
template <typename Sig, size_t MaxSize>
|
||||||
|
class FixedFn;
|
||||||
|
|
||||||
|
template <typename R, typename... A, size_t MaxSize>
|
||||||
|
class FixedFn<R(A...), MaxSize> {
|
||||||
|
alignas(void*) std::byte buf[MaxSize];
|
||||||
|
void (*call)(void*, A&&...) {};
|
||||||
|
void (*moveFn)(void*, void*) {};
|
||||||
|
void (*destroy)(void*) {};
|
||||||
|
|
||||||
|
public:
|
||||||
|
template <typename Fun>
|
||||||
|
void set(Fun&& f) {
|
||||||
|
static_assert(sizeof(Fun) <= MaxSize);
|
||||||
|
new (buf) Fun(std::forward<Fun>(f));
|
||||||
|
call = [](void* p, A&&... as) { (*reinterpret_cast<Fun*>(p))(std::forward<A>(as)...); };
|
||||||
|
moveFn = [](void* d, void* s) { new (d) Fun(std::move(*reinterpret_cast<Fun*>(s))); };
|
||||||
|
destroy = [](void* p) { reinterpret_cast<Fun*>(p)->~Fun(); };
|
||||||
|
}
|
||||||
|
R operator()(A... as) const {
|
||||||
|
return call(const_cast<void*>(static_cast<const void*>(buf)), std::forward<A>(as)...);
|
||||||
|
}
|
||||||
|
FixedFn() = default;
|
||||||
|
FixedFn(FixedFn&& o) {
|
||||||
|
if (o.moveFn) o.moveFn(buf, o.buf);
|
||||||
|
call = o.call;
|
||||||
|
moveFn = o.moveFn;
|
||||||
|
destroy = o.destroy;
|
||||||
|
o.destroy = nullptr;
|
||||||
|
}
|
||||||
|
FixedFn(const FixedFn& o) {
|
||||||
|
std::memcpy(buf, o.buf, MaxSize);
|
||||||
|
call = o.call;
|
||||||
|
moveFn = o.moveFn;
|
||||||
|
destroy = o.destroy;
|
||||||
|
}
|
||||||
|
~FixedFn() {
|
||||||
|
if (destroy) destroy(buf);
|
||||||
|
}
|
||||||
|
FixedFn& operator=(const FixedFn&) = delete;
|
||||||
|
FixedFn& operator=(FixedFn&&) = delete;
|
||||||
|
};
|
||||||
|
|
||||||
enum class EmitMode { Latest, Batch };
|
enum class EmitMode { Latest, Batch };
|
||||||
|
|
||||||
template <typename Msg, size_t QueueDepth = 64, size_t MaxSubs = 8, size_t BatchSize = 16>
|
template <typename Msg, size_t QueueDepth = 64, size_t MaxSubs = 8, size_t BatchSize = 16>
|
||||||
class EventBus {
|
class EventBus {
|
||||||
static_assert(BatchSize > 0);
|
|
||||||
|
|
||||||
struct Item {
|
struct Item {
|
||||||
Msg payload;
|
Msg payload;
|
||||||
uint8_t exclude; // 0-MaxSubs-1 or 0xFF for “none”
|
size_t exclude;
|
||||||
};
|
};
|
||||||
|
static constexpr size_t NO_EX = MaxSubs;
|
||||||
using ExIdx = uint8_t;
|
|
||||||
|
|
||||||
static constexpr ExIdx NO_EX = 0xFF;
|
|
||||||
|
|
||||||
struct Sub {
|
struct Sub {
|
||||||
std::function<void(const Msg*, size_t)> cb;
|
FixedFn<void(const Msg*, size_t), 48> cb;
|
||||||
TickType_t interval;
|
TickType_t interval;
|
||||||
TickType_t last;
|
TickType_t last;
|
||||||
EmitMode mode;
|
EmitMode mode;
|
||||||
std::array<Msg, BatchSize> buf;
|
std::array<Msg, BatchSize> buf;
|
||||||
size_t cnt;
|
size_t cnt;
|
||||||
};
|
};
|
||||||
|
inline static StaticQueue_t qbuf;
|
||||||
inline static StaticQueue_t q_buf {};
|
inline static Item qStorage[QueueDepth];
|
||||||
inline static Item q_storage[QueueDepth];
|
inline static QueueHandle_t queue =
|
||||||
inline static QueueHandle_t q_handle =
|
xQueueCreateStatic(QueueDepth, sizeof(Item), reinterpret_cast<uint8_t*>(qStorage), &qbuf);
|
||||||
xQueueCreateStatic(QueueDepth, sizeof(Item), reinterpret_cast<uint8_t*>(q_storage), &q_buf);
|
|
||||||
|
|
||||||
inline static std::array<std::optional<Sub>, MaxSubs> subs {};
|
inline static std::array<std::optional<Sub>, MaxSubs> subs {};
|
||||||
inline static portMUX_TYPE mux = portMUX_INITIALIZER_UNLOCKED;
|
inline static portMUX_TYPE mux = portMUX_INITIALIZER_UNLOCKED;
|
||||||
inline static Msg latest {};
|
inline static Msg latest {};
|
||||||
inline static volatile bool has_latest = false;
|
inline static std::atomic<bool> hasLatest {false};
|
||||||
inline static std::atomic<size_t> sub_cnt {0};
|
inline static std::atomic<size_t> subCount {0};
|
||||||
|
|
||||||
static void store(const Msg& m) {
|
static void store(const Msg& m) {
|
||||||
portENTER_CRITICAL(&mux);
|
portENTER_CRITICAL(&mux);
|
||||||
latest = m;
|
latest = m;
|
||||||
has_latest = true;
|
hasLatest.store(true, std::memory_order_release);
|
||||||
portEXIT_CRITICAL(&mux);
|
portEXIT_CRITICAL(&mux);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void storeISR(const Msg& m) {
|
static void storeISR(const Msg& m) {
|
||||||
UBaseType_t s = portSET_INTERRUPT_MASK_FROM_ISR();
|
UBaseType_t s = portSET_INTERRUPT_MASK_FROM_ISR();
|
||||||
latest = m;
|
latest = m;
|
||||||
has_latest = true;
|
hasLatest.store(true, std::memory_order_release);
|
||||||
portCLEAR_INTERRUPT_MASK_FROM_ISR(s);
|
portCLEAR_INTERRUPT_MASK_FROM_ISR(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dispatch(const Msg& m, Sub* ex) {
|
static void dispatch(const Msg& m, size_t ex) {
|
||||||
store(m);
|
|
||||||
TickType_t now = xTaskGetTickCount();
|
TickType_t now = xTaskGetTickCount();
|
||||||
for (auto& sref : subs) {
|
Sub* ready[MaxSubs];
|
||||||
if (!sref) continue;
|
size_t readyCnt = 0;
|
||||||
auto& sub = *sref;
|
|
||||||
if (&sub == ex) continue;
|
portENTER_CRITICAL(&mux);
|
||||||
TickType_t dt = now - sub.last;
|
for (size_t i = 0; i < MaxSubs; ++i) {
|
||||||
if (sub.interval && dt < sub.interval) {
|
auto& opt = subs[i];
|
||||||
if (sub.mode == EmitMode::Batch && sub.cnt < BatchSize)
|
if (!opt || i == ex) continue;
|
||||||
sub.buf[sub.cnt++] = m;
|
Sub& s = *opt;
|
||||||
else if (sub.mode == EmitMode::Latest)
|
TickType_t dt = now - s.last;
|
||||||
sub.buf[0] = m, sub.cnt = 1;
|
|
||||||
|
if (s.interval && dt < s.interval) {
|
||||||
|
if (s.mode == EmitMode::Batch && s.cnt < BatchSize)
|
||||||
|
s.buf[s.cnt++] = m;
|
||||||
|
else if (s.mode == EmitMode::Latest) {
|
||||||
|
s.buf[0] = m;
|
||||||
|
s.cnt = 1;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if (sub.cnt == 0)
|
s.buf[s.cnt++] = m;
|
||||||
sub.cb(&m, 1);
|
s.last = now;
|
||||||
else {
|
ready[readyCnt++] = &s;
|
||||||
sub.cb(sub.buf.data(), sub.cnt);
|
|
||||||
sub.cnt = 0;
|
|
||||||
}
|
}
|
||||||
sub.last = now;
|
|
||||||
}
|
}
|
||||||
|
portEXIT_CRITICAL(&mux);
|
||||||
|
|
||||||
|
for (size_t i = 0; i < readyCnt; ++i) {
|
||||||
|
Sub* s = ready[i];
|
||||||
|
s->cb(s->buf.data(), s->cnt);
|
||||||
|
s->cnt = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void worker(void*) {
|
static void worker(void*) {
|
||||||
Item it;
|
Item it;
|
||||||
for (;;)
|
while (xQueueReceive(queue, &it, portMAX_DELAY) == pdTRUE) dispatch(it.payload, it.exclude);
|
||||||
if (xQueueReceive(q_handle, &it, portMAX_DELAY) == pdTRUE)
|
|
||||||
dispatch(it.payload, it.exclude == NO_EX ? nullptr : &*subs[it.exclude]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void ensureTask() {
|
static void ensureTask() {
|
||||||
static bool once =
|
static bool once = (xTaskCreatePinnedToCore(worker, "evtbus", 4096, nullptr, 6, nullptr, 1), true);
|
||||||
(xTaskCreatePinnedToCore(worker, "eventbus", 4096, nullptr, 6, nullptr, tskNO_AFFINITY), true);
|
|
||||||
(void)once;
|
(void)once;
|
||||||
}
|
}
|
||||||
|
static bool push(const Msg& m, size_t ex = NO_EX, TickType_t to = 0) {
|
||||||
static void push(const Msg& m, uint8_t ex = NO_EX) {
|
ensureTask();
|
||||||
Item it {m, ex};
|
Item it {m, ex};
|
||||||
xQueueSend(q_handle, &it, portMAX_DELAY);
|
return xQueueSend(queue, &it, to) == pdTRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
class Handle {
|
class Handle {
|
||||||
size_t index {MaxSubs};
|
size_t idx {NO_EX};
|
||||||
friend class EventBus;
|
friend class EventBus;
|
||||||
explicit Handle(size_t i) : index(i) {}
|
explicit Handle(size_t i) : idx(i) {}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
Handle() = default;
|
Handle() = default;
|
||||||
Handle(const Handle&) = delete;
|
Handle(const Handle&) = delete;
|
||||||
Handle& operator=(const Handle&) = delete;
|
Handle& operator=(const Handle&) = delete;
|
||||||
Handle(Handle&& h) noexcept : index(h.index) { h.index = MaxSubs; }
|
Handle(Handle&& o) noexcept : idx(o.idx) { o.idx = NO_EX; }
|
||||||
Handle& operator=(Handle&& h) noexcept {
|
Handle& operator=(Handle&& o) noexcept {
|
||||||
if (this != &h) {
|
if (this != &o) {
|
||||||
unsubscribe();
|
unsubscribe();
|
||||||
index = h.index;
|
idx = o.idx;
|
||||||
h.index = MaxSubs;
|
o.idx = NO_EX;
|
||||||
}
|
}
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
~Handle() = default;
|
~Handle() { unsubscribe(); }
|
||||||
void unsubscribe() {
|
void unsubscribe() {
|
||||||
if (index < MaxSubs) {
|
if (idx < MaxSubs) {
|
||||||
subs[index].reset();
|
portENTER_CRITICAL(&mux);
|
||||||
sub_cnt.fetch_sub(1, std::memory_order_acq_rel);
|
subs[idx].reset();
|
||||||
index = MaxSubs;
|
portEXIT_CRITICAL(&mux);
|
||||||
|
subCount.fetch_sub(1, std::memory_order_acq_rel);
|
||||||
|
idx = NO_EX;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bool valid() const { return index < MaxSubs; }
|
bool valid() const { return idx < MaxSubs; }
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename C>
|
template <typename C>
|
||||||
static Handle subscribe(uint32_t ms, EmitMode mode, C&& fn) {
|
static Handle subscribe(uint32_t ms, EmitMode mode, C fn) {
|
||||||
ensureTask();
|
ensureTask();
|
||||||
|
portENTER_CRITICAL(&mux);
|
||||||
for (size_t i = 0; i < MaxSubs; ++i)
|
for (size_t i = 0; i < MaxSubs; ++i)
|
||||||
if (!subs[i]) {
|
if (!subs[i]) {
|
||||||
subs[i].emplace(Sub {.cb = std::forward<C>(fn),
|
subs[i].emplace();
|
||||||
.interval = pdMS_TO_TICKS(ms),
|
Sub& s = *subs[i];
|
||||||
.last = xTaskGetTickCount(),
|
s.cb.set(std::move(fn));
|
||||||
.mode = mode,
|
s.interval = pdMS_TO_TICKS(ms);
|
||||||
.cnt = 0});
|
s.last = xTaskGetTickCount();
|
||||||
sub_cnt.fetch_add(1, std::memory_order_acq_rel);
|
s.mode = mode;
|
||||||
|
s.cnt = 0;
|
||||||
|
subCount.fetch_add(1, std::memory_order_acq_rel);
|
||||||
|
portEXIT_CRITICAL(&mux);
|
||||||
return Handle(i);
|
return Handle(i);
|
||||||
}
|
}
|
||||||
return Handle(MaxSubs);
|
portEXIT_CRITICAL(&mux);
|
||||||
|
return Handle(NO_EX);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename C>
|
template <typename C>
|
||||||
static Handle subscribe(C&& fn) {
|
static Handle subscribe(C fn) {
|
||||||
using F = std::decay_t<C>;
|
if constexpr (std::is_invocable_v<C, const Msg*, size_t>)
|
||||||
if constexpr (std::is_invocable_v<F, const Msg&>)
|
return subscribe(0, EmitMode::Latest, std::move(fn));
|
||||||
return subscribe(0, EmitMode::Latest, [f = std::forward<C>(fn)](const Msg* p, size_t n) {
|
|
||||||
for (size_t i = 0; i < n; ++i) f(p[i]);
|
|
||||||
});
|
|
||||||
else
|
else
|
||||||
return subscribe(0, EmitMode::Latest, std::forward<C>(fn));
|
return subscribe(0, EmitMode::Latest, [fn = std::move(fn)](const Msg* p, size_t n) {
|
||||||
|
for (size_t i = 0; i < n; ++i) fn(p[i]);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename C>
|
template <typename C>
|
||||||
static Handle subscribe(uint32_t ms, C&& fn) {
|
static Handle subscribe(uint32_t ms, C fn) {
|
||||||
using F = std::decay_t<C>;
|
if constexpr (std::is_invocable_v<C, const Msg*, size_t>)
|
||||||
if constexpr (std::is_invocable_v<F, const Msg&>)
|
return subscribe(ms, EmitMode::Batch, std::move(fn));
|
||||||
return subscribe(ms, EmitMode::Batch, [f = std::forward<C>(fn)](const Msg* p, size_t n) {
|
|
||||||
for (size_t i = 0; i < n; ++i) f(p[i]);
|
|
||||||
});
|
|
||||||
else
|
else
|
||||||
return subscribe(ms, EmitMode::Batch, std::forward<C>(fn));
|
return subscribe(ms, EmitMode::Batch, [fn = std::move(fn)](const Msg* p, size_t n) {
|
||||||
|
for (size_t i = 0; i < n; ++i) fn(p[i]);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
static void publish(const Msg& m) {
|
static void publish(const Msg& m) {
|
||||||
store(m);
|
store(m);
|
||||||
push(m);
|
push(m, NO_EX, portMAX_DELAY);
|
||||||
}
|
}
|
||||||
|
static void publish(const Msg& m, const Handle& h) {
|
||||||
static void publishAsync(const Msg& m, const Handle& ex) {
|
if (h.valid())
|
||||||
store(m);
|
dispatch(m, h.idx);
|
||||||
push(m, ex.valid() ? ex.index : NO_EX);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void publish(const Msg& m, const Handle& ex) {
|
|
||||||
if (ex.valid())
|
|
||||||
dispatch(m, &*subs[ex.index]);
|
|
||||||
else
|
else
|
||||||
publish(m);
|
publish(m);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool publishAsync(const Msg& m) {
|
||||||
|
store(m);
|
||||||
|
return push(m);
|
||||||
|
}
|
||||||
|
static bool publishAsync(const Msg& m, const Handle& h) {
|
||||||
|
if (h.valid()) dispatch(m, h.idx);
|
||||||
|
return publishAsync(m);
|
||||||
|
}
|
||||||
|
|
||||||
static void publishISR(const Msg& m, BaseType_t* hpw = nullptr) {
|
static void publishISR(const Msg& m, BaseType_t* hpw = nullptr) {
|
||||||
storeISR(m);
|
storeISR(m);
|
||||||
Item it {m, NO_EX};
|
Item it {m, NO_EX};
|
||||||
xQueueSendFromISR(q_handle, &it, hpw);
|
xQueueSendFromISR(queue, &it, hpw);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool peek(Msg& out) {
|
static bool peek(Msg& out) {
|
||||||
if (!has_latest) return false;
|
if (!hasLatest.load(std::memory_order_acquire)) return false;
|
||||||
portENTER_CRITICAL(&mux);
|
portENTER_CRITICAL(&mux);
|
||||||
out = latest;
|
out = latest;
|
||||||
portEXIT_CRITICAL(&mux);
|
portEXIT_CRITICAL(&mux);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool take(Msg& out) {
|
static bool take(Msg& out) {
|
||||||
if (!has_latest) return false;
|
if (!hasLatest.load(std::memory_order_acquire)) return false;
|
||||||
portENTER_CRITICAL(&mux);
|
portENTER_CRITICAL(&mux);
|
||||||
out = latest;
|
out = latest;
|
||||||
has_latest = false;
|
hasLatest.store(false, std::memory_order_release);
|
||||||
portEXIT_CRITICAL(&mux);
|
portEXIT_CRITICAL(&mux);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
static bool hasSubscribers() { return subCount.load(std::memory_order_acquire) > 0; }
|
||||||
static bool hasSubscribers() { return sub_cnt.load(std::memory_order_acquire) != 0; }
|
|
||||||
};
|
};
|
||||||
Reference in New Issue
Block a user