🤹 Update adapters with better handling

This commit is contained in:
Rune Harlyk
2025-07-08 22:28:46 +02:00
parent 6769ffeb20
commit 481dfaf8e5
3 changed files with 33 additions and 21 deletions
+26 -9
View File
@@ -18,9 +18,14 @@ class CommBase {
std::array<Bits, NTopics> subs_; std::array<Bits, NTopics> subs_;
portMUX_TYPE mux_ portMUX_INITIALIZER_UNLOCKED; portMUX_TYPE mux_ portMUX_INITIALIZER_UNLOCKED;
std::array<void*, static_cast<size_t>(Topic::COUNT)> subscriptionHandle {}; std::array<void*, NTopics> subscriptionHandle {};
static constexpr size_t idx(Topic t) { return static_cast<size_t>(t); } static constexpr size_t invalid = SIZE_MAX;
static constexpr size_t idx(Topic t) {
size_t i = static_cast<size_t>(t);
return i < NTopics ? i : invalid;
}
template <Topic T> template <Topic T>
void encode(JsonDocument& d, const typename TopicTraits<T>::Msg& m) { void encode(JsonDocument& d, const typename TopicTraits<T>::Msg& m) {
@@ -36,33 +41,45 @@ class CommBase {
template <class Msg> template <class Msg>
auto& getHandle(Topic topic) { auto& getHandle(Topic topic) {
using H = typename EventBus<Msg>::Handle; using H = typename EventBus<Msg>::Handle;
return *static_cast<H*>(subscriptionHandle[static_cast<size_t>(topic)]); static H dummy;
auto* p = static_cast<H*>(subscriptionHandle[size_t(topic)]);
return p ? *p : dummy;
} }
template <class Msg> template <class Msg>
void setHandle(Topic topic, typename EventBus<Msg>::Handle&& h) { void setHandle(Topic topic, typename EventBus<Msg>::Handle&& h) {
using H = typename EventBus<Msg>::Handle; using H = typename EventBus<Msg>::Handle;
subscriptionHandle[static_cast<size_t>(topic)] = new H(std::move(h)); subscriptionHandle[size_t(topic)] = new H(std::move(h));
} }
public: public:
void subscribe(Topic t, size_t cid) { void subscribe(Topic t, size_t cid) {
size_t i = idx(t);
if (i == invalid) return;
portENTER_CRITICAL(&mux_); portENTER_CRITICAL(&mux_);
subs_[idx(t)].set(cid); subs_[i].set(cid);
portEXIT_CRITICAL(&mux_); portEXIT_CRITICAL(&mux_);
} }
void unsubscribe(Topic t, size_t cid) { void unsubscribe(Topic t, size_t cid) {
size_t i = idx(t);
if (i == invalid) return;
portENTER_CRITICAL(&mux_); portENTER_CRITICAL(&mux_);
subs_[idx(t)].reset(cid); subs_[i].reset(cid);
portEXIT_CRITICAL(&mux_); portEXIT_CRITICAL(&mux_);
} }
bool has(Topic t) const { return subs_[idx(t)].any(); } bool has(Topic t) const {
size_t i = idx(t);
return i == invalid ? false : subs_[i].any();
}
template <Topic T> template <Topic T>
void emit(const typename TopicTraits<T>::Msg& m) { void emit(const typename TopicTraits<T>::Msg& m) {
if (!has(T)) return; constexpr size_t i = idx(T);
if (i == invalid) return;
if (!subs_[i].any()) return;
JsonDocument doc; JsonDocument doc;
encode<T>(doc, m); encode<T>(doc, m);
String out; String out;
@@ -71,7 +88,7 @@ class CommBase {
#else #else
serializeJson(doc, out); serializeJson(doc, out);
#endif #endif
auto& b = subs_[idx(T)]; auto& b = subs_[i];
for (size_t cid = 0; cid < MaxCid; ++cid) for (size_t cid = 0; cid < MaxCid; ++cid)
if (b.test(cid)) send(cid, out.c_str(), out.length()); if (b.test(cid)) send(cid, out.c_str(), out.length());
} }
-9
View File
@@ -10,12 +10,6 @@
#include "event_bus.hpp" #include "event_bus.hpp"
#include "adapters/comm_base.hpp" #include "adapters/comm_base.hpp"
#include "topic.hpp" #include "topic.hpp"
// #include "msgs/motion_input_msg.hpp"
// #include "msgs/motion_angles_msg.hpp"
// #include "msgs/motion_position_msg.hpp"
// #include "msgs/motion_mode_msg.hpp"
// typedef std::function<void(JsonObject &root, int originId)> EventCallback;
class EventSocket : public CommBase<> { class EventSocket : public CommBase<> {
PsychicWebSocketHandler _socket; PsychicWebSocketHandler _socket;
@@ -28,9 +22,6 @@ class EventSocket : public CommBase<> {
void send(size_t clientId, const char *data, size_t len) override; void send(size_t clientId, const char *data, size_t len) override;
void handleReceive(const std::string &data); void handleReceive(const std::string &data);
// void handleTypedMessage(const std::string &data);
// void handleLegacyMessage(const std::string &data, int originId);
// void handleEventCallbacks(String event, JsonObject &jsonObject, int originId);
void onWSOpen(PsychicWebSocketClient *client); void onWSOpen(PsychicWebSocketClient *client);
void onWSClose(PsychicWebSocketClient *client); void onWSClose(PsychicWebSocketClient *client);
+7 -3
View File
@@ -4,6 +4,13 @@ EventSocket::EventSocket() {
_socket.onOpen((std::bind(&EventSocket::onWSOpen, this, std::placeholders::_1))); _socket.onOpen((std::bind(&EventSocket::onWSOpen, this, std::placeholders::_1)));
_socket.onClose(std::bind(&EventSocket::onWSClose, this, std::placeholders::_1)); _socket.onClose(std::bind(&EventSocket::onWSClose, this, std::placeholders::_1));
_socket.onFrame(std::bind(&EventSocket::onFrame, this, std::placeholders::_1, std::placeholders::_2)); _socket.onFrame(std::bind(&EventSocket::onFrame, this, std::placeholders::_1, std::placeholders::_2));
#define X(e, t) \
setHandle<t>(Topic::e, EventBus<t>::subscribe([this](const t* d, size_t n) { \
if (n) this->emit<Topic::e>(d[0]); \
}));
TOPIC_LIST
#undef X
} }
void EventSocket::onWSOpen(PsychicWebSocketClient* client) { void EventSocket::onWSOpen(PsychicWebSocketClient* client) {
@@ -93,9 +100,6 @@ void EventSocket::send(size_t clientId, const char* data, size_t len) {
#else #else
client->sendMessage(HTTPD_WS_TYPE_TEXT, data, len); client->sendMessage(HTTPD_WS_TYPE_TEXT, data, len);
#endif #endif
// ESP_LOGV("EventSocket", "Sending to client %zu: %.*s", clientId, len, data);
// client->sendMessage(data);
} }
EventSocket socket; EventSocket socket;