Module TeachMyAgent.teachers.algos.covar_gmm
Expand source code
# Taken from https://arxiv.org/abs/1910.07224
# Modified by Clément Romac, copy of the license at TeachMyAgent/teachers/LICENSES/ALP-GMM
from sklearn.mixture import GaussianMixture as GMM
import numpy as np
from gym.spaces import Box
from TeachMyAgent.teachers.algos.AbstractTeacher import AbstractTeacher
def proportional_choice(v, random_state, eps=0.):
'''
Return an index of `v` chosen proportionally to values contained in `v`.
Args:
v: List of values
random_state: Random generator
eps: Epsilon used for an Epsilon-greedy strategy
'''
if np.sum(v) == 0 or random_state.rand() < eps:
return random_state.randint(np.size(v))
else:
probas = np.array(v) / np.sum(v)
return np.where(random_state.multinomial(1, probas) == 1)[0][0]
# Implementation of IGMM (https://www.ncbi.nlm.nih.gov/pmc/articles/PMC3893575/) + minor improvements
class CovarGMM(AbstractTeacher):
def __init__(self, mins, maxs, seed, env_reward_lb, env_reward_ub, absolute_lp=False, fit_rate=250,
potential_ks=np.arange(2, 11, 1), random_task_ratio=0.2, nb_bootstrap=None, initial_dist=None):
'''
Covar - Gaussian Mixture Model.
Implementation of IGMM (https://www.ncbi.nlm.nih.gov/pmc/articles/PMC3893575/) + minor improvements.
Args:
absolute_lp: Original version does not use Absolute LP, only LP.
fit_rate: Number of episodes between two fit of the GMM
potential_ks: Range of number of Gaussians to try when fitting the GMM
random_task_ratio: Ratio of randomly sampled tasks VS tasks sampling using GMM
nb_bootstrap: Number of bootstrapping episodes, must be >= to fit_rate
initial_dist: Initial Gaussian distribution. If None, bootstrap with random tasks
'''
AbstractTeacher.__init__(self, mins, maxs, env_reward_lb, env_reward_ub, seed)
# Range of number of Gaussians to try when fitting the GMM
self.potential_ks = potential_ks
# Ratio of randomly sampled tasks VS tasks sampling using GMM
self.random_task_ratio = random_task_ratio
self.random_task_generator = Box(self.mins, self.maxs, dtype=np.float32)
self.random_task_generator.seed(self.seed)
# Number of episodes between two fit of the GMM
self.fit_rate = fit_rate
self.nb_bootstrap = nb_bootstrap if nb_bootstrap is not None else fit_rate # Number of bootstrapping episodes, must be >= to fit_rate
self.initial_dist = initial_dist # Initial Gaussian distribution. If None, bootstrap with random tasks
# Original version does not use Absolute LP, only LP.
self.absolute_lp = absolute_lp
self.tasks = []
self.tasks_times_rewards = []
self.all_times = np.arange(0, 1, 1/self.fit_rate)
self.gmm = None
# boring book-keeping
self.bk = {'weights': [], 'covariances': [], 'means': [], 'tasks_lps': [], 'episodes': [], 'tasks_origin': []}
def episodic_update(self, task, reward, is_success):
# Compute time of task, relative to position in current batch of tasks
current_time = self.all_times[len(self.tasks) % self.fit_rate]
self.tasks.append(task)
# Concatenate task with its corresponding time and reward
self.tasks_times_rewards.append(np.array(task.tolist() + [current_time] + [reward]))
if len(self.tasks) >= self.nb_bootstrap: # If initial bootstrapping is done
if (len(self.tasks) % self.fit_rate) == 0: # Time to fit
# 1 - Retrieve last <fit_rate> (task, time, reward) triplets
cur_tasks_times_rewards = np.array(self.tasks_times_rewards[-self.fit_rate:])
# 2 - Fit batch of GMMs with varying number of Gaussians
potential_gmms = [GMM(n_components=k, covariance_type='full', random_state=self.seed) for k in self.potential_ks]
potential_gmms = [g.fit(cur_tasks_times_rewards) for g in potential_gmms]
# 3 - Compute fitness and keep best GMM
aics = [m.aic(cur_tasks_times_rewards) for m in potential_gmms]
self.gmm = potential_gmms[np.argmin(aics)]
# book-keeping
self.bk['weights'].append(self.gmm.weights_.copy())
self.bk['covariances'].append(self.gmm.covariances_.copy())
self.bk['means'].append(self.gmm.means_.copy())
self.bk['tasks_lps'] = self.tasks_times_rewards
self.bk['episodes'].append(len(self.tasks))
def sample_task(self):
task_origin = None
if len(self.tasks) < self.nb_bootstrap or self.random_state.random() < self.random_task_ratio or self.gmm is None:
if self.initial_dist and len(self.tasks) < self.nb_bootstrap: # bootstrap in initial dist
# Expert bootstrap Gaussian task sampling
new_task = self.random_state.multivariate_normal(self.initial_dist['mean'],
self.initial_dist['variance'])
new_task = np.clip(new_task, self.mins, self.maxs).astype(np.float32)
task_origin = -2 # -2 = task originates from initial bootstrap gaussian sampling
else:
# Random task sampling
new_task = self.random_task_generator.sample()
task_origin = -1 # -1 = task originates from random sampling
else:
# Task sampling based on positive time-reward covariance
# 1 - Retrieve positive time-reward covariance for each Gaussian
self.times_rewards_covars = []
for pos, covar, w in zip(self.gmm.means_, self.gmm.covariances_, self.gmm.weights_):
if self.absolute_lp:
self.times_rewards_covars.append(np.abs(covar[-2, -1]))
else:
self.times_rewards_covars.append(max(0, covar[-2, -1]))
# 2 - Sample Gaussian according to its Learning Progress, defined as positive time-reward covariance
idx = proportional_choice(self.times_rewards_covars, self.random_state, eps=0.0)
task_origin = idx
# 3 - Sample task in Gaussian, without forgetting to remove time and reward dimension
new_task = self.random_state.multivariate_normal(self.gmm.means_[idx], self.gmm.covariances_[idx])[:-2]
new_task = np.clip(new_task, self.mins, self.maxs).astype(np.float32)
# boring book-keeping
self.bk['tasks_origin'].append(task_origin)
return new_task
def is_non_exploratory_task_sampling_available(self):
return self.gmm is not None
def non_exploratory_task_sampling(self):
# 1 - Retrieve positive time-reward covariance for each Gaussian
self.times_rewards_covars = []
for pos, covar, w in zip(self.gmm.means_, self.gmm.covariances_, self.gmm.weights_):
if self.absolute_lp:
self.times_rewards_covars.append(np.abs(covar[-2, -1]))
else:
self.times_rewards_covars.append(max(0, covar[-2, -1]))
# 2 - Sample Gaussian according to its Learning Progress, defined as positive time-reward covariance
idx = proportional_choice(self.times_rewards_covars, self.random_state, eps=0.0)
# 3 - Sample task in Gaussian, without forgetting to remove time and reward dimension
new_task = self.random_state.multivariate_normal(self.gmm.means_[idx], self.gmm.covariances_[idx])[:-2]
new_task = np.clip(new_task, self.mins, self.maxs).astype(np.float32)
return {"task": new_task,
"infos": {
"bk_index": len(self.bk[list(self.bk.keys())[0]]) - 1,
"task_infos": idx}
}
Functions
def proportional_choice(v, random_state, eps=0.0)
-
Return an index of
v
chosen proportionally to values contained inv
.Args
v
- List of values
random_state
- Random generator
eps
- Epsilon used for an Epsilon-greedy strategy
Expand source code
def proportional_choice(v, random_state, eps=0.): ''' Return an index of `v` chosen proportionally to values contained in `v`. Args: v: List of values random_state: Random generator eps: Epsilon used for an Epsilon-greedy strategy ''' if np.sum(v) == 0 or random_state.rand() < eps: return random_state.randint(np.size(v)) else: probas = np.array(v) / np.sum(v) return np.where(random_state.multinomial(1, probas) == 1)[0][0]
Classes
class CovarGMM (mins, maxs, seed, env_reward_lb, env_reward_ub, absolute_lp=False, fit_rate=250, potential_ks=array([ 2, 3, 4, 5, 6, 7, 8, 9, 10]), random_task_ratio=0.2, nb_bootstrap=None, initial_dist=None)
-
Base class for ACL methods.
This will be used to sample tasks for the DeepRL student given a task space provided at the beginning of training.
Covar - Gaussian Mixture Model. Implementation of IGMM (https://www.ncbi.nlm.nih.gov/pmc/articles/PMC3893575/) + minor improvements.
Args
absolute_lp
- Original version does not use Absolute LP, only LP.
fit_rate
- Number of episodes between two fit of the GMM
potential_ks
- Range of number of Gaussians to try when fitting the GMM
random_task_ratio
- Ratio of randomly sampled tasks VS tasks sampling using GMM
nb_bootstrap
- Number of bootstrapping episodes, must be >= to fit_rate
initial_dist
- Initial Gaussian distribution. If None, bootstrap with random tasks
Expand source code
class CovarGMM(AbstractTeacher): def __init__(self, mins, maxs, seed, env_reward_lb, env_reward_ub, absolute_lp=False, fit_rate=250, potential_ks=np.arange(2, 11, 1), random_task_ratio=0.2, nb_bootstrap=None, initial_dist=None): ''' Covar - Gaussian Mixture Model. Implementation of IGMM (https://www.ncbi.nlm.nih.gov/pmc/articles/PMC3893575/) + minor improvements. Args: absolute_lp: Original version does not use Absolute LP, only LP. fit_rate: Number of episodes between two fit of the GMM potential_ks: Range of number of Gaussians to try when fitting the GMM random_task_ratio: Ratio of randomly sampled tasks VS tasks sampling using GMM nb_bootstrap: Number of bootstrapping episodes, must be >= to fit_rate initial_dist: Initial Gaussian distribution. If None, bootstrap with random tasks ''' AbstractTeacher.__init__(self, mins, maxs, env_reward_lb, env_reward_ub, seed) # Range of number of Gaussians to try when fitting the GMM self.potential_ks = potential_ks # Ratio of randomly sampled tasks VS tasks sampling using GMM self.random_task_ratio = random_task_ratio self.random_task_generator = Box(self.mins, self.maxs, dtype=np.float32) self.random_task_generator.seed(self.seed) # Number of episodes between two fit of the GMM self.fit_rate = fit_rate self.nb_bootstrap = nb_bootstrap if nb_bootstrap is not None else fit_rate # Number of bootstrapping episodes, must be >= to fit_rate self.initial_dist = initial_dist # Initial Gaussian distribution. If None, bootstrap with random tasks # Original version does not use Absolute LP, only LP. self.absolute_lp = absolute_lp self.tasks = [] self.tasks_times_rewards = [] self.all_times = np.arange(0, 1, 1/self.fit_rate) self.gmm = None # boring book-keeping self.bk = {'weights': [], 'covariances': [], 'means': [], 'tasks_lps': [], 'episodes': [], 'tasks_origin': []} def episodic_update(self, task, reward, is_success): # Compute time of task, relative to position in current batch of tasks current_time = self.all_times[len(self.tasks) % self.fit_rate] self.tasks.append(task) # Concatenate task with its corresponding time and reward self.tasks_times_rewards.append(np.array(task.tolist() + [current_time] + [reward])) if len(self.tasks) >= self.nb_bootstrap: # If initial bootstrapping is done if (len(self.tasks) % self.fit_rate) == 0: # Time to fit # 1 - Retrieve last <fit_rate> (task, time, reward) triplets cur_tasks_times_rewards = np.array(self.tasks_times_rewards[-self.fit_rate:]) # 2 - Fit batch of GMMs with varying number of Gaussians potential_gmms = [GMM(n_components=k, covariance_type='full', random_state=self.seed) for k in self.potential_ks] potential_gmms = [g.fit(cur_tasks_times_rewards) for g in potential_gmms] # 3 - Compute fitness and keep best GMM aics = [m.aic(cur_tasks_times_rewards) for m in potential_gmms] self.gmm = potential_gmms[np.argmin(aics)] # book-keeping self.bk['weights'].append(self.gmm.weights_.copy()) self.bk['covariances'].append(self.gmm.covariances_.copy()) self.bk['means'].append(self.gmm.means_.copy()) self.bk['tasks_lps'] = self.tasks_times_rewards self.bk['episodes'].append(len(self.tasks)) def sample_task(self): task_origin = None if len(self.tasks) < self.nb_bootstrap or self.random_state.random() < self.random_task_ratio or self.gmm is None: if self.initial_dist and len(self.tasks) < self.nb_bootstrap: # bootstrap in initial dist # Expert bootstrap Gaussian task sampling new_task = self.random_state.multivariate_normal(self.initial_dist['mean'], self.initial_dist['variance']) new_task = np.clip(new_task, self.mins, self.maxs).astype(np.float32) task_origin = -2 # -2 = task originates from initial bootstrap gaussian sampling else: # Random task sampling new_task = self.random_task_generator.sample() task_origin = -1 # -1 = task originates from random sampling else: # Task sampling based on positive time-reward covariance # 1 - Retrieve positive time-reward covariance for each Gaussian self.times_rewards_covars = [] for pos, covar, w in zip(self.gmm.means_, self.gmm.covariances_, self.gmm.weights_): if self.absolute_lp: self.times_rewards_covars.append(np.abs(covar[-2, -1])) else: self.times_rewards_covars.append(max(0, covar[-2, -1])) # 2 - Sample Gaussian according to its Learning Progress, defined as positive time-reward covariance idx = proportional_choice(self.times_rewards_covars, self.random_state, eps=0.0) task_origin = idx # 3 - Sample task in Gaussian, without forgetting to remove time and reward dimension new_task = self.random_state.multivariate_normal(self.gmm.means_[idx], self.gmm.covariances_[idx])[:-2] new_task = np.clip(new_task, self.mins, self.maxs).astype(np.float32) # boring book-keeping self.bk['tasks_origin'].append(task_origin) return new_task def is_non_exploratory_task_sampling_available(self): return self.gmm is not None def non_exploratory_task_sampling(self): # 1 - Retrieve positive time-reward covariance for each Gaussian self.times_rewards_covars = [] for pos, covar, w in zip(self.gmm.means_, self.gmm.covariances_, self.gmm.weights_): if self.absolute_lp: self.times_rewards_covars.append(np.abs(covar[-2, -1])) else: self.times_rewards_covars.append(max(0, covar[-2, -1])) # 2 - Sample Gaussian according to its Learning Progress, defined as positive time-reward covariance idx = proportional_choice(self.times_rewards_covars, self.random_state, eps=0.0) # 3 - Sample task in Gaussian, without forgetting to remove time and reward dimension new_task = self.random_state.multivariate_normal(self.gmm.means_[idx], self.gmm.covariances_[idx])[:-2] new_task = np.clip(new_task, self.mins, self.maxs).astype(np.float32) return {"task": new_task, "infos": { "bk_index": len(self.bk[list(self.bk.keys())[0]]) - 1, "task_infos": idx} }
Ancestors
Inherited members