🎍 Adds virtual sensors and camera mjpeg stream
This commit is contained in:
@@ -0,0 +1,38 @@
|
|||||||
|
from os import sys
|
||||||
|
|
||||||
|
from src.spot import Spot
|
||||||
|
from src.IMU.IMU import IMU
|
||||||
|
from src.Kinematics.SpotKinematics import SpotModel
|
||||||
|
from src.camera_server import StreamingServerThread
|
||||||
|
from src.Camera.WebCamera import WebCamera
|
||||||
|
|
||||||
|
sys.path.append('../')
|
||||||
|
|
||||||
|
from simulation.simulator import Simulator
|
||||||
|
from simulation.GymEnvs.spot_bezier_env import spotBezierEnv
|
||||||
|
|
||||||
|
from simulation.sensors.camera import PyBulletCamera
|
||||||
|
|
||||||
|
camera = PyBulletCamera()
|
||||||
|
|
||||||
|
env = spotBezierEnv(
|
||||||
|
render=True,
|
||||||
|
on_rack=False,
|
||||||
|
height_field=False,
|
||||||
|
draw_foot_path=False,
|
||||||
|
urdf_root="..\simulation\pybullet_data"
|
||||||
|
)
|
||||||
|
kinematics = SpotModel()
|
||||||
|
imu = IMU()
|
||||||
|
camera = PyBulletCamera() # WebCamera()
|
||||||
|
spot = Spot(kinematics, imu)
|
||||||
|
|
||||||
|
server = StreamingServerThread(camera)
|
||||||
|
sim = Simulator(env, spot)
|
||||||
|
|
||||||
|
server.start()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
camera.update_view_matrix(sim.env.spot.quadruped)
|
||||||
|
camera.get_image()
|
||||||
|
sim.step(0.01)
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
from abc import abstractmethod
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
class CameraBase():
|
||||||
|
def __init__(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_image(self) -> Any:
|
||||||
|
raise NotImplementedError
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
import cv2
|
||||||
|
from .CameraBase import CameraBase
|
||||||
|
|
||||||
|
|
||||||
|
class WebCamera(CameraBase):
|
||||||
|
def __init__(self, camera_id=0):
|
||||||
|
self.cap = cv2.VideoCapture(camera_id)
|
||||||
|
# super().__init__()
|
||||||
|
|
||||||
|
def get_image(self):
|
||||||
|
ret, frame = self.cap.read()
|
||||||
|
if ret:
|
||||||
|
return frame
|
||||||
|
else:
|
||||||
|
return None # Return None if the frame could not be captured
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
self.cap.release()
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
|
from .IMUBase import IMUBase
|
||||||
|
|
||||||
|
class IMU(IMUBase):
|
||||||
class IMU:
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@@ -0,0 +1,6 @@
|
|||||||
|
class IMUBase():
|
||||||
|
def __init__(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def read_orientation(self):
|
||||||
|
raise NotImplementedError()
|
||||||
@@ -0,0 +1,51 @@
|
|||||||
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||||
|
from threading import Thread
|
||||||
|
import cv2
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
class StreamingHandler(BaseHTTPRequestHandler):
|
||||||
|
def do_GET(self):
|
||||||
|
if self.path == '/stream.mjpg':
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header('Content-type', 'multipart/x-mixed-replace; boundary=frame')
|
||||||
|
self.end_headers()
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
frame = self.server.camera.get_image()
|
||||||
|
_, jpeg = cv2.imencode('.jpg', frame)
|
||||||
|
self.wfile.write(b'--frame\r\n')
|
||||||
|
self.send_header('Content-Type', 'image/jpeg')
|
||||||
|
self.send_header('Content-Length', len(jpeg))
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(jpeg.tobytes())
|
||||||
|
self.wfile.write(b'\r\n')
|
||||||
|
time.sleep(0.5)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Stream stopped: {e}")
|
||||||
|
else:
|
||||||
|
self.send_error(404)
|
||||||
|
self.end_headers()
|
||||||
|
|
||||||
|
class StreamingServer(HTTPServer):
|
||||||
|
def __init__(self, server_address, camera):
|
||||||
|
super().__init__(server_address, StreamingHandler)
|
||||||
|
self.camera = camera
|
||||||
|
|
||||||
|
class StreamingServerThread:
|
||||||
|
def __init__(self, camera, port=8080):
|
||||||
|
self.camera = camera
|
||||||
|
self.port = port
|
||||||
|
self.server_thread = None
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
def run_server():
|
||||||
|
address = ('', self.port)
|
||||||
|
server = StreamingServer(address, self.camera)
|
||||||
|
print(f"Starting server at http://localhost:{self.port}/stream.mjpg")
|
||||||
|
server.serve_forever()
|
||||||
|
|
||||||
|
self.server_thread = Thread(target=run_server)
|
||||||
|
self.server_thread.daemon = True
|
||||||
|
self.server_thread.start()
|
||||||
|
|
||||||
@@ -6,9 +6,9 @@ import time
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
import pybullet_data
|
import pybullet_data
|
||||||
from gym import spaces
|
from gym import spaces
|
||||||
import entities.spot as spot
|
|
||||||
from gym.envs.registration import register
|
from gym.envs.registration import register
|
||||||
|
|
||||||
|
from simulation.entities import spot
|
||||||
from simulation.GymEnvs.spot_gym_env import spotGymEnv
|
from simulation.GymEnvs.spot_gym_env import spotGymEnv
|
||||||
|
|
||||||
SENSOR_NOISE_STDDEV = spot.SENSOR_NOISE_STDDEV
|
SENSOR_NOISE_STDDEV = spot.SENSOR_NOISE_STDDEV
|
||||||
|
|||||||
@@ -19,13 +19,14 @@ import pybullet
|
|||||||
import pybullet_data
|
import pybullet_data
|
||||||
from gym import spaces
|
from gym import spaces
|
||||||
from gym.utils import seeding
|
from gym.utils import seeding
|
||||||
from pkg_resources import parse_version
|
|
||||||
import simulation.entities.spot as spot
|
|
||||||
import pybullet_utils.bullet_client as bullet_client
|
|
||||||
from gym.envs.registration import register
|
from gym.envs.registration import register
|
||||||
|
from pkg_resources import parse_version
|
||||||
|
import pybullet_utils.bullet_client as bullet_client
|
||||||
|
|
||||||
|
import simulation.entities.spot as spot
|
||||||
from simulation.GymEnvs.heightfield import HeightField
|
from simulation.GymEnvs.heightfield import HeightField
|
||||||
from OpenLoopSM.SpotOL import BezierStepper
|
|
||||||
from raspberry_pi.src.Kinematics import LieAlgebra as LA
|
from raspberry_pi.src.Kinematics import LieAlgebra as LA
|
||||||
|
from simulation.OpenLoopSM.SpotOL import BezierStepper
|
||||||
|
|
||||||
NUM_SUBSTEPS = 5
|
NUM_SUBSTEPS = 5
|
||||||
NUM_MOTORS = 12
|
NUM_MOTORS = 12
|
||||||
|
|||||||
@@ -17,9 +17,9 @@ import math
|
|||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import entities.motor as motor
|
|
||||||
import pybullet_data
|
import pybullet_data
|
||||||
|
|
||||||
|
from simulation.entities import motor
|
||||||
sys.path.append("../")
|
sys.path.append("../")
|
||||||
|
|
||||||
from raspberry_pi.src.Kinematics.SpotKinematics import SpotModel
|
from raspberry_pi.src.Kinematics.SpotKinematics import SpotModel
|
||||||
@@ -397,14 +397,14 @@ class Spot(object):
|
|||||||
print(pybullet_data.getDataPath())
|
print(pybullet_data.getDataPath())
|
||||||
if self._self_collision_enabled:
|
if self._self_collision_enabled:
|
||||||
self.quadruped = self._pybullet_client.loadURDF(
|
self.quadruped = self._pybullet_client.loadURDF(
|
||||||
pybullet_data.getDataPath() + "/spot.urdf",
|
self._urdf_root + "/spot.urdf",
|
||||||
init_position,
|
init_position,
|
||||||
useFixedBase=self._on_rack,
|
useFixedBase=self._on_rack,
|
||||||
flags=self._pybullet_client.URDF_USE_SELF_COLLISION_EXCLUDE_PARENT,
|
flags=self._pybullet_client.URDF_USE_SELF_COLLISION_EXCLUDE_PARENT,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
self.quadruped = self._pybullet_client.loadURDF(
|
self.quadruped = self._pybullet_client.loadURDF(
|
||||||
pybullet_data.getDataPath() + "/spot.urdf",
|
self._urdf_root + "/spot.urdf",
|
||||||
init_position,
|
init_position,
|
||||||
INIT_ORIENTATION,
|
INIT_ORIENTATION,
|
||||||
useFixedBase=self._on_rack,
|
useFixedBase=self._on_rack,
|
||||||
|
|||||||
@@ -1,55 +1,27 @@
|
|||||||
import copy
|
|
||||||
from os import sys
|
from os import sys
|
||||||
|
|
||||||
sys.path.append('../')
|
sys.path.append('../')
|
||||||
|
|
||||||
from raspberry_pi.src.spot import Spot
|
from raspberry_pi.src.spot import Spot
|
||||||
from raspberry_pi.src.Kinematics.SpotKinematics import SpotModel
|
from raspberry_pi.src.Kinematics.SpotKinematics import SpotModel
|
||||||
from raspberry_pi.src.IMU import IMU
|
from raspberry_pi.src.IMU.IMU import IMU
|
||||||
|
|
||||||
from GymEnvs.spot_bezier_env import spotBezierEnv
|
# from GymEnvs.spot_bezier_env import spotBezierEnv
|
||||||
from util.gui import GUI
|
from simulator import Simulator
|
||||||
|
|
||||||
kinematics = SpotModel()
|
|
||||||
imu = IMU()
|
|
||||||
env = spotBezierEnv(
|
|
||||||
render=True,
|
|
||||||
on_rack=False,
|
|
||||||
height_field=False,
|
|
||||||
draw_foot_path=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
state = env.reset()
|
|
||||||
dt = 0.01
|
|
||||||
gui = GUI(env.spot.quadruped)
|
|
||||||
action = env.action_space.sample()
|
|
||||||
|
|
||||||
spot = Spot(kinematics, imu)
|
|
||||||
|
|
||||||
def step(state):
|
|
||||||
# handle_inputs()
|
|
||||||
gui.UserInput(spot.body_state, spot.gait_state)
|
|
||||||
spot.gait_state.contacts = state[-4:]
|
|
||||||
spot.body_state.worldFeetPositions = copy.deepcopy(spot.kinematic.WorldToFoot)
|
|
||||||
|
|
||||||
spot.gait_state.update_gait_state(dt)
|
|
||||||
|
|
||||||
# Generate next feet positions
|
|
||||||
spot.gait_controller.generate_trajectory(spot.body_state, spot.gait_state, dt)
|
|
||||||
|
|
||||||
# Calculate next joint angles
|
|
||||||
joint_angles = spot.joint_angles()
|
|
||||||
|
|
||||||
# Update environment
|
|
||||||
env.pass_joint_angles(joint_angles.reshape(-1))
|
|
||||||
state, _, done, _ = env.step(action)
|
|
||||||
if done:
|
|
||||||
print("DONE")
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
while True:
|
kinematics = SpotModel()
|
||||||
done = step(state)
|
imu = IMU()
|
||||||
if done:
|
# env = spotBezierEnv(
|
||||||
env.close()
|
# render=True,
|
||||||
|
# on_rack=False,
|
||||||
|
# height_field=False,
|
||||||
|
# draw_foot_path=False,
|
||||||
|
# )
|
||||||
|
|
||||||
|
spot = Spot(kinematics, imu)
|
||||||
|
|
||||||
|
simulation = Simulator(spot=spot)
|
||||||
|
simulation.run(0.01)
|
||||||
@@ -0,0 +1,57 @@
|
|||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
import pybullet as pb
|
||||||
|
import sys
|
||||||
|
|
||||||
|
sys.path.append("../../../")
|
||||||
|
|
||||||
|
from raspberry_pi.src.Camera.CameraBase import CameraBase
|
||||||
|
|
||||||
|
viewMatrix = pb.computeViewMatrix(
|
||||||
|
cameraEyePosition=[0, 0, 1],
|
||||||
|
cameraTargetPosition=[0, 0, 0],
|
||||||
|
cameraUpVector=[0, 1, 0])
|
||||||
|
projectionMatrix = pb.computeProjectionMatrixFOV(
|
||||||
|
fov=160, aspect=1.0, nearVal=0.1, farVal=150.0)
|
||||||
|
|
||||||
|
class PyBulletCamera(CameraBase):
|
||||||
|
def __init__(self, view_matrix=viewMatrix, projection_matrix=projectionMatrix, width=640, height=480):
|
||||||
|
"""
|
||||||
|
Initializes the camera with PyBullet view and projection matrices.
|
||||||
|
:param view_matrix: The view matrix for the camera in PyBullet.
|
||||||
|
:param projection_matrix: The projection matrix for the camera.
|
||||||
|
:param width: Width of the camera image.
|
||||||
|
:param height: Height of the camera image.
|
||||||
|
"""
|
||||||
|
self.view_matrix = view_matrix
|
||||||
|
self.projection_matrix = projection_matrix
|
||||||
|
self.width = width
|
||||||
|
self.height = height
|
||||||
|
|
||||||
|
def get_image(self):
|
||||||
|
"""
|
||||||
|
Captures an image from the PyBullet simulation.
|
||||||
|
:return: An image frame captured from PyBullet.
|
||||||
|
"""
|
||||||
|
# Capture an image from the simulation
|
||||||
|
_, _, px, _, _ = pb.getCameraImage(
|
||||||
|
width=self.width, height=self.height,
|
||||||
|
viewMatrix=self.view_matrix,
|
||||||
|
projectionMatrix=self.projection_matrix)
|
||||||
|
|
||||||
|
# Convert PyBullet's RGBA image to an OpenCV BGR image
|
||||||
|
rgb_image = np.array(px, dtype=np.uint8)
|
||||||
|
rgb_image = np.reshape(rgb_image, (self.height, self.width, -1))
|
||||||
|
bgr_image = cv2.cvtColor(rgb_image, cv2.COLOR_RGBA2BGR)
|
||||||
|
|
||||||
|
return bgr_image
|
||||||
|
|
||||||
|
def update_view_matrix(self, model_id):
|
||||||
|
position, orientation = pb.getBasePositionAndOrientation(model_id)
|
||||||
|
position = [position[0], position[1]-0.1, position[2]]
|
||||||
|
orientation = [orientation[0], orientation[1], orientation[2]]
|
||||||
|
|
||||||
|
# Calculate the camera's view matrix based on the new position and orientation
|
||||||
|
camera_eye_pos = position # This should be adjusted based on your specific relative position requirements
|
||||||
|
camera_target_pos = [position[0] + orientation[0], position[1] + orientation[1], position[2] + orientation[2]]
|
||||||
|
self.view_matrix = pb.computeViewMatrix(camera_eye_pos, camera_target_pos, [0, 1, 0])
|
||||||
@@ -0,0 +1,44 @@
|
|||||||
|
import copy
|
||||||
|
from os import sys
|
||||||
|
|
||||||
|
sys.path.append('../../')
|
||||||
|
|
||||||
|
from .util.gui import GUI
|
||||||
|
|
||||||
|
|
||||||
|
class Simulator():
|
||||||
|
def __init__(self, env, spot) -> None:
|
||||||
|
self.env = env
|
||||||
|
self.state = env.reset()
|
||||||
|
self.gui = GUI(env.spot.quadruped)
|
||||||
|
self.action = env.action_space.sample()
|
||||||
|
|
||||||
|
self.spot = spot
|
||||||
|
|
||||||
|
def step(self, dt):
|
||||||
|
# handle_inputs()
|
||||||
|
self.gui.UserInput(self.spot.body_state, self.spot.gait_state)
|
||||||
|
self.spot.gait_state.contacts = self.state[-4:]
|
||||||
|
self.spot.body_state.worldFeetPositions = copy.deepcopy(self.spot.kinematic.WorldToFoot)
|
||||||
|
|
||||||
|
self.spot.gait_state.update_gait_state(dt)
|
||||||
|
|
||||||
|
# Generate next feet positions
|
||||||
|
self.spot.gait_controller.generate_trajectory(self.spot.body_state, self.spot.gait_state, dt)
|
||||||
|
|
||||||
|
# Calculate next joint angles
|
||||||
|
joint_angles = self.spot.joint_angles()
|
||||||
|
|
||||||
|
# Update environment
|
||||||
|
self.env.pass_joint_angles(joint_angles.reshape(-1))
|
||||||
|
self.state, _, done, _ = self.env.step(self.action)
|
||||||
|
if done:
|
||||||
|
print("DONE")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def run(self, dt):
|
||||||
|
while True:
|
||||||
|
done = self.step(dt)
|
||||||
|
if done:
|
||||||
|
self.env.close()
|
||||||
|
|
||||||
Reference in New Issue
Block a user