Python gym 模块,Wrapper() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gym.Wrapper()。
def get_wrapper_by_name(env, classname):
"""Given an a gym environment possibly wrapped multiple times,returns a wrapper
of class named classname or raises ValueError if no such wrapper was applied
Parameters
----------
env: gym.Env of gym.Wrapper
gym environment
classname: str
name of the wrapper
Returns
-------
wrapper: gym.Wrapper
wrapper named classname
"""
currentenv = env
while True:
if classname == currentenv.class_name():
return currentenv
elif isinstance(currentenv, gym.Wrapper):
currentenv = currentenv.env
else:
raise ValueError("Couldn't find wrapper named %s" % classname)
def SetResolution(target_resolution):
class SetResolutionWrapper(gym.Wrapper):
"""
Doom wrapper to change screen resolution
"""
def __init__(self, env):
super(SetResolutionWrapper, self).__init__(env)
if target_resolution not in resolutions:
raise gym.error.Error('Error - The specified resolution "{}" is not supported by Vizdoom.'.format(target_resolution))
parts = target_resolution.lower().split('x')
width = int(parts[0])
height = int(parts[1])
screen_res = __import__('doom_py')
screen_res = getattr(screen_res, 'ScreenResolution')
screen_res = getattr(screen_res, 'RES_{}X{}'.format(width, height))
self.screen_width, self.screen_height, self.unwrapped.screen_resolution = width, height, screen_res
self.unwrapped.observation_space = gym.spaces.Box(low=0, high=255, shape=(self.screen_height, self.screen_width, 3))
self.observation_space = self.unwrapped.observation_space
return SetResolutionWrapper
def __init__(self, env, k):
"""Stack k last frames.
Returns lazy array,which is much more memory efficient.
See Also
--------
baselines.common.atari_wrappers.LazyFrames
"""
gym.Wrapper.__init__(self, env)
self.k = k
self.frames = deque([], maxlen=k)
shp = env.observation_space.shape
self.observation_space = spaces.Box(low=0, shape=(shp[0], shp[1], shp[2] * k))
def get_wrapper_by_name(env, classname):
currentenv = env
while True:
if classname in currentenv.__class__.__name__:
return currentenv
elif isinstance(env, gym.Wrapper):
currentenv = currentenv.env
else:
raise ValueError("Couldn't find wrapper named %s"%classname)
def __init__(self, noop_max=30):
"""Sample initial states by taking random number of no-ops on reset.
No-op is assumed to be action 0.
"""
gym.Wrapper.__init__(self, env)
self.noop_max = noop_max
self.override_num_noops = None
self.noop_action = 0
assert env.unwrapped.get_action_meanings()[0] == 'NOOP'
def __init__(self, env)
self.noop_max = noop_max
self.override_num_noops = None
self.noop_action = 0
assert env.unwrapped.get_action_meanings()[0] == 'NOOP'
def __init__(self, env)
assert env.unwrapped.get_action_meanings()[1] == 'FIRE'
assert len(env.unwrapped.get_action_meanings()) >= 3
def __init__(self, env)
self.lives = 0
self.was_real_done = True
def __init__(self,)+env.observation_space.shape, dtype='uint8')
self._skip = skip
def list_wrappers(env: Union[Env, gym.Wrapper]):
while isinstance(env, gym.Wrapper):
yield env
env = env.env
def __init__(self, env):
super(Wrapper, self).__init__(env)
if not env.Metadata.get('runtime.vectorized'):
if self.autovectorize:
# Circular dependency :(
from universe import wrappers
env = wrappers.Vectorize(env)
else:
raise error.Error('This wrapper can only wrap vectorized envs (i.e. where env.Metadata["runtime.vectorized"] = True),not {}. Set "self.autovectorize = True" to automatically add a Vectorize wrapper.'.format(env))
self.env = env
def Skipwrapper(repeat_count):
class Skipwrapper(gym.Wrapper):
"""
Generic common frame skipping wrapper
Will perform action for `x` additional steps
"""
def __init__(self, env):
super(Skipwrapper, self).__init__(env)
self.repeat_count = repeat_count
self.stepcount = 0
def _step(self, action):
done = False
total_reward = 0
current_step = 0
while current_step < (self.repeat_count + 1) and not done:
self.stepcount += 1
obs, reward, done, info = self.env.step(action)
total_reward += reward
current_step += 1
if 'skip.stepcount' in info:
raise gym.error.Error('Key "skip.stepcount" already in info. Make sure you are not stacking ' \
'the Skipwrapper wrappers.')
info['skip.stepcount'] = self.stepcount
return obs, total_reward, info
def _reset(self):
self.stepcount = 0
return self.env.reset()
return Skipwrapper
def HistoryWrapper(steps):
class HistoryWrapper(gym.Wrapper):
"""
Track history of observations for given amount of steps
Initial steps are zero-filled
"""
def __init__(self, env):
super(HistoryWrapper, self).__init__(env)
self.steps = steps
self.history = self._make_history()
def _make_history(self):
return [np.zeros(shape=self.env.observation_space.shape) for _ in range(steps)]
def _step(self, action):
obs, info = self.env.step(action)
self.history.pop(0)
self.history.append(obs)
return np.array(self.history), info
def _reset(self):
self.history = self._make_history()
self.history.pop(0)
self.history.append(self.env.reset())
return np.array(self.history)
return HistoryWrapper
def HistoryWrapper(steps):
class _HistoryWrapper(gym.Wrapper):
"""
Track history of observations for given amount of steps
Initial steps are zero-filled
"""
def __init__(self, env):
super(_HistoryWrapper, self).__init__(env)
self.steps = steps
self.history = self._make_history()
self.observation_space = self._make_observation_space(steps, env.observation_space)
@staticmethod
def _make_observation_space(steps, orig_obs):
assert isinstance(orig_obs, gym.spaces.Box)
low = np.repeat(np.expand_dims(orig_obs.low, 0), steps, axis=0)
high = np.repeat(np.expand_dims(orig_obs.high, axis=0)
return gym.spaces.Box(low, high)
def _make_history(self, last_item = None):
size = self.steps if last_item is None else self.steps-1
res = collections.deque([np.zeros(shape=self.env.observation_space.shape)] * size)
if last_item is not None:
res.append(last_item)
return res
def _step(self, info = self.env.step(action)
self.history.popleft()
self.history.append(obs)
return self.history, info
def _reset(self):
self.history = self._make_history(last_item=self.env.reset())
return self.history
return _HistoryWrapper
def make_env(env_name, monitor_dir=None, wrappers=()):
"""
Make gym environment with optional monitor
:param env_name: name of the environment to create
:param monitor_dir: optional directory to save monitor results
:param wrappers: list of optional Wrapper object instances
:return: environment object
"""
env = gym.make(env_name)
for wrapper in wrappers:
env = wrapper(env)
if monitor_dir:
env = gym.wrappers.Monitor(env, monitor_dir)
return env
def RepeatActionWrapper(env, repeat):
"""
This is just a thin wrapper around `gym.wrappes.Skipwrapper`
to get a consistent interface.
:param gym.env env: Environment to wrap
:param int repeat: Number of times that an action will be repeated.
:return gym.Wrapper: A wrapper that repeats an action for `repeat`
steps.
"""
from gym.wrappers import Skipwrapper
return Skipwrapper(repeat)(env)
def __init__(self, env)
assert env.unwrapped.get_action_meanings()[1] == 'FIRE'
assert len(env.unwrapped.get_action_meanings()) >= 3
def __init__(self, env)
self.lives = 0
self.was_real_done = True
def Todiscrete():
class TodiscreteWrapper(gym.Wrapper):
"""
Wrapper to convert Multidiscrete action space to discrete
Only supports one config,which maps to the most logical discrete space possible
"""
def __init__(self, env):
super(TodiscreteWrapper, self).__init__(env)
mapping = {
0: [0, 0, 0], # NOOP
1: [1, # Up
2: [0, 1, # Down
3: [0, # Left
4: [0, # Left + A
5: [0, 1], # Left + B
6: [0, # Left + A + B
7: [0, # Right
8: [0, # Right + A
9: [0, # Right + B
10: [0, # Right + A + B
11: [0, # A
12: [0, # B
13: [0, # A + B
}
self.action_space = gym.spaces.multi_discrete.discretetoMultidiscrete(self.action_space, mapping)
def _step(self, action):
return self.env._step(self.action_space(action))
return TodiscreteWrapper
def ToBox():
class ToBoxWrapper(gym.Wrapper):
"""
Wrapper to convert Multidiscrete action space to Box
Only supports one config,which allows all keys to be pressed
"""
def __init__(self, env):
super(ToBoxWrapper, self).__init__(env)
self.action_space = gym.spaces.multi_discrete.BoxToMultidiscrete(self.action_space)
def _step(self, action):
return self.env._step(self.action_space(action))
return ToBoxWrapper
def SetPlayingMode(target_mode):
""" target mode can be 'algo' or 'human' """
class SetPlayingModeWrapper(gym.Wrapper):
"""
Doom wrapper to change playing mode 'human' or 'algo'
"""
def __init__(self, env):
super(SetPlayingModeWrapper, self).__init__(env)
if target_mode not in ['algo', 'human']:
raise gym.error.Error('Error - The mode "{}" is not supported. Supported options are "algo" or "human"'.format(target_mode))
self.unwrapped.mode = target_mode
return SetPlayingModeWrapper
def SetPlayingMode(target_mode):
""" target mode can be 'algo' or 'human' """
class SetPlayingModeWrapper(gym.Wrapper):
"""
Doom wrapper to change playing mode 'human' or 'algo'
"""
def __init__(self, 'human']:
raise gym.error.Error('Error - The mode "{}" is not supported. Supported options are "algo" or "human"'.format(target_mode))
self.unwrapped._mode = target_mode
return SetPlayingModeWrapper
def __init__(self, env)
self.noop_max = noop_max
self.override_num_noops = None
if isinstance(env.action_space, gym.spaces.MultiBinary):
self.noop_action = np.zeros(self.env.action_space.n, dtype=np.int64)
else:
# used for atari environments
self.noop_action = 0
assert env.unwrapped.get_action_meanings()[0] == 'NOOP'
def __init__(self, env)
assert env.unwrapped.get_action_meanings()[1] == 'FIRE'
assert len(env.unwrapped.get_action_meanings()) >= 3
def __init__(self, env)
self.lives = 0
self.was_real_done = True