Module TeachMyAgent.students.openai_baselines.common.vec_env

from .vec_env import AlreadySteppingError, NotSteppingError, VecEnv, VecEnvWrapper, VecEnvObservationWrapper, CloudpickleWrapper
from .dummy_vec_env import DummyVecEnv
from .shmem_vec_env import ShmemVecEnv
from .subproc_vec_env import SubprocVecEnv
from .vec_frame_stack import VecFrameStack
from .vec_monitor import VecMonitor
from .vec_normalize import VecNormalize
from .vec_remove_dict_obs import VecExtractDictObs

__all__ = ['AlreadySteppingError', 'NotSteppingError', 'VecEnv', 'VecEnvWrapper', 'VecEnvObservationWrapper', 'CloudpickleWrapper', 'DummyVecEnv', 'ShmemVecEnv', 'SubprocVecEnv', 'VecFrameStack', 'VecMonitor', 'VecNormalize', 'VecExtractDictObs']



An interface for asynchronous vectorized environments.


Tests for asynchronous vectorized environments.


Helpers for dealing with vectorized environments.



class AlreadySteppingError

Raised when an asynchronous step is running while step_async() is called again.

class AlreadySteppingError(Exception):
    Raised when an asynchronous step is running while
    step_async() is called again.

    def __init__(self):
        msg = 'already running an async step'
        Exception.__init__(self, msg)


class CloudpickleWrapper (x)

Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle)

class CloudpickleWrapper(object):
    Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle)

    def __init__(self, x):
        self.x = x

    def __getstate__(self):
        import cloudpickle
        return cloudpickle.dumps(self.x)

    def __setstate__(self, ob):
        import pickle
        self.x = pickle.loads(ob)
class DummyVecEnv (env_fns)

VecEnv that does runs multiple environments sequentially, that is, the step and reset commands are send to one environment at a time. Useful when debugging and when num_env == 1 (in the latter case, avoids communication overhead)


env_fns: iterable of callables functions that build environments

class DummyVecEnv(VecEnv):
    VecEnv that does runs multiple environments sequentially, that is,
    the step and reset commands are send to one environment at a time.
    Useful when debugging and when num_env == 1 (in the latter case,
    avoids communication overhead)
    def __init__(self, env_fns):

        env_fns: iterable of callables      functions that build environments
        self.envs = [fn() for fn in env_fns]
        env = self.envs[0]
        VecEnv.__init__(self, len(env_fns), env.observation_space, env.action_space)
        obs_space = env.observation_space
        self.keys, shapes, dtypes = obs_space_info(obs_space)

        self.buf_obs = { k: np.zeros((self.num_envs,) + tuple(shapes[k]), dtype=dtypes[k]) for k in self.keys }
        self.buf_dones = np.zeros((self.num_envs,), dtype=np.bool)
        self.buf_rews  = np.zeros((self.num_envs,), dtype=np.float32)
        self.buf_infos = [{} for _ in range(self.num_envs)]
        self.actions = None
        self.spec = self.envs[0].spec

    def step_async(self, actions):
        listify = True
            if len(actions) == self.num_envs:
                listify = False
        except TypeError:

        if not listify:
            self.actions = actions
            assert self.num_envs == 1, "actions {} is either not a list or has a wrong size - cannot match to {} environments".format(actions, self.num_envs)
            self.actions = [actions]

    def step_wait(self):
        for e in range(self.num_envs):
            action = self.actions[e]
            # if isinstance(self.envs[e].action_space, spaces.Discrete):
            #    action = int(action)

            obs, self.buf_rews[e], self.buf_dones[e], self.buf_infos[e] = self.envs[e].step(action)
            if self.buf_dones[e]:
                obs = self.envs[e].reset()
            self._save_obs(e, obs)
        return (self._obs_from_buf(), np.copy(self.buf_rews), np.copy(self.buf_dones),

    def reset(self):
        for e in range(self.num_envs):
            obs = self.envs[e].reset()
            self._save_obs(e, obs)
        return self._obs_from_buf()

    def _save_obs(self, e, obs):
        for k in self.keys:
            if k is None:
                self.buf_obs[k][e] = obs
                self.buf_obs[k][e] = obs[k]

    def _obs_from_buf(self):
        return dict_to_obs(copy_obs_dict(self.buf_obs))

    def get_images(self):
        return [env.render(mode='rgb_array') for env in self.envs]

    def render(self, mode='human'):
        if self.num_envs == 1:
            return self.envs[0].render(mode=mode)
            return super().render(mode=mode)



def render(self, mode='human')
def render(self, mode='human'):
    if self.num_envs == 1:
        return self.envs[0].render(mode=mode)
        return super().render(mode=mode)

class NotSteppingError

Raised when an asynchronous step is not running but step_wait() is called.

class NotSteppingError(Exception):
    Raised when an asynchronous step is not running but
    step_wait() is called.

    def __init__(self):
        msg = 'not running an async step'
        Exception.__init__(self, msg)


class ShmemVecEnv (env_fns, spaces=None, context='spawn')

Optimized version of SubprocVecEnv that uses shared variables to communicate observations.

If you don't specify observation_space, we'll have to create a dummy environment to get it.

class ShmemVecEnv(VecEnv):
    Optimized version of SubprocVecEnv that uses shared variables to communicate observations.

    def __init__(self, env_fns, spaces=None, context='spawn'):
        If you don't specify observation_space, we'll have to create a dummy
        environment to get it.
        ctx = mp.get_context(context)
        if spaces:
            observation_space, action_space = spaces
            logger.log('Creating dummy env object to get spaces')
            with logger.scoped_configure(format_strs=[]):
                dummy = env_fns[0]()
                observation_space, action_space = dummy.observation_space, dummy.action_space
                del dummy
        VecEnv.__init__(self, len(env_fns), observation_space, action_space)
        self.obs_keys, self.obs_shapes, self.obs_dtypes = obs_space_info(observation_space)
        self.obs_bufs = [
            {k: ctx.Array(_NP_TO_CT[self.obs_dtypes[k].type], int([k]))) for k in self.obs_keys}
            for _ in env_fns]
        self.parent_pipes = []
        self.procs = []
        with clear_mpi_env_vars():
            for env_fn, obs_buf in zip(env_fns, self.obs_bufs):
                wrapped_fn = CloudpickleWrapper(env_fn)
                parent_pipe, child_pipe = ctx.Pipe()
                proc = ctx.Process(target=_subproc_worker,
                            args=(child_pipe, parent_pipe, wrapped_fn, obs_buf, self.obs_shapes, self.obs_dtypes, self.obs_keys))
                proc.daemon = True
        self.waiting_step = False
        self.viewer = None

    def reset(self):
        if self.waiting_step:
            logger.warn('Called reset() while waiting for the step to complete')
        for pipe in self.parent_pipes:
            pipe.send(('reset', None))
        return self._decode_obses([pipe.recv() for pipe in self.parent_pipes])

    def step_async(self, actions):
        assert len(actions) == len(self.parent_pipes)
        for pipe, act in zip(self.parent_pipes, actions):
            pipe.send(('step', act))
        self.waiting_step = True

    def step_wait(self):
        outs = [pipe.recv() for pipe in self.parent_pipes]
        self.waiting_step = False
        obs, rews, dones, infos = zip(*outs)
        return self._decode_obses(obs), np.array(rews), np.array(dones), infos

    def close_extras(self):
        if self.waiting_step:
        for pipe in self.parent_pipes:
            pipe.send(('close', None))
        for pipe in self.parent_pipes:
        for proc in self.procs:

    def get_images(self, mode='human'):
        for pipe in self.parent_pipes:
            pipe.send(('render', None))
        return [pipe.recv() for pipe in self.parent_pipes]

    def _decode_obses(self, obs):
        result = {}
        for k in self.obs_keys:

            bufs = [b[k] for b in self.obs_bufs]
            o = [np.frombuffer(b.get_obj(), dtype=self.obs_dtypes[k]).reshape(self.obs_shapes[k]) for b in bufs]
            result[k] = np.array(o)
        return dict_to_obs(result)


class SubprocVecEnv (env_fns, spaces=None, context='spawn', in_series=1)

VecEnv that runs multiple environments in parallel in subproceses and communicates with them via pipes. Recommended to use when num_envs > 1 and step() can be a bottleneck.


env_fns: iterable of callables - functions that create environments to run in subprocesses. Need to be cloud-pickleable in_series: number of environments to run in series in a single process (e.g. when len(env_fns) == 12 and in_series == 3, it will run 4 processes, each running 3 envs in series)

class SubprocVecEnv(VecEnv):
    VecEnv that runs multiple environments in parallel in subproceses and communicates with them via pipes.
    Recommended to use when num_envs > 1 and step() can be a bottleneck.
    def __init__(self, env_fns, spaces=None, context='spawn', in_series=1):

        env_fns: iterable of callables -  functions that create environments to run in subprocesses. Need to be cloud-pickleable
        in_series: number of environments to run in series in a single process
        (e.g. when len(env_fns) == 12 and in_series == 3, it will run 4 processes, each running 3 envs in series)
        self.waiting = False
        self.closed = False
        self.in_series = in_series
        nenvs = len(env_fns)
        assert nenvs % in_series == 0, "Number of envs must be divisible by number of envs to run in series"
        self.nremotes = nenvs // in_series
        env_fns = np.array_split(env_fns, self.nremotes)
        ctx = mp.get_context(context)
        self.remotes, self.work_remotes = zip(*[ctx.Pipe() for _ in range(self.nremotes)]) = [ctx.Process(target=worker, args=(work_remote, remote, CloudpickleWrapper(env_fn)))
                   for (work_remote, remote, env_fn) in zip(self.work_remotes, self.remotes, env_fns)]
        for p in
            p.daemon = True  # if the main process crashes, we should not cause things to hang
            with clear_mpi_env_vars():
        for remote in self.work_remotes:

        self.remotes[0].send(('get_spaces_spec', None))
        observation_space, action_space, self.spec = self.remotes[0].recv().x
        self.viewer = None
        VecEnv.__init__(self, nenvs, observation_space, action_space)

    def step_async(self, actions):
        actions = np.array_split(actions, self.nremotes)
        for remote, action in zip(self.remotes, actions):
            remote.send(('step', action))
        self.waiting = True

    def step_wait(self):
        results = [remote.recv() for remote in self.remotes]
        results = _flatten_list(results)
        self.waiting = False
        obs, rews, dones, infos = zip(*results)
        return _flatten_obs(obs), np.stack(rews), np.stack(dones), infos

    def reset(self):
        for remote in self.remotes:
            remote.send(('reset', None))
        obs = [remote.recv() for remote in self.remotes]
        obs = _flatten_list(obs)
        return _flatten_obs(obs)

    def close_extras(self):
        self.closed = True
        if self.waiting:
            for remote in self.remotes:
        for remote in self.remotes:
            remote.send(('close', None))
        for p in

    def get_images(self):
        for pipe in self.remotes:
            pipe.send(('render', None))
        imgs = [pipe.recv() for pipe in self.remotes]
        imgs = _flatten_list(imgs)
        return imgs

    def _assert_not_closed(self):
        assert not self.closed, "Trying to operate on a SubprocVecEnv after calling close()"

    def __del__(self):
        if not self.closed:


class VecEnv (num_envs, observation_space, action_space)

An abstract asynchronous, vectorized environment. Used to batch data from multiple copies of an environment, so that each observation becomes an batch of observations, and expected action is a batch of actions to be applied per-environment.

class VecEnv(ABC):
    An abstract asynchronous, vectorized environment.
    Used to batch data from multiple copies of an environment, so that
    each observation becomes an batch of observations, and expected action is a batch of actions to
    be applied per-environment.
    closed = False
    viewer = None

    metadata = {
        'render.modes': ['human', 'rgb_array']

    def __init__(self, num_envs, observation_space, action_space):
        self.num_envs = num_envs
        self.observation_space = observation_space
        self.action_space = action_space

    def reset(self):
        Reset all the environments and return an array of
        observations, or a dict of observation arrays.

        If step_async is still doing work, that work will
        be cancelled and step_wait() should not be called
        until step_async() is invoked again.

    def step_async(self, actions):
        Tell all the environments to start taking a step
        with the given actions.
        Call step_wait() to get the results of the step.

        You should not call this if a step_async run is
        already pending.

    def step_wait(self):
        Wait for the step taken with step_async().

        Returns (obs, rews, dones, infos):
         - obs: an array of observations, or a dict of
                arrays of observations.
         - rews: an array of rewards
         - dones: an array of "episode done" booleans
         - infos: a sequence of info objects

    def close_extras(self):
        Clean up the  extra resources, beyond what's in this base class.
        Only runs when not self.closed.

    def close(self):
        if self.closed:
        if self.viewer is not None:
        self.closed = True

    def step(self, actions):
        Step the environments synchronously.

        This is available for backwards compatibility.
        return self.step_wait()

    def render(self, mode='human'):
        imgs = self.get_images()
        bigimg = tile_images(imgs)
        if mode == 'human':
            return self.get_viewer().isopen
        elif mode == 'rgb_array':
            return bigimg
            raise NotImplementedError

    def get_images(self):
        Return RGB images from each environment
        raise NotImplementedError

    def unwrapped(self):
        if isinstance(self, VecEnvWrapper):
            return self.venv.unwrapped
            return self

    def get_viewer(self):
        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer = rendering.SimpleImageViewer()
        return self.viewer


var unwrapped
def unwrapped(self):
    if isinstance(self, VecEnvWrapper):
        return self.venv.unwrapped
        return self


def close(self)
def close(self):
    if self.closed:
    if self.viewer is not None:
    self.closed = True
def close_extras(self)

Clean up the extra resources, beyond what's in this base class. Only runs when not self.closed.

def close_extras(self):
    Clean up the  extra resources, beyond what's in this base class.
    Only runs when not self.closed.
def get_images(self)

Return RGB images from each environment

def get_images(self):
    Return RGB images from each environment
    raise NotImplementedError
def get_viewer(self)
def get_viewer(self):
    if self.viewer is None:
        from gym.envs.classic_control import rendering
        self.viewer = rendering.SimpleImageViewer()
    return self.viewer
def render(self, mode='human')
def render(self, mode='human'):
    imgs = self.get_images()
    bigimg = tile_images(imgs)
    if mode == 'human':
        return self.get_viewer().isopen
    elif mode == 'rgb_array':
        return bigimg
        raise NotImplementedError
def reset(self)

Reset all the environments and return an array of observations, or a dict of observation arrays.

If step_async is still doing work, that work will be cancelled and step_wait() should not be called until step_async() is invoked again.

def reset(self):
    Reset all the environments and return an array of
    observations, or a dict of observation arrays.

    If step_async is still doing work, that work will
    be cancelled and step_wait() should not be called
    until step_async() is invoked again.
def step(self, actions)

Step the environments synchronously.

This is available for backwards compatibility.

def step(self, actions):
    Step the environments synchronously.

    This is available for backwards compatibility.
    return self.step_wait()
def step_async(self, actions)

Tell all the environments to start taking a step with the given actions. Call step_wait() to get the results of the step.

You should not call this if a step_async run is already pending.

def step_async(self, actions):
    Tell all the environments to start taking a step
    with the given actions.
    Call step_wait() to get the results of the step.

    You should not call this if a step_async run is
    already pending.
def step_wait(self)

Wait for the step taken with step_async().

Returns (obs, rews, dones, infos): - obs: an array of observations, or a dict of arrays of observations. - rews: an array of rewards - dones: an array of "episode done" booleans - infos: a sequence of info objects

def step_wait(self):
    Wait for the step taken with step_async().

    Returns (obs, rews, dones, infos):
     - obs: an array of observations, or a dict of
            arrays of observations.
     - rews: an array of rewards
     - dones: an array of "episode done" booleans
     - infos: a sequence of info objects
class VecEnvObservationWrapper (venv, observation_space=None, action_space=None)

An environment wrapper that applies to an entire batch of environments at once.

class VecEnvObservationWrapper(VecEnvWrapper):
    def process(self, obs):

    def reset(self):
        obs = self.venv.reset()
        return self.process(obs)

    def step_wait(self):
        obs, rews, dones, infos = self.venv.step_wait()
        return self.process(obs), rews, dones, infos




def process(self, obs)
def process(self, obs):

class VecEnvWrapper (venv, observation_space=None, action_space=None)

An environment wrapper that applies to an entire batch of environments at once.

class VecEnvWrapper(VecEnv):
    An environment wrapper that applies to an entire batch
    of environments at once.

    def __init__(self, venv, observation_space=None, action_space=None):
        self.venv = venv
                        observation_space=observation_space or venv.observation_space,
                        action_space=action_space or venv.action_space)

    def step_async(self, actions):

    def reset(self):

    def step_wait(self):

    def close(self):
        return self.venv.close()

    def render(self, mode='human'):
        return self.venv.render(mode=mode)

    def get_images(self):
        return self.venv.get_images()

    def __getattr__(self, name):
        if name.startswith('_'):
            raise AttributeError("attempted to get missing private attribute '{}'".format(name))
        return getattr(self.venv, name)




def close(self)
def close(self):
    return self.venv.close()
def render(self, mode='human')
def render(self, mode='human'):
    return self.venv.render(mode=mode)

class VecExtractDictObs (venv, key)

An environment wrapper that applies to an entire batch of environments at once.

class VecExtractDictObs(VecEnvObservationWrapper):
    def __init__(self, venv, key):
        self.key = key

    def process(self, obs):
        return obs[self.key]



def process(self, obs)
def process(self, obs):
    return obs[self.key]

class VecFrameStack (venv, nstack)

An environment wrapper that applies to an entire batch of environments at once.

class VecFrameStack(VecEnvWrapper):
    def __init__(self, venv, nstack):
        self.venv = venv
        self.nstack = nstack
        wos = venv.observation_space  # wrapped ob space
        low = np.repeat(wos.low, self.nstack, axis=-1)
        high = np.repeat(wos.high, self.nstack, axis=-1)
        self.stackedobs = np.zeros((venv.num_envs,) + low.shape, low.dtype)
        observation_space = spaces.Box(low=low, high=high, dtype=venv.observation_space.dtype)
        VecEnvWrapper.__init__(self, venv, observation_space=observation_space)

    def step_wait(self):
        obs, rews, news, infos = self.venv.step_wait()
        self.stackedobs = np.roll(self.stackedobs, shift=-1, axis=-1)
        for (i, new) in enumerate(news):
            if new:
                self.stackedobs[i] = 0
        self.stackedobs[..., -obs.shape[-1]:] = obs
        return self.stackedobs, rews, news, infos

    def reset(self):
        obs = self.venv.reset()
        self.stackedobs[...] = 0
        self.stackedobs[..., -obs.shape[-1]:] = obs
        return self.stackedobs


class VecMonitor (venv, filename=None, keep_buf=0, info_keywords=())

An environment wrapper that applies to an entire batch of environments at once.

class VecMonitor(VecEnvWrapper):
    def __init__(self, venv, filename=None, keep_buf=0, info_keywords=()):
        VecEnvWrapper.__init__(self, venv)
        self.eprets = None
        self.eplens = None
        self.epcount = 0
        self.tstart = time.time()
        if filename:
            self.results_writer = ResultsWriter(filename, header={'t_start': self.tstart},
            self.results_writer = None
        self.info_keywords = info_keywords
        self.keep_buf = keep_buf
        if self.keep_buf:
            self.epret_buf = deque([], maxlen=keep_buf)
            self.eplen_buf = deque([], maxlen=keep_buf)

    def reset(self):
        obs = self.venv.reset()
        self.eprets = np.zeros(self.num_envs, 'f')
        self.eplens = np.zeros(self.num_envs, 'i')
        return obs

    def step_wait(self):
        obs, rews, dones, infos = self.venv.step_wait()
        self.eprets += rews
        self.eplens += 1

        newinfos = list(infos[:])
        for i in range(len(dones)):
            if dones[i]:
                info = infos[i].copy()
                ret = self.eprets[i]
                eplen = self.eplens[i]
                epinfo = {'r': ret, 'l': eplen, 't': round(time.time() - self.tstart, 6)}
                for k in self.info_keywords:
                    epinfo[k] = info[k]
                info['episode'] = epinfo
                if self.keep_buf:
                self.epcount += 1
                self.eprets[i] = 0
                self.eplens[i] = 0
                if self.results_writer:
                newinfos[i] = info
        return obs, rews, dones, newinfos


class VecNormalize (venv, ob=True, ret=True, clipob=10.0, cliprew=10.0, gamma=0.99, epsilon=1e-08, use_tf=False)

A vectorized wrapper that normalizes the observations and returns from an environment.

class VecNormalize(VecEnvWrapper):
    A vectorized wrapper that normalizes the observations
    and returns from an environment.

    def __init__(self, venv, ob=True, ret=True, clipob=10., cliprew=10., gamma=0.99, epsilon=1e-8, use_tf=False):
        VecEnvWrapper.__init__(self, venv)
        if use_tf:
            from TeachMyAgent.students.openai_baselines.common.running_mean_std import TfRunningMeanStd
            self.ob_rms = TfRunningMeanStd(shape=self.observation_space.shape, scope='ob_rms') if ob else None
            self.ret_rms = TfRunningMeanStd(shape=(), scope='ret_rms') if ret else None
            from TeachMyAgent.students.openai_baselines.common.running_mean_std import RunningMeanStd
            self.ob_rms = RunningMeanStd(shape=self.observation_space.shape) if ob else None
            self.ret_rms = RunningMeanStd(shape=()) if ret else None
        self.clipob = clipob
        self.cliprew = cliprew
        self.ret = np.zeros(self.num_envs)
        self.gamma = gamma
        self.epsilon = epsilon

    def step_wait(self):
        obs, rews, news, infos = self.venv.step_wait()
        self.ret = self.ret * self.gamma + rews
        obs = self._obfilt(obs)
        if self.ret_rms:
            rews = np.clip(rews / np.sqrt(self.ret_rms.var + self.epsilon), -self.cliprew, self.cliprew)
        self.ret[news] = 0.
        return obs, rews, news, infos

    def _obfilt(self, obs):
        if self.ob_rms:
            obs = np.clip((obs - self.ob_rms.mean) / np.sqrt(self.ob_rms.var + self.epsilon), -self.clipob, self.clipob)
            return obs
            return obs

    def reset(self):
        self.ret = np.zeros(self.num_envs)
        obs = self.venv.reset()
        return self._obfilt(obs)



