Moves rest of events to event bus

This commit is contained in:
Rune Harlyk
2026-02-01 00:44:24 +01:00
parent 57e0aac2aa
commit 0c0ef0ac40
9 changed files with 110 additions and 30 deletions
+36 -2
View File
@@ -6,8 +6,11 @@
#include <functional>
#include <list>
#include <map>
#include <memory>
#include <vector>
#include <type_traits>
#include <communication/proto_helpers.h>
#include <event_bus/event_bus.h>
class CommAdapterBase {
public:
@@ -35,6 +38,26 @@ class CommAdapterBase {
decoder_.on<T>(handler);
}
template <typename T>
void onPublish(std::function<void(const T&, int)> handler = nullptr) {
decoder_.on<T>([this, handler](const T& data, int clientId) {
EventBus::publish(data);
if (handler) handler(data, clientId);
});
}
template <typename T>
void bridgeFromEventBus() {
eventBusHandles_.push_back(
std::make_unique<EventBusHandleStorage<T>>(EventBus::subscribe<T>([this](const T& data) { emit(data); })));
}
template <typename T>
void bridgeFromEventBus(uint32_t intervalMs) {
eventBusHandles_.push_back(std::make_unique<EventBusHandleStorage<T>>(
EventBus::subscribe<T>(intervalMs, [this](const T& data) { emit(data); })));
}
template <typename T>
void emit(const T& data, int clientId = -1) {
constexpr pb_size_t tag = MessageTraits<T>::tag;
@@ -47,8 +70,7 @@ class CommAdapterBase {
size_t out_size;
pb_get_encoded_size(&out_size, socket_message_Message_fields, &msg_);
uint8_t* buffer = pb_heap_enc_buf;
if (out_size > sizeof(pb_heap_enc_buf)) { // If the encoded size exceeds our buffer size, we needs to malloc a
// buffer of a proper size
if (out_size > sizeof(pb_heap_enc_buf)) {
buffer = (uint8_t*)malloc(out_size);
}
@@ -116,6 +138,18 @@ class CommAdapterBase {
socket_message_Message msg_ = socket_message_Message_init_zero;
uint8_t pb_heap_enc_buf[PROTO_BUFFER_SIZE];
struct EventBusHandleBase {
virtual ~EventBusHandleBase() = default;
};
template <typename T>
struct EventBusHandleStorage : EventBusHandleBase {
EventBus::Handle<T> handle;
EventBusHandleStorage(EventBus::Handle<T>&& h) : handle(std::move(h)) {}
};
std::vector<std::unique_ptr<EventBusHandleBase>> eventBusHandles_;
private:
void sendToSubscribers(int32_t tag, const uint8_t* data, size_t len) {
xSemaphoreTake(mutex_, portMAX_DELAY);