From 14c0f438a6d836cfdcfd4eb3bc54a010fbfabe7e Mon Sep 17 00:00:00 2001 From: Rune Harlyk Date: Mon, 4 Mar 2024 15:57:30 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=A7=AA=20Adds=20initial=20new=20python=20?= =?UTF-8?q?mocking?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mock/MotionController.py | 53 +++++++++++++++++++++++++++ mock/model.py | 34 ++++++++++++++++++ mock/server.py | 78 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 165 insertions(+) create mode 100644 mock/MotionController.py create mode 100644 mock/model.py create mode 100644 mock/server.py diff --git a/mock/MotionController.py b/mock/MotionController.py new file mode 100644 index 0000000..17b98d9 --- /dev/null +++ b/mock/MotionController.py @@ -0,0 +1,53 @@ +import copy + + +class GaitState: + def __init__(self) -> None: + self.step_length = 0.1 + self.yaw_rate = 0 + self.lateral_fraction = 0 + self.step_velocity = 0.001 + self.swing_period = 0.2 + self.clearance_height = 0.045 + self.penetration_depth = 0.003 + self.contacts = [False] * 4 + + self.target_step_length = 0 + self.target_yaw_rate = 0 + self.target_lateral_fraction = 0 + + def update_gait_state(self, dt): + self.step_length = self.step_length * (1 - dt) + self.target_step_length * dt + self.lateral_fraction = ( + self.lateral_fraction * (1 - dt) + self.target_lateral_fraction * dt + ) + self.yaw_rate = self.yaw_rate * (1 - dt) + self.target_yaw_rate * dt + + +class MotionController: + def __init__( + self, + # env: spotBezierEnv, + # gui: GUI, + # bodyState: BodyState, + # gaitState: GaitState, + spot_model, + gait, + ) -> None: + self.gait = gait + self.gait_state = GaitState() + self.spot_model = spot_model + + self.dt = 0.01 + + def update_gait_state(self, command): + self.gait_state.step_length = abs(command["lx"]) / 255 + + def run(self, model, command): + self.update_gait_state(command) + self.gait_state.contacts = [False] * 4 + self.body_state.worldFeetPositions = copy.deepcopy(self.spot.WorldToFoot) + + model["transformation"]["world_feet_position"] = self.gait.generate_trajectory( + model, self.gait_state, self.dt + ) diff --git a/mock/model.py b/mock/model.py new file mode 100644 index 0000000..66fb90f --- /dev/null +++ b/mock/model.py @@ -0,0 +1,34 @@ +import random + + +model = lambda: { + "gait": { + "step_length": 0, + "yaw_rate": 0, + "lateral_fraction": 0, + "step_velocity": 0, + "swing_period": 0, + "clearance_height": 0, + "penetration_depth": 0, + "contacts": 0, + }, + "transformation": { + "world_position": [0, 0, 0], + "position": [0, 0, 0], + "rotation": [0, 0, 0], + "world_feet_positions": {}, + }, + "sensors": { + "mpu": { + "x": 0, + "y": 0, + "z": 0, + }, + "battery": { + "voltage": round(random.uniform(7.6, 8.2), 2), + "ampere": round(random.uniform(0.2, 3), 2), + }, + }, + "logs": ["[2023-02-05 10:00:00] Booting up"], + "settings": {"useMetric": True}, +} diff --git a/mock/server.py b/mock/server.py new file mode 100644 index 0000000..a022e3e --- /dev/null +++ b/mock/server.py @@ -0,0 +1,78 @@ +import asyncio +from enum import Enum +import json +import websockets +from model import model +import struct + +clients = {} + + +class Command(Enum): + ESTOP = 0 + CONTROLLER = 1 + + +def get_controller(buffer): + buffer = struct.unpack("<8b", buffer) + return { + "command": buffer[0], + "estop": buffer[1], + "lx": buffer[2], + "ly": buffer[3], + "rx": buffer[4], + "ry": buffer[5], + "height": buffer[6], + "speed": buffer[7], + } + + +async def handle_binary_message(client, data): + message = get_controller(data) + command = Command(message["command"]) + if command == Command.ESTOP: + client["model"]["running"] = False + await client["websocket"].send( + json.dumps({"type": "stop", "data": "Servos stopped"}) + ) + + if command == Command.CONTROLLER: + await client["websocket"].send(json.dumps({"type": "echo", "data": message})) + + +async def handle_json_message(client, message): + data = json.loads(message) + client = client["clientState"] + if data["type"] in ("stop", "mode_change"): + client["model"][data["type"]] = data.get("data", False) + await client["websocket"].send( + json.dumps( + {"type": data["type"], "data": data.get("data", "Servos stopped")} + ) + ) + + +async def handle_message(websocket, path): + client_id = id(websocket) + clients[client_id] = { + "clientState": model(), + "websocket": websocket, + } + try: + async for message in websocket: + if isinstance(message, bytes): + await handle_binary_message(clients[client_id], message) + else: + await handle_json_message(clients[client_id], message) + finally: + del clients[client_id] + + +async def main(): + async with websockets.serve(handle_message, "localhost", 2096): + print("Server starting") + await asyncio.Future() + + +if __name__ == "__main__": + asyncio.run(main())