Module TeachMyAgent.students.openai_baselines.bench.monitor

Expand source code
__all__ = ['Monitor', 'get_monitor_files', 'load_results']

from gym.core import Wrapper
import time
from glob import glob
import csv
import os.path as osp
import json

class Monitor(Wrapper):
    EXT = "monitor.csv"
    f = None

    def __init__(self, env, filename, allow_early_resets=False, reset_keywords=(), info_keywords=()):
        Wrapper.__init__(self, env=env)
        self.tstart = time.time()
        if filename:
            self.results_writer = ResultsWriter(filename,
                header={"t_start": time.time(), 'env_id' : env.spec and env.spec.id},
                extra_keys=reset_keywords + info_keywords
            )
        else:
            self.results_writer = None
        self.reset_keywords = reset_keywords
        self.info_keywords = info_keywords
        self.allow_early_resets = allow_early_resets
        self.rewards = None
        self.needs_reset = True
        self.episode_rewards = []
        self.episode_lengths = []
        self.episode_times = []
        self.total_steps = 0
        self.current_reset_info = {} # extra info about the current episode, that was passed in during reset()

    def reset(self, **kwargs):
        self.reset_state()
        for k in self.reset_keywords:
            v = kwargs.get(k)
            if v is None:
                raise ValueError('Expected you to pass kwarg %s into reset'%k)
            self.current_reset_info[k] = v
        return self.env.reset(**kwargs)

    def reset_state(self):
        if not self.allow_early_resets and not self.needs_reset:
            raise RuntimeError("Tried to reset an environment before done. If you want to allow early resets, wrap your env with Monitor(env, path, allow_early_resets=True)")
        self.rewards = []
        self.needs_reset = False


    def step(self, action):
        if self.needs_reset:
            raise RuntimeError("Tried to step environment that needs reset")
        ob, rew, done, info = self.env.step(action)
        self.update(ob, rew, done, info)
        return (ob, rew, done, info)

    def update(self, ob, rew, done, info):
        self.rewards.append(rew)
        if done:
            self.needs_reset = True
            eprew = sum(self.rewards)
            eplen = len(self.rewards)
            epinfo = {"r": round(eprew, 6), "l": eplen, "t": round(time.time() - self.tstart, 6)}
            for k in self.info_keywords:
                epinfo[k] = info[k]
            self.episode_rewards.append(eprew)
            self.episode_lengths.append(eplen)
            self.episode_times.append(time.time() - self.tstart)
            epinfo.update(self.current_reset_info)
            if self.results_writer:
                self.results_writer.write_row(epinfo)
            assert isinstance(info, dict)
            if isinstance(info, dict):
                info['episode'] = epinfo

        self.total_steps += 1

    def close(self):
        super(Monitor, self).close()
        if self.f is not None:
            self.f.close()

    def get_total_steps(self):
        return self.total_steps

    def get_episode_rewards(self):
        return self.episode_rewards

    def get_episode_lengths(self):
        return self.episode_lengths

    def get_episode_times(self):
        return self.episode_times

class LoadMonitorResultsError(Exception):
    pass


class ResultsWriter(object):
    def __init__(self, filename, header='', extra_keys=()):
        self.extra_keys = extra_keys
        assert filename is not None
        if not filename.endswith(Monitor.EXT):
            if osp.isdir(filename):
                filename = osp.join(filename, Monitor.EXT)
            else:
                filename = filename + "." + Monitor.EXT
        self.f = open(filename, "wt")
        if isinstance(header, dict):
            header = '# {} \n'.format(json.dumps(header))
        self.f.write(header)
        self.logger = csv.DictWriter(self.f, fieldnames=('r', 'l', 't')+tuple(extra_keys))
        self.logger.writeheader()
        self.f.flush()

    def write_row(self, epinfo):
        if self.logger:
            self.logger.writerow(epinfo)
            self.f.flush()


def get_monitor_files(dir):
    return glob(osp.join(dir, "*" + Monitor.EXT))

def load_results(dir):
    import pandas
    monitor_files = (
        glob(osp.join(dir, "*monitor.json")) +
        glob(osp.join(dir, "*monitor.csv"))) # get both csv and (old) json files
    if not monitor_files:
        raise LoadMonitorResultsError("no monitor files of the form *%s found in %s" % (Monitor.EXT, dir))
    dfs = []
    headers = []
    for fname in monitor_files:
        with open(fname, 'rt') as fh:
            if fname.endswith('csv'):
                firstline = fh.readline()
                if not firstline:
                    continue
                assert firstline[0] == '#'
                header = json.loads(firstline[1:])
                df = pandas.read_csv(fh, index_col=None)
                headers.append(header)
            elif fname.endswith('json'): # Deprecated json format
                episodes = []
                lines = fh.readlines()
                header = json.loads(lines[0])
                headers.append(header)
                for line in lines[1:]:
                    episode = json.loads(line)
                    episodes.append(episode)
                df = pandas.DataFrame(episodes)
            else:
                assert 0, 'unreachable'
            df['t'] += header['t_start']
        dfs.append(df)
    df = pandas.concat(dfs)
    df.sort_values('t', inplace=True)
    df.reset_index(inplace=True)
    df['t'] -= min(header['t_start'] for header in headers)
    df.headers = headers # HACK to preserve backwards compatibility
    return df

Functions

def get_monitor_files(dir)
Expand source code
def get_monitor_files(dir):
    return glob(osp.join(dir, "*" + Monitor.EXT))
def load_results(dir)
Expand source code
def load_results(dir):
    import pandas
    monitor_files = (
        glob(osp.join(dir, "*monitor.json")) +
        glob(osp.join(dir, "*monitor.csv"))) # get both csv and (old) json files
    if not monitor_files:
        raise LoadMonitorResultsError("no monitor files of the form *%s found in %s" % (Monitor.EXT, dir))
    dfs = []
    headers = []
    for fname in monitor_files:
        with open(fname, 'rt') as fh:
            if fname.endswith('csv'):
                firstline = fh.readline()
                if not firstline:
                    continue
                assert firstline[0] == '#'
                header = json.loads(firstline[1:])
                df = pandas.read_csv(fh, index_col=None)
                headers.append(header)
            elif fname.endswith('json'): # Deprecated json format
                episodes = []
                lines = fh.readlines()
                header = json.loads(lines[0])
                headers.append(header)
                for line in lines[1:]:
                    episode = json.loads(line)
                    episodes.append(episode)
                df = pandas.DataFrame(episodes)
            else:
                assert 0, 'unreachable'
            df['t'] += header['t_start']
        dfs.append(df)
    df = pandas.concat(dfs)
    df.sort_values('t', inplace=True)
    df.reset_index(inplace=True)
    df['t'] -= min(header['t_start'] for header in headers)
    df.headers = headers # HACK to preserve backwards compatibility
    return df

Classes

class Monitor (env, filename, allow_early_resets=False, reset_keywords=(), info_keywords=())

Wraps an environment to allow a modular transformation of the :meth:step and :meth:reset methods.

This class is the base class for all wrappers. The subclass could override some methods to change the behavior of the original environment without touching the original code.

Note

Don't forget to call super().__init__(env) if the subclass overrides :meth:__init__.

Wraps an environment to allow a modular transformation of the :meth:step and :meth:reset methods.

Args

env
The environment to wrap
Expand source code
class Monitor(Wrapper):
    EXT = "monitor.csv"
    f = None

    def __init__(self, env, filename, allow_early_resets=False, reset_keywords=(), info_keywords=()):
        Wrapper.__init__(self, env=env)
        self.tstart = time.time()
        if filename:
            self.results_writer = ResultsWriter(filename,
                header={"t_start": time.time(), 'env_id' : env.spec and env.spec.id},
                extra_keys=reset_keywords + info_keywords
            )
        else:
            self.results_writer = None
        self.reset_keywords = reset_keywords
        self.info_keywords = info_keywords
        self.allow_early_resets = allow_early_resets
        self.rewards = None
        self.needs_reset = True
        self.episode_rewards = []
        self.episode_lengths = []
        self.episode_times = []
        self.total_steps = 0
        self.current_reset_info = {} # extra info about the current episode, that was passed in during reset()

    def reset(self, **kwargs):
        self.reset_state()
        for k in self.reset_keywords:
            v = kwargs.get(k)
            if v is None:
                raise ValueError('Expected you to pass kwarg %s into reset'%k)
            self.current_reset_info[k] = v
        return self.env.reset(**kwargs)

    def reset_state(self):
        if not self.allow_early_resets and not self.needs_reset:
            raise RuntimeError("Tried to reset an environment before done. If you want to allow early resets, wrap your env with Monitor(env, path, allow_early_resets=True)")
        self.rewards = []
        self.needs_reset = False


    def step(self, action):
        if self.needs_reset:
            raise RuntimeError("Tried to step environment that needs reset")
        ob, rew, done, info = self.env.step(action)
        self.update(ob, rew, done, info)
        return (ob, rew, done, info)

    def update(self, ob, rew, done, info):
        self.rewards.append(rew)
        if done:
            self.needs_reset = True
            eprew = sum(self.rewards)
            eplen = len(self.rewards)
            epinfo = {"r": round(eprew, 6), "l": eplen, "t": round(time.time() - self.tstart, 6)}
            for k in self.info_keywords:
                epinfo[k] = info[k]
            self.episode_rewards.append(eprew)
            self.episode_lengths.append(eplen)
            self.episode_times.append(time.time() - self.tstart)
            epinfo.update(self.current_reset_info)
            if self.results_writer:
                self.results_writer.write_row(epinfo)
            assert isinstance(info, dict)
            if isinstance(info, dict):
                info['episode'] = epinfo

        self.total_steps += 1

    def close(self):
        super(Monitor, self).close()
        if self.f is not None:
            self.f.close()

    def get_total_steps(self):
        return self.total_steps

    def get_episode_rewards(self):
        return self.episode_rewards

    def get_episode_lengths(self):
        return self.episode_lengths

    def get_episode_times(self):
        return self.episode_times

Ancestors

  • gym.core.Wrapper
  • gym.core.Env
  • typing.Generic

Class variables

var EXT
var f

Instance variables

var action_space : gym.spaces.space.Space[~ActType]

Returns the action space of the environment.

Expand source code
@property
def action_space(self) -> spaces.Space[ActType]:
    """Returns the action space of the environment."""
    if self._action_space is None:
        return self.env.action_space
    return self._action_space
var metadata : dict

Returns the environment metadata.

Expand source code
@property
def metadata(self) -> dict:
    """Returns the environment metadata."""
    if self._metadata is None:
        return self.env.metadata
    return self._metadata
var observation_space : gym.spaces.space.Space

Returns the observation space of the environment.

Expand source code
@property
def observation_space(self) -> spaces.Space:
    """Returns the observation space of the environment."""
    if self._observation_space is None:
        return self.env.observation_space
    return self._observation_space
var render_mode : Union[str, NoneType]

Returns the environment render_mode.

Expand source code
@property
def render_mode(self) -> Optional[str]:
    """Returns the environment render_mode."""
    return self.env.render_mode
var spec

Returns the environment specification.

Expand source code
@property
def spec(self):
    """Returns the environment specification."""
    return self.env.spec

Methods

def close(self)

Closes the environment.

Expand source code
def close(self):
    super(Monitor, self).close()
    if self.f is not None:
        self.f.close()
def get_episode_lengths(self)
Expand source code
def get_episode_lengths(self):
    return self.episode_lengths
def get_episode_rewards(self)
Expand source code
def get_episode_rewards(self):
    return self.episode_rewards
def get_episode_times(self)
Expand source code
def get_episode_times(self):
    return self.episode_times
def get_total_steps(self)
Expand source code
def get_total_steps(self):
    return self.total_steps
def reset(self, **kwargs)

Resets the environment with kwargs.

Expand source code
def reset(self, **kwargs):
    self.reset_state()
    for k in self.reset_keywords:
        v = kwargs.get(k)
        if v is None:
            raise ValueError('Expected you to pass kwarg %s into reset'%k)
        self.current_reset_info[k] = v
    return self.env.reset(**kwargs)
def reset_state(self)
Expand source code
def reset_state(self):
    if not self.allow_early_resets and not self.needs_reset:
        raise RuntimeError("Tried to reset an environment before done. If you want to allow early resets, wrap your env with Monitor(env, path, allow_early_resets=True)")
    self.rewards = []
    self.needs_reset = False
def step(self, action)

Steps through the environment with action.

Expand source code
def step(self, action):
    if self.needs_reset:
        raise RuntimeError("Tried to step environment that needs reset")
    ob, rew, done, info = self.env.step(action)
    self.update(ob, rew, done, info)
    return (ob, rew, done, info)
def update(self, ob, rew, done, info)
Expand source code
def update(self, ob, rew, done, info):
    self.rewards.append(rew)
    if done:
        self.needs_reset = True
        eprew = sum(self.rewards)
        eplen = len(self.rewards)
        epinfo = {"r": round(eprew, 6), "l": eplen, "t": round(time.time() - self.tstart, 6)}
        for k in self.info_keywords:
            epinfo[k] = info[k]
        self.episode_rewards.append(eprew)
        self.episode_lengths.append(eplen)
        self.episode_times.append(time.time() - self.tstart)
        epinfo.update(self.current_reset_info)
        if self.results_writer:
            self.results_writer.write_row(epinfo)
        assert isinstance(info, dict)
        if isinstance(info, dict):
            info['episode'] = epinfo

    self.total_steps += 1