Heap event bus

This commit is contained in:
Rune Harlyk
2026-02-01 01:09:38 +01:00
parent 0c0ef0ac40
commit 18ec2e618e
3 changed files with 168 additions and 96 deletions
+8 -4
View File
@@ -64,6 +64,8 @@ class CommAdapterBase {
if (clientId < 0 && !hasSubscribers(tag)) return;
xSemaphoreTake(mutex_, portMAX_DELAY);
msg_.which_message = tag;
MessageTraits<T>::assign(msg_, data);
@@ -77,18 +79,22 @@ class CommAdapterBase {
pb_ostream_t stream = pb_ostream_from_buffer(buffer, out_size);
if (!pb_encode(&stream, socket_message_Message_fields, &msg_)) {
ESP_LOGE("ProtoComm", "Failed to encode message (tag %d), buffer too small?", (int)tag);
xSemaphoreGive(mutex_);
if (pb_heap_enc_buf != buffer) free(buffer);
return;
}
if (clientId >= 0) {
send(buffer, stream.bytes_written, clientId);
} else {
sendToSubscribers(tag, buffer, stream.bytes_written);
sendToSubscribersLocked(tag, buffer, stream.bytes_written);
}
if (pb_heap_enc_buf != buffer) {
free(buffer);
}
xSemaphoreGive(mutex_);
}
protected:
@@ -151,11 +157,9 @@ class CommAdapterBase {
std::vector<std::unique_ptr<EventBusHandleBase>> eventBusHandles_;
private:
void sendToSubscribers(int32_t tag, const uint8_t* data, size_t len) {
xSemaphoreTake(mutex_, portMAX_DELAY);
void sendToSubscribersLocked(int32_t tag, const uint8_t* data, size_t len) {
for (int cid : client_subscriptions_[tag]) {
send(data, len, cid);
}
xSemaphoreGive(mutex_);
}
};
+25 -7
View File
@@ -36,10 +36,28 @@ REGISTER_SETTINGS_TYPE(api_PeripheralSettings, EventType::PERIPHERAL_SETTINGS)
REGISTER_SETTINGS_TYPE(api_ServoSettings, EventType::SERVO_SETTINGS)
REGISTER_SETTINGS_TYPE(api_CameraSettings, EventType::CAMERA_SETTINGS)
REGISTER_EVENT_TYPE(socket_message_IMUData, EventType::IMU_DATA)
REGISTER_EVENT_TYPE(socket_message_ControllerData, EventType::MOTION_COMMAND)
REGISTER_EVENT_TYPE(socket_message_ModeData, EventType::MOTION_MODE)
REGISTER_EVENT_TYPE(socket_message_AnglesData, EventType::MOTION_ANGLES)
REGISTER_EVENT_TYPE(socket_message_WalkGaitData, EventType::MOTION_WALK_GAIT)
REGISTER_EVENT_TYPE(socket_message_ServoStateData, EventType::SERVO_STATE)
REGISTER_EVENT_TYPE(socket_message_ServoPWMData, EventType::SERVO_PWM)
#define REGISTER_COMMAND_TYPE(MsgType, EventTypeValue) \
REGISTER_EVENT_TYPE(MsgType, EventTypeValue) \
template <> \
struct EventBusConfig<MsgType> { \
static constexpr size_t QueueDepth = 1; \
static constexpr size_t MaxSubs = 3; \
static constexpr size_t BatchSize = 1; \
};
#define REGISTER_STREAM_TYPE(MsgType, EventTypeValue) \
REGISTER_EVENT_TYPE(MsgType, EventTypeValue) \
template <> \
struct EventBusConfig<MsgType> { \
static constexpr size_t QueueDepth = 4; \
static constexpr size_t MaxSubs = 3; \
static constexpr size_t BatchSize = 4; \
};
REGISTER_STREAM_TYPE(socket_message_IMUData, EventType::IMU_DATA)
REGISTER_COMMAND_TYPE(socket_message_ControllerData, EventType::MOTION_COMMAND)
REGISTER_COMMAND_TYPE(socket_message_ModeData, EventType::MOTION_MODE)
REGISTER_COMMAND_TYPE(socket_message_AnglesData, EventType::MOTION_ANGLES)
REGISTER_COMMAND_TYPE(socket_message_WalkGaitData, EventType::MOTION_WALK_GAIT)
REGISTER_COMMAND_TYPE(socket_message_ServoStateData, EventType::SERVO_STATE)
REGISTER_COMMAND_TYPE(socket_message_ServoPWMData, EventType::SERVO_PWM)
+135 -85
View File
@@ -10,6 +10,7 @@
#include <freertos/FreeRTOS.h>
#include <freertos/task.h>
#include <freertos/queue.h>
#include <esp_heap_caps.h>
template <typename Sig, size_t MaxSize>
class FixedFn;
@@ -65,82 +66,121 @@ class TypedEventBus {
TickType_t interval;
TickType_t last;
EmitMode mode;
std::array<Msg, BatchSize> buf;
Msg* buf;
size_t cnt;
std::atomic<bool> enabled;
std::atomic<uint32_t> running;
Sub() : buf(nullptr), cnt(0), enabled(false), running(0) {
buf = static_cast<Msg*>(heap_caps_malloc(BatchSize * sizeof(Msg), MALLOC_CAP_SPIRAM | MALLOC_CAP_8BIT));
if (!buf) buf = static_cast<Msg*>(malloc(BatchSize * sizeof(Msg)));
}
~Sub() {
if (buf) free(buf);
}
Sub(const Sub&) = delete;
Sub& operator=(const Sub&) = delete;
};
inline static StaticQueue_t qbuf;
inline static Item qStorage[QueueDepth];
inline static QueueHandle_t queue =
xQueueCreateStatic(QueueDepth, sizeof(Item), reinterpret_cast<uint8_t*>(qStorage), &qbuf);
inline static std::array<std::optional<Sub>, MaxSubs> subs {};
inline static portMUX_TYPE mux = portMUX_INITIALIZER_UNLOCKED;
inline static Msg latest {};
inline static std::atomic<bool> hasLatest {false};
inline static std::atomic<size_t> subCount {0};
inline static std::atomic<bool> taskStarted {false};
struct BusState {
QueueHandle_t queue;
Sub* subs[MaxSubs];
portMUX_TYPE mux;
Msg latest;
std::atomic<bool> hasLatest;
std::atomic<size_t> subCount;
std::atomic<bool> taskStarted;
BusState()
: queue(nullptr),
mux(portMUX_INITIALIZER_UNLOCKED),
latest {},
hasLatest(false),
subCount(0),
taskStarted(false) {
for (size_t i = 0; i < MaxSubs; ++i) subs[i] = nullptr;
}
};
static BusState& state() {
static BusState s;
return s;
}
static void ensureQueue() {
auto& s = state();
if (s.queue) return;
portENTER_CRITICAL(&s.mux);
if (!s.queue) {
s.queue = xQueueCreate(QueueDepth, sizeof(Item));
}
portEXIT_CRITICAL(&s.mux);
}
static void storeISR(const Msg& m) {
UBaseType_t s = portSET_INTERRUPT_MASK_FROM_ISR();
latest = m;
hasLatest.store(true, std::memory_order_release);
portCLEAR_INTERRUPT_MASK_FROM_ISR(s);
auto& s = state();
UBaseType_t saved = portSET_INTERRUPT_MASK_FROM_ISR();
s.latest = m;
s.hasLatest.store(true, std::memory_order_release);
portCLEAR_INTERRUPT_MASK_FROM_ISR(saved);
}
static void dispatch(const Msg& m, size_t ex) {
auto& s = state();
TickType_t now = xTaskGetTickCount();
Sub* ready[MaxSubs];
size_t readyCnt = 0;
portENTER_CRITICAL(&mux);
portENTER_CRITICAL(&s.mux);
for (size_t i = 0; i < MaxSubs; ++i) {
auto& opt = subs[i];
if (!opt || i == ex) continue;
Sub& s = *opt;
if (!s.enabled.load(std::memory_order_acquire)) continue;
Sub* sub = s.subs[i];
if (!sub || i == ex) continue;
if (!sub->enabled.load(std::memory_order_acquire)) continue;
TickType_t dt = now - s.last;
TickType_t dt = now - sub->last;
if (s.interval && dt < s.interval) {
if (s.mode == EmitMode::Batch) {
if (s.cnt < BatchSize)
s.buf[s.cnt++] = m;
if (sub->interval && dt < sub->interval) {
if (sub->mode == EmitMode::Batch) {
if (sub->cnt < BatchSize)
sub->buf[sub->cnt++] = m;
else
s.buf[BatchSize - 1] = m;
sub->buf[BatchSize - 1] = m;
} else {
s.buf[0] = m;
s.cnt = 1;
sub->buf[0] = m;
sub->cnt = 1;
}
} else {
if (s.cnt < BatchSize)
s.buf[s.cnt++] = m;
if (sub->cnt < BatchSize)
sub->buf[sub->cnt++] = m;
else
s.buf[BatchSize - 1] = m;
s.last = now;
s.running.fetch_add(1, std::memory_order_acq_rel);
ready[readyCnt++] = &s;
sub->buf[BatchSize - 1] = m;
sub->last = now;
sub->running.fetch_add(1, std::memory_order_acq_rel);
ready[readyCnt++] = sub;
}
}
portEXIT_CRITICAL(&mux);
portEXIT_CRITICAL(&s.mux);
for (size_t i = 0; i < readyCnt; ++i) {
Sub* s = ready[i];
s->cb(s->buf.data(), s->cnt);
s->cnt = 0;
s->running.fetch_sub(1, std::memory_order_acq_rel);
Sub* sub = ready[i];
sub->cb(sub->buf, sub->cnt);
sub->cnt = 0;
sub->running.fetch_sub(1, std::memory_order_acq_rel);
}
}
static void worker(void*) {
auto& s = state();
Item it;
while (xQueueReceive(queue, &it, portMAX_DELAY) == pdTRUE) dispatch(it.payload, it.exclude);
while (xQueueReceive(s.queue, &it, portMAX_DELAY) == pdTRUE) dispatch(it.payload, it.exclude);
}
static void ensureTask() {
if (!taskStarted.load(std::memory_order_acquire)) {
auto& s = state();
if (!s.taskStarted.load(std::memory_order_acquire)) {
bool expected = false;
if (taskStarted.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
if (s.taskStarted.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
ensureQueue();
xTaskCreatePinnedToCore(worker, "evtbus", 4096, nullptr, 6, nullptr, 1);
}
}
@@ -148,8 +188,9 @@ class TypedEventBus {
static bool push(const Msg& m, size_t ex = NO_EX, TickType_t to = 0) {
ensureTask();
auto& s = state();
Item it {m, ex};
return xQueueSend(queue, &it, to) == pdTRUE;
return xQueueSend(s.queue, &it, to) == pdTRUE;
}
public:
@@ -174,19 +215,21 @@ class TypedEventBus {
~Handle() { unsubscribe(); }
void unsubscribe() {
if (idx < MaxSubs) {
Sub* s = nullptr;
portENTER_CRITICAL(&mux);
if (subs[idx]) {
s = &*subs[idx];
s->enabled.store(false, std::memory_order_release);
auto& s = state();
Sub* sub = nullptr;
portENTER_CRITICAL(&s.mux);
sub = s.subs[idx];
if (sub) {
sub->enabled.store(false, std::memory_order_release);
}
portEXIT_CRITICAL(&mux);
if (s) {
while (s->running.load(std::memory_order_acquire) != 0) taskYIELD();
portENTER_CRITICAL(&mux);
subs[idx].reset();
portEXIT_CRITICAL(&mux);
subCount.fetch_sub(1, std::memory_order_acq_rel);
portEXIT_CRITICAL(&s.mux);
if (sub) {
while (sub->running.load(std::memory_order_acquire) != 0) taskYIELD();
portENTER_CRITICAL(&s.mux);
s.subs[idx] = nullptr;
portEXIT_CRITICAL(&s.mux);
delete sub;
s.subCount.fetch_sub(1, std::memory_order_acq_rel);
}
idx = NO_EX;
}
@@ -195,10 +238,11 @@ class TypedEventBus {
};
static void store(const Msg& m) {
portENTER_CRITICAL(&mux);
latest = m;
hasLatest.store(true, std::memory_order_release);
portEXIT_CRITICAL(&mux);
auto& s = state();
portENTER_CRITICAL(&s.mux);
s.latest = m;
s.hasLatest.store(true, std::memory_order_release);
portEXIT_CRITICAL(&s.mux);
}
template <typename C>
@@ -210,23 +254,25 @@ class TypedEventBus {
template <typename C>
static Handle subscribe(uint32_t ms, EmitMode mode, C fn) {
ensureTask();
portENTER_CRITICAL(&mux);
for (size_t i = 0; i < MaxSubs; ++i)
if (!subs[i]) {
subs[i].emplace();
Sub& s = *subs[i];
s.cb.set(std::move(fn));
s.interval = pdMS_TO_TICKS(ms);
s.last = xTaskGetTickCount() - s.interval;
s.mode = mode;
s.cnt = 0;
s.enabled.store(true, std::memory_order_release);
s.running.store(0, std::memory_order_release);
subCount.fetch_add(1, std::memory_order_acq_rel);
portEXIT_CRITICAL(&mux);
auto& s = state();
portENTER_CRITICAL(&s.mux);
for (size_t i = 0; i < MaxSubs; ++i) {
if (!s.subs[i]) {
Sub* sub = new Sub();
sub->cb.set(std::move(fn));
sub->interval = pdMS_TO_TICKS(ms);
sub->last = xTaskGetTickCount() - sub->interval;
sub->mode = mode;
sub->cnt = 0;
sub->enabled.store(true, std::memory_order_release);
sub->running.store(0, std::memory_order_release);
s.subs[i] = sub;
s.subCount.fetch_add(1, std::memory_order_acq_rel);
portEXIT_CRITICAL(&s.mux);
return Handle(i);
}
portEXIT_CRITICAL(&mux);
}
portEXIT_CRITICAL(&s.mux);
return Handle(NO_EX);
}
@@ -261,27 +307,31 @@ class TypedEventBus {
}
static void publishISR(const Msg& m, BaseType_t* hpw = nullptr) {
auto& s = state();
storeISR(m);
ensureQueue();
Item it {m, NO_EX};
xQueueSendFromISR(queue, &it, hpw);
xQueueSendFromISR(s.queue, &it, hpw);
}
static bool peek(Msg& out) {
if (!hasLatest.load(std::memory_order_acquire)) return false;
portENTER_CRITICAL(&mux);
out = latest;
portEXIT_CRITICAL(&mux);
auto& s = state();
if (!s.hasLatest.load(std::memory_order_acquire)) return false;
portENTER_CRITICAL(&s.mux);
out = s.latest;
portEXIT_CRITICAL(&s.mux);
return true;
}
static bool take(Msg& out) {
if (!hasLatest.load(std::memory_order_acquire)) return false;
portENTER_CRITICAL(&mux);
out = latest;
hasLatest.store(false, std::memory_order_release);
portEXIT_CRITICAL(&mux);
auto& s = state();
if (!s.hasLatest.load(std::memory_order_acquire)) return false;
portENTER_CRITICAL(&s.mux);
out = s.latest;
s.hasLatest.store(false, std::memory_order_release);
portEXIT_CRITICAL(&s.mux);
return true;
}
static bool hasSubscribers() { return subCount.load(std::memory_order_acquire) > 0; }
static bool hasSubscribers() { return state().subCount.load(std::memory_order_acquire) > 0; }
};