Refactors simulation an raspberry pi project
This commit is contained in:
@@ -0,0 +1,27 @@
|
||||
"""Abstract base class for environment randomizer.
|
||||
Source: https://github.com/bulletphysics/bullet3/blob/master/examples/pybullet/gym/pybullet_envs/minitaur/envs/env_randomizer_base.py
|
||||
"""
|
||||
|
||||
|
||||
import abc
|
||||
|
||||
|
||||
class EnvRandomizerBase(object):
|
||||
"""Abstract base class for environment randomizer.
|
||||
|
||||
An EnvRandomizer is called in environment.reset(). It will
|
||||
randomize physical parameters of the objects in the simulation.
|
||||
The physical parameters will be fixed for that episode and be
|
||||
randomized again in the next environment.reset().
|
||||
"""
|
||||
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
@abc.abstractmethod
|
||||
def randomize_env(self, env):
|
||||
"""Randomize the simulated_objects in the environment.
|
||||
|
||||
Args:
|
||||
env: The environment to be randomized.
|
||||
"""
|
||||
pass
|
||||
@@ -0,0 +1,138 @@
|
||||
"""
|
||||
CODE BASED ON EXAMPLE FROM:
|
||||
@misc{coumans2017pybullet,
|
||||
title={Pybullet, a python module for physics simulation in robotics, games and machine learning},
|
||||
author={Coumans, Erwin and Bai, Yunfei},
|
||||
url={www.pybullet.org},
|
||||
year={2017},
|
||||
}
|
||||
|
||||
Example: heightfield.py
|
||||
https://github.com/bulletphysics/bullet3/blob/master/examples/pybullet/examples/heightfield.py
|
||||
"""
|
||||
|
||||
import pybullet as p
|
||||
import pybullet_data as pd
|
||||
|
||||
textureId = -1
|
||||
|
||||
useProgrammatic = 0
|
||||
useTerrainFromPNG = 1
|
||||
useDeepLocoCSV = 2
|
||||
updateHeightfield = False
|
||||
|
||||
heightfieldSource = useProgrammatic
|
||||
numHeightfieldRows = 256
|
||||
numHeightfieldColumns = 256
|
||||
import random
|
||||
random.seed(10)
|
||||
|
||||
|
||||
class HeightField():
|
||||
def __init__(self):
|
||||
self.hf_id = 0
|
||||
self.terrainShape = 0
|
||||
self.heightfieldData = [0] * numHeightfieldRows * numHeightfieldColumns
|
||||
|
||||
def _generate_field(self, env, heightPerturbationRange=0.08):
|
||||
|
||||
env.pybullet_client.setAdditionalSearchPath(pd.getDataPath())
|
||||
|
||||
env.pybullet_client.configureDebugVisualizer(
|
||||
env.pybullet_client.COV_ENABLE_RENDERING, 0)
|
||||
heightPerturbationRange = heightPerturbationRange
|
||||
if heightfieldSource == useProgrammatic:
|
||||
for j in range(int(numHeightfieldColumns / 2)):
|
||||
for i in range(int(numHeightfieldRows / 2)):
|
||||
height = random.uniform(0, heightPerturbationRange)
|
||||
self.heightfieldData[2 * i +
|
||||
2 * j * numHeightfieldRows] = height
|
||||
self.heightfieldData[2 * i + 1 +
|
||||
2 * j * numHeightfieldRows] = height
|
||||
self.heightfieldData[2 * i + (2 * j + 1) *
|
||||
numHeightfieldRows] = height
|
||||
self.heightfieldData[2 * i + 1 + (2 * j + 1) *
|
||||
numHeightfieldRows] = height
|
||||
|
||||
terrainShape = env.pybullet_client.createCollisionShape(
|
||||
shapeType=env.pybullet_client.GEOM_HEIGHTFIELD,
|
||||
meshScale=[.07, .07, 1.6],
|
||||
heightfieldTextureScaling=(numHeightfieldRows - 1) / 2,
|
||||
heightfieldData=self.heightfieldData,
|
||||
numHeightfieldRows=numHeightfieldRows,
|
||||
numHeightfieldColumns=numHeightfieldColumns)
|
||||
terrain = env.pybullet_client.createMultiBody(0, terrainShape)
|
||||
env.pybullet_client.resetBasePositionAndOrientation(
|
||||
terrain, [0, 0, 0.0], [0, 0, 0, 1])
|
||||
env.pybullet_client.changeDynamics(terrain,
|
||||
-1,
|
||||
lateralFriction=1.0)
|
||||
|
||||
if heightfieldSource == useDeepLocoCSV:
|
||||
terrainShape = env.pybullet_client.createCollisionShape(
|
||||
shapeType=env.pybullet_client.GEOM_HEIGHTFIELD,
|
||||
meshScale=[.5, .5, 2.5],
|
||||
fileName="heightmaps/ground0.txt",
|
||||
heightfieldTextureScaling=128)
|
||||
terrain = env.pybullet_client.createMultiBody(0, terrainShape)
|
||||
env.pybullet_client.resetBasePositionAndOrientation(
|
||||
terrain, [0, 0, 0], [0, 0, 0, 1])
|
||||
env.pybullet_client.changeDynamics(terrain,
|
||||
-1,
|
||||
lateralFriction=1.0)
|
||||
|
||||
if heightfieldSource == useTerrainFromPNG:
|
||||
terrainShape = env.pybullet_client.createCollisionShape(
|
||||
shapeType=env.pybullet_client.GEOM_HEIGHTFIELD,
|
||||
meshScale=[.05, .05, 1.8],
|
||||
fileName="heightmaps/wm_height_out.png")
|
||||
textureId = env.pybullet_client.loadTexture(
|
||||
"heightmaps/gimp_overlay_out.png")
|
||||
terrain = env.pybullet_client.createMultiBody(0, terrainShape)
|
||||
env.pybullet_client.changeVisualShape(terrain,
|
||||
-1,
|
||||
textureUniqueId=textureId)
|
||||
env.pybullet_client.resetBasePositionAndOrientation(
|
||||
terrain, [0, 0, 0.1], [0, 0, 0, 1])
|
||||
env.pybullet_client.changeDynamics(terrain,
|
||||
-1,
|
||||
lateralFriction=1.0)
|
||||
|
||||
self.hf_id = terrainShape
|
||||
self.terrainShape = terrainShape
|
||||
print("TERRAIN SHAPE: {}".format(terrainShape))
|
||||
|
||||
env.pybullet_client.changeVisualShape(terrain,
|
||||
-1,
|
||||
rgbaColor=[1, 1, 1, 1])
|
||||
|
||||
env.pybullet_client.configureDebugVisualizer(
|
||||
env.pybullet_client.COV_ENABLE_RENDERING, 1)
|
||||
|
||||
def UpdateHeightField(self, heightPerturbationRange=0.08):
|
||||
if heightfieldSource == useProgrammatic:
|
||||
for j in range(int(numHeightfieldColumns / 2)):
|
||||
for i in range(int(numHeightfieldRows / 2)):
|
||||
height = random.uniform(
|
||||
0, heightPerturbationRange) # +math.sin(time.time())
|
||||
self.heightfieldData[2 * i +
|
||||
2 * j * numHeightfieldRows] = height
|
||||
self.heightfieldData[2 * i + 1 +
|
||||
2 * j * numHeightfieldRows] = height
|
||||
self.heightfieldData[2 * i + (2 * j + 1) *
|
||||
numHeightfieldRows] = height
|
||||
self.heightfieldData[2 * i + 1 + (2 * j + 1) *
|
||||
numHeightfieldRows] = height
|
||||
#GEOM_CONCAVE_INTERNAL_EDGE may help avoid getting stuck at an internal (shared) edge of the triangle/heightfield.
|
||||
#GEOM_CONCAVE_INTERNAL_EDGE is a bit slower to build though.
|
||||
#flags = p.GEOM_CONCAVE_INTERNAL_EDGE
|
||||
flags = 0
|
||||
self.terrainShape = p.createCollisionShape(
|
||||
shapeType=p.GEOM_HEIGHTFIELD,
|
||||
flags=flags,
|
||||
meshScale=[.05, .05, 1],
|
||||
heightfieldTextureScaling=(numHeightfieldRows - 1) / 2,
|
||||
heightfieldData=self.heightfieldData,
|
||||
numHeightfieldRows=numHeightfieldRows,
|
||||
numHeightfieldColumns=numHeightfieldColumns,
|
||||
replaceHeightfieldIndex=self.terrainShape)
|
||||
@@ -0,0 +1,266 @@
|
||||
""" This file implements the gym environment of SpotMicro with Bezier Curve.
|
||||
"""
|
||||
|
||||
import math
|
||||
import time
|
||||
import numpy as np
|
||||
import pybullet_data
|
||||
from gym import spaces
|
||||
import spot
|
||||
from gym.envs.registration import register
|
||||
|
||||
from simulation.GymEnvs.spot_gym_env import spotGymEnv
|
||||
|
||||
SENSOR_NOISE_STDDEV = spot.SENSOR_NOISE_STDDEV
|
||||
|
||||
# Register as OpenAI Gym Environment
|
||||
register(
|
||||
id="SpotMicroEnv-v1",
|
||||
entry_point="spotmicro.GymEnvs.spot_bezier_env:spotBezierEnv",
|
||||
max_episode_steps=1000,
|
||||
)
|
||||
|
||||
|
||||
class spotBezierEnv(spotGymEnv):
|
||||
"""The gym environment for spot.
|
||||
|
||||
It simulates the locomotion of spot, a quadruped robot. The state space
|
||||
include the angles, velocities and torques for all the motors and the action
|
||||
space is the desired motor angle for each motor. The reward function is based
|
||||
on how far spot walks in 1000 steps and penalizes the energy
|
||||
expenditure.
|
||||
|
||||
"""
|
||||
|
||||
metadata = {"render.modes": ["human", "rgb_array"], "video.frames_per_second": 50}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
distance_weight=1.0,
|
||||
rotation_weight=0.0,
|
||||
energy_weight=0.000,
|
||||
shake_weight=0.00,
|
||||
drift_weight=0.0,
|
||||
rp_weight=10.0,
|
||||
rate_weight=0.03,
|
||||
urdf_root=pybullet_data.getDataPath(),
|
||||
urdf_version=None,
|
||||
distance_limit=float("inf"),
|
||||
observation_noise_stdev=SENSOR_NOISE_STDDEV,
|
||||
self_collision_enabled=True,
|
||||
motor_velocity_limit=np.inf,
|
||||
pd_control_enabled=False,
|
||||
leg_model_enabled=False,
|
||||
accurate_motor_model_enabled=False,
|
||||
remove_default_joint_damping=False,
|
||||
motor_kp=2.0,
|
||||
motor_kd=0.03,
|
||||
control_latency=0.0,
|
||||
pd_latency=0.0,
|
||||
torque_control_enabled=False,
|
||||
motor_overheat_protection=False,
|
||||
hard_reset=False,
|
||||
on_rack=False,
|
||||
render=True,
|
||||
num_steps_to_log=1000,
|
||||
action_repeat=1,
|
||||
control_time_step=None,
|
||||
forward_reward_cap=float("inf"),
|
||||
reflection=True,
|
||||
log_path=None,
|
||||
desired_velocity=0.5,
|
||||
desired_rate=0.0,
|
||||
lateral=False,
|
||||
draw_foot_path=False,
|
||||
height_field=False,
|
||||
AutoStepper=True,
|
||||
action_dim=14,
|
||||
contacts=True,
|
||||
):
|
||||
|
||||
super(spotBezierEnv, self).__init__(
|
||||
distance_weight=distance_weight,
|
||||
rotation_weight=rotation_weight,
|
||||
energy_weight=energy_weight,
|
||||
shake_weight=shake_weight,
|
||||
drift_weight=drift_weight,
|
||||
rp_weight=rp_weight,
|
||||
rate_weight=rate_weight,
|
||||
urdf_root=urdf_root,
|
||||
urdf_version=urdf_version,
|
||||
distance_limit=distance_limit,
|
||||
observation_noise_stdev=observation_noise_stdev,
|
||||
self_collision_enabled=self_collision_enabled,
|
||||
motor_velocity_limit=motor_velocity_limit,
|
||||
pd_control_enabled=pd_control_enabled,
|
||||
leg_model_enabled=leg_model_enabled,
|
||||
accurate_motor_model_enabled=accurate_motor_model_enabled,
|
||||
remove_default_joint_damping=remove_default_joint_damping,
|
||||
motor_kp=motor_kp,
|
||||
motor_kd=motor_kd,
|
||||
control_latency=control_latency,
|
||||
pd_latency=pd_latency,
|
||||
torque_control_enabled=torque_control_enabled,
|
||||
motor_overheat_protection=motor_overheat_protection,
|
||||
hard_reset=hard_reset,
|
||||
on_rack=on_rack,
|
||||
render=render,
|
||||
num_steps_to_log=num_steps_to_log,
|
||||
action_repeat=action_repeat,
|
||||
control_time_step=control_time_step,
|
||||
forward_reward_cap=forward_reward_cap,
|
||||
reflection=reflection,
|
||||
log_path=log_path,
|
||||
desired_velocity=desired_velocity,
|
||||
desired_rate=desired_rate,
|
||||
lateral=lateral,
|
||||
draw_foot_path=draw_foot_path,
|
||||
height_field=height_field,
|
||||
AutoStepper=AutoStepper,
|
||||
contacts=contacts,
|
||||
)
|
||||
|
||||
# Residuals + Clearance Height + Penetration Depth
|
||||
action_high = np.array([self._action_bound] * action_dim)
|
||||
self.action_space = spaces.Box(-action_high, action_high)
|
||||
print("Action SPACE: {}".format(self.action_space))
|
||||
|
||||
self.prev_pos = np.array([0.0, 0.0, 0.0])
|
||||
|
||||
self.yaw = 0.0
|
||||
|
||||
def pass_joint_angles(self, ja):
|
||||
"""For executing joint angles"""
|
||||
self.ja = ja
|
||||
|
||||
def step(self, action):
|
||||
"""Step forward the simulation, given the action.
|
||||
|
||||
Args:
|
||||
action: A list of desired motor angles for eight motors.
|
||||
smach: the bezier state machine containing simulated
|
||||
random controll inputs
|
||||
|
||||
Returns:
|
||||
observations: The angles, velocities and torques of all motors.
|
||||
reward: The reward for the current state-action pair.
|
||||
done: Whether the episode has ended.
|
||||
info: A dictionary that stores diagnostic information.
|
||||
|
||||
Raises:
|
||||
ValueError: The action dimension is not the same as the number of motors.
|
||||
ValueError: The magnitude of actions is out of bounds.
|
||||
"""
|
||||
# Discard all but joint angles
|
||||
action = self.ja
|
||||
|
||||
self._last_base_position = self.spot.GetBasePosition()
|
||||
self._last_base_orientation = self.spot.GetBaseOrientation()
|
||||
# print("ACTION:")
|
||||
# print(action)
|
||||
if self._is_render:
|
||||
# Sleep, otherwise the computation takes less time than real time,
|
||||
# which will make the visualization like a fast-forward video.
|
||||
time_spent = time.time() - self._last_frame_time
|
||||
self._last_frame_time = time.time()
|
||||
time_to_sleep = self.control_time_step - time_spent
|
||||
if time_to_sleep > 0:
|
||||
time.sleep(time_to_sleep)
|
||||
base_pos = self.spot.GetBasePosition()
|
||||
# Keep the previous orientation of the camera set by the user.
|
||||
[yaw, pitch, dist] = self._pybullet_client.getDebugVisualizerCamera()[8:11]
|
||||
self._pybullet_client.resetDebugVisualizerCamera(dist, yaw, pitch, base_pos)
|
||||
|
||||
action = self._transform_action_to_motor_command(action)
|
||||
self.spot.Step(action)
|
||||
# NOTE: SMACH is passed to the reward method
|
||||
reward = self._reward()
|
||||
done = self._termination()
|
||||
self._env_step_counter += 1
|
||||
|
||||
# DRAW FOOT PATH
|
||||
if self.draw_foot_path:
|
||||
self.DrawFootPath()
|
||||
|
||||
return np.array(self._get_observation()), reward, done, {}
|
||||
|
||||
def return_state(self):
|
||||
return np.array(self._get_observation())
|
||||
|
||||
def return_yaw(self):
|
||||
return self.yaw
|
||||
|
||||
def _reward(self):
|
||||
# get observation
|
||||
obs = self._get_observation()
|
||||
|
||||
orn = self.spot.GetBaseOrientation()
|
||||
|
||||
# Return StepVelocity with the sign of StepLength
|
||||
DesiredVelicty = math.copysign(
|
||||
self.spot.StepVelocity / 4.0, self.spot.StepLength
|
||||
)
|
||||
|
||||
fwd_speed = self.spot.prev_lin_twist[0] # vx
|
||||
lat_speed = self.spot.prev_lin_twist[1] # vy
|
||||
|
||||
# DEBUG
|
||||
lt, at = self.spot.GetBaseTwist()
|
||||
|
||||
# ONLY WORKS FOR MOVING PURELY FORWARD
|
||||
pos = self.spot.GetBasePosition()
|
||||
|
||||
forward_reward = pos[0] - self.prev_pos[0]
|
||||
|
||||
# yaw_rate = obs[4]
|
||||
|
||||
rot_reward = 0.0
|
||||
|
||||
roll, pitch, yaw = self._pybullet_client.getEulerFromQuaternion(
|
||||
[orn[0], orn[1], orn[2], orn[3]]
|
||||
)
|
||||
|
||||
# if yaw < 0.0:
|
||||
# yaw += np.pi
|
||||
# else:
|
||||
# yaw -= np.pi
|
||||
|
||||
# For auto correct
|
||||
self.yaw = yaw
|
||||
|
||||
# penalty for nonzero PITCH and YAW(hidden) ONLY
|
||||
# NOTE: Added Yaw mult
|
||||
rp_reward = -(abs(obs[0]) + abs(obs[1]))
|
||||
|
||||
# print("YAW: {}".format(yaw))
|
||||
# print("RP RWD: {:.2f}".format(rp_reward))
|
||||
# print("ROLL: {} \t PITCH: {}".format(obs[0], obs[1]))
|
||||
|
||||
# penalty for nonzero acc(z) - UNRELIABLE ON IMU
|
||||
shake_reward = 0
|
||||
|
||||
# penalty for nonzero rate (x,y,z)
|
||||
rate_reward = -(abs(obs[2]) + abs(obs[3]))
|
||||
|
||||
# print("RATES: {}".format(obs[2:5]))
|
||||
|
||||
drift_reward = -abs(pos[1])
|
||||
energy_reward = (
|
||||
-np.abs(np.dot(self.spot.GetMotorTorques(), self.spot.GetMotorVelocities()))
|
||||
* self._time_step
|
||||
)
|
||||
reward = (
|
||||
self._distance_weight * forward_reward
|
||||
+ self._rotation_weight * rot_reward
|
||||
+ self._energy_weight * energy_reward
|
||||
+ self._drift_weight * drift_reward
|
||||
+ self._shake_weight * shake_reward
|
||||
+ self._rp_weight * rp_reward
|
||||
+ self._rate_weight * rate_reward
|
||||
)
|
||||
self._objectives.append(
|
||||
[forward_reward, energy_reward, drift_reward, shake_reward]
|
||||
)
|
||||
# print("REWARD: ", reward)
|
||||
# NOTE: return yaw for automatic correction (not part of RL)
|
||||
return reward
|
||||
@@ -0,0 +1,90 @@
|
||||
"""
|
||||
CODE BASED ON EXAMPLE FROM:
|
||||
@misc{coumans2017pybullet,
|
||||
title={Pybullet, a python module for physics simulation in robotics, games and machine learning},
|
||||
author={Coumans, Erwin and Bai, Yunfei},
|
||||
url={www.pybullet.org},
|
||||
year={2017},
|
||||
}
|
||||
|
||||
Example: minitaur_env_randomizer.py
|
||||
https://github.com/bulletphysics/bullet3/blob/master/examples/pybullet/gym/pybullet_envs/minitaur/envs/env_randomizers/minitaur_env_randomizer.py
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import simulation.GymEnvs.env_randomizer_base as env_randomizer_base
|
||||
|
||||
# Relative range.
|
||||
spot_BASE_MASS_ERROR_RANGE = (-0.2, 0.2) # 0.2 means 20%
|
||||
spot_LEG_MASS_ERROR_RANGE = (-0.2, 0.2) # 0.2 means 20%
|
||||
# Absolute range.
|
||||
BATTERY_VOLTAGE_RANGE = (7.0, 8.4) # Unit: Volt
|
||||
MOTOR_VISCOUS_DAMPING_RANGE = (0, 0.01) # Unit: N*m*s/rad (torque/angular vel)
|
||||
spot_LEG_FRICTION = (0.8, 1.5) # Unit: dimensionless
|
||||
|
||||
|
||||
class SpotEnvRandomizer(env_randomizer_base.EnvRandomizerBase):
|
||||
"""A randomizer that change the spot_gym_env during every reset."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
spot_base_mass_err_range=spot_BASE_MASS_ERROR_RANGE,
|
||||
spot_leg_mass_err_range=spot_LEG_MASS_ERROR_RANGE,
|
||||
battery_voltage_range=BATTERY_VOLTAGE_RANGE,
|
||||
motor_viscous_damping_range=MOTOR_VISCOUS_DAMPING_RANGE,
|
||||
):
|
||||
self._spot_base_mass_err_range = spot_base_mass_err_range
|
||||
self._spot_leg_mass_err_range = spot_leg_mass_err_range
|
||||
self._battery_voltage_range = battery_voltage_range
|
||||
self._motor_viscous_damping_range = motor_viscous_damping_range
|
||||
|
||||
np.random.seed(0)
|
||||
|
||||
def randomize_env(self, env):
|
||||
self._randomize_spot(env.spot)
|
||||
|
||||
def _randomize_spot(self, spot):
|
||||
"""Randomize various physical properties of spot.
|
||||
|
||||
It randomizes the mass/inertia of the base, mass/inertia of the legs,
|
||||
friction coefficient of the feet, the battery voltage and the motor damping
|
||||
at each reset() of the environment.
|
||||
|
||||
Args:
|
||||
spot: the spot instance in spot_gym_env environment.
|
||||
"""
|
||||
base_mass = spot.GetBaseMassFromURDF()
|
||||
# print("BM: ", base_mass)
|
||||
randomized_base_mass = np.random.uniform(
|
||||
np.array([base_mass]) * (1.0 + self._spot_base_mass_err_range[0]),
|
||||
np.array([base_mass]) * (1.0 + self._spot_base_mass_err_range[1]),
|
||||
)
|
||||
spot.SetBaseMass(randomized_base_mass[0])
|
||||
|
||||
leg_masses = spot.GetLegMassesFromURDF()
|
||||
leg_masses_lower_bound = np.array(leg_masses) * (
|
||||
1.0 + self._spot_leg_mass_err_range[0]
|
||||
)
|
||||
leg_masses_upper_bound = np.array(leg_masses) * (
|
||||
1.0 + self._spot_leg_mass_err_range[1]
|
||||
)
|
||||
randomized_leg_masses = [
|
||||
np.random.uniform(leg_masses_lower_bound[i], leg_masses_upper_bound[i])
|
||||
for i in range(len(leg_masses))
|
||||
]
|
||||
spot.SetLegMasses(randomized_leg_masses)
|
||||
|
||||
randomized_battery_voltage = np.random.uniform(
|
||||
BATTERY_VOLTAGE_RANGE[0], BATTERY_VOLTAGE_RANGE[1]
|
||||
)
|
||||
spot.SetBatteryVoltage(randomized_battery_voltage)
|
||||
|
||||
randomized_motor_damping = np.random.uniform(
|
||||
MOTOR_VISCOUS_DAMPING_RANGE[0], MOTOR_VISCOUS_DAMPING_RANGE[1]
|
||||
)
|
||||
spot.SetMotorViscousDamping(randomized_motor_damping)
|
||||
|
||||
randomized_foot_friction = np.random.uniform(
|
||||
spot_LEG_FRICTION[0], spot_LEG_FRICTION[1]
|
||||
)
|
||||
spot.SetFootFriction(randomized_foot_friction)
|
||||
@@ -0,0 +1,785 @@
|
||||
"""
|
||||
CODE BASED ON EXAMPLE FROM:
|
||||
@misc{coumans2017pybullet,
|
||||
title={Pybullet, a python module for physics simulation in robotics, games and machine learning},
|
||||
author={Coumans, Erwin and Bai, Yunfei},
|
||||
url={www.pybullet.org},
|
||||
year={2017},
|
||||
}
|
||||
|
||||
Example: minitaur_gym_env.py
|
||||
https://github.com/bulletphysics/bullet3/blob/master/examples/pybullet/gym/pybullet_envs/minitaur/envs/minitaur_gym_env.py
|
||||
"""
|
||||
|
||||
import math
|
||||
import time
|
||||
import gym
|
||||
import numpy as np
|
||||
import pybullet
|
||||
import pybullet_data
|
||||
from gym import spaces
|
||||
from gym.utils import seeding
|
||||
from pkg_resources import parse_version
|
||||
import spot
|
||||
import pybullet_utils.bullet_client as bullet_client
|
||||
from gym.envs.registration import register
|
||||
from simulation.GymEnvs.heightfield import HeightField
|
||||
from OpenLoopSM.SpotOL import BezierStepper
|
||||
from raspberry_pi.src.Kinematics import LieAlgebra as LA
|
||||
from simulation.GymEnvs.spot_env_randomizer import SpotEnvRandomizer
|
||||
|
||||
NUM_SUBSTEPS = 5
|
||||
NUM_MOTORS = 12
|
||||
MOTOR_ANGLE_OBSERVATION_INDEX = 0
|
||||
MOTOR_VELOCITY_OBSERVATION_INDEX = MOTOR_ANGLE_OBSERVATION_INDEX + NUM_MOTORS
|
||||
MOTOR_TORQUE_OBSERVATION_INDEX = MOTOR_VELOCITY_OBSERVATION_INDEX + NUM_MOTORS
|
||||
BASE_ORIENTATION_OBSERVATION_INDEX = MOTOR_TORQUE_OBSERVATION_INDEX + NUM_MOTORS
|
||||
ACTION_EPS = 0.01
|
||||
OBSERVATION_EPS = 0.01
|
||||
RENDER_HEIGHT = 720
|
||||
RENDER_WIDTH = 960
|
||||
SENSOR_NOISE_STDDEV = spot.SENSOR_NOISE_STDDEV
|
||||
DEFAULT_URDF_VERSION = "default"
|
||||
NUM_SIMULATION_ITERATION_STEPS = 1000
|
||||
|
||||
spot_URDF_VERSION_MAP = {DEFAULT_URDF_VERSION: spot.Spot}
|
||||
|
||||
# Register as OpenAI Gym Environment
|
||||
register(
|
||||
id="SpotMicroEnv-v0",
|
||||
entry_point="spotmicro.spot_gym_env:spotGymEnv",
|
||||
max_episode_steps=1000,
|
||||
)
|
||||
|
||||
|
||||
def convert_to_list(obj):
|
||||
try:
|
||||
iter(obj)
|
||||
return obj
|
||||
except TypeError:
|
||||
return [obj]
|
||||
|
||||
|
||||
class spotGymEnv(gym.Env):
|
||||
"""The gym environment for spot.
|
||||
|
||||
It simulates the locomotion of spot, a quadruped robot. The state space
|
||||
include the angles, velocities and torques for all the motors and the action
|
||||
space is the desired motor angle for each motor. The reward function is based
|
||||
on how far spot walks in 1000 steps and penalizes the energy
|
||||
expenditure.
|
||||
|
||||
"""
|
||||
|
||||
metadata = {"render.modes": ["human", "rgb_array"], "video.frames_per_second": 50}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
distance_weight=1.0,
|
||||
rotation_weight=1.0,
|
||||
energy_weight=0.0005,
|
||||
shake_weight=0.005,
|
||||
drift_weight=2.0,
|
||||
rp_weight=0.1,
|
||||
rate_weight=0.1,
|
||||
urdf_root=pybullet_data.getDataPath(),
|
||||
urdf_version=None,
|
||||
distance_limit=float("inf"),
|
||||
observation_noise_stdev=SENSOR_NOISE_STDDEV,
|
||||
self_collision_enabled=True,
|
||||
motor_velocity_limit=np.inf,
|
||||
pd_control_enabled=False,
|
||||
leg_model_enabled=False,
|
||||
accurate_motor_model_enabled=False,
|
||||
remove_default_joint_damping=False,
|
||||
motor_kp=2.0,
|
||||
motor_kd=0.03,
|
||||
control_latency=0.0,
|
||||
pd_latency=0.0,
|
||||
torque_control_enabled=False,
|
||||
motor_overheat_protection=False,
|
||||
hard_reset=False,
|
||||
on_rack=False,
|
||||
render=True,
|
||||
num_steps_to_log=1000,
|
||||
action_repeat=1,
|
||||
control_time_step=None,
|
||||
forward_reward_cap=float("inf"),
|
||||
reflection=True,
|
||||
log_path=None,
|
||||
desired_velocity=0.5,
|
||||
desired_rate=0.0,
|
||||
lateral=False,
|
||||
draw_foot_path=False,
|
||||
height_field=False,
|
||||
height_field_iters=2,
|
||||
AutoStepper=False,
|
||||
contacts=True,
|
||||
):
|
||||
"""Initialize the spot gym environment.
|
||||
|
||||
Args:
|
||||
urdf_root: The path to the urdf data folder.
|
||||
urdf_version: [DEFAULT_URDF_VERSION] are allowable
|
||||
versions. If None, DEFAULT_URDF_VERSION is used.
|
||||
distance_weight: The weight of the distance term in the reward.
|
||||
energy_weight: The weight of the energy term in the reward.
|
||||
shake_weight: The weight of the vertical shakiness term in the reward.
|
||||
drift_weight: The weight of the sideways drift term in the reward.
|
||||
distance_limit: The maximum distance to terminate the episode.
|
||||
observation_noise_stdev: The standard deviation of observation noise.
|
||||
self_collision_enabled: Whether to enable self collision in the sim.
|
||||
motor_velocity_limit: The velocity limit of each motor.
|
||||
pd_control_enabled: Whether to use PD controller for each motor.
|
||||
leg_model_enabled: Whether to use a leg motor to reparameterize the action
|
||||
space.
|
||||
accurate_motor_model_enabled: Whether to use the accurate DC motor model.
|
||||
remove_default_joint_damping: Whether to remove the default joint damping.
|
||||
motor_kp: proportional gain for the accurate motor model.
|
||||
motor_kd: derivative gain for the accurate motor model.
|
||||
control_latency: It is the delay in the controller between when an
|
||||
observation is made at some point, and when that reading is reported
|
||||
back to the Neural Network.
|
||||
pd_latency: latency of the PD controller loop. PD calculates PWM based on
|
||||
the motor angle and velocity. The latency measures the time between when
|
||||
the motor angle and velocity are observed on the microcontroller and
|
||||
when the true state happens on the motor. It is typically (0.001-
|
||||
0.002s).
|
||||
torque_control_enabled: Whether to use the torque control, if set to
|
||||
False, pose control will be used.
|
||||
motor_overheat_protection: Whether to shutdown the motor that has exerted
|
||||
large torque (OVERHEAT_SHUTDOWN_TORQUE) for an extended amount of time
|
||||
(OVERHEAT_SHUTDOWN_TIME). See ApplyAction() in spot.py for more
|
||||
details.
|
||||
hard_reset: Whether to wipe the simulation and load everything when reset
|
||||
is called. If set to false, reset just place spot back to start
|
||||
position and set its pose to initial configuration.
|
||||
on_rack: Whether to place spot on rack. This is only used to debug
|
||||
the walking gait. In this mode, spot's base is hanged midair so
|
||||
that its walking gait is clearer to visualize.
|
||||
render: Whether to render the simulation.
|
||||
num_steps_to_log: The max number of control steps in one episode that will
|
||||
be logged. If the number of steps is more than num_steps_to_log, the
|
||||
environment will still be running, but only first num_steps_to_log will
|
||||
be recorded in logging.
|
||||
action_repeat: The number of simulation steps before actions are applied.
|
||||
control_time_step: The time step between two successive control signals.
|
||||
env_randomizer: An instance (or a list) of EnvRandomizer(s). An
|
||||
EnvRandomizer may randomize the physical property of spot, change
|
||||
the terrrain during reset(), or add perturbation forces during step().
|
||||
forward_reward_cap: The maximum value that forward reward is capped at.
|
||||
Disabled (Inf) by default.
|
||||
log_path: The path to write out logs. For the details of logging, refer to
|
||||
spot_logging.proto.
|
||||
Raises:
|
||||
ValueError: If the urdf_version is not supported.
|
||||
"""
|
||||
# Sense Contacts
|
||||
self.contacts = contacts
|
||||
# Enable Auto Stepper State Machine
|
||||
self.AutoStepper = AutoStepper
|
||||
# Enable Rough Terrain or Not
|
||||
self.height_field = height_field
|
||||
self.draw_foot_path = draw_foot_path
|
||||
# DRAWING FEET PATH
|
||||
self.prev_feet_path = np.array(
|
||||
[[0.0, 0.0, 0.0], [0.0, 0.0, 0.0], [0.0, 0.0, 0.0], [0.0, 0.0, 0.0]]
|
||||
)
|
||||
|
||||
# CONTROL METRICS
|
||||
self.desired_velocity = desired_velocity
|
||||
self.desired_rate = desired_rate
|
||||
self.lateral = lateral
|
||||
|
||||
# Set up logging.
|
||||
self._log_path = log_path
|
||||
# @TODO fix logging
|
||||
|
||||
# NUM ITERS
|
||||
self._time_step = 0.01
|
||||
self._action_repeat = action_repeat
|
||||
self._num_bullet_solver_iterations = 300
|
||||
self.logging = None
|
||||
if pd_control_enabled or accurate_motor_model_enabled:
|
||||
self._time_step /= NUM_SUBSTEPS
|
||||
self._num_bullet_solver_iterations /= NUM_SUBSTEPS
|
||||
self._action_repeat *= NUM_SUBSTEPS
|
||||
# PD control needs smaller time step for stability.
|
||||
if control_time_step is not None:
|
||||
self.control_time_step = control_time_step
|
||||
else:
|
||||
# Get Control Timestep
|
||||
self.control_time_step = self._time_step * self._action_repeat
|
||||
# TODO: Fix the value of self._num_bullet_solver_iterations.
|
||||
self._num_bullet_solver_iterations = int(
|
||||
NUM_SIMULATION_ITERATION_STEPS / self._action_repeat
|
||||
)
|
||||
|
||||
# URDF
|
||||
self._urdf_root = urdf_root
|
||||
self._self_collision_enabled = self_collision_enabled
|
||||
self._motor_velocity_limit = motor_velocity_limit
|
||||
self._observation = []
|
||||
self._true_observation = []
|
||||
self._objectives = []
|
||||
self._objective_weights = [
|
||||
distance_weight,
|
||||
energy_weight,
|
||||
drift_weight,
|
||||
shake_weight,
|
||||
]
|
||||
self._env_step_counter = 0
|
||||
self._num_steps_to_log = num_steps_to_log
|
||||
self._is_render = render
|
||||
self._last_base_position = [0, 0, 0]
|
||||
self._last_base_orientation = [0, 0, 0, 1]
|
||||
self._distance_weight = distance_weight
|
||||
self._rotation_weight = rotation_weight
|
||||
self._energy_weight = energy_weight
|
||||
self._drift_weight = drift_weight
|
||||
self._shake_weight = shake_weight
|
||||
self._rp_weight = rp_weight
|
||||
self._rate_weight = rate_weight
|
||||
self._distance_limit = distance_limit
|
||||
self._observation_noise_stdev = observation_noise_stdev
|
||||
self._action_bound = 1
|
||||
self._pd_control_enabled = pd_control_enabled
|
||||
self._leg_model_enabled = leg_model_enabled
|
||||
self._accurate_motor_model_enabled = accurate_motor_model_enabled
|
||||
self._remove_default_joint_damping = remove_default_joint_damping
|
||||
self._motor_kp = motor_kp
|
||||
self._motor_kd = motor_kd
|
||||
self._torque_control_enabled = torque_control_enabled
|
||||
self._motor_overheat_protection = motor_overheat_protection
|
||||
self._on_rack = on_rack
|
||||
self._cam_dist = 1.0
|
||||
self._cam_yaw = 0
|
||||
self._cam_pitch = -30
|
||||
self._forward_reward_cap = forward_reward_cap
|
||||
self._hard_reset = True
|
||||
self._last_frame_time = 0.0
|
||||
self._control_latency = control_latency
|
||||
self._pd_latency = pd_latency
|
||||
self._urdf_version = urdf_version
|
||||
self._ground_id = None
|
||||
self._reflection = reflection
|
||||
# @TODO fix logging
|
||||
self._episode_proto = None
|
||||
if self._is_render:
|
||||
self._pybullet_client = bullet_client.BulletClient(
|
||||
connection_mode=pybullet.GUI
|
||||
)
|
||||
else:
|
||||
self._pybullet_client = bullet_client.BulletClient()
|
||||
if self._urdf_version is None:
|
||||
self._urdf_version = DEFAULT_URDF_VERSION
|
||||
self._pybullet_client.setPhysicsEngineParameter(enableConeFriction=0)
|
||||
self.seed()
|
||||
# Only update after HF has been generated
|
||||
self.height_field = False
|
||||
self.reset()
|
||||
observation_high = self.spot.GetObservationUpperBound() + OBSERVATION_EPS
|
||||
observation_low = self.spot.GetObservationLowerBound() - OBSERVATION_EPS
|
||||
action_dim = NUM_MOTORS
|
||||
action_high = np.array([self._action_bound] * action_dim)
|
||||
self.action_space = spaces.Box(-action_high, action_high)
|
||||
self.observation_space = spaces.Box(observation_low, observation_high)
|
||||
self.viewer = None
|
||||
self._hard_reset = hard_reset # This assignment need to be after reset()
|
||||
self.goal_reached = False
|
||||
# Generate HeightField or not
|
||||
self.height_field = height_field
|
||||
self.hf = HeightField()
|
||||
if self.height_field:
|
||||
# Do 3x for extra roughness
|
||||
for i in range(height_field_iters):
|
||||
self.hf._generate_field(self)
|
||||
|
||||
def set_env_randomizer(self, env_randomizer):
|
||||
self._env_randomizer = env_randomizer
|
||||
|
||||
def configure(self, args):
|
||||
self._args = args
|
||||
|
||||
def reset(
|
||||
self,
|
||||
initial_motor_angles=None,
|
||||
reset_duration=1.0,
|
||||
desired_velocity=None,
|
||||
desired_rate=None,
|
||||
):
|
||||
# Use Autostepper
|
||||
if self.AutoStepper:
|
||||
self.StateMachine = BezierStepper(dt=self._time_step)
|
||||
# Shuffle order of states
|
||||
self.StateMachine.reshuffle()
|
||||
|
||||
self._pybullet_client.configureDebugVisualizer(
|
||||
self._pybullet_client.COV_ENABLE_RENDERING, 0
|
||||
)
|
||||
if self._hard_reset:
|
||||
self._pybullet_client.resetSimulation()
|
||||
self._pybullet_client.setPhysicsEngineParameter(
|
||||
numSolverIterations=int(self._num_bullet_solver_iterations)
|
||||
)
|
||||
self._pybullet_client.setTimeStep(self._time_step)
|
||||
self._ground_id = self._pybullet_client.loadURDF(
|
||||
"%s/plane.urdf" % self._urdf_root
|
||||
)
|
||||
if self._reflection:
|
||||
self._pybullet_client.changeVisualShape(
|
||||
self._ground_id, -1, rgbaColor=[1, 1, 1, 0.8]
|
||||
)
|
||||
self._pybullet_client.configureDebugVisualizer(
|
||||
self._pybullet_client.COV_ENABLE_PLANAR_REFLECTION, self._ground_id
|
||||
)
|
||||
self._pybullet_client.setGravity(0, 0, -9.81)
|
||||
acc_motor = self._accurate_motor_model_enabled
|
||||
motor_protect = self._motor_overheat_protection
|
||||
if self._urdf_version not in spot_URDF_VERSION_MAP:
|
||||
raise ValueError(
|
||||
"%s is not a supported urdf_version." % self._urdf_version
|
||||
)
|
||||
else:
|
||||
self.spot = spot_URDF_VERSION_MAP[self._urdf_version](
|
||||
pybullet_client=self._pybullet_client,
|
||||
action_repeat=self._action_repeat,
|
||||
urdf_root=self._urdf_root,
|
||||
time_step=self._time_step,
|
||||
self_collision_enabled=self._self_collision_enabled,
|
||||
motor_velocity_limit=self._motor_velocity_limit,
|
||||
pd_control_enabled=self._pd_control_enabled,
|
||||
accurate_motor_model_enabled=acc_motor,
|
||||
remove_default_joint_damping=self._remove_default_joint_damping,
|
||||
motor_kp=self._motor_kp,
|
||||
motor_kd=self._motor_kd,
|
||||
control_latency=self._control_latency,
|
||||
pd_latency=self._pd_latency,
|
||||
observation_noise_stdev=self._observation_noise_stdev,
|
||||
torque_control_enabled=self._torque_control_enabled,
|
||||
motor_overheat_protection=motor_protect,
|
||||
on_rack=self._on_rack,
|
||||
np_random=self.np_random,
|
||||
contacts=self.contacts,
|
||||
)
|
||||
self.spot.Reset(
|
||||
reload_urdf=False,
|
||||
default_motor_angles=initial_motor_angles,
|
||||
reset_time=reset_duration,
|
||||
)
|
||||
|
||||
if desired_velocity is not None:
|
||||
self.desired_velocity = desired_velocity
|
||||
if desired_rate is not None:
|
||||
self.desired_rate = desired_rate
|
||||
|
||||
self._pybullet_client.setPhysicsEngineParameter(enableConeFriction=0)
|
||||
self._env_step_counter = 0
|
||||
self._last_base_position = [0, 0, 0]
|
||||
self._last_base_orientation = [0, 0, 0, 1]
|
||||
self._objectives = []
|
||||
self._pybullet_client.resetDebugVisualizerCamera(
|
||||
self._cam_dist, self._cam_yaw, self._cam_pitch, [0, 0, 0]
|
||||
)
|
||||
self._pybullet_client.configureDebugVisualizer(
|
||||
self._pybullet_client.COV_ENABLE_RENDERING, 1
|
||||
)
|
||||
return self._get_observation()
|
||||
|
||||
def seed(self, seed=None):
|
||||
self.np_random, seed = seeding.np_random(seed)
|
||||
return [seed]
|
||||
|
||||
def _transform_action_to_motor_command(self, action):
|
||||
if self._leg_model_enabled:
|
||||
for i, action_component in enumerate(action):
|
||||
if not (
|
||||
-self._action_bound - ACTION_EPS
|
||||
<= action_component
|
||||
<= self._action_bound + ACTION_EPS
|
||||
):
|
||||
raise ValueError(
|
||||
"{}th action {} out of bounds.".format(i, action_component)
|
||||
)
|
||||
action = self.spot.ConvertFromLegModel(action)
|
||||
return action
|
||||
|
||||
def step(self, action):
|
||||
"""Step forward the simulation, given the action.
|
||||
|
||||
Args:
|
||||
action: A list of desired motor angles for eight motors.
|
||||
|
||||
Returns:
|
||||
observations: The angles, velocities and torques of all motors.
|
||||
reward: The reward for the current state-action pair.
|
||||
done: Whether the episode has ended.
|
||||
info: A dictionary that stores diagnostic information.
|
||||
|
||||
Raises:
|
||||
ValueError: The action dimension is not the same as the number of motors.
|
||||
ValueError: The magnitude of actions is out of bounds.
|
||||
"""
|
||||
self._last_base_position = self.spot.GetBasePosition()
|
||||
self._last_base_orientation = self.spot.GetBaseOrientation()
|
||||
# print("ACTION:")
|
||||
# print(action)
|
||||
if self._is_render:
|
||||
# Sleep, otherwise the computation takes less time than real time,
|
||||
# which will make the visualization like a fast-forward video.
|
||||
time_spent = time.time() - self._last_frame_time
|
||||
self._last_frame_time = time.time()
|
||||
time_to_sleep = self.control_time_step - time_spent
|
||||
if time_to_sleep > 0:
|
||||
time.sleep(time_to_sleep)
|
||||
base_pos = self.spot.GetBasePosition()
|
||||
# Keep the previous orientation of the camera set by the user.
|
||||
[yaw, pitch, dist] = self._pybullet_client.getDebugVisualizerCamera()[8:11]
|
||||
self._pybullet_client.resetDebugVisualizerCamera(dist, yaw, pitch, base_pos)
|
||||
|
||||
action = self._transform_action_to_motor_command(action)
|
||||
self.spot.Step(action)
|
||||
reward = self._reward()
|
||||
done = self._termination()
|
||||
self._env_step_counter += 1
|
||||
|
||||
# DRAW FOOT PATH
|
||||
if self.draw_foot_path:
|
||||
self.DrawFootPath()
|
||||
return np.array(self._get_observation()), reward, done, {}
|
||||
|
||||
def render(self, mode="rgb_array", close=False):
|
||||
if mode != "rgb_array":
|
||||
return np.array([])
|
||||
base_pos = self.spot.GetBasePosition()
|
||||
view_matrix = self._pybullet_client.computeViewMatrixFromYawPitchRoll(
|
||||
cameraTargetPosition=base_pos,
|
||||
distance=self._cam_dist,
|
||||
yaw=self._cam_yaw,
|
||||
pitch=self._cam_pitch,
|
||||
roll=0,
|
||||
upAxisIndex=2,
|
||||
)
|
||||
proj_matrix = self._pybullet_client.computeProjectionMatrixFOV(
|
||||
fov=60,
|
||||
aspect=float(RENDER_WIDTH) / RENDER_HEIGHT,
|
||||
nearVal=0.1,
|
||||
farVal=100.0,
|
||||
)
|
||||
(_, _, px, _, _) = self._pybullet_client.getCameraImage(
|
||||
width=RENDER_WIDTH,
|
||||
height=RENDER_HEIGHT,
|
||||
renderer=self._pybullet_client.ER_BULLET_HARDWARE_OPENGL,
|
||||
viewMatrix=view_matrix,
|
||||
projectionMatrix=proj_matrix,
|
||||
)
|
||||
rgb_array = np.array(px)
|
||||
rgb_array = rgb_array[:, :, :3]
|
||||
return rgb_array
|
||||
|
||||
def DrawFootPath(self):
|
||||
# Get Foot Positions
|
||||
FL = self._pybullet_client.getLinkState(
|
||||
self.spot.quadruped, self.spot._foot_id_list[0]
|
||||
)[0]
|
||||
FR = self._pybullet_client.getLinkState(
|
||||
self.spot.quadruped, self.spot._foot_id_list[1]
|
||||
)[0]
|
||||
BL = self._pybullet_client.getLinkState(
|
||||
self.spot.quadruped, self.spot._foot_id_list[2]
|
||||
)[0]
|
||||
BR = self._pybullet_client.getLinkState(
|
||||
self.spot.quadruped, self.spot._foot_id_list[3]
|
||||
)[0]
|
||||
|
||||
lifetime = 3.0 # sec
|
||||
self._pybullet_client.addUserDebugLine(
|
||||
self.prev_feet_path[0], FL, [1, 0, 0], lifeTime=lifetime
|
||||
)
|
||||
self._pybullet_client.addUserDebugLine(
|
||||
self.prev_feet_path[1], FR, [0, 1, 0], lifeTime=lifetime
|
||||
)
|
||||
self._pybullet_client.addUserDebugLine(
|
||||
self.prev_feet_path[2], BL, [0, 0, 1], lifeTime=lifetime
|
||||
)
|
||||
self._pybullet_client.addUserDebugLine(
|
||||
self.prev_feet_path[3], BR, [1, 1, 0], lifeTime=lifetime
|
||||
)
|
||||
|
||||
self.prev_feet_path[0] = FL
|
||||
self.prev_feet_path[1] = FR
|
||||
self.prev_feet_path[2] = BL
|
||||
self.prev_feet_path[3] = BR
|
||||
|
||||
def get_spot_motor_angles(self):
|
||||
"""Get the spot's motor angles.
|
||||
|
||||
Returns:
|
||||
A numpy array of motor angles.
|
||||
"""
|
||||
return np.array(
|
||||
self._observation[
|
||||
MOTOR_ANGLE_OBSERVATION_INDEX : MOTOR_ANGLE_OBSERVATION_INDEX
|
||||
+ NUM_MOTORS
|
||||
]
|
||||
)
|
||||
|
||||
def get_spot_motor_velocities(self):
|
||||
"""Get the spot's motor velocities.
|
||||
|
||||
Returns:
|
||||
A numpy array of motor velocities.
|
||||
"""
|
||||
return np.array(
|
||||
self._observation[
|
||||
MOTOR_VELOCITY_OBSERVATION_INDEX : MOTOR_VELOCITY_OBSERVATION_INDEX
|
||||
+ NUM_MOTORS
|
||||
]
|
||||
)
|
||||
|
||||
def get_spot_motor_torques(self):
|
||||
"""Get the spot's motor torques.
|
||||
|
||||
Returns:
|
||||
A numpy array of motor torques.
|
||||
"""
|
||||
return np.array(
|
||||
self._observation[
|
||||
MOTOR_TORQUE_OBSERVATION_INDEX : MOTOR_TORQUE_OBSERVATION_INDEX
|
||||
+ NUM_MOTORS
|
||||
]
|
||||
)
|
||||
|
||||
def get_spot_base_orientation(self):
|
||||
"""Get the spot's base orientation, represented by a quaternion.
|
||||
|
||||
Returns:
|
||||
A numpy array of spot's orientation.
|
||||
"""
|
||||
return np.array(self._observation[BASE_ORIENTATION_OBSERVATION_INDEX:])
|
||||
|
||||
def is_fallen(self):
|
||||
"""Decide whether spot has fallen.
|
||||
|
||||
If the up directions between the base and the world is larger (the dot
|
||||
product is smaller than 0.85) or the base is very low on the ground
|
||||
(the height is smaller than 0.13 meter), spot is considered fallen.
|
||||
|
||||
Returns:
|
||||
Boolean value that indicates whether spot has fallen.
|
||||
"""
|
||||
orientation = self.spot.GetBaseOrientation()
|
||||
rot_mat = self._pybullet_client.getMatrixFromQuaternion(orientation)
|
||||
local_up = rot_mat[6:]
|
||||
pos = self.spot.GetBasePosition()
|
||||
# or pos[2] < 0.13
|
||||
return np.dot(np.asarray([0, 0, 1]), np.asarray(local_up)) < 0.55
|
||||
|
||||
def _termination(self):
|
||||
position = self.spot.GetBasePosition()
|
||||
distance = math.sqrt(position[0] ** 2 + position[1] ** 2)
|
||||
return self.is_fallen() or distance > self._distance_limit
|
||||
|
||||
def _reward(self):
|
||||
"""NOTE: reward now consists of:
|
||||
roll, pitch at desired 0
|
||||
acc (y,z) = 0
|
||||
FORWARD-BACKWARD: rate(x,y,z) = 0
|
||||
--> HIDDEN REWARD: x(+-) velocity reference, not incl. in obs
|
||||
SPIN: acc(x) = 0, rate(x,y) = 0, rate (z) = rate reference
|
||||
Also include drift, energy vanilla rewards
|
||||
"""
|
||||
current_base_position = self.spot.GetBasePosition()
|
||||
|
||||
# get observation
|
||||
obs = self._get_observation()
|
||||
# forward_reward = current_base_position[0] - self._last_base_position[0]
|
||||
|
||||
# # POSITIVE FOR FORWARD, NEGATIVE FOR BACKWARD | NOTE: HIDDEN
|
||||
# GETTING TWIST IN BODY FRAME
|
||||
pos = self.spot.GetBasePosition()
|
||||
orn = self.spot.GetBaseOrientation()
|
||||
roll, pitch, yaw = self._pybullet_client.getEulerFromQuaternion(
|
||||
[orn[0], orn[1], orn[2], orn[3]]
|
||||
)
|
||||
rpy = LA.RPY(roll, pitch, yaw)
|
||||
R, _ = LA.TransToRp(rpy)
|
||||
T_wb = LA.RpToTrans(R, np.array([pos[0], pos[1], pos[2]]))
|
||||
T_bw = LA.TransInv(T_wb)
|
||||
Adj_Tbw = LA.Adjoint(T_bw)
|
||||
|
||||
Vw = np.concatenate((self.spot.prev_ang_twist, self.spot.prev_lin_twist))
|
||||
Vb = np.dot(Adj_Tbw, Vw)
|
||||
|
||||
# New Twist in Body Frame
|
||||
# POSITIVE FOR FORWARD, NEGATIVE FOR BACKWARD | NOTE: HIDDEN
|
||||
fwd_speed = -Vb[3] # vx
|
||||
lat_speed = -Vb[4] # vy
|
||||
# fwd_speed = self.spot.prev_lin_twist[0]
|
||||
# lat_speed = self.spot.prev_lin_twist[1]
|
||||
# print("FORWARD SPEED: {} \t STATE SPEED: {}".format(
|
||||
# fwd_speed, self.desired_velocity))
|
||||
# self.desired_velocity = 0.4
|
||||
|
||||
# Modification for lateral/fwd rewards
|
||||
reward_max = 1.0
|
||||
# FORWARD
|
||||
if not self.lateral:
|
||||
# f(x)=-(x-desired))^(2)*((1/desired)^2)+1
|
||||
# to make sure that at 0vel there is 0 reawrd.
|
||||
# also squishes allowable tolerance
|
||||
forward_reward = reward_max * np.exp(
|
||||
-((fwd_speed - self.desired_velocity) ** 2) / (0.1)
|
||||
)
|
||||
# LATERAL
|
||||
else:
|
||||
forward_reward = reward_max * np.exp(
|
||||
-((lat_speed - self.desired_velocity) ** 2) / (0.1)
|
||||
)
|
||||
|
||||
yaw_rate = obs[4]
|
||||
|
||||
rot_reward = reward_max * np.exp(-((yaw_rate - self.desired_rate) ** 2) / (0.1))
|
||||
|
||||
# Make sure that for forward-policy there is the appropriate rotation penalty
|
||||
if self.desired_velocity != 0:
|
||||
self._rotation_weight = self._rate_weight
|
||||
rot_reward = -abs(obs[4])
|
||||
elif self.desired_rate != 0:
|
||||
forward_reward = 0.0
|
||||
|
||||
# penalty for nonzero roll, pitch
|
||||
rp_reward = -(abs(obs[0]) + abs(obs[1]))
|
||||
# print("ROLL: {} \t PITCH: {}".format(obs[0], obs[1]))
|
||||
|
||||
# penalty for nonzero acc(z)
|
||||
shake_reward = -abs(obs[4])
|
||||
|
||||
# penalty for nonzero rate (x,y,z)
|
||||
rate_reward = -(abs(obs[2]) + abs(obs[3]))
|
||||
|
||||
# drift_reward = -abs(current_base_position[1] -
|
||||
# self._last_base_position[1])
|
||||
|
||||
# this penalizes absolute error, and does not penalize correction
|
||||
# NOTE: for side-side, drift reward becomes in x instead
|
||||
drift_reward = -abs(current_base_position[1])
|
||||
|
||||
# If Lateral, change drift reward
|
||||
if self.lateral:
|
||||
drift_reward = -abs(current_base_position[0])
|
||||
|
||||
# shake_reward = -abs(current_base_position[2] -
|
||||
# self._last_base_position[2])
|
||||
self._last_base_position = current_base_position
|
||||
energy_reward = (
|
||||
-np.abs(np.dot(self.spot.GetMotorTorques(), self.spot.GetMotorVelocities()))
|
||||
* self._time_step
|
||||
)
|
||||
reward = (
|
||||
self._distance_weight * forward_reward
|
||||
+ self._rotation_weight * rot_reward
|
||||
+ self._energy_weight * energy_reward
|
||||
+ self._drift_weight * drift_reward
|
||||
+ self._shake_weight * shake_reward
|
||||
+ self._rp_weight * rp_reward
|
||||
+ self._rate_weight * rate_reward
|
||||
)
|
||||
self._objectives.append(
|
||||
[forward_reward, energy_reward, drift_reward, shake_reward]
|
||||
)
|
||||
# print("REWARD: ", reward)
|
||||
return reward
|
||||
|
||||
def get_objectives(self):
|
||||
return self._objectives
|
||||
|
||||
@property
|
||||
def objective_weights(self):
|
||||
"""Accessor for the weights for all the objectives.
|
||||
|
||||
Returns:
|
||||
List of floating points that corresponds to weights for the objectives in
|
||||
the order that objectives are stored.
|
||||
"""
|
||||
return self._objective_weights
|
||||
|
||||
def _get_observation(self):
|
||||
"""Get observation of this environment, including noise and latency.
|
||||
|
||||
spot class maintains a history of true observations. Based on the
|
||||
latency, this function will find the observation at the right time,
|
||||
interpolate if necessary. Then Gaussian noise is added to this observation
|
||||
based on self.observation_noise_stdev.
|
||||
|
||||
Returns:
|
||||
The noisy observation with latency.
|
||||
"""
|
||||
|
||||
self._observation = self.spot.GetObservation()
|
||||
return self._observation
|
||||
|
||||
def _get_realistic_observation(self):
|
||||
"""Get the observations of this environment.
|
||||
|
||||
It includes the angles, velocities, torques and the orientation of the base.
|
||||
|
||||
Returns:
|
||||
The observation list. observation[0:8] are motor angles. observation[8:16]
|
||||
are motor velocities, observation[16:24] are motor torques.
|
||||
observation[24:28] is the orientation of the base, in quaternion form.
|
||||
"""
|
||||
self._observation = self.spot.RealisticObservation()
|
||||
return self._observation
|
||||
|
||||
if parse_version(gym.__version__) < parse_version("0.9.6"):
|
||||
_render = render
|
||||
_reset = reset
|
||||
_seed = seed
|
||||
_step = step
|
||||
|
||||
def set_time_step(self, control_step, simulation_step=0.001):
|
||||
"""Sets the time step of the environment.
|
||||
|
||||
Args:
|
||||
control_step: The time period (in seconds) between two adjacent control
|
||||
actions are applied.
|
||||
simulation_step: The simulation time step in PyBullet. By default, the
|
||||
simulation step is 0.001s, which is a good trade-off between simulation
|
||||
speed and accuracy.
|
||||
Raises:
|
||||
ValueError: If the control step is smaller than the simulation step.
|
||||
"""
|
||||
if control_step < simulation_step:
|
||||
raise ValueError(
|
||||
"Control step should be larger than or equal to simulation step."
|
||||
)
|
||||
self.control_time_step = control_step
|
||||
self._time_step = simulation_step
|
||||
self._action_repeat = int(round(control_step / simulation_step))
|
||||
self._num_bullet_solver_iterations = (
|
||||
NUM_SIMULATION_ITERATION_STEPS / self._action_repeat
|
||||
)
|
||||
self._pybullet_client.setPhysicsEngineParameter(
|
||||
numSolverIterations=self._num_bullet_solver_iterations
|
||||
)
|
||||
self._pybullet_client.setTimeStep(self._time_step)
|
||||
self.spot.SetTimeSteps(
|
||||
action_repeat=self._action_repeat, simulation_step=self._time_step
|
||||
)
|
||||
|
||||
@property
|
||||
def pybullet_client(self):
|
||||
return self._pybullet_client
|
||||
|
||||
@property
|
||||
def ground_id(self):
|
||||
return self._ground_id
|
||||
|
||||
@ground_id.setter
|
||||
def ground_id(self, new_ground_id):
|
||||
self._ground_id = new_ground_id
|
||||
|
||||
@property
|
||||
def env_step_counter(self):
|
||||
return self._env_step_counter
|
||||
Reference in New Issue
Block a user