Python gym 模块,spec() 实例源码
我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用gym.spec()。
def create_env(env_id, client_id, remotes, task=0, subject=None, summary_writer=None, **kwargs):
import config
if config.project is 'g':
spec = gym.spec(env_id)
if spec.tags.get('flashgames', False):
return create_flash_env(env_id, **kwargs)
elif spec.tags.get('atari', False) and spec.tags.get('vnc', False):
return create_vncatari_env(env_id, **kwargs)
else:
# Assume atari.
assert "." not in env_id # universe environments have dots in names.
return create_atari_env(env_id)
elif config.project is 'f':
return env_f(env_id = env_id,
task = task,
subject = subject,
summary_writer=summary_writer)
def configure(self, n=1, pool_size=None, episode_limit=None):
self.n = n
self.envs = [self.spec.make() for _ in range(self.n)]
if pool_size is None:
pool_size = min(len(self.envs), multiprocessing.cpu_count() - 1)
pool_size = max(1, pool_size)
self.worker_n = []
m = int((self.n + pool_size - 1) / pool_size)
for i in range(0, self.n, m):
envs = self.envs[i:i+m]
self.worker_n.append(Worker(envs, i))
if episode_limit is not None:
self._episode_id.episode_limit = episode_limit
def gym_core_action_space(gym_core_id):
spec = gym.spec(gym_core_id)
if spec.id == 'CartPole-v0':
return spaces.Hardcoded([[spaces.KeyEvent.by_name('left', down=True)],
[spaces.KeyEvent.by_name('left', down=False)]])
elif spec._entry_point.startswith('gym.envs.atari:'):
actions = []
env = spec.make()
for action in env.unwrapped.get_action_meanings():
z = 'FIRE' in action
left = 'LEFT' in action
right = 'RIGHT' in action
up = 'UP' in action
down = 'DOWN' in action
translated = atari_vnc(up=up, down=down, left=left, right=right, z=z)
actions.append(translated)
return spaces.Hardcoded(actions)
else:
raise error.Error('Unsupported env type: {}'.format(spec.id))
def __init__(self, env, gym_core_id=None):
super(GymCoreAction, self).__init__(env)
if gym_core_id is None:
# self.spec is None while inside of the make,so we need
# to pass gym_core_id in explicitly there. This case will
# be hit when instantiating by hand.
gym_core_id = self.spec._kwargs['gym_core_id']
spec = gym.spec(gym_core_id)
raw_action_space = gym_core_action_space(gym_core_id)
self._actions = raw_action_space.actions
self.action_space = gym_spaces.discrete(len(self._actions))
if spec._entry_point.startswith('gym.envs.atari:'):
self.key_state = translator.AtariKeyState(gym.make(gym_core_id))
else:
self.key_state = None
def score_from_file(json_file):
"""Calculate score from an episode_batch.json file"""
with open(json_file) as f:
results = json.load(f)
# No scores yet saved
if results is None:
return None
episode_lengths = results['episode_lengths']
episode_rewards = results['episode_rewards']
episode_types = results['episode_types']
timestamps = results['timestamps']
initial_reset_timestamp = results['initial_reset_timestamp']
spec = gym.spec(results['env_id'])
return score_from_merged(episode_lengths, episode_rewards, episode_types, timestamps, initial_reset_timestamp, spec.trials, spec.reward_threshold)
def benchmark_score_from_local(benchmark_id, training_dir):
spec = gym.benchmark_spec(benchmark_id)
directories = []
for name, _, files in os.walk(training_dir):
manifests = gym.monitoring.detect_training_manifests(name, files=files)
if manifests:
directories.append(name)
benchmark_results = defaultdict(list)
for training_dir in directories:
results = gym.monitoring.load_results(training_dir)
env_id = results['env_info']['env_id']
benchmark_result = spec.score_evaluation(env_id, results['data_sources'], results['initial_reset_timestamps'], results['episode_lengths'], results['episode_rewards'], results['episode_types'], results['timestamps'])
# from pprint import pprint
# pprint(benchmark_result)
benchmark_results[env_id].append(benchmark_result)
return gym.benchmarks.scoring.benchmark_aggregate_score(spec, benchmark_results)
def score_from_file(json_file):
"""Calculate score from an episode_batch.json file"""
with open(json_file) as f:
results = json.load(f)
# No scores yet saved
if results is None:
return None
episode_lengths = results['episode_lengths']
episode_rewards = results['episode_rewards']
episode_types = results['episode_types']
timestamps = results['timestamps']
initial_reset_timestamp = results['initial_reset_timestamp']
spec = gym.spec(results['env_id'])
return score_from_merged(episode_lengths, spec.reward_threshold)
def benchmark_score_from_local(benchmark_id, benchmark_results)
def __init__(self, env_id, task, summary_writer=None):
self._episode_reward = 0
self._episode_length = 0
class nnn():
def __init__(self, n):
self.n = n
import config
self.action_space = nnn(config.direction_num)
self.env_id = env_id
import envs_li
self.env_li = envs_li.env_li(env_id=env_id,
task=task,
subject=subject,
summary_writer=summary_writer)
'''observation_space'''
from config import observation_space
self.observation_space = observation_space
'''warper to meet origin env'''
'''env.spec.tags.get('wrapper_config.TimeLimit.max_episode_steps')'''
class spec():
def __init__(self, env_li):
class tags():
def __init__(self, env_li):
self.env_li = env_li
def get(self,get_str):
if get_str=='wrapper_config.TimeLimit.max_episode_steps':
return self.env_li.step_total
else:
print(s)
self.tags = tags(env_li)
self.spec = spec(self.env_li)
def __init__(self, env_id):
self.worker_n = None
# Pull the relevant info from a transient env instance
self.spec = gym.spec(env_id)
env = self.spec.make()
current_Metadata = self.Metadata
self.Metadata = env.Metadata.copy()
self.Metadata.update(current_Metadata)
self.action_space = env.action_space
self.observation_space = env.observation_space
self.reward_range = env.reward_range
def __init__(self, gym_core_id=None):
super(GymCoreObservation,so we need
# to pass gym_core_id in explicitly there. This case will
# be hit when instantiating by hand.
gym_core_id = self.spec._kwargs['gym_core_id']
self._reward_n = None
self._done_n = None
self._info_n = None
self._gym_core_env = gym.spec(gym_core_id).make()
def WrappedGymCoreSyncEnv(gym_core_id, fps=60, rewarder_observation=False):
spec = gym.spec(gym_core_id)
env = gym_core_sync.GymCoreSync(BlockingReset(wrap(envs.VNCEnv(fps=fps))))
if rewarder_observation:
env = GymCoreObservation(env, gym_core_id=gym_core_id)
elif spec._entry_point.startswith('gym.envs.atari:'):
env = CropAtari(env)
return env
def __init__(self, gym_core_id, vnc_pixels=True):
super(GymCoreSyncEnv, self).__init__(gym_core_id, fps=fps)
# Metadata has already been cloned
self.Metadata['semantics.async'] = False
self.gym_core_id = gym_core_id
self.vnc_pixels = vnc_pixels
if not vnc_pixels:
self._core_env = gym.spec(gym_core_id).make()
else:
self._core_env = None
def test_nice_vnc_semantics_match(spec, matcher, wrapper):
# Check that when running over VNC or using the raw environment,
# semantics match exactly.
gym.undo_logger_setup()
logging.getLogger().setLevel(logging.INFO)
spaces.seed(0)
vnc_env = spec.make()
if vnc_env.Metadata.get('configure.required', False):
vnc_env.configure(remotes=1)
vnc_env = wrapper(vnc_env)
vnc_env = wrappers.Unvectorize(vnc_env)
env = gym.make(spec._kwargs['gym_core_id'])
env.seed(0)
vnc_env.seed(0)
# Check that reset observations work
reset(matcher, vnc_env, stage='initial reset')
# Check a full rollout
rollout(matcher, timestep_limit=50, stage='50 steps')
# Reset to start a new episode
reset(matcher, stage='reset to new episode')
# Check that a step into the next episode works
rollout(matcher, timestep_limit=1, stage='1 step in new episode')
# Make sure env can be reseeded
env.seed(1)
vnc_env.seed(1)
reset(matcher, 'reseeded reset')
rollout(matcher, stage='reseeded step')
def score_from_remote(url):
result = requests.get(url)
parsed = result.json()
episode_lengths = parsed['episode_lengths']
episode_rewards = parsed['episode_rewards']
episode_types = parsed.get('episode_types')
timestamps = parsed['timestamps']
# Handle legacy entries where initial_reset_timestamp wasn't set
initial_reset_timestamp = parsed.get('initial_reset_timestamp', timestamps[0])
env_id = parsed['env_id']
spec = gym.spec(env_id)
return score_from_merged(episode_lengths, spec.reward_threshold)
def score_from_local(directory):
"""Calculate score from a local results directory"""
results = gym.monitoring.load_results(directory)
# No scores yet saved
if results is None:
return None
episode_lengths = results['episode_lengths']
episode_rewards = results['episode_rewards']
episode_types = results['episode_types']
timestamps = results['timestamps']
initial_reset_timestamp = results['initial_reset_timestamp']
spec = gym.spec(results['env_info']['env_id'])
return score_from_merged(episode_lengths, spec.reward_threshold)
def create_env(env_id, **kwargs):
spec = gym.spec(env_id)
if spec.tags.get('flashgames', False):
return create_flash_env(env_id, **kwargs)
elif spec.tags.get('atari', False):
return create_vncatari_env(env_id, **kwargs)
else:
# Assume atari.
assert "." not in env_id # universe environments have dots in names.
return create_atari_env(env_id)
def create_env(env_id, **kwargs):
if 'doom' in env_id.lower() or 'labyrinth' in env_id.lower():
return create_doom(env_id, **kwargs)
if 'mario' in env_id.lower():
return create_mario(env_id, **kwargs)
spec = gym.spec(env_id)
if spec.tags.get('flashgames', **kwargs)
else:
# Assume atari.
assert "." not in env_id # universe environments have dots in names.
return create_atari_env(env_id, **kwargs)
def create_env(env_id, **kwargs)
else:
# Assume atari.
assert "." not in env_id # universe environments have dots in names.
return create_atari_env(env_id)
def score_from_remote(url):
result = requests.get(url)
parsed = result.json()
episode_lengths = parsed['episode_lengths']
episode_rewards = parsed['episode_rewards']
episode_types = parsed.get('episode_types')
timestamps = parsed['timestamps']
# Handle legacy entries where initial_reset_timestamp wasn't set
initial_reset_timestamp = parsed.get('initial_reset_timestamp', spec.reward_threshold)
def score_from_local(directory):
"""Calculate score from a local results directory"""
results = gym.monitoring.load_results(directory)
# No scores yet saved
if results is None:
return None
episode_lengths = results['episode_lengths']
episode_rewards = results['episode_rewards']
episode_types = results['episode_types']
timestamps = results['timestamps']
initial_reset_timestamp = results['initial_reset_timestamp']
spec = gym.spec(results['env_info']['env_id'])
return score_from_merged(episode_lengths, spec.reward_threshold)
def create_env(env_id, **kwargs):
spec = gym.spec(env_id)
# Assume atari.
assert "." not in env_id # universe environments have dots in names.
return create_atari_env(env_id)
def score_from_remote(url):
result = requests.get(url)
parsed = result.json()
episode_lengths = parsed['episode_lengths']
episode_rewards = parsed['episode_rewards']
episode_types = parsed.get('episode_types')
timestamps = parsed['timestamps']
# Handle legacy entries where initial_reset_timestamp wasn't set
initial_reset_timestamp = parsed.get('initial_reset_timestamp', spec.reward_threshold)
def score_from_local(directory):
"""Calculate score from a local results directory"""
results = gym.monitoring.monitor.load_results(directory)
# No scores yet saved
if results is None:
return None
episode_lengths = results['episode_lengths']
episode_rewards = results['episode_rewards']
episode_types = results['episode_types']
timestamps = results['timestamps']
initial_reset_timestamp = results['initial_reset_timestamp']
spec = gym.spec(results['env_info']['env_id'])
return score_from_merged(episode_lengths, spec.reward_threshold)
def create_env(env_id, **kwargs):
spec = gym.spec(env_id)
if spec.tags.get('feudal', False):
return create_feudal_env(env_id, **kwargs)
elif spec.tags.get('flashgames', **kwargs)
else:
# Assume atari.
assert "." not in env_id # universe environments have dots in names.
return create_atari_env(env_id)