Updates camera and mocks

This commit is contained in:
Rune Harlyk
2024-03-07 14:01:09 +01:00
parent fae6171f93
commit 971418fce8
13 changed files with 170 additions and 64 deletions
+11 -4
View File
@@ -16,16 +16,23 @@ def main():
imu = IMU()
hardware_interface = HardwareInterface()
# shared_controller_state = SharedState()
controller_interface = WebsocketController() # shared_controller_state)
shared_controller_state = SharedState()
controller_interface = WebsocketController(shared_controller_state)
spot = Spot(kinematics, camera, imu, hardware_interface, controller_interface)
spot = Spot(
kinematics,
camera,
imu,
hardware_interface,
controller_interface,
shared_controller_state
)
spot.start()
try:
while True:
spot.run(shared_state.get_latest_state().__dict__)
time.sleep(1)
spot.run()
except KeyboardInterrupt:
spot.stop()
+52 -24
View File
@@ -1,38 +1,66 @@
from os import sys
import time
from src.spot import Spot
from src.IMU.IMU import IMU
from src.Kinematics.SpotKinematics import SpotModel
from src.camera_server import StreamingServerThread
from src.Camera.WebCamera import WebCamera
from src.kinematics.SpotKinematics import SpotModel
from src.camera.WebCamera import WebCamera
from src.imu.IMU import IMU
from src.controller_interface.WebsocketController import WebsocketController
from src.controller_interface.SharedControllerState import SharedState, shared_state
# from src.camera_server import StreamingServerThread
sys.path.append('../')
from simulation.simulator import Simulator
from simulation.GymEnvs.spot_bezier_env import spotBezierEnv
from simulation.sensors.camera import PyBulletCamera
from simulation.sensors.hardware_interface import PyBulletHardwareInterface
camera = PyBulletCamera()
def main():
env = spotBezierEnv(
render=True,
on_rack=False,
height_field=False,
draw_foot_path=False,
urdf_root="..\simulation\pybullet_data"
)
env = spotBezierEnv(
render=True,
on_rack=False,
height_field=False,
draw_foot_path=False,
urdf_root="..\simulation\pybullet_data"
)
kinematics = SpotModel()
imu = IMU()
camera = PyBulletCamera() # WebCamera()
spot = Spot(kinematics, imu)
kinematics = SpotModel()
camera = WebCamera()
camera = PyBulletCamera(env.spot.quadruped)
imu = IMU()
hardware_interface = PyBulletHardwareInterface(env)
# shared_controller_state = SharedState()
controller_interface = WebsocketController(shared_state)
server = StreamingServerThread(camera)
sim = Simulator(env, spot)
spot = Spot(
kinematics,
camera,
imu,
hardware_interface,
controller_interface,
shared_state
)
server.start()
simulator = Simulator(env, spot)
spot.start()
last_time = time.time()
try:
while True:
contacts, done = simulator.step()
spot.gait_state.contacts = contacts
spot.run() # time.time() - last_time
last_time = time.time()
if done:
break
except KeyboardInterrupt:
print("Program was interrupted")
spot.stop()
if __name__ == '__main__':
main()
while True:
camera.update_view_matrix(sim.env.spot.quadruped)
camera.get_image()
sim.step(0.01)
+1 -2
View File
@@ -1,5 +1,4 @@
from abc import abstractmethod
from typing import Any
class CameraBase():
@@ -7,5 +6,5 @@ class CameraBase():
pass
@abstractmethod
def get_image(self) -> Any:
def update(self) -> None:
raise NotImplementedError
+7 -5
View File
@@ -5,14 +5,16 @@ from .CameraBase import CameraBase
class WebCamera(CameraBase):
def __init__(self, camera_id=0):
self.cap = cv2.VideoCapture(camera_id)
# super().__init__()
self._last_frame = None
super().__init__()
def get_image(self):
def get_frame(self):
self._last_frame
def update(self) -> None:
ret, frame = self.cap.read()
if ret:
return frame
else:
return None # Return None if the frame could not be captured
self._last_frame = frame.copy()
def __del__(self):
self.cap.release()
+3 -1
View File
@@ -12,7 +12,7 @@ class StreamingHandler(BaseHTTPRequestHandler):
self.end_headers()
try:
while True:
frame = self.server.camera.get_image()
frame = self.server.camera.get_frame()
_, jpeg = cv2.imencode('.jpg', frame)
self.wfile.write(b'--frame\r\n')
self.send_header('Content-Type', 'image/jpeg')
@@ -49,3 +49,5 @@ class StreamingServerThread:
self.server_thread.daemon = True
self.server_thread.start()
def stop(self):
self.server_thread.join()
@@ -41,7 +41,6 @@ class WebsocketController:
clients[client_id] = {
"websocket": websocket,
}
print("Got a new connection")
try:
async for message in websocket:
if isinstance(message, bytes):
+22 -5
View File
@@ -3,18 +3,26 @@ import numpy as np
import copy
import sys
from .camera.CameraBase import CameraBase
from .camera_server import StreamingServerThread
from .GaitGenerator.Bezier import BezierGait
from .State import BodyState, GaitState
sys.path.append("../../..")
def map_value(x, from_low, from_high, to_low, to_high):
return ((x - from_low) / (from_high - from_low)) * (to_high - to_low) + to_low
class Spot:
def __init__(self, kinematic, camera, imu, hardware_interface, controller_interface):
def __init__(self, kinematic, camera:CameraBase, imu, hardware_interface, controller_interface, shared_controller_state):
self.kinematic = kinematic
self.camera = camera
self.imu = imu
self.hardware_interface = hardware_interface
self.controller_interface = controller_interface
self.shared_controller_state = shared_controller_state
self.camera_stream = StreamingServerThread(self.camera)
self.body_state = BodyState()
self.gait_state = GaitState()
@@ -23,10 +31,13 @@ class Spot:
self.body_state.worldFeetPositions = copy.deepcopy(self.kinematic.WorldToFoot)
def start(self):
self.controller_interface.start()
self.camera_stream.start()
def stop(self):
self.controller_interface.stop()
self.camera_stream.stop()
def joint_angles(self):
return self.kinematic.IK(
@@ -34,16 +45,22 @@ class Spot:
self.body_state.position,
self.body_state.worldFeetPositions,
)
def handle_input(self, controller):
self.gait_state.target_step_length = map_value(max(controller.lx, controller.ly, key=abs), -127, 128, -0.1, 0.1)
self.gait_state.target_lateral_fraction = map_value(controller.lx, -127, 128, -np.pi / 2.0, np.pi / 2.0)
# self.gait_state.target_yaw_rate = map_value(controller.rx, -127, 128, -2.0, 2.0)
def run(self, dt=.01):
# self.camera.update()
controller = self.shared_controller_state.get_latest_state()
self.handle_input(controller)
self.gait_state.update_gait_state(dt)
# self.gui.UserInput(self.body_state, self.gait_state)
self.body_state.worldFeetPositions = copy.deepcopy(self.kinematic.WorldToFoot)
self.gait_controller.generate_trajectory(self.body_state, self.gait_state, dt)
self.update_environment()
def update_environment(self):
joint_angles = self.kinematic.IK(
self.body_state.rotation,
self.body_state.position,