Module TeachMyAgent.teachers.algos.self_paced_teacher
Expand source code
# Taken from https://github.com/psclklnk/spdl and wrapped to our architecture
# Modified by Clément Romac, copy of the license at TeachMyAgent/teachers/LICENSES/SPDL
import torch
import numpy as np
from copy import deepcopy
from functools import partial
from TeachMyAgent.teachers.algos.AbstractTeacher import AbstractTeacher
from TeachMyAgent.teachers.utils.conjugate_gradient import cg_step
from TeachMyAgent.teachers.utils.torch import to_float_tensor
from TeachMyAgent.teachers.utils.gaussian_torch_distribution import GaussianTorchDistribution
class Buffer:
def __init__(self, n_elements, max_buffer_size, reset_on_query):
self.reset_on_query = reset_on_query
self.max_buffer_size = max_buffer_size
self.buffers = [list() for i in range(0, n_elements)]
def update_buffer(self, datas):
if isinstance(datas[0], list):
for buffer, data in zip(self.buffers, datas):
buffer.extend(data)
else:
for buffer, data in zip(self.buffers, datas):
buffer.append(data)
while len(self.buffers[0]) > self.max_buffer_size:
for buffer in self.buffers:
del buffer[0]
def read_buffer(self, reset=None):
if reset is None:
reset = self.reset_on_query
res = tuple([buffer for buffer in self.buffers])
if reset:
for i in range(0, len(self.buffers)):
self.buffers[i] = []
return res
def __len__(self):
return len(self.buffers[0])
class AbstractSelfPacedTeacher():
'''
Base SPDL Teacher
'''
def __init__(self, init_mean, flat_init_chol, target_mean, flat_target_chol, alpha_function, max_kl, cg_parameters):
self.context_dist = GaussianTorchDistribution(init_mean, flat_init_chol, use_cuda=False)
self.target_dist = GaussianTorchDistribution(target_mean, flat_target_chol, use_cuda=False)
self.alpha_function = alpha_function
self.max_kl = max_kl
self.cg_parameters = {"n_epochs_line_search": 10, "n_epochs_cg": 10, "cg_damping": 1e-2,
"cg_residual_tol": 1e-10}
if cg_parameters is not None:
self.cg_parameters.update(cg_parameters)
self.task = None
self.iteration = 0
def target_context_kl(self, numpy=True):
kl_div = torch.distributions.kl.kl_divergence(self.context_dist.distribution_t,
self.target_dist.distribution_t).detach()
if numpy:
kl_div = kl_div.numpy()
return kl_div
def save(self, path):
weights = self.context_dist.get_weights()
np.save(path, weights)
def load(self, path):
self.context_dist.set_weights(np.load(path))
def _compute_context_kl(self, old_context_dist):
return torch.distributions.kl.kl_divergence(old_context_dist.distribution_t, self.context_dist.distribution_t)
def _compute_context_loss(self, cons_t, old_c_log_prob_t, c_val_t, alpha_cur_t):
con_ratio_t = torch.exp(self.context_dist.log_pdf_t(cons_t) - old_c_log_prob_t)
kl_div = torch.distributions.kl.kl_divergence(self.context_dist.distribution_t, self.target_dist.distribution_t)
return torch.mean(con_ratio_t * c_val_t) - alpha_cur_t * kl_div
class SelfPacedTeacher(AbstractTeacher, AbstractSelfPacedTeacher):
def __init__(self, mins, maxs, seed, env_reward_lb, env_reward_ub, update_frequency, update_offset, alpha_function, initial_dist=None,
target_dist=None, max_kl=0.1, std_lower_bound=None, kl_threshold=None, cg_parameters=None,
use_avg_performance=False, max_context_buffer_size=1000, reset_contexts=True, discount_factor=0.99):
'''
Self-paced Deep Reinforcement Learning (https://papers.nips.cc/paper/2020/hash/68a9750337a418a86fe06c1991a1d64c-Abstract.html).
Taken from https://github.com/psclklnk/spdl and wrapped to our architecture.
Works in a non-episodic setup, updates are thus made in the `step_update` method.
Args:
update_frequency: Update frequency of the sampling distribution (in steps)
update_offset: How many steps must be done before the starting to update the distribution
alpha_function: Function calculating the alpha parameter
initial_dist: Initial distribution to start from
target_dist: Target distribution to reach
max_kl: Maximum KL-divergence authorized between the old and new distributions when updating
std_lower_bound: Minimum std authorized on the sampling distribution if the KL-divergence between
the latter and the target distribution is greater than `kl_threshold`. Set this to
`None` if no constraint on the std must be applied
kl_threshold: Threshold enforcing the std constraint
cg_parameters: Additional parameters for the Conjugate Gradient method
use_avg_performance: Whether the alpha function must used the averaged performance
max_context_buffer_size: Maximum size of the buffer storing sampled tasks
reset_contexts: Whether the buffer should be reset when queried
discount_factor: Discount factor used in the Universal Value Function
'''
AbstractTeacher.__init__(self, mins, maxs, env_reward_lb, env_reward_ub, seed)
torch.manual_seed(self.seed)
initial_mean, initial_variance = self.get_or_create_dist(initial_dist, mins, maxs, subspace=True) # Random subspace of the task space if no intial dist
target_mean, target_variance = self.get_or_create_dist(target_dist, mins, maxs, subspace=False) # Full task space if no intial dist
context_bounds = (np.array(mins), np.array(maxs))
self.update_frequency = update_frequency
self.update_offset = update_offset
self.step_counter = 0
self.discounted_sum_reward = 0
self.discount_factor = discount_factor
self.discounted_sum_rewards = []
self.current_disc = 1
self.pending_initial_state = None
self.algorithm_iterations = 0
# The bounds that we show to the outside are limited to the interval [-1, 1], as this is typically better for
# neural nets to deal with
self.context_buffer = Buffer(2, max_context_buffer_size, reset_contexts)
self.context_dim = target_mean.shape[0]
self.context_bounds = context_bounds
self.use_avg_performance = use_avg_performance
if std_lower_bound is not None and kl_threshold is None:
raise RuntimeError("Error! Both Lower Bound on standard deviation and kl threshold need to be set")
else:
if std_lower_bound is not None:
if isinstance(std_lower_bound, np.ndarray):
if std_lower_bound.shape[0] != self.context_dim:
raise RuntimeError("Error! Wrong dimension of the standard deviation lower bound")
elif std_lower_bound is not None:
std_lower_bound = np.ones(self.context_dim) * std_lower_bound
self.std_lower_bound = std_lower_bound
self.kl_threshold = kl_threshold
# Create the initial context distribution
if isinstance(initial_variance, np.ndarray):
flat_init_chol = GaussianTorchDistribution.flatten_matrix(initial_variance, tril=False)
else:
flat_init_chol = GaussianTorchDistribution.flatten_matrix(initial_variance * np.eye(self.context_dim),
tril=False)
# Create the target distribution
if isinstance(target_variance, np.ndarray):
flat_target_chol = GaussianTorchDistribution.flatten_matrix(target_variance, tril=False)
else:
flat_target_chol = GaussianTorchDistribution.flatten_matrix(target_variance * np.eye(self.context_dim),
tril=False)
AbstractSelfPacedTeacher.__init__(self, initial_mean, flat_init_chol, target_mean, flat_target_chol,
alpha_function, max_kl, cg_parameters)
self.bk = {'mean': [],
'covariance': [],
'steps': [],
'algo_iterations': [],
'kl': []}
def record_initial_state(self, task, initial_state):
self.pending_initial_state = initial_state
def episodic_update(self, task, reward, is_success):
assert self.pending_initial_state is not None
self.discounted_sum_rewards.append(self.discounted_sum_reward)
self.context_buffer.update_buffer((self.pending_initial_state, task))
self.discounted_sum_reward = 0
self.current_disc = 1
self.pending_initial_state = None
def step_update(self, state, action, reward, next_state, done):
self.step_counter += 1
self.discounted_sum_reward += self.current_disc * reward
self.current_disc *= self.discount_factor
if self.step_counter >= self.update_offset and self.step_counter % self.update_frequency == 0:
if len(self.discounted_sum_rewards) > 0 and len(self.context_buffer) > 0:
self.algorithm_iterations += 1
avg_performance = np.mean(self.discounted_sum_rewards)
self.discounted_sum_rewards = []
ins, cons = self.context_buffer.read_buffer()
initial_states, contexts = np.array(ins), np.array(cons)
values = self.value_estimator(initial_states)
if values is None:
raise Exception("Please define a valid value estimator, this one returns None...")
old_context_dist = deepcopy(self.context_dist)
contexts_t = to_float_tensor(contexts, use_cuda=False)
old_c_log_prob_t = old_context_dist.log_pdf_t(contexts_t).detach()
# Estimate the value of the state after the policy update
c_val_t = to_float_tensor(values, use_cuda=False)
# Add the penalty term
cur_kl_t = self.target_context_kl(numpy=False)
if self.use_avg_performance:
alpha_cur_t = self.alpha_function(self.algorithm_iterations, avg_performance, cur_kl_t)
else:
alpha_cur_t = self.alpha_function(self.algorithm_iterations, torch.mean(c_val_t).detach(), cur_kl_t)
cg_step(partial(self._compute_context_loss, contexts_t, old_c_log_prob_t, c_val_t, alpha_cur_t),
partial(self._compute_context_kl, old_context_dist), self.max_kl,
self.context_dist.parameters, self.context_dist.set_weights,
self.context_dist.get_weights, **self.cg_parameters, use_cuda=False)
cov = self.context_dist._chol_flat.detach().numpy()
if self.std_lower_bound is not None and self.target_context_kl() > self.kl_threshold:
cov[0:self.context_dim] = np.log(np.maximum(np.exp(cov[0:self.context_dim]), self.std_lower_bound))
self.context_dist.set_weights(np.concatenate((self.context_dist.mean(), cov)))
self.bk["mean"].append(self.context_dist.mean())
self.bk["covariance"].append(self.context_dist.covariance_matrix())
self.bk["steps"].append(self.step_counter)
self.bk["algo_iterations"].append(self.algorithm_iterations)
self.bk["kl"].append(self.target_context_kl())
else:
print("Skipping iteration at step {} because buffers are empty.".format(self.step_counter))
def sample_task(self):
sample = self.context_dist.sample().detach().numpy()
return np.clip(sample, self.context_bounds[0], self.context_bounds[1], dtype=np.float32)
def non_exploratory_task_sampling(self):
return {"task": self.sample_task(),
"infos": {
"bk_index": len(self.bk[list(self.bk.keys())[0]]) - 1,
"task_infos": None}
}
Classes
class AbstractSelfPacedTeacher (init_mean, flat_init_chol, target_mean, flat_target_chol, alpha_function, max_kl, cg_parameters)
-
Base SPDL Teacher
Expand source code
class AbstractSelfPacedTeacher(): ''' Base SPDL Teacher ''' def __init__(self, init_mean, flat_init_chol, target_mean, flat_target_chol, alpha_function, max_kl, cg_parameters): self.context_dist = GaussianTorchDistribution(init_mean, flat_init_chol, use_cuda=False) self.target_dist = GaussianTorchDistribution(target_mean, flat_target_chol, use_cuda=False) self.alpha_function = alpha_function self.max_kl = max_kl self.cg_parameters = {"n_epochs_line_search": 10, "n_epochs_cg": 10, "cg_damping": 1e-2, "cg_residual_tol": 1e-10} if cg_parameters is not None: self.cg_parameters.update(cg_parameters) self.task = None self.iteration = 0 def target_context_kl(self, numpy=True): kl_div = torch.distributions.kl.kl_divergence(self.context_dist.distribution_t, self.target_dist.distribution_t).detach() if numpy: kl_div = kl_div.numpy() return kl_div def save(self, path): weights = self.context_dist.get_weights() np.save(path, weights) def load(self, path): self.context_dist.set_weights(np.load(path)) def _compute_context_kl(self, old_context_dist): return torch.distributions.kl.kl_divergence(old_context_dist.distribution_t, self.context_dist.distribution_t) def _compute_context_loss(self, cons_t, old_c_log_prob_t, c_val_t, alpha_cur_t): con_ratio_t = torch.exp(self.context_dist.log_pdf_t(cons_t) - old_c_log_prob_t) kl_div = torch.distributions.kl.kl_divergence(self.context_dist.distribution_t, self.target_dist.distribution_t) return torch.mean(con_ratio_t * c_val_t) - alpha_cur_t * kl_div
Subclasses
Methods
def load(self, path)
-
Expand source code
def load(self, path): self.context_dist.set_weights(np.load(path))
def save(self, path)
-
Expand source code
def save(self, path): weights = self.context_dist.get_weights() np.save(path, weights)
def target_context_kl(self, numpy=True)
-
Expand source code
def target_context_kl(self, numpy=True): kl_div = torch.distributions.kl.kl_divergence(self.context_dist.distribution_t, self.target_dist.distribution_t).detach() if numpy: kl_div = kl_div.numpy() return kl_div
class Buffer (n_elements, max_buffer_size, reset_on_query)
-
Expand source code
class Buffer: def __init__(self, n_elements, max_buffer_size, reset_on_query): self.reset_on_query = reset_on_query self.max_buffer_size = max_buffer_size self.buffers = [list() for i in range(0, n_elements)] def update_buffer(self, datas): if isinstance(datas[0], list): for buffer, data in zip(self.buffers, datas): buffer.extend(data) else: for buffer, data in zip(self.buffers, datas): buffer.append(data) while len(self.buffers[0]) > self.max_buffer_size: for buffer in self.buffers: del buffer[0] def read_buffer(self, reset=None): if reset is None: reset = self.reset_on_query res = tuple([buffer for buffer in self.buffers]) if reset: for i in range(0, len(self.buffers)): self.buffers[i] = [] return res def __len__(self): return len(self.buffers[0])
Methods
def read_buffer(self, reset=None)
-
Expand source code
def read_buffer(self, reset=None): if reset is None: reset = self.reset_on_query res = tuple([buffer for buffer in self.buffers]) if reset: for i in range(0, len(self.buffers)): self.buffers[i] = [] return res
def update_buffer(self, datas)
-
Expand source code
def update_buffer(self, datas): if isinstance(datas[0], list): for buffer, data in zip(self.buffers, datas): buffer.extend(data) else: for buffer, data in zip(self.buffers, datas): buffer.append(data) while len(self.buffers[0]) > self.max_buffer_size: for buffer in self.buffers: del buffer[0]
class SelfPacedTeacher (mins, maxs, seed, env_reward_lb, env_reward_ub, update_frequency, update_offset, alpha_function, initial_dist=None, target_dist=None, max_kl=0.1, std_lower_bound=None, kl_threshold=None, cg_parameters=None, use_avg_performance=False, max_context_buffer_size=1000, reset_contexts=True, discount_factor=0.99)
-
Base class for ACL methods.
This will be used to sample tasks for the DeepRL student given a task space provided at the beginning of training.
Self-paced Deep Reinforcement Learning (https://papers.nips.cc/paper/2020/hash/68a9750337a418a86fe06c1991a1d64c-Abstract.html). Taken from https://github.com/psclklnk/spdl and wrapped to our architecture.
Works in a non-episodic setup, updates are thus made in the
step_update
method.Args
update_frequency
- Update frequency of the sampling distribution (in steps)
update_offset
- How many steps must be done before the starting to update the distribution
alpha_function
- Function calculating the alpha parameter
initial_dist
- Initial distribution to start from
target_dist
- Target distribution to reach
max_kl
- Maximum KL-divergence authorized between the old and new distributions when updating
std_lower_bound
- Minimum std authorized on the sampling distribution if the KL-divergence between
the latter and the target distribution is greater than
kl_threshold
. Set this toNone
if no constraint on the std must be applied kl_threshold
- Threshold enforcing the std constraint
cg_parameters
- Additional parameters for the Conjugate Gradient method
use_avg_performance
- Whether the alpha function must used the averaged performance
max_context_buffer_size
- Maximum size of the buffer storing sampled tasks
reset_contexts
- Whether the buffer should be reset when queried
discount_factor
- Discount factor used in the Universal Value Function
Expand source code
class SelfPacedTeacher(AbstractTeacher, AbstractSelfPacedTeacher): def __init__(self, mins, maxs, seed, env_reward_lb, env_reward_ub, update_frequency, update_offset, alpha_function, initial_dist=None, target_dist=None, max_kl=0.1, std_lower_bound=None, kl_threshold=None, cg_parameters=None, use_avg_performance=False, max_context_buffer_size=1000, reset_contexts=True, discount_factor=0.99): ''' Self-paced Deep Reinforcement Learning (https://papers.nips.cc/paper/2020/hash/68a9750337a418a86fe06c1991a1d64c-Abstract.html). Taken from https://github.com/psclklnk/spdl and wrapped to our architecture. Works in a non-episodic setup, updates are thus made in the `step_update` method. Args: update_frequency: Update frequency of the sampling distribution (in steps) update_offset: How many steps must be done before the starting to update the distribution alpha_function: Function calculating the alpha parameter initial_dist: Initial distribution to start from target_dist: Target distribution to reach max_kl: Maximum KL-divergence authorized between the old and new distributions when updating std_lower_bound: Minimum std authorized on the sampling distribution if the KL-divergence between the latter and the target distribution is greater than `kl_threshold`. Set this to `None` if no constraint on the std must be applied kl_threshold: Threshold enforcing the std constraint cg_parameters: Additional parameters for the Conjugate Gradient method use_avg_performance: Whether the alpha function must used the averaged performance max_context_buffer_size: Maximum size of the buffer storing sampled tasks reset_contexts: Whether the buffer should be reset when queried discount_factor: Discount factor used in the Universal Value Function ''' AbstractTeacher.__init__(self, mins, maxs, env_reward_lb, env_reward_ub, seed) torch.manual_seed(self.seed) initial_mean, initial_variance = self.get_or_create_dist(initial_dist, mins, maxs, subspace=True) # Random subspace of the task space if no intial dist target_mean, target_variance = self.get_or_create_dist(target_dist, mins, maxs, subspace=False) # Full task space if no intial dist context_bounds = (np.array(mins), np.array(maxs)) self.update_frequency = update_frequency self.update_offset = update_offset self.step_counter = 0 self.discounted_sum_reward = 0 self.discount_factor = discount_factor self.discounted_sum_rewards = [] self.current_disc = 1 self.pending_initial_state = None self.algorithm_iterations = 0 # The bounds that we show to the outside are limited to the interval [-1, 1], as this is typically better for # neural nets to deal with self.context_buffer = Buffer(2, max_context_buffer_size, reset_contexts) self.context_dim = target_mean.shape[0] self.context_bounds = context_bounds self.use_avg_performance = use_avg_performance if std_lower_bound is not None and kl_threshold is None: raise RuntimeError("Error! Both Lower Bound on standard deviation and kl threshold need to be set") else: if std_lower_bound is not None: if isinstance(std_lower_bound, np.ndarray): if std_lower_bound.shape[0] != self.context_dim: raise RuntimeError("Error! Wrong dimension of the standard deviation lower bound") elif std_lower_bound is not None: std_lower_bound = np.ones(self.context_dim) * std_lower_bound self.std_lower_bound = std_lower_bound self.kl_threshold = kl_threshold # Create the initial context distribution if isinstance(initial_variance, np.ndarray): flat_init_chol = GaussianTorchDistribution.flatten_matrix(initial_variance, tril=False) else: flat_init_chol = GaussianTorchDistribution.flatten_matrix(initial_variance * np.eye(self.context_dim), tril=False) # Create the target distribution if isinstance(target_variance, np.ndarray): flat_target_chol = GaussianTorchDistribution.flatten_matrix(target_variance, tril=False) else: flat_target_chol = GaussianTorchDistribution.flatten_matrix(target_variance * np.eye(self.context_dim), tril=False) AbstractSelfPacedTeacher.__init__(self, initial_mean, flat_init_chol, target_mean, flat_target_chol, alpha_function, max_kl, cg_parameters) self.bk = {'mean': [], 'covariance': [], 'steps': [], 'algo_iterations': [], 'kl': []} def record_initial_state(self, task, initial_state): self.pending_initial_state = initial_state def episodic_update(self, task, reward, is_success): assert self.pending_initial_state is not None self.discounted_sum_rewards.append(self.discounted_sum_reward) self.context_buffer.update_buffer((self.pending_initial_state, task)) self.discounted_sum_reward = 0 self.current_disc = 1 self.pending_initial_state = None def step_update(self, state, action, reward, next_state, done): self.step_counter += 1 self.discounted_sum_reward += self.current_disc * reward self.current_disc *= self.discount_factor if self.step_counter >= self.update_offset and self.step_counter % self.update_frequency == 0: if len(self.discounted_sum_rewards) > 0 and len(self.context_buffer) > 0: self.algorithm_iterations += 1 avg_performance = np.mean(self.discounted_sum_rewards) self.discounted_sum_rewards = [] ins, cons = self.context_buffer.read_buffer() initial_states, contexts = np.array(ins), np.array(cons) values = self.value_estimator(initial_states) if values is None: raise Exception("Please define a valid value estimator, this one returns None...") old_context_dist = deepcopy(self.context_dist) contexts_t = to_float_tensor(contexts, use_cuda=False) old_c_log_prob_t = old_context_dist.log_pdf_t(contexts_t).detach() # Estimate the value of the state after the policy update c_val_t = to_float_tensor(values, use_cuda=False) # Add the penalty term cur_kl_t = self.target_context_kl(numpy=False) if self.use_avg_performance: alpha_cur_t = self.alpha_function(self.algorithm_iterations, avg_performance, cur_kl_t) else: alpha_cur_t = self.alpha_function(self.algorithm_iterations, torch.mean(c_val_t).detach(), cur_kl_t) cg_step(partial(self._compute_context_loss, contexts_t, old_c_log_prob_t, c_val_t, alpha_cur_t), partial(self._compute_context_kl, old_context_dist), self.max_kl, self.context_dist.parameters, self.context_dist.set_weights, self.context_dist.get_weights, **self.cg_parameters, use_cuda=False) cov = self.context_dist._chol_flat.detach().numpy() if self.std_lower_bound is not None and self.target_context_kl() > self.kl_threshold: cov[0:self.context_dim] = np.log(np.maximum(np.exp(cov[0:self.context_dim]), self.std_lower_bound)) self.context_dist.set_weights(np.concatenate((self.context_dist.mean(), cov))) self.bk["mean"].append(self.context_dist.mean()) self.bk["covariance"].append(self.context_dist.covariance_matrix()) self.bk["steps"].append(self.step_counter) self.bk["algo_iterations"].append(self.algorithm_iterations) self.bk["kl"].append(self.target_context_kl()) else: print("Skipping iteration at step {} because buffers are empty.".format(self.step_counter)) def sample_task(self): sample = self.context_dist.sample().detach().numpy() return np.clip(sample, self.context_bounds[0], self.context_bounds[1], dtype=np.float32) def non_exploratory_task_sampling(self): return {"task": self.sample_task(), "infos": { "bk_index": len(self.bk[list(self.bk.keys())[0]]) - 1, "task_infos": None} }
Ancestors
Inherited members