Module TeachMyAgent.students.openai_baselines.common.vec_env
Expand source code
from .vec_env import AlreadySteppingError, NotSteppingError, VecEnv, VecEnvWrapper, VecEnvObservationWrapper, CloudpickleWrapper
from .dummy_vec_env import DummyVecEnv
from .shmem_vec_env import ShmemVecEnv
from .subproc_vec_env import SubprocVecEnv
from .vec_frame_stack import VecFrameStack
from .vec_monitor import VecMonitor
from .vec_normalize import VecNormalize
from .vec_remove_dict_obs import VecExtractDictObs
__all__ = ['AlreadySteppingError', 'NotSteppingError', 'VecEnv', 'VecEnvWrapper', 'VecEnvObservationWrapper', 'CloudpickleWrapper', 'DummyVecEnv', 'ShmemVecEnv', 'SubprocVecEnv', 'VecFrameStack', 'VecMonitor', 'VecNormalize', 'VecExtractDictObs']
Sub-modules
TeachMyAgent.students.openai_baselines.common.vec_env.dummy_vec_env
TeachMyAgent.students.openai_baselines.common.vec_env.shmem_vec_env
-
An interface for asynchronous vectorized environments.
TeachMyAgent.students.openai_baselines.common.vec_env.subproc_vec_env
TeachMyAgent.students.openai_baselines.common.vec_env.test_video_recorder
-
Tests for asynchronous vectorized environments.
TeachMyAgent.students.openai_baselines.common.vec_env.util
-
Helpers for dealing with vectorized environments.
TeachMyAgent.students.openai_baselines.common.vec_env.vec_env
TeachMyAgent.students.openai_baselines.common.vec_env.vec_frame_stack
TeachMyAgent.students.openai_baselines.common.vec_env.vec_monitor
TeachMyAgent.students.openai_baselines.common.vec_env.vec_normalize
TeachMyAgent.students.openai_baselines.common.vec_env.vec_remove_dict_obs
TeachMyAgent.students.openai_baselines.common.vec_env.vec_video_recorder
Classes
class AlreadySteppingError
-
Raised when an asynchronous step is running while step_async() is called again.
Expand source code
class AlreadySteppingError(Exception): """ Raised when an asynchronous step is running while step_async() is called again. """ def __init__(self): msg = 'already running an async step' Exception.__init__(self, msg)
Ancestors
- builtins.Exception
- builtins.BaseException
class CloudpickleWrapper (x)
-
Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle)
Expand source code
class CloudpickleWrapper(object): """ Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle) """ def __init__(self, x): self.x = x def __getstate__(self): import cloudpickle return cloudpickle.dumps(self.x) def __setstate__(self, ob): import pickle self.x = pickle.loads(ob)
class DummyVecEnv (env_fns)
-
VecEnv that does runs multiple environments sequentially, that is, the step and reset commands are send to one environment at a time. Useful when debugging and when num_env == 1 (in the latter case, avoids communication overhead)
Arguments:
env_fns: iterable of callables functions that build environments
Expand source code
class DummyVecEnv(VecEnv): """ VecEnv that does runs multiple environments sequentially, that is, the step and reset commands are send to one environment at a time. Useful when debugging and when num_env == 1 (in the latter case, avoids communication overhead) """ def __init__(self, env_fns): """ Arguments: env_fns: iterable of callables functions that build environments """ self.envs = [fn() for fn in env_fns] env = self.envs[0] VecEnv.__init__(self, len(env_fns), env.observation_space, env.action_space) obs_space = env.observation_space self.keys, shapes, dtypes = obs_space_info(obs_space) self.buf_obs = { k: np.zeros((self.num_envs,) + tuple(shapes[k]), dtype=dtypes[k]) for k in self.keys } self.buf_dones = np.zeros((self.num_envs,), dtype=np.bool) self.buf_rews = np.zeros((self.num_envs,), dtype=np.float32) self.buf_infos = [{} for _ in range(self.num_envs)] self.actions = None self.spec = self.envs[0].spec def step_async(self, actions): listify = True try: if len(actions) == self.num_envs: listify = False except TypeError: pass if not listify: self.actions = actions else: assert self.num_envs == 1, "actions {} is either not a list or has a wrong size - cannot match to {} environments".format(actions, self.num_envs) self.actions = [actions] def step_wait(self): for e in range(self.num_envs): action = self.actions[e] # if isinstance(self.envs[e].action_space, spaces.Discrete): # action = int(action) obs, self.buf_rews[e], self.buf_dones[e], self.buf_infos[e] = self.envs[e].step(action) if self.buf_dones[e]: obs = self.envs[e].reset() self._save_obs(e, obs) return (self._obs_from_buf(), np.copy(self.buf_rews), np.copy(self.buf_dones), self.buf_infos.copy()) def reset(self): for e in range(self.num_envs): obs = self.envs[e].reset() self._save_obs(e, obs) return self._obs_from_buf() def _save_obs(self, e, obs): for k in self.keys: if k is None: self.buf_obs[k][e] = obs else: self.buf_obs[k][e] = obs[k] def _obs_from_buf(self): return dict_to_obs(copy_obs_dict(self.buf_obs)) def get_images(self): return [env.render(mode='rgb_array') for env in self.envs] def render(self, mode='human'): if self.num_envs == 1: return self.envs[0].render(mode=mode) else: return super().render(mode=mode)
Ancestors
- VecEnv
- abc.ABC
Methods
def render(self, mode='human')
-
Expand source code
def render(self, mode='human'): if self.num_envs == 1: return self.envs[0].render(mode=mode) else: return super().render(mode=mode)
Inherited members
class NotSteppingError
-
Raised when an asynchronous step is not running but step_wait() is called.
Expand source code
class NotSteppingError(Exception): """ Raised when an asynchronous step is not running but step_wait() is called. """ def __init__(self): msg = 'not running an async step' Exception.__init__(self, msg)
Ancestors
- builtins.Exception
- builtins.BaseException
class ShmemVecEnv (env_fns, spaces=None, context='spawn')
-
Optimized version of SubprocVecEnv that uses shared variables to communicate observations.
If you don't specify observation_space, we'll have to create a dummy environment to get it.
Expand source code
class ShmemVecEnv(VecEnv): """ Optimized version of SubprocVecEnv that uses shared variables to communicate observations. """ def __init__(self, env_fns, spaces=None, context='spawn'): """ If you don't specify observation_space, we'll have to create a dummy environment to get it. """ ctx = mp.get_context(context) if spaces: observation_space, action_space = spaces else: logger.log('Creating dummy env object to get spaces') with logger.scoped_configure(format_strs=[]): dummy = env_fns[0]() observation_space, action_space = dummy.observation_space, dummy.action_space dummy.close() del dummy VecEnv.__init__(self, len(env_fns), observation_space, action_space) self.obs_keys, self.obs_shapes, self.obs_dtypes = obs_space_info(observation_space) self.obs_bufs = [ {k: ctx.Array(_NP_TO_CT[self.obs_dtypes[k].type], int(np.prod(self.obs_shapes[k]))) for k in self.obs_keys} for _ in env_fns] self.parent_pipes = [] self.procs = [] with clear_mpi_env_vars(): for env_fn, obs_buf in zip(env_fns, self.obs_bufs): wrapped_fn = CloudpickleWrapper(env_fn) parent_pipe, child_pipe = ctx.Pipe() proc = ctx.Process(target=_subproc_worker, args=(child_pipe, parent_pipe, wrapped_fn, obs_buf, self.obs_shapes, self.obs_dtypes, self.obs_keys)) proc.daemon = True self.procs.append(proc) self.parent_pipes.append(parent_pipe) proc.start() child_pipe.close() self.waiting_step = False self.viewer = None def reset(self): if self.waiting_step: logger.warn('Called reset() while waiting for the step to complete') self.step_wait() for pipe in self.parent_pipes: pipe.send(('reset', None)) return self._decode_obses([pipe.recv() for pipe in self.parent_pipes]) def step_async(self, actions): assert len(actions) == len(self.parent_pipes) for pipe, act in zip(self.parent_pipes, actions): pipe.send(('step', act)) self.waiting_step = True def step_wait(self): outs = [pipe.recv() for pipe in self.parent_pipes] self.waiting_step = False obs, rews, dones, infos = zip(*outs) return self._decode_obses(obs), np.array(rews), np.array(dones), infos def close_extras(self): if self.waiting_step: self.step_wait() for pipe in self.parent_pipes: pipe.send(('close', None)) for pipe in self.parent_pipes: pipe.recv() pipe.close() for proc in self.procs: proc.join() def get_images(self, mode='human'): for pipe in self.parent_pipes: pipe.send(('render', None)) return [pipe.recv() for pipe in self.parent_pipes] def _decode_obses(self, obs): result = {} for k in self.obs_keys: bufs = [b[k] for b in self.obs_bufs] o = [np.frombuffer(b.get_obj(), dtype=self.obs_dtypes[k]).reshape(self.obs_shapes[k]) for b in bufs] result[k] = np.array(o) return dict_to_obs(result)
Ancestors
- VecEnv
- abc.ABC
Inherited members
class SubprocVecEnv (env_fns, spaces=None, context='spawn', in_series=1)
-
VecEnv that runs multiple environments in parallel in subproceses and communicates with them via pipes. Recommended to use when num_envs > 1 and step() can be a bottleneck.
Arguments:
env_fns: iterable of callables - functions that create environments to run in subprocesses. Need to be cloud-pickleable in_series: number of environments to run in series in a single process (e.g. when len(env_fns) == 12 and in_series == 3, it will run 4 processes, each running 3 envs in series)
Expand source code
class SubprocVecEnv(VecEnv): """ VecEnv that runs multiple environments in parallel in subproceses and communicates with them via pipes. Recommended to use when num_envs > 1 and step() can be a bottleneck. """ def __init__(self, env_fns, spaces=None, context='spawn', in_series=1): """ Arguments: env_fns: iterable of callables - functions that create environments to run in subprocesses. Need to be cloud-pickleable in_series: number of environments to run in series in a single process (e.g. when len(env_fns) == 12 and in_series == 3, it will run 4 processes, each running 3 envs in series) """ self.waiting = False self.closed = False self.in_series = in_series nenvs = len(env_fns) assert nenvs % in_series == 0, "Number of envs must be divisible by number of envs to run in series" self.nremotes = nenvs // in_series env_fns = np.array_split(env_fns, self.nremotes) ctx = mp.get_context(context) self.remotes, self.work_remotes = zip(*[ctx.Pipe() for _ in range(self.nremotes)]) self.ps = [ctx.Process(target=worker, args=(work_remote, remote, CloudpickleWrapper(env_fn))) for (work_remote, remote, env_fn) in zip(self.work_remotes, self.remotes, env_fns)] for p in self.ps: p.daemon = True # if the main process crashes, we should not cause things to hang with clear_mpi_env_vars(): p.start() for remote in self.work_remotes: remote.close() self.remotes[0].send(('get_spaces_spec', None)) observation_space, action_space, self.spec = self.remotes[0].recv().x self.viewer = None VecEnv.__init__(self, nenvs, observation_space, action_space) def step_async(self, actions): self._assert_not_closed() actions = np.array_split(actions, self.nremotes) for remote, action in zip(self.remotes, actions): remote.send(('step', action)) self.waiting = True def step_wait(self): self._assert_not_closed() results = [remote.recv() for remote in self.remotes] results = _flatten_list(results) self.waiting = False obs, rews, dones, infos = zip(*results) return _flatten_obs(obs), np.stack(rews), np.stack(dones), infos def reset(self): self._assert_not_closed() for remote in self.remotes: remote.send(('reset', None)) obs = [remote.recv() for remote in self.remotes] obs = _flatten_list(obs) return _flatten_obs(obs) def close_extras(self): self.closed = True if self.waiting: for remote in self.remotes: remote.recv() for remote in self.remotes: remote.send(('close', None)) for p in self.ps: p.join() def get_images(self): self._assert_not_closed() for pipe in self.remotes: pipe.send(('render', None)) imgs = [pipe.recv() for pipe in self.remotes] imgs = _flatten_list(imgs) return imgs def _assert_not_closed(self): assert not self.closed, "Trying to operate on a SubprocVecEnv after calling close()" def __del__(self): if not self.closed: self.close()
Ancestors
- VecEnv
- abc.ABC
Inherited members
class VecEnv (num_envs, observation_space, action_space)
-
An abstract asynchronous, vectorized environment. Used to batch data from multiple copies of an environment, so that each observation becomes an batch of observations, and expected action is a batch of actions to be applied per-environment.
Expand source code
class VecEnv(ABC): """ An abstract asynchronous, vectorized environment. Used to batch data from multiple copies of an environment, so that each observation becomes an batch of observations, and expected action is a batch of actions to be applied per-environment. """ closed = False viewer = None metadata = { 'render.modes': ['human', 'rgb_array'] } def __init__(self, num_envs, observation_space, action_space): self.num_envs = num_envs self.observation_space = observation_space self.action_space = action_space @abstractmethod def reset(self): """ Reset all the environments and return an array of observations, or a dict of observation arrays. If step_async is still doing work, that work will be cancelled and step_wait() should not be called until step_async() is invoked again. """ pass @abstractmethod def step_async(self, actions): """ Tell all the environments to start taking a step with the given actions. Call step_wait() to get the results of the step. You should not call this if a step_async run is already pending. """ pass @abstractmethod def step_wait(self): """ Wait for the step taken with step_async(). Returns (obs, rews, dones, infos): - obs: an array of observations, or a dict of arrays of observations. - rews: an array of rewards - dones: an array of "episode done" booleans - infos: a sequence of info objects """ pass def close_extras(self): """ Clean up the extra resources, beyond what's in this base class. Only runs when not self.closed. """ pass def close(self): if self.closed: return if self.viewer is not None: self.viewer.close() self.close_extras() self.closed = True def step(self, actions): """ Step the environments synchronously. This is available for backwards compatibility. """ self.step_async(actions) return self.step_wait() def render(self, mode='human'): imgs = self.get_images() bigimg = tile_images(imgs) if mode == 'human': self.get_viewer().imshow(bigimg) return self.get_viewer().isopen elif mode == 'rgb_array': return bigimg else: raise NotImplementedError def get_images(self): """ Return RGB images from each environment """ raise NotImplementedError @property def unwrapped(self): if isinstance(self, VecEnvWrapper): return self.venv.unwrapped else: return self def get_viewer(self): if self.viewer is None: from gym.envs.classic_control import rendering self.viewer = rendering.SimpleImageViewer() return self.viewer
Ancestors
- abc.ABC
Subclasses
Class variables
var closed
var metadata
var viewer
Instance variables
var unwrapped
-
Expand source code
@property def unwrapped(self): if isinstance(self, VecEnvWrapper): return self.venv.unwrapped else: return self
Methods
def close(self)
-
Expand source code
def close(self): if self.closed: return if self.viewer is not None: self.viewer.close() self.close_extras() self.closed = True
def close_extras(self)
-
Clean up the extra resources, beyond what's in this base class. Only runs when not self.closed.
Expand source code
def close_extras(self): """ Clean up the extra resources, beyond what's in this base class. Only runs when not self.closed. """ pass
def get_images(self)
-
Return RGB images from each environment
Expand source code
def get_images(self): """ Return RGB images from each environment """ raise NotImplementedError
def get_viewer(self)
-
Expand source code
def get_viewer(self): if self.viewer is None: from gym.envs.classic_control import rendering self.viewer = rendering.SimpleImageViewer() return self.viewer
def render(self, mode='human')
-
Expand source code
def render(self, mode='human'): imgs = self.get_images() bigimg = tile_images(imgs) if mode == 'human': self.get_viewer().imshow(bigimg) return self.get_viewer().isopen elif mode == 'rgb_array': return bigimg else: raise NotImplementedError
def reset(self)
-
Reset all the environments and return an array of observations, or a dict of observation arrays.
If step_async is still doing work, that work will be cancelled and step_wait() should not be called until step_async() is invoked again.
Expand source code
@abstractmethod def reset(self): """ Reset all the environments and return an array of observations, or a dict of observation arrays. If step_async is still doing work, that work will be cancelled and step_wait() should not be called until step_async() is invoked again. """ pass
def step(self, actions)
-
Step the environments synchronously.
This is available for backwards compatibility.
Expand source code
def step(self, actions): """ Step the environments synchronously. This is available for backwards compatibility. """ self.step_async(actions) return self.step_wait()
def step_async(self, actions)
-
Tell all the environments to start taking a step with the given actions. Call step_wait() to get the results of the step.
You should not call this if a step_async run is already pending.
Expand source code
@abstractmethod def step_async(self, actions): """ Tell all the environments to start taking a step with the given actions. Call step_wait() to get the results of the step. You should not call this if a step_async run is already pending. """ pass
def step_wait(self)
-
Wait for the step taken with step_async().
Returns (obs, rews, dones, infos): - obs: an array of observations, or a dict of arrays of observations. - rews: an array of rewards - dones: an array of "episode done" booleans - infos: a sequence of info objects
Expand source code
@abstractmethod def step_wait(self): """ Wait for the step taken with step_async(). Returns (obs, rews, dones, infos): - obs: an array of observations, or a dict of arrays of observations. - rews: an array of rewards - dones: an array of "episode done" booleans - infos: a sequence of info objects """ pass
class VecEnvObservationWrapper (venv, observation_space=None, action_space=None)
-
An environment wrapper that applies to an entire batch of environments at once.
Expand source code
class VecEnvObservationWrapper(VecEnvWrapper): @abstractmethod def process(self, obs): pass def reset(self): obs = self.venv.reset() return self.process(obs) def step_wait(self): obs, rews, dones, infos = self.venv.step_wait() return self.process(obs), rews, dones, infos
Ancestors
- VecEnvWrapper
- VecEnv
- abc.ABC
Subclasses
Methods
def process(self, obs)
-
Expand source code
@abstractmethod def process(self, obs): pass
Inherited members
class VecEnvWrapper (venv, observation_space=None, action_space=None)
-
An environment wrapper that applies to an entire batch of environments at once.
Expand source code
class VecEnvWrapper(VecEnv): """ An environment wrapper that applies to an entire batch of environments at once. """ def __init__(self, venv, observation_space=None, action_space=None): self.venv = venv super().__init__(num_envs=venv.num_envs, observation_space=observation_space or venv.observation_space, action_space=action_space or venv.action_space) def step_async(self, actions): self.venv.step_async(actions) @abstractmethod def reset(self): pass @abstractmethod def step_wait(self): pass def close(self): return self.venv.close() def render(self, mode='human'): return self.venv.render(mode=mode) def get_images(self): return self.venv.get_images() def __getattr__(self, name): if name.startswith('_'): raise AttributeError("attempted to get missing private attribute '{}'".format(name)) return getattr(self.venv, name)
Ancestors
- VecEnv
- abc.ABC
Subclasses
Methods
def close(self)
-
Expand source code
def close(self): return self.venv.close()
def render(self, mode='human')
-
Expand source code
def render(self, mode='human'): return self.venv.render(mode=mode)
Inherited members
class VecExtractDictObs (venv, key)
-
An environment wrapper that applies to an entire batch of environments at once.
Expand source code
class VecExtractDictObs(VecEnvObservationWrapper): def __init__(self, venv, key): self.key = key super().__init__(venv=venv, observation_space=venv.observation_space.spaces[self.key]) def process(self, obs): return obs[self.key]
Ancestors
Methods
def process(self, obs)
-
Expand source code
def process(self, obs): return obs[self.key]
Inherited members
class VecFrameStack (venv, nstack)
-
An environment wrapper that applies to an entire batch of environments at once.
Expand source code
class VecFrameStack(VecEnvWrapper): def __init__(self, venv, nstack): self.venv = venv self.nstack = nstack wos = venv.observation_space # wrapped ob space low = np.repeat(wos.low, self.nstack, axis=-1) high = np.repeat(wos.high, self.nstack, axis=-1) self.stackedobs = np.zeros((venv.num_envs,) + low.shape, low.dtype) observation_space = spaces.Box(low=low, high=high, dtype=venv.observation_space.dtype) VecEnvWrapper.__init__(self, venv, observation_space=observation_space) def step_wait(self): obs, rews, news, infos = self.venv.step_wait() self.stackedobs = np.roll(self.stackedobs, shift=-1, axis=-1) for (i, new) in enumerate(news): if new: self.stackedobs[i] = 0 self.stackedobs[..., -obs.shape[-1]:] = obs return self.stackedobs, rews, news, infos def reset(self): obs = self.venv.reset() self.stackedobs[...] = 0 self.stackedobs[..., -obs.shape[-1]:] = obs return self.stackedobs
Ancestors
- VecEnvWrapper
- VecEnv
- abc.ABC
Inherited members
class VecMonitor (venv, filename=None, keep_buf=0, info_keywords=())
-
An environment wrapper that applies to an entire batch of environments at once.
Expand source code
class VecMonitor(VecEnvWrapper): def __init__(self, venv, filename=None, keep_buf=0, info_keywords=()): VecEnvWrapper.__init__(self, venv) self.eprets = None self.eplens = None self.epcount = 0 self.tstart = time.time() if filename: self.results_writer = ResultsWriter(filename, header={'t_start': self.tstart}, extra_keys=info_keywords) else: self.results_writer = None self.info_keywords = info_keywords self.keep_buf = keep_buf if self.keep_buf: self.epret_buf = deque([], maxlen=keep_buf) self.eplen_buf = deque([], maxlen=keep_buf) def reset(self): obs = self.venv.reset() self.eprets = np.zeros(self.num_envs, 'f') self.eplens = np.zeros(self.num_envs, 'i') return obs def step_wait(self): obs, rews, dones, infos = self.venv.step_wait() self.eprets += rews self.eplens += 1 newinfos = list(infos[:]) for i in range(len(dones)): if dones[i]: info = infos[i].copy() ret = self.eprets[i] eplen = self.eplens[i] epinfo = {'r': ret, 'l': eplen, 't': round(time.time() - self.tstart, 6)} for k in self.info_keywords: epinfo[k] = info[k] info['episode'] = epinfo if self.keep_buf: self.epret_buf.append(ret) self.eplen_buf.append(eplen) self.epcount += 1 self.eprets[i] = 0 self.eplens[i] = 0 if self.results_writer: self.results_writer.write_row(epinfo) newinfos[i] = info return obs, rews, dones, newinfos
Ancestors
- VecEnvWrapper
- VecEnv
- abc.ABC
Inherited members
class VecNormalize (venv, ob=True, ret=True, clipob=10.0, cliprew=10.0, gamma=0.99, epsilon=1e-08, use_tf=False)
-
A vectorized wrapper that normalizes the observations and returns from an environment.
Expand source code
class VecNormalize(VecEnvWrapper): """ A vectorized wrapper that normalizes the observations and returns from an environment. """ def __init__(self, venv, ob=True, ret=True, clipob=10., cliprew=10., gamma=0.99, epsilon=1e-8, use_tf=False): VecEnvWrapper.__init__(self, venv) if use_tf: from TeachMyAgent.students.openai_baselines.common.running_mean_std import TfRunningMeanStd self.ob_rms = TfRunningMeanStd(shape=self.observation_space.shape, scope='ob_rms') if ob else None self.ret_rms = TfRunningMeanStd(shape=(), scope='ret_rms') if ret else None else: from TeachMyAgent.students.openai_baselines.common.running_mean_std import RunningMeanStd self.ob_rms = RunningMeanStd(shape=self.observation_space.shape) if ob else None self.ret_rms = RunningMeanStd(shape=()) if ret else None self.clipob = clipob self.cliprew = cliprew self.ret = np.zeros(self.num_envs) self.gamma = gamma self.epsilon = epsilon def step_wait(self): obs, rews, news, infos = self.venv.step_wait() self.ret = self.ret * self.gamma + rews obs = self._obfilt(obs) if self.ret_rms: self.ret_rms.update(self.ret) rews = np.clip(rews / np.sqrt(self.ret_rms.var + self.epsilon), -self.cliprew, self.cliprew) self.ret[news] = 0. return obs, rews, news, infos def _obfilt(self, obs): if self.ob_rms: self.ob_rms.update(obs) obs = np.clip((obs - self.ob_rms.mean) / np.sqrt(self.ob_rms.var + self.epsilon), -self.clipob, self.clipob) return obs else: return obs def reset(self): self.ret = np.zeros(self.num_envs) obs = self.venv.reset() return self._obfilt(obs)
Ancestors
- VecEnvWrapper
- VecEnv
- abc.ABC
Subclasses
Inherited members