Python gym 模块,spaces() 实例源码
我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用gym.spaces()。
def example(env):
"""Show an example of gym
Parameters
----------
env: gym.core.Environment
Environment to play on. Must have nS,nA,and P as
attributes.
"""
env.seed(0);
from gym.spaces import prng;
prng.seed(10) # for print the location
# Generate the episode
ob = env.reset()
for t in range(100):
env.render()
a = env.action_space.sample()
ob, rew, done, _ = env.step(a)
if done:
break
assert done
env.render();
def example(env):
"""Show an example of gym
Parameters
----------
env: gym.core.Environment
Environment to play on. Must have nS,and P as
attributes.
"""
env.seed(0);
from gym.spaces import prng; prng.seed(10) # for print the location
# Generate the episode
ob = env.reset()
for t in range(100):
env.render()
a = env.action_space.sample()
ob, _ = env.step(a)
if done:
break
assert done
env.render();
def __init__(self, model_xml, robot_name, timestep, frame_skip, action_dim, obs_dim, repeats):
self.action_space = gym.spaces.Box(-1.0, 1.0, shape=(action_dim,))
float_max = np.finfo(np.float32).max
# obs space for problem is (R,obs_dim)
# R = number of repeats
# obs_dim d tuple
self.state_shape = (repeats, obs_dim)
self.observation_space = gym.spaces.Box(-float_max, float_max, shape=self.state_shape)
# no state until reset.
self.state = np.empty(self.state_shape, dtype=np.float32)
self.frame_skip = frame_skip
self.timestep = timestep
self.model_xml = model_xml
self.parts, self.joints, = self.getScene(p.loadMJCF(model_xml))
self.robot_name = robot_name
self.dt = timestep * frame_skip
self.Metadata = {
'render.modes': ['human', 'rgb_array'],
'video.frames_per_second': int(np.round(1.0 / timestep / frame_skip))
}
self._seed()
def __init__(self, obs_dim):
self.scene = None
self.parts = None
self.jdict = None
self.ordered_joints = None
self.robot_body = None
high = np.ones([action_dim])
self.action_space = gym.spaces.Box(-high, high)
high = np.inf*np.ones([obs_dim])
self.observation_space = gym.spaces.Box(-high, high)
self._seed()
self.model_xml = model_xml
self.robot_name = robot_name
self.camera = Camera()
def test_discretize_errors():
cont = Box(np.array([0.0, 1.0]), np.array([1.0, 2.0]))
with pytest.raises(TypeError):
trafo = discretize(5, 5)
with pytest.raises(ValueError):
trafo = discretize(cont, 1)
with pytest.raises(NotImplementedError):
trafo = discretize(Tuple(spaces=[cont]), 10)
with pytest.raises(ValueError):
trafo = discretize(cont, [1, 1])
with pytest.raises(ValueError):
trafo = discretize(cont, [5, 5, 5])
# flatten
def convert_gym_space(space):
if isinstance(space, gym.spaces.Box):
return Box(low=space.low, high=space.high)
elif isinstance(space, gym.spaces.discrete):
return discrete(n=space.n)
elif isinstance(space, gym.spaces.Tuple):
return Product([convert_gym_space(x) for x in space.spaces])
else:
raise NotImplementedError
def convert_gym_space(space):
if isinstance(space, gym.spaces.Tuple):
return Product([convert_gym_space(x) for x in space.spaces])
else:
raise NotImplementedError
def spaces_grid(*spaces, deFinition=50):
"""
Return a meshgrid covering the cartesian product of the given spaces
:param spaces: Minimum one
"""
low = np.concatenate([space.low for space in spaces], axis=0)
high = np.concatenate([space.high for space in spaces], axis=0)
dim = low.shape[0]
axes = []
for x in range(dim):
axes.append(np.linspace(low[x], high[x], deFinition))
return(np.meshgrid(*axes))
def merge_spaces(*spaces):
"""Merge the given spaces"""
for space in spaces:
if not isinstance(space, gym.spaces.Box):
raise("Your given space is not of type Box")
low = np.concatenate([space.low for space in spaces], axis=0)
return gym.spaces.Box(low, high)
def __init__(self, initialWealth=25.0, edge=0.6, maxWealth=250.0, maxRounds=300):
self.action_space = spaces.discrete(int(maxWealth*100)) # betting in penny increments
self.observation_space = spaces.Tuple((
spaces.Box(0, maxWealth, [1]), # (w,b)
spaces.discrete(maxRounds+1)))
self.reward_range = (0, maxWealth)
self.edge = edge
self.wealth = initialWealth
self.initialWealth = initialWealth
self.maxRounds = maxRounds
self.maxWealth = maxWealth
self._seed()
self._reset()
def __init__(self, edgePriorAlpha=7, edgePriorBeta=3, maxWealthAlpha=5.0, maxWealthM=200.0, maxRoundsMean=300.0, maxRoundsSD=25.0, reseed=True):
# store the hyperparameters for passing back into __init__() during resets so the same hyperparameters govern the next game's parameters,as the user expects: Todo: this is boilerplate,is there any more elegant way to do this?
self.initialWealth=float(initialWealth)
self.edgePriorAlpha=edgePriorAlpha
self.edgePriorBeta=edgePriorBeta
self.maxWealthAlpha=maxWealthAlpha
self.maxWealthM=maxWealthM
self.maxRoundsMean=maxRoundsMean
self.maxRoundsSD=maxRoundsSD
# draw this game's set of parameters:
edge = prng.np_random.beta(edgePriorAlpha, edgePriorBeta)
maxWealth = round(genpareto.rvs(maxWealthAlpha, maxWealthM, random_state=prng.np_random))
maxRounds = int(round(prng.np_random.normal(maxRoundsMean, maxRoundsSD)))
# add an additional global variable which is the sufficient statistic for the Pareto distribution on wealth cap;
# alpha doesn't update,but x_m does,and simply is the highest wealth count we've seen to date:
self.maxEverWealth = float(self.initialWealth)
# for the coinflip edge,it is total wins/losses:
self.wins = 0
self.losses = 0
# for the number of rounds,we need to remember how many rounds we've played:
self.roundsElapsed = 0
# the rest proceeds as before:
self.action_space = spaces.discrete(int(maxWealth*100))
self.observation_space = spaces.Tuple((
spaces.Box(0, shape=[1]), # current wealth
spaces.discrete(maxRounds+1), # rounds elapsed
spaces.discrete(maxRounds+1), # wins
spaces.discrete(maxRounds+1), # losses
spaces.Box(0, [1]))) # maximum observed wealth
self.reward_range = (0, maxWealth)
self.edge = edge
self.wealth = self.initialWealth
self.maxRounds = maxRounds
self.rounds = self.maxRounds
self.maxWealth = maxWealth
if reseed or not hasattr(self, 'np_random') : self._seed()
def __init__(self, space):
assert(isinstance(space, Tuple))
self.in_space = space
self.convertors = list(map(convertor, space.spaces))
low = np.concatenate([c.out_space.low for c in self.convertors])
high = np.concatenate([c.out_space.high for c in self.convertors])
self.out_space = Box(low, high)
def convert_gym_space(space):
if isinstance(space, gym.spaces.discrete):
return discrete(n=space.n)
else:
raise NotImplementedError
def _make_observation_space(orig_space, target_shape):
assert isinstance(orig_space, gym.spaces.Box)
shape = target_shape + (orig_space.shape[0] * orig_space.shape[-1], )
low = np.ones(shape) * orig_space.low.min()
high = np.ones(shape) * orig_space.high.max()
return gym.spaces.Box(low, high)
def HistoryWrapper(steps):
class _HistoryWrapper(gym.Wrapper):
"""
Track history of observations for given amount of steps
Initial steps are zero-filled
"""
def __init__(self, env):
super(_HistoryWrapper, self).__init__(env)
self.steps = steps
self.history = self._make_history()
self.observation_space = self._make_observation_space(steps, env.observation_space)
@staticmethod
def _make_observation_space(steps, orig_obs):
assert isinstance(orig_obs, gym.spaces.Box)
low = np.repeat(np.expand_dims(orig_obs.low, 0), steps, axis=0)
high = np.repeat(np.expand_dims(orig_obs.high, axis=0)
return gym.spaces.Box(low, high)
def _make_history(self, last_item = None):
size = self.steps if last_item is None else self.steps-1
res = collections.deque([np.zeros(shape=self.env.observation_space.shape)] * size)
if last_item is not None:
res.append(last_item)
return res
def _step(self, action):
obs, reward, info = self.env.step(action)
self.history.popleft()
self.history.append(obs)
return self.history, info
def _reset(self):
self.history = self._make_history(last_item=self.env.reset())
return self.history
return _HistoryWrapper
def __init__(self, maxWealth)
self.edge = edge
self.wealth = initialWealth
self.initialWealth = initialWealth
self.maxRounds = maxRounds
self.maxWealth = maxWealth
self._seed()
self._reset()
def convert_gym_space(space):
if isinstance(space, gym.spaces.Tuple):
return Product([convert_gym_space(x) for x in space.spaces])
else:
raise NotImplementedError
def convert_gym_space(space):
if isinstance(space, gym.spaces.Tuple):
return Product([convert_gym_space(x) for x in space.spaces])
else:
raise NotImplementedError
def convert_gym_space(space):
if isinstance(space, gym.spaces.Tuple):
return Product([convert_gym_space(x) for x in space.spaces])
else:
raise NotImplementedError