Adds option to control sim using web app

This commit is contained in:
Rune Harlyk
2025-10-10 21:14:59 +02:00
committed by Rune Harlyk
parent d47ce02cc6
commit 1fbddd483c
2 changed files with 331 additions and 60 deletions
+132 -60
View File
@@ -1,81 +1,153 @@
#!/usr/bin/env python3
import time
import numpy as np
import pybullet as p
import asyncio
import argparse
import sys
from typing import Optional
from src.robot.kinematics import Kinematics, BodyStateT, KinConfig
from src.robot.gait import GaitController, GaitStateT, GaitType, default_offset, default_stand_frac
from src.envs.quadruped_env import QuadrupedEnv, TerrainType
from src.controllers import Controller, GUIController, WebSocketController
print("Initializing Spot Micro simulation...")
try:
env = QuadrupedEnv(terrain_type=TerrainType.FLAT)
print("Environment created successfully")
print(f"Robot ID: {env.robot.robot_id}")
print(f"Number of joints: {env.robot.get_observation().shape[0]}")
# Print joint names
print("\nJoint names:")
num_joints = p.getNumJoints(env.robot.robot_id)
for i in range(num_joints):
joint_info = p.getJointInfo(env.robot.robot_id, i)
joint_name = joint_info[1].decode("utf-8")
joint_type = joint_info[2]
print(f"Joint {i}: {joint_name} (type: {joint_type})")
class SpotMicroSimulation:
def __init__(
self, controller: Controller, env: Optional[QuadrupedEnv] = None, terrain_type: TerrainType = TerrainType.FLAT
):
print("Initializing Spot Micro simulation...")
try:
if env is not None:
self.env = env
print("Using existing environment")
else:
self.env = QuadrupedEnv(terrain_type=terrain_type)
print("Environment created successfully")
print("Simulation ready! Use the GUI sliders to control the robot.")
except Exception as e:
print(f"Error creating environment: {e}")
import traceback
print(f"Robot ID: {self.env.robot.robot_id}")
print(f"Number of joints: {self.env.robot.get_observation().shape[0]}")
traceback.print_exc()
exit(1)
num_joints = p.getNumJoints(self.env.robot.robot_id)
print("\nJoint names:")
for i in range(num_joints):
joint_info = p.getJointInfo(self.env.robot.robot_id, i)
joint_name = joint_info[1].decode("utf-8")
joint_type = joint_info[2]
print(f"Joint {i}: {joint_name} (type: {joint_type})")
joint_directions = np.array([-1, 1, 1, 1, 1, 1, -1, 1, 1, 1, 1, 1])
print("Simulation ready!")
except Exception as e:
print(f"Error creating environment: {e}")
import traceback
kinematics = Kinematics()
traceback.print_exc()
sys.exit(1)
standby = KinConfig.default_feet_positions[:4, :3]
self.controller = controller
self.joint_directions = np.array([-1, 1, 1, 1, 1, 1, -1, 1, 1, 1, 1, 1])
self.kinematics = Kinematics()
body_state = BodyStateT(
omega=0,
phi=0,
psi=0,
xm=0,
ym=KinConfig.default_body_height,
zm=0,
px=0,
py=0,
pz=0,
feet=standby,
default_feet=standby,
)
standby = KinConfig.default_feet_positions[:4, :3]
gait_state = GaitStateT(
step_height=KinConfig.default_step_height,
step_x=0,
step_z=0,
step_angle=0,
step_velocity=1,
step_depth=KinConfig.default_step_depth,
stand_frac=default_stand_frac[GaitType.TROT_GATE],
offset=default_offset[GaitType.TROT_GATE],
gait_type=GaitType.TROT_GATE,
)
self.body_state = BodyStateT(
omega=0,
phi=0,
psi=0,
xm=0,
ym=KinConfig.default_body_height,
zm=0,
px=0,
py=0,
pz=0,
feet=standby,
default_feet=standby,
)
gait = GaitController(standby)
self.gait_state = GaitStateT(
step_height=KinConfig.default_step_height,
step_x=0,
step_z=0,
step_angle=0,
step_velocity=1,
step_depth=KinConfig.default_step_depth,
stand_frac=default_stand_frac[GaitType.TROT_GATE],
offset=default_offset[GaitType.TROT_GATE],
gait_type=GaitType.TROT_GATE,
)
dt = 1.0 / 240
while True:
env.gui.update_gait_state(gait_state)
env.gui.update_body_state(body_state)
env.gui.update()
self.gait = GaitController(standby)
self.dt = 1.0 / 240
gait.step(gait_state, body_state, dt)
joints = kinematics.inverse_kinematics(body_state)
joints = joints * joint_directions
def step(self):
self.controller.update(self.body_state, self.gait_state, self.dt)
_, _, done, truncated, _ = env.step(joints)
# if done or truncated:
# env.reset()
self.gait.step(self.gait_state, self.body_state, self.dt)
joints = self.kinematics.inverse_kinematics(self.body_state)
joints = joints * self.joint_directions
time.sleep(dt)
_, _, done, truncated, _ = self.env.step(joints)
return joints, done, truncated
def run_sync(self):
try:
while self.controller.is_running():
joints, done, truncated = self.step()
time.sleep(self.dt)
except KeyboardInterrupt:
print("\n[*] Shutting down...")
async def run_async(self):
try:
while self.controller.is_running():
joints, done, truncated = self.step()
if isinstance(self.controller, WebSocketController):
await self.controller.broadcast_angles(joints)
await asyncio.sleep(self.dt)
except KeyboardInterrupt:
print("\n[*] Shutting down...")
def main():
parser = argparse.ArgumentParser(description="Spot Micro Interactive Control Server")
parser.add_argument("--port", type=int, default=8765, help="WebSocket server port (default: 8765)")
parser.add_argument("--mode", choices=["gui", "websocket"], default="gui", help="Control mode (default: gui)")
parser.add_argument("--terrain", choices=["flat", "maze", "terrain"], default="flat", help="Terrain type")
args = parser.parse_args()
terrain_map = {"flat": TerrainType.FLAT, "maze": TerrainType.MAZE, "terrain": TerrainType.TERRAIN}
terrain_type = terrain_map.get(args.terrain, TerrainType.FLAT)
if args.mode == "websocket":
controller = WebSocketController(port=args.port)
sim = SpotMicroSimulation(controller, terrain_type=terrain_type)
async def run():
server = await controller.start_server()
try:
await sim.run_async()
except KeyboardInterrupt:
print("\n[!] Shutting down server...")
controller.running = False
server.close()
await server.wait_closed()
print("[+] Server stopped")
asyncio.run(run())
else:
from src.envs.quadruped_env import QuadrupedEnv
env = QuadrupedEnv(terrain_type=terrain_type)
controller = GUIController(env)
sim = SpotMicroSimulation(controller, env=env)
print("Use the GUI sliders to control the robot.")
sim.run_sync()
if __name__ == "__main__":
main()