Module TeachMyAgent.students.openai_baselines.bench.monitor
Expand source code
__all__ = ['Monitor', 'get_monitor_files', 'load_results']
from gym.core import Wrapper
import time
from glob import glob
import csv
import os.path as osp
import json
class Monitor(Wrapper):
EXT = "monitor.csv"
f = None
def __init__(self, env, filename, allow_early_resets=False, reset_keywords=(), info_keywords=()):
Wrapper.__init__(self, env=env)
self.tstart = time.time()
if filename:
self.results_writer = ResultsWriter(filename,
header={"t_start": time.time(), 'env_id' : env.spec and env.spec.id},
extra_keys=reset_keywords + info_keywords
)
else:
self.results_writer = None
self.reset_keywords = reset_keywords
self.info_keywords = info_keywords
self.allow_early_resets = allow_early_resets
self.rewards = None
self.needs_reset = True
self.episode_rewards = []
self.episode_lengths = []
self.episode_times = []
self.total_steps = 0
self.current_reset_info = {} # extra info about the current episode, that was passed in during reset()
def reset(self, **kwargs):
self.reset_state()
for k in self.reset_keywords:
v = kwargs.get(k)
if v is None:
raise ValueError('Expected you to pass kwarg %s into reset'%k)
self.current_reset_info[k] = v
return self.env.reset(**kwargs)
def reset_state(self):
if not self.allow_early_resets and not self.needs_reset:
raise RuntimeError("Tried to reset an environment before done. If you want to allow early resets, wrap your env with Monitor(env, path, allow_early_resets=True)")
self.rewards = []
self.needs_reset = False
def step(self, action):
if self.needs_reset:
raise RuntimeError("Tried to step environment that needs reset")
ob, rew, done, info = self.env.step(action)
self.update(ob, rew, done, info)
return (ob, rew, done, info)
def update(self, ob, rew, done, info):
self.rewards.append(rew)
if done:
self.needs_reset = True
eprew = sum(self.rewards)
eplen = len(self.rewards)
epinfo = {"r": round(eprew, 6), "l": eplen, "t": round(time.time() - self.tstart, 6)}
for k in self.info_keywords:
epinfo[k] = info[k]
self.episode_rewards.append(eprew)
self.episode_lengths.append(eplen)
self.episode_times.append(time.time() - self.tstart)
epinfo.update(self.current_reset_info)
if self.results_writer:
self.results_writer.write_row(epinfo)
assert isinstance(info, dict)
if isinstance(info, dict):
info['episode'] = epinfo
self.total_steps += 1
def close(self):
super(Monitor, self).close()
if self.f is not None:
self.f.close()
def get_total_steps(self):
return self.total_steps
def get_episode_rewards(self):
return self.episode_rewards
def get_episode_lengths(self):
return self.episode_lengths
def get_episode_times(self):
return self.episode_times
class LoadMonitorResultsError(Exception):
pass
class ResultsWriter(object):
def __init__(self, filename, header='', extra_keys=()):
self.extra_keys = extra_keys
assert filename is not None
if not filename.endswith(Monitor.EXT):
if osp.isdir(filename):
filename = osp.join(filename, Monitor.EXT)
else:
filename = filename + "." + Monitor.EXT
self.f = open(filename, "wt")
if isinstance(header, dict):
header = '# {} \n'.format(json.dumps(header))
self.f.write(header)
self.logger = csv.DictWriter(self.f, fieldnames=('r', 'l', 't')+tuple(extra_keys))
self.logger.writeheader()
self.f.flush()
def write_row(self, epinfo):
if self.logger:
self.logger.writerow(epinfo)
self.f.flush()
def get_monitor_files(dir):
return glob(osp.join(dir, "*" + Monitor.EXT))
def load_results(dir):
import pandas
monitor_files = (
glob(osp.join(dir, "*monitor.json")) +
glob(osp.join(dir, "*monitor.csv"))) # get both csv and (old) json files
if not monitor_files:
raise LoadMonitorResultsError("no monitor files of the form *%s found in %s" % (Monitor.EXT, dir))
dfs = []
headers = []
for fname in monitor_files:
with open(fname, 'rt') as fh:
if fname.endswith('csv'):
firstline = fh.readline()
if not firstline:
continue
assert firstline[0] == '#'
header = json.loads(firstline[1:])
df = pandas.read_csv(fh, index_col=None)
headers.append(header)
elif fname.endswith('json'): # Deprecated json format
episodes = []
lines = fh.readlines()
header = json.loads(lines[0])
headers.append(header)
for line in lines[1:]:
episode = json.loads(line)
episodes.append(episode)
df = pandas.DataFrame(episodes)
else:
assert 0, 'unreachable'
df['t'] += header['t_start']
dfs.append(df)
df = pandas.concat(dfs)
df.sort_values('t', inplace=True)
df.reset_index(inplace=True)
df['t'] -= min(header['t_start'] for header in headers)
df.headers = headers # HACK to preserve backwards compatibility
return df
Functions
def get_monitor_files(dir)
-
Expand source code
def get_monitor_files(dir): return glob(osp.join(dir, "*" + Monitor.EXT))
def load_results(dir)
-
Expand source code
def load_results(dir): import pandas monitor_files = ( glob(osp.join(dir, "*monitor.json")) + glob(osp.join(dir, "*monitor.csv"))) # get both csv and (old) json files if not monitor_files: raise LoadMonitorResultsError("no monitor files of the form *%s found in %s" % (Monitor.EXT, dir)) dfs = [] headers = [] for fname in monitor_files: with open(fname, 'rt') as fh: if fname.endswith('csv'): firstline = fh.readline() if not firstline: continue assert firstline[0] == '#' header = json.loads(firstline[1:]) df = pandas.read_csv(fh, index_col=None) headers.append(header) elif fname.endswith('json'): # Deprecated json format episodes = [] lines = fh.readlines() header = json.loads(lines[0]) headers.append(header) for line in lines[1:]: episode = json.loads(line) episodes.append(episode) df = pandas.DataFrame(episodes) else: assert 0, 'unreachable' df['t'] += header['t_start'] dfs.append(df) df = pandas.concat(dfs) df.sort_values('t', inplace=True) df.reset_index(inplace=True) df['t'] -= min(header['t_start'] for header in headers) df.headers = headers # HACK to preserve backwards compatibility return df
Classes
class Monitor (env, filename, allow_early_resets=False, reset_keywords=(), info_keywords=())
-
Wraps an environment to allow a modular transformation of the :meth:
step
and :meth:reset
methods.This class is the base class for all wrappers. The subclass could override some methods to change the behavior of the original environment without touching the original code.
Note
Don't forget to call
super().__init__(env)
if the subclass overrides :meth:__init__
.Wraps an environment to allow a modular transformation of the :meth:
step
and :meth:reset
methods.Args
env
- The environment to wrap
Expand source code
class Monitor(Wrapper): EXT = "monitor.csv" f = None def __init__(self, env, filename, allow_early_resets=False, reset_keywords=(), info_keywords=()): Wrapper.__init__(self, env=env) self.tstart = time.time() if filename: self.results_writer = ResultsWriter(filename, header={"t_start": time.time(), 'env_id' : env.spec and env.spec.id}, extra_keys=reset_keywords + info_keywords ) else: self.results_writer = None self.reset_keywords = reset_keywords self.info_keywords = info_keywords self.allow_early_resets = allow_early_resets self.rewards = None self.needs_reset = True self.episode_rewards = [] self.episode_lengths = [] self.episode_times = [] self.total_steps = 0 self.current_reset_info = {} # extra info about the current episode, that was passed in during reset() def reset(self, **kwargs): self.reset_state() for k in self.reset_keywords: v = kwargs.get(k) if v is None: raise ValueError('Expected you to pass kwarg %s into reset'%k) self.current_reset_info[k] = v return self.env.reset(**kwargs) def reset_state(self): if not self.allow_early_resets and not self.needs_reset: raise RuntimeError("Tried to reset an environment before done. If you want to allow early resets, wrap your env with Monitor(env, path, allow_early_resets=True)") self.rewards = [] self.needs_reset = False def step(self, action): if self.needs_reset: raise RuntimeError("Tried to step environment that needs reset") ob, rew, done, info = self.env.step(action) self.update(ob, rew, done, info) return (ob, rew, done, info) def update(self, ob, rew, done, info): self.rewards.append(rew) if done: self.needs_reset = True eprew = sum(self.rewards) eplen = len(self.rewards) epinfo = {"r": round(eprew, 6), "l": eplen, "t": round(time.time() - self.tstart, 6)} for k in self.info_keywords: epinfo[k] = info[k] self.episode_rewards.append(eprew) self.episode_lengths.append(eplen) self.episode_times.append(time.time() - self.tstart) epinfo.update(self.current_reset_info) if self.results_writer: self.results_writer.write_row(epinfo) assert isinstance(info, dict) if isinstance(info, dict): info['episode'] = epinfo self.total_steps += 1 def close(self): super(Monitor, self).close() if self.f is not None: self.f.close() def get_total_steps(self): return self.total_steps def get_episode_rewards(self): return self.episode_rewards def get_episode_lengths(self): return self.episode_lengths def get_episode_times(self): return self.episode_times
Ancestors
- gym.core.Wrapper
- gym.core.Env
- typing.Generic
Class variables
var EXT
var f
Instance variables
var action_space : gym.spaces.space.Space[~ActType]
-
Returns the action space of the environment.
Expand source code
@property def action_space(self) -> spaces.Space[ActType]: """Returns the action space of the environment.""" if self._action_space is None: return self.env.action_space return self._action_space
var metadata : dict
-
Returns the environment metadata.
Expand source code
@property def metadata(self) -> dict: """Returns the environment metadata.""" if self._metadata is None: return self.env.metadata return self._metadata
var observation_space : gym.spaces.space.Space
-
Returns the observation space of the environment.
Expand source code
@property def observation_space(self) -> spaces.Space: """Returns the observation space of the environment.""" if self._observation_space is None: return self.env.observation_space return self._observation_space
var render_mode : Union[str, NoneType]
-
Returns the environment render_mode.
Expand source code
@property def render_mode(self) -> Optional[str]: """Returns the environment render_mode.""" return self.env.render_mode
var spec
-
Returns the environment specification.
Expand source code
@property def spec(self): """Returns the environment specification.""" return self.env.spec
Methods
def close(self)
-
Closes the environment.
Expand source code
def close(self): super(Monitor, self).close() if self.f is not None: self.f.close()
def get_episode_lengths(self)
-
Expand source code
def get_episode_lengths(self): return self.episode_lengths
def get_episode_rewards(self)
-
Expand source code
def get_episode_rewards(self): return self.episode_rewards
def get_episode_times(self)
-
Expand source code
def get_episode_times(self): return self.episode_times
def get_total_steps(self)
-
Expand source code
def get_total_steps(self): return self.total_steps
def reset(self, **kwargs)
-
Resets the environment with kwargs.
Expand source code
def reset(self, **kwargs): self.reset_state() for k in self.reset_keywords: v = kwargs.get(k) if v is None: raise ValueError('Expected you to pass kwarg %s into reset'%k) self.current_reset_info[k] = v return self.env.reset(**kwargs)
def reset_state(self)
-
Expand source code
def reset_state(self): if not self.allow_early_resets and not self.needs_reset: raise RuntimeError("Tried to reset an environment before done. If you want to allow early resets, wrap your env with Monitor(env, path, allow_early_resets=True)") self.rewards = [] self.needs_reset = False
def step(self, action)
-
Steps through the environment with action.
Expand source code
def step(self, action): if self.needs_reset: raise RuntimeError("Tried to step environment that needs reset") ob, rew, done, info = self.env.step(action) self.update(ob, rew, done, info) return (ob, rew, done, info)
def update(self, ob, rew, done, info)
-
Expand source code
def update(self, ob, rew, done, info): self.rewards.append(rew) if done: self.needs_reset = True eprew = sum(self.rewards) eplen = len(self.rewards) epinfo = {"r": round(eprew, 6), "l": eplen, "t": round(time.time() - self.tstart, 6)} for k in self.info_keywords: epinfo[k] = info[k] self.episode_rewards.append(eprew) self.episode_lengths.append(eplen) self.episode_times.append(time.time() - self.tstart) epinfo.update(self.current_reset_info) if self.results_writer: self.results_writer.write_row(epinfo) assert isinstance(info, dict) if isinstance(info, dict): info['episode'] = epinfo self.total_steps += 1