🧪 Adds initial new python mocking

This commit is contained in:
Rune Harlyk
2024-03-04 15:57:30 +01:00
parent 9c5096a3c5
commit ebca54f2a0
10 changed files with 206 additions and 21 deletions
+100
View File
@@ -0,0 +1,100 @@
import asyncio
from enum import Enum
import json
import sys
import websockets
from model import model
from MotionController import GaitState
from simulator.GaitGenerator.Bezier import BezierGait
from simulator.GymEnvs.spot_bezier_env import spotBezierEnv
from simulator.Kinematics.SpotKinematics import SpotModel
from simulator.util.gui import GUI
from simulator.simulator import BodyState, Simulator
import struct
sys.path.append("./simulator/GymEnvs")
clients = {}
env = spotBezierEnv(
render=True,
on_rack=False,
height_field=False,
draw_foot_path=False,
env_randomizer=None,
)
gui = GUI(env.spot.quadruped)
bodyState = BodyState()
gaitState = GaitState()
spot = SpotModel()
bezierGait = BezierGait()
simulator = Simulator()
class Command(Enum):
ESTOP = 0
CONTROLLER = 1
def get_controller(buffer):
buffer = struct.unpack("<8b", buffer)
return {
"command": buffer[0],
"estop": buffer[1],
"lx": buffer[2],
"ly": buffer[3],
"rx": buffer[4],
"ry": buffer[5],
"height": buffer[6],
"speed": buffer[7],
}
async def handle_binary_message(client, data):
message = get_controller(data)
command = Command(message["command"])
if command == Command.ESTOP:
client["model"]["running"] = False
await client["websocket"].send(
json.dumps({"type": "stop", "data": "Servos stopped"})
)
if command == Command.CONTROLLER:
await client["websocket"].send(json.dumps({"type": "echo", "data": message}))
async def handle_json_message(client, message):
data = json.loads(message)
client = client["clientState"]
if data["type"] in ("stop", "mode_change"):
client["model"][data["type"]] = data.get("data", False)
await client["websocket"].send(
json.dumps(
{"type": data["type"], "data": data.get("data", "Servos stopped")}
)
)
async def handle_message(websocket, path):
client_id = id(websocket)
clients[client_id] = {
"clientState": model(),
"websocket": websocket,
}
try:
async for message in websocket:
if isinstance(message, bytes):
await handle_binary_message(clients[client_id], message)
else:
await handle_json_message(clients[client_id], message)
finally:
del clients[client_id]
async def main():
async with websockets.serve(handle_message, "localhost", 2096):
print("Server starting")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())