Simplifies spot class and add threaded websocket controller interface

This commit is contained in:
Rune Harlyk
2024-03-05 15:02:31 +01:00
parent 7208cc7b1c
commit fae6171f93
11 changed files with 213 additions and 29 deletions
-10
View File
@@ -1,10 +0,0 @@
# SCRIPT FOR RUNNING THE RPI ROBOT
from .src.spot import Spot
# Server
# Bluetooth controller
# Motion planing
# Hardware connections
+33
View File
@@ -0,0 +1,33 @@
# SCRIPT FOR RUNNING THE RPI ROBOT
import time
from src.spot import Spot
from src.kinematics.SpotKinematics import SpotModel
from src.camera.WebCamera import WebCamera
from src.imu.IMU import IMU
from src.hardware_interface.HardwareInterface import HardwareInterface
from src.controller_interface.WebsocketController import WebsocketController
from src.controller_interface.SharedControllerState import SharedState, shared_state
def main():
kinematics = SpotModel()
camera = WebCamera()
imu = IMU()
hardware_interface = HardwareInterface()
# shared_controller_state = SharedState()
controller_interface = WebsocketController() # shared_controller_state)
spot = Spot(kinematics, camera, imu, hardware_interface, controller_interface)
spot.start()
try:
while True:
spot.run(shared_state.get_latest_state().__dict__)
time.sleep(1)
except KeyboardInterrupt:
spot.stop()
if __name__ == '__main__':
main()
+1 -1
View File
@@ -1,5 +1,5 @@
import numpy as np
from .IMUBase import IMUBase
from src.imu.IMUBase import IMUBase
class IMU(IMUBase):
def __init__(self) -> None:
@@ -1,10 +1,11 @@
#!/usr/bin/env python
import numpy as np
from .LegKinematics import LegIK
from .LieAlgebra import RpToTrans, TransToRp, TransInv, RPY, TransformVector
from collections import OrderedDict
from src.kinematics.LegKinematics import LegIK
from src.kinematics.LieAlgebra import RpToTrans, TransToRp, TransInv, RPY
class SpotModel:
def __init__(
@@ -0,0 +1,10 @@
class ControllerState:
def __init__(self) -> None:
self.command = 0
self.estop = 0
self.lx = 0
self.ly = 0
self.rx = 0
self.ry = 0
self.height = 0
self.speed = 0
@@ -0,0 +1,18 @@
import threading
from src.controller_interface.ControllerState import ControllerState
class SharedState:
def __init__(self):
self.lock = threading.Lock()
self.state = ControllerState()
def update_state(self, new_state):
with self.lock:
self.state = new_state
def get_latest_state(self):
with self.lock:
return self.state
shared_state = SharedState()
@@ -0,0 +1,108 @@
import asyncio
from enum import Enum
import struct
import threading
import websockets
import json
from src.controller_interface.ControllerState import ControllerState
from src.controller_interface.SharedControllerState import shared_state
clients = {}
class Command(Enum):
ESTOP = 0
CONTROLLER = 1
def get_controller(buffer):
buffer = struct.unpack("<8b", buffer)
state = ControllerState()
state.command = buffer[0]
state.estop = buffer[1]
state.lx = buffer[2]
state.ly = buffer[3]
state.rx = buffer[4]
state.ry = buffer[5]
state.height = buffer[6]
state.speed = buffer[7]
return state
class WebsocketController:
def __init__(self, shared_controller_state=shared_state, host="localhost", port=2096):
self.shared_controller_state = shared_controller_state
self.host = host
self.port = port
self.loop = None
self.thread = None
async def handle_message(self, websocket, path):
client_id = id(websocket)
clients[client_id] = {
"websocket": websocket,
}
print("Got a new connection")
try:
async for message in websocket:
if isinstance(message, bytes):
await self.handle_binary_message(clients[client_id], message)
else:
await self.handle_json_message(clients[client_id], message)
finally:
del clients[client_id]
async def handle_binary_message(self, client, data):
message = get_controller(data)
shared_state.update_state(message)
command = Command(message.command)
if command == Command.ESTOP:
await client["websocket"].send(
json.dumps({"type": "stop", "data": "Servos stopped"})
)
if command == Command.CONTROLLER:
await client["websocket"].send(json.dumps({"type": "echo", "data": message.__dict__}))
async def handle_json_message(self, client, message):
data = json.loads(message)
if data["type"] in ("stop", "mode_change"):
client["model"][data["type"]] = data.get("data", False)
await client["websocket"].send(
json.dumps(
{"type": data["type"], "data": data.get("data", "Servos stopped")}
)
)
def start_server(self):
asyncio.set_event_loop(self.loop)
start_server = websockets.serve(self.handle_message, self.host, self.port)
self.loop.run_until_complete(start_server)
self.loop.run_forever()
def start(self):
# Create a new event loop in a new thread
if not self.thread or not self.thread.is_alive():
self.loop = asyncio.new_event_loop()
self.thread = threading.Thread(target=self.start_server)
self.thread.start()
def stop(self):
if self.loop:
self.loop.call_soon_threadsafe(self.loop.stop)
if self.thread and self.thread.is_alive():
self.thread.join()
if __name__ == "__main__":
server = WebsocketController("localhost", 2096)
server.run()
try:
# Your main thread logic here
print("WebSocket server is running...")
# Example: Keep the main thread doing something or waiting
while True:
pass
except KeyboardInterrupt:
print("Stopping WebSocket server...")
server.stop()
@@ -0,0 +1,10 @@
from src.hardware_interface.HardwareInterfaceBase import HardwareInterfaceBase
class HardwareInterface(HardwareInterfaceBase):
def __init__(self):
super().__init__()
def set_actuator_positions(self, joint_angles):
pass
@@ -0,0 +1,11 @@
from abc import abstractmethod
class HardwareInterfaceBase:
def __init__(self):
pass
@abstractmethod
def set_actuator_positions(self, joint_angles):
raise NotImplementedError
+19 -16
View File
@@ -9,15 +9,25 @@ from .State import BodyState, GaitState
sys.path.append("../../..")
class Spot:
def __init__(self, kinematic, IMU=None):
def __init__(self, kinematic, camera, imu, hardware_interface, controller_interface):
self.kinematic = kinematic
self.camera = camera
self.imu = imu
self.hardware_interface = hardware_interface
self.controller_interface = controller_interface
self.body_state = BodyState()
self.gait_state = GaitState()
self.kinematic = kinematic
self.gait_controller = BezierGait()
self.IMU = IMU
self.body_state.worldFeetPositions = copy.deepcopy(self.kinematic.WorldToFoot)
def start(self):
self.controller_interface.start()
def stop(self):
self.controller_interface.stop()
def joint_angles(self):
return self.kinematic.IK(
self.body_state.rotation,
@@ -25,27 +35,20 @@ class Spot:
self.body_state.worldFeetPositions,
)
def run(self, state, command):
self.gait_state.update_gait_state(self.dt)
self.gui.UserInput(self.body_state, self.gait_state)
self.gait_state.contacts = self.state[-4:]
self.body_state.worldFeetPositions = copy.deepcopy(self.spot.WorldToFoot)
def run(self, dt=.01):
self.gait_state.update_gait_state(dt)
# self.gui.UserInput(self.body_state, self.gait_state)
self.bezier_gait.generate_trajectory(self.body_state, self.gait_state, self.dt)
self.gait_controller.generate_trajectory(self.body_state, self.gait_state, dt)
self.update_environment()
self.state, _, done, _ = self.env.step(self.action)
if done:
print("DONE")
return True
def update_environment(self):
joint_angles = self.spot.IK(
joint_angles = self.kinematic.IK(
self.body_state.rotation,
self.body_state.position,
self.body_state.worldFeetPositions,
)
self.env.pass_joint_angles(joint_angles.reshape(-1))
self.hardware_interface.set_actuator_positions(joint_angles.reshape(-1))