♻️ Handle incomming messages
This commit is contained in:
@@ -6,84 +6,14 @@
|
||||
#include <type_traits>
|
||||
#include <communication/proto_helpers.h>
|
||||
|
||||
template <typename T>
|
||||
struct MessageTraits;
|
||||
|
||||
template <>
|
||||
struct MessageTraits<socket_message_IMUData> {
|
||||
static constexpr pb_size_t tag = socket_message_WebsocketMessage_imu_tag;
|
||||
static void assign(socket_message_WebsocketMessage& msg, const socket_message_IMUData& data) {
|
||||
msg.message.imu = data;
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct MessageTraits<socket_message_ModeData> {
|
||||
static constexpr pb_size_t tag = socket_message_WebsocketMessage_mode_tag;
|
||||
static void assign(socket_message_WebsocketMessage& msg, const socket_message_ModeData& data) {
|
||||
msg.message.mode = data;
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct MessageTraits<socket_message_AnalyticsData> {
|
||||
static constexpr pb_size_t tag = socket_message_WebsocketMessage_analytics_tag;
|
||||
static void assign(socket_message_WebsocketMessage& msg, const socket_message_AnalyticsData& data) {
|
||||
msg.message.analytics = data;
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct MessageTraits<socket_message_AnglesData> {
|
||||
static constexpr pb_size_t tag = socket_message_WebsocketMessage_angles_tag;
|
||||
static void assign(socket_message_WebsocketMessage& msg, const socket_message_AnglesData& data) {
|
||||
msg.message.angles = data;
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct MessageTraits<socket_message_RSSIData> {
|
||||
static constexpr pb_size_t tag = socket_message_WebsocketMessage_rssi_tag;
|
||||
static void assign(socket_message_WebsocketMessage& msg, const socket_message_RSSIData& data) {
|
||||
msg.message.rssi = data;
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct MessageTraits<socket_message_KinematicData> {
|
||||
static constexpr pb_size_t tag = socket_message_WebsocketMessage_kinematic_data_tag;
|
||||
static void assign(socket_message_WebsocketMessage& msg, const socket_message_KinematicData& data) {
|
||||
msg.message.kinematic_data = data;
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct MessageTraits<socket_message_IMUCalibrateData> {
|
||||
static constexpr pb_size_t tag = socket_message_WebsocketMessage_imu_calibrate_tag;
|
||||
static void assign(socket_message_WebsocketMessage& msg, const socket_message_IMUCalibrateData& data) {
|
||||
msg.message.imu_calibrate = data;
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct MessageTraits<socket_message_I2CScanData> {
|
||||
static constexpr pb_size_t tag = socket_message_WebsocketMessage_i2c_scan_tag;
|
||||
static void assign(socket_message_WebsocketMessage& msg, const socket_message_I2CScanData& data) {
|
||||
msg.message.i2c_scan = data;
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct MessageTraits<socket_message_PeripheralSettingsData> {
|
||||
static constexpr pb_size_t tag = socket_message_WebsocketMessage_peripheral_settings_tag;
|
||||
static void assign(socket_message_WebsocketMessage& msg, const socket_message_PeripheralSettingsData& data) {
|
||||
msg.message.peripheral_settings = data;
|
||||
}
|
||||
};
|
||||
|
||||
class CommAdapterBase {
|
||||
public:
|
||||
CommAdapterBase() { mutex_ = xSemaphoreCreateMutex(); }
|
||||
CommAdapterBase() {
|
||||
mutex_ = xSemaphoreCreateMutex();
|
||||
decoder_.onSubscribe([this](int32_t tag, int cid) { subscribe(tag, cid); });
|
||||
decoder_.onUnsubscribe([this](int32_t tag, int cid) { unsubscribe(tag, cid); });
|
||||
decoder_.onPing([this](int cid) { sendPong(cid); });
|
||||
}
|
||||
~CommAdapterBase() { vSemaphoreDelete(mutex_); }
|
||||
|
||||
virtual void begin() {}
|
||||
@@ -97,6 +27,11 @@ class CommAdapterBase {
|
||||
|
||||
ProtoDecoder& decoder() { return decoder_; }
|
||||
|
||||
template <typename T>
|
||||
void on(std::function<void(const T&, int)> handler) {
|
||||
decoder_.on<T>(handler);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void emit(const T& data, int clientId = -1) {
|
||||
constexpr pb_size_t tag = MessageTraits<T>::tag;
|
||||
@@ -159,14 +94,6 @@ class CommAdapterBase {
|
||||
}
|
||||
}
|
||||
|
||||
void setupDecoderHandlers() {
|
||||
decoder_.onSubscribe([this](int32_t tag, int cid) { subscribe(tag, cid); });
|
||||
|
||||
decoder_.onUnsubscribe([this](int32_t tag, int cid) { unsubscribe(tag, cid); });
|
||||
|
||||
decoder_.onPing([this](int cid) { sendPong(cid); });
|
||||
}
|
||||
|
||||
SemaphoreHandle_t mutex_;
|
||||
std::map<int32_t, std::list<int>> client_subscriptions_;
|
||||
ProtoDecoder decoder_;
|
||||
|
||||
@@ -4,35 +4,59 @@
|
||||
#include <pb_decode.h>
|
||||
#include <platform_shared/websocket_message.pb.h>
|
||||
#include <functional>
|
||||
#include <map>
|
||||
|
||||
#define PROTO_BUFFER_SIZE 512
|
||||
|
||||
template <typename T>
|
||||
struct MessageTraits;
|
||||
|
||||
#define DEFINE_MESSAGE_TRAITS(DataType, field) \
|
||||
template <> \
|
||||
struct MessageTraits<socket_message_##DataType> { \
|
||||
static constexpr pb_size_t tag = socket_message_WebsocketMessage_##field##_tag; \
|
||||
static void assign(socket_message_WebsocketMessage& msg, const socket_message_##DataType& data) { \
|
||||
msg.message.field = data; \
|
||||
} \
|
||||
static const socket_message_##DataType& access(const socket_message_WebsocketMessage& msg) { \
|
||||
return msg.message.field; \
|
||||
} \
|
||||
};
|
||||
|
||||
DEFINE_MESSAGE_TRAITS(IMUData, imu)
|
||||
DEFINE_MESSAGE_TRAITS(ModeData, mode)
|
||||
DEFINE_MESSAGE_TRAITS(AnalyticsData, analytics)
|
||||
DEFINE_MESSAGE_TRAITS(AnglesData, angles)
|
||||
DEFINE_MESSAGE_TRAITS(RSSIData, rssi)
|
||||
DEFINE_MESSAGE_TRAITS(KinematicData, kinematic_data)
|
||||
DEFINE_MESSAGE_TRAITS(IMUCalibrateData, imu_calibrate)
|
||||
DEFINE_MESSAGE_TRAITS(I2CScanData, i2c_scan)
|
||||
DEFINE_MESSAGE_TRAITS(PeripheralSettingsData, peripheral_settings)
|
||||
DEFINE_MESSAGE_TRAITS(HumanInputData, human_input_data)
|
||||
DEFINE_MESSAGE_TRAITS(WalkGaitData, walk_gait)
|
||||
DEFINE_MESSAGE_TRAITS(IMUCalibrateExecute, imu_calibrate_execute)
|
||||
DEFINE_MESSAGE_TRAITS(I2CScanDataRequest, i2c_scan_data_request)
|
||||
DEFINE_MESSAGE_TRAITS(PeripheralSettingsDataRequest, peripheral_settings_data_request)
|
||||
DEFINE_MESSAGE_TRAITS(ServoPWMData, servo_pwm)
|
||||
DEFINE_MESSAGE_TRAITS(ServoStateData, servo_state)
|
||||
|
||||
#undef DEFINE_MESSAGE_TRAITS
|
||||
|
||||
class ProtoDecoder {
|
||||
public:
|
||||
using SubscribeHandler = std::function<void(int32_t tag, int clientId)>;
|
||||
using UnsubscribeHandler = std::function<void(int32_t tag, int clientId)>;
|
||||
using PingHandler = std::function<void(int clientId)>;
|
||||
using ModeHandler = std::function<void(const socket_message_ModeData& data, int clientId)>;
|
||||
using InputHandler = std::function<void(const socket_message_HumanInputData& data, int clientId)>;
|
||||
using AnglesHandler = std::function<void(const socket_message_AnglesData& data, int clientId)>;
|
||||
using KinematicHandler = std::function<void(const socket_message_KinematicData& data, int clientId)>;
|
||||
using WalkGaitHandler = std::function<void(const socket_message_WalkGaitData& data, int clientId)>;
|
||||
using IMUCalibrateExecHandler = std::function<void(int clientId)>;
|
||||
using I2CScanRequestHandler = std::function<void(int clientId)>;
|
||||
using PeripheralSettingsRequestHandler = std::function<void(int clientId)>;
|
||||
|
||||
void onSubscribe(SubscribeHandler handler) { subscribeHandler = handler; }
|
||||
void onUnsubscribe(UnsubscribeHandler handler) { unsubscribeHandler = handler; }
|
||||
void onPing(PingHandler handler) { pingHandler = handler; }
|
||||
void onMode(ModeHandler handler) { modeHandler = handler; }
|
||||
void onInput(InputHandler handler) { inputHandler = handler; }
|
||||
void onAngles(AnglesHandler handler) { anglesHandler = handler; }
|
||||
void onKinematic(KinematicHandler handler) { kinematicHandler = handler; }
|
||||
void onWalkGait(WalkGaitHandler handler) { walkGaitHandler = handler; }
|
||||
void onIMUCalibrateExec(IMUCalibrateExecHandler handler) { imuCalibrateExecHandler = handler; }
|
||||
void onI2CScanRequest(I2CScanRequestHandler handler) { i2cScanRequestHandler = handler; }
|
||||
void onPeripheralSettingsRequest(PeripheralSettingsRequestHandler handler) {
|
||||
peripheralSettingsRequestHandler = handler;
|
||||
void onSubscribe(SubscribeHandler handler) { subscribeHandler_ = handler; }
|
||||
void onUnsubscribe(UnsubscribeHandler handler) { unsubscribeHandler_ = handler; }
|
||||
void onPing(PingHandler handler) { pingHandler_ = handler; }
|
||||
|
||||
template <typename T>
|
||||
void on(std::function<void(const T&, int)> handler) {
|
||||
handlers_[MessageTraits<T>::tag] = [handler, this](int clientId) {
|
||||
handler(MessageTraits<T>::access(msg_), clientId);
|
||||
};
|
||||
}
|
||||
|
||||
bool decode(const uint8_t* data, size_t len, int clientId) {
|
||||
@@ -44,66 +68,32 @@ class ProtoDecoder {
|
||||
|
||||
switch (msg_.which_message) {
|
||||
case socket_message_WebsocketMessage_sub_notif_tag:
|
||||
if (subscribeHandler) subscribeHandler(msg_.message.sub_notif.tag, clientId);
|
||||
break;
|
||||
if (subscribeHandler_) subscribeHandler_(msg_.message.sub_notif.tag, clientId);
|
||||
return true;
|
||||
|
||||
case socket_message_WebsocketMessage_unsub_notif_tag:
|
||||
if (unsubscribeHandler) unsubscribeHandler(msg_.message.unsub_notif.tag, clientId);
|
||||
break;
|
||||
if (unsubscribeHandler_) unsubscribeHandler_(msg_.message.unsub_notif.tag, clientId);
|
||||
return true;
|
||||
|
||||
case socket_message_WebsocketMessage_pingmsg_tag:
|
||||
if (pingHandler) pingHandler(clientId);
|
||||
break;
|
||||
if (pingHandler_) pingHandler_(clientId);
|
||||
return true;
|
||||
|
||||
case socket_message_WebsocketMessage_mode_tag:
|
||||
if (modeHandler) modeHandler(msg_.message.mode, clientId);
|
||||
break;
|
||||
|
||||
case socket_message_WebsocketMessage_human_input_data_tag:
|
||||
if (inputHandler) inputHandler(msg_.message.human_input_data, clientId);
|
||||
break;
|
||||
|
||||
case socket_message_WebsocketMessage_angles_tag:
|
||||
if (anglesHandler) anglesHandler(msg_.message.angles, clientId);
|
||||
break;
|
||||
|
||||
case socket_message_WebsocketMessage_kinematic_data_tag:
|
||||
if (kinematicHandler) kinematicHandler(msg_.message.kinematic_data, clientId);
|
||||
break;
|
||||
|
||||
case socket_message_WebsocketMessage_walk_gait_tag:
|
||||
if (walkGaitHandler) walkGaitHandler(msg_.message.walk_gait, clientId);
|
||||
break;
|
||||
|
||||
case socket_message_WebsocketMessage_imu_calibrate_execute_tag:
|
||||
if (imuCalibrateExecHandler) imuCalibrateExecHandler(clientId);
|
||||
break;
|
||||
|
||||
case socket_message_WebsocketMessage_i2c_scan_data_request_tag:
|
||||
if (i2cScanRequestHandler) i2cScanRequestHandler(clientId);
|
||||
break;
|
||||
|
||||
case socket_message_WebsocketMessage_peripheral_settings_data_request_tag:
|
||||
if (peripheralSettingsRequestHandler) peripheralSettingsRequestHandler(clientId);
|
||||
break;
|
||||
|
||||
default: return false;
|
||||
default: {
|
||||
auto it = handlers_.find(msg_.which_message);
|
||||
if (it != handlers_.end()) {
|
||||
it->second(clientId);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
socket_message_WebsocketMessage msg_ = socket_message_WebsocketMessage_init_zero;
|
||||
SubscribeHandler subscribeHandler;
|
||||
UnsubscribeHandler unsubscribeHandler;
|
||||
PingHandler pingHandler;
|
||||
ModeHandler modeHandler;
|
||||
InputHandler inputHandler;
|
||||
AnglesHandler anglesHandler;
|
||||
KinematicHandler kinematicHandler;
|
||||
WalkGaitHandler walkGaitHandler;
|
||||
IMUCalibrateExecHandler imuCalibrateExecHandler;
|
||||
I2CScanRequestHandler i2cScanRequestHandler;
|
||||
PeripheralSettingsRequestHandler peripheralSettingsRequestHandler;
|
||||
SubscribeHandler subscribeHandler_;
|
||||
UnsubscribeHandler unsubscribeHandler_;
|
||||
PingHandler pingHandler_;
|
||||
std::map<pb_size_t, std::function<void(int)>> handlers_;
|
||||
};
|
||||
|
||||
@@ -16,9 +16,9 @@ class Websocket : public CommAdapterBase {
|
||||
void begin() override;
|
||||
|
||||
private:
|
||||
PsychicWebSocketHandler _socket;
|
||||
PsychicHttpServer &_server;
|
||||
const char *_route;
|
||||
PsychicWebSocketHandler socket_;
|
||||
PsychicHttpServer &server_;
|
||||
const char *route_;
|
||||
|
||||
void onWSOpen(PsychicWebSocketClient *client);
|
||||
void onWSClose(PsychicWebSocketClient *client);
|
||||
|
||||
Reference in New Issue
Block a user