Module TeachMyAgent.teachers.algos.alp_gmm
Expand source code
# Taken from https://arxiv.org/abs/1910.07224
# Modified by Clément Romac, copy of the license at TeachMyAgent/teachers/LICENSES/ALP-GMM
from sklearn.mixture import GaussianMixture as GMM
import numpy as np
from gym.spaces import Box
from TeachMyAgent.teachers.utils.dataset import BufferedDataset
from TeachMyAgent.teachers.algos.AbstractTeacher import AbstractTeacher
def proportional_choice(v, random_state, eps=0.):
'''
Return an index of `v` chosen proportionally to values contained in `v`.
Args:
v: List of values
random_state: Random generator
eps: Epsilon used for an Epsilon-greedy strategy
'''
if np.sum(v) == 0 or random_state.rand() < eps:
return random_state.randint(np.size(v))
else:
probas = np.array(v) / np.sum(v)
return np.where(random_state.multinomial(1, probas) == 1)[0][0]
# Absolute Learning Progress (ALP) computer object
# It uses a buffered kd-tree to efficiently implement a k-nearest-neighbor algorithm
class EmpiricalALPComputer():
'''
Absolute Learning Progress (ALP) computer object.
It uses a buffered kd-tree to efficiently implement a k-nearest-neighbor algorithm.
'''
def __init__(self, task_size, max_size=None, buffer_size=500):
self.alp_knn = BufferedDataset(1, task_size, buffer_size=buffer_size, lateness=0, max_size=max_size)
def compute_alp(self, task, reward):
alp = 0
lp = 0
if len(self.alp_knn) > 5:
# Compute absolute learning progress for new task
# 1 - Retrieve closest previous task
dist, idx = self.alp_knn.nn_y(task)
# 2 - Retrieve corresponding reward
closest_previous_task_reward = self.alp_knn.get_x(idx[0])
# 3 - Compute alp as absolute difference in reward
lp = reward - closest_previous_task_reward
alp = np.abs(lp)
# Add to database
self.alp_knn.add_xy(reward, task)
return alp, lp
# Absolute Learning Progress - Gaussian Mixture Model
class ALPGMM(AbstractTeacher):
def __init__(self, mins, maxs, seed, env_reward_lb, env_reward_ub, gmm_fitness_func="aic", warm_start=False, nb_em_init=1, fit_rate=250,
alp_max_size=None, alp_buffer_size=500, potential_ks=np.arange(2, 11, 1), random_task_ratio=0.2, nb_bootstrap=None, initial_dist=None):
'''
Absolute Learning Progress - Gaussian Mixture Model (https://arxiv.org/abs/1910.07224).
Args:
gmm_fitness_func: Fitness criterion when selecting best GMM among range of GMMs varying in number of Gaussians.
warm_start: Restart new fit by initializing with last fit
nb_em_init: Number of Expectation-Maximization trials when fitting
fit_rate: Number of episodes between two fit of the GMM
alp_max_size: Maximum number of episodes stored
alp_buffer_size: Maximal number of episodes to account for when computing ALP
potential_ks: Range of number of Gaussians to try when fitting the GMM
random_task_ratio: Ratio of randomly sampled tasks VS tasks sampling using GMM
nb_bootstrap: Number of bootstrapping episodes, must be >= to fit_rate
initial_dist: Initial Gaussian distribution. If None, bootstrap with random tasks
'''
AbstractTeacher.__init__(self, mins, maxs, env_reward_lb, env_reward_ub, seed)
# Range of number of Gaussians to try when fitting the GMM
self.potential_ks = potential_ks
# Restart new fit by initializing with last fit
self.warm_start = warm_start
# Fitness criterion when selecting best GMM among range of GMMs varying in number of Gaussians.
self.gmm_fitness_func = gmm_fitness_func
# Number of Expectation-Maximization trials when fitting
self.nb_em_init = nb_em_init
# Number of episodes between two fit of the GMM
self.fit_rate = fit_rate
self.nb_bootstrap = nb_bootstrap if nb_bootstrap is not None else fit_rate # Number of bootstrapping episodes, must be >= to fit_rate
self.initial_dist = initial_dist # Initial Gaussian distribution. If None, bootstrap with random tasks
# Ratio of randomly sampled tasks VS tasks sampling using GMM
self.random_task_ratio = random_task_ratio
self.random_task_generator = Box(self.mins, self.maxs, dtype=np.float32)
self.random_task_generator.seed(self.seed)
# Maximal number of episodes to account for when computing ALP
alp_max_size = alp_max_size
alp_buffer_size = alp_buffer_size
# Init ALP computer
self.alp_computer = EmpiricalALPComputer(len(mins), max_size=alp_max_size, buffer_size=alp_buffer_size)
self.tasks = []
self.alps = []
self.tasks_alps = []
# Init GMMs
self.potential_gmms = [self.init_gmm(k) for k in self.potential_ks]
self.gmm = None
# Boring book-keeping
self.bk = {'weights': [], 'covariances': [], 'means': [], 'tasks_alps': [],
'tasks_lps':[], 'episodes': [], 'tasks_origin': []}
def init_gmm(self, nb_gaussians):
'''
Init the GMM given the number of gaussians.
'''
return GMM(n_components=nb_gaussians, covariance_type='full', random_state=self.seed,
warm_start=self.warm_start, n_init=self.nb_em_init)
def get_nb_gmm_params(self, gmm):
'''
Assumes full covariance.
See https://stats.stackexchange.com/questions/229293/the-number-of-parameters-in-gaussian-mixture-model
'''
nb_gmms = gmm.get_params()['n_components']
d = len(self.mins)
params_per_gmm = (d*d - d)/2 + 2*d + 1
return nb_gmms * params_per_gmm - 1
def episodic_update(self, task, reward, is_success):
self.tasks.append(task)
is_update_time = False
# Compute corresponding ALP
alp, lp = self.alp_computer.compute_alp(task, reward)
self.alps.append(alp)
# Concatenate task vector with ALP dimension
self.tasks_alps.append(np.array(task.tolist() + [self.alps[-1]]))
if len(self.tasks) >= self.nb_bootstrap: # If initial bootstrapping is done
if (len(self.tasks) % self.fit_rate) == 0: # Time to fit
is_update_time = True
# 1 - Retrieve last <fit_rate> (task, reward) pairs
cur_tasks_alps = np.array(self.tasks_alps[-self.fit_rate:])
# 2 - Fit batch of GMMs with varying number of Gaussians
self.potential_gmms = [g.fit(cur_tasks_alps) for g in self.potential_gmms]
# 3 - Compute fitness and keep best GMM
fitnesses = []
if self.gmm_fitness_func == 'bic': # Bayesian Information Criterion
fitnesses = [m.bic(cur_tasks_alps) for m in self.potential_gmms]
elif self.gmm_fitness_func == 'aic': # Akaike Information Criterion
fitnesses = [m.aic(cur_tasks_alps) for m in self.potential_gmms]
elif self.gmm_fitness_func == 'aicc': # Modified AIC
n = self.fit_rate
fitnesses = []
for l, m in enumerate(self.potential_gmms):
k = self.get_nb_gmm_params(m)
penalty = (2*k*(k+1)) / (n-k-1)
fitnesses.append(m.aic(cur_tasks_alps) + penalty)
else:
raise NotImplementedError
exit(1)
self.gmm = self.potential_gmms[np.argmin(fitnesses)]
# book-keeping
self.bk['weights'].append(self.gmm.weights_.copy())
self.bk['covariances'].append(self.gmm.covariances_.copy())
self.bk['means'].append(self.gmm.means_.copy())
self.bk['tasks_alps'] = self.tasks_alps
self.bk['tasks_lps'].append(lp)
self.bk['episodes'].append(len(self.tasks))
return is_update_time
def sample_task(self):
task_origin = None
if len(self.tasks) < self.nb_bootstrap or self.random_state.random() < self.random_task_ratio or self.gmm is None:
if self.initial_dist and len(self.tasks) < self.nb_bootstrap: # bootstrap in initial dist
# Expert bootstrap Gaussian task sampling
new_task = self.random_state.multivariate_normal(self.initial_dist['mean'],
self.initial_dist['variance'])
new_task = np.clip(new_task, self.mins, self.maxs).astype(np.float32)
task_origin = -2 # -2 = task originates from initial bootstrap gaussian sampling
else:
# Random task sampling
new_task = self.random_task_generator.sample()
task_origin = -1 # -1 = task originates from random sampling
else:
# ALP-based task sampling
# 1 - Retrieve the mean ALP value of each Gaussian in the GMM
self.alp_means = []
for pos, _, w in zip(self.gmm.means_, self.gmm.covariances_, self.gmm.weights_):
self.alp_means.append(pos[-1])
# 2 - Sample Gaussian proportionally to its mean ALP
idx = proportional_choice(self.alp_means, self.random_state, eps=0.0)
task_origin = idx
# 3 - Sample task in Gaussian, without forgetting to remove ALP dimension
new_task = self.random_state.multivariate_normal(self.gmm.means_[idx], self.gmm.covariances_[idx])[:-1]
new_task = np.clip(new_task, self.mins, self.maxs).astype(np.float32)
# boring book-keeping
self.bk['tasks_origin'].append(task_origin)
return new_task
def is_non_exploratory_task_sampling_available(self):
return self.gmm is not None
def non_exploratory_task_sampling(self):
# 1 - Retrieve the mean ALP value of each Gaussian in the GMM
alp_means = []
for pos, _, w in zip(self.gmm.means_, self.gmm.covariances_, self.gmm.weights_):
alp_means.append(pos[-1])
# 2 - Sample Gaussian proportionally to its mean ALP
idx = proportional_choice(alp_means, self.random_state, eps=0.0)
# 3 - Sample task in Gaussian, without forgetting to remove ALP dimension
new_task = self.random_state.multivariate_normal(self.gmm.means_[idx], self.gmm.covariances_[idx])[:-1]
new_task = np.clip(new_task, self.mins, self.maxs).astype(np.float32)
return {"task": new_task,
"infos": {
"bk_index": len(self.bk[list(self.bk.keys())[0]]) - 1,
"task_infos": idx}
}
Functions
def proportional_choice(v, random_state, eps=0.0)
-
Return an index of
v
chosen proportionally to values contained inv
.Args
v
- List of values
random_state
- Random generator
eps
- Epsilon used for an Epsilon-greedy strategy
Expand source code
def proportional_choice(v, random_state, eps=0.): ''' Return an index of `v` chosen proportionally to values contained in `v`. Args: v: List of values random_state: Random generator eps: Epsilon used for an Epsilon-greedy strategy ''' if np.sum(v) == 0 or random_state.rand() < eps: return random_state.randint(np.size(v)) else: probas = np.array(v) / np.sum(v) return np.where(random_state.multinomial(1, probas) == 1)[0][0]
Classes
class ALPGMM (mins, maxs, seed, env_reward_lb, env_reward_ub, gmm_fitness_func='aic', warm_start=False, nb_em_init=1, fit_rate=250, alp_max_size=None, alp_buffer_size=500, potential_ks=array([ 2, 3, 4, 5, 6, 7, 8, 9, 10]), random_task_ratio=0.2, nb_bootstrap=None, initial_dist=None)
-
Base class for ACL methods.
This will be used to sample tasks for the DeepRL student given a task space provided at the beginning of training.
Absolute Learning Progress - Gaussian Mixture Model (https://arxiv.org/abs/1910.07224).
Args
gmm_fitness_func
- Fitness criterion when selecting best GMM among range of GMMs varying in number of Gaussians.
warm_start
- Restart new fit by initializing with last fit
nb_em_init
- Number of Expectation-Maximization trials when fitting
fit_rate
- Number of episodes between two fit of the GMM
alp_max_size
- Maximum number of episodes stored
alp_buffer_size
- Maximal number of episodes to account for when computing ALP
potential_ks
- Range of number of Gaussians to try when fitting the GMM
random_task_ratio
- Ratio of randomly sampled tasks VS tasks sampling using GMM
nb_bootstrap
- Number of bootstrapping episodes, must be >= to fit_rate
initial_dist
- Initial Gaussian distribution. If None, bootstrap with random tasks
Expand source code
class ALPGMM(AbstractTeacher): def __init__(self, mins, maxs, seed, env_reward_lb, env_reward_ub, gmm_fitness_func="aic", warm_start=False, nb_em_init=1, fit_rate=250, alp_max_size=None, alp_buffer_size=500, potential_ks=np.arange(2, 11, 1), random_task_ratio=0.2, nb_bootstrap=None, initial_dist=None): ''' Absolute Learning Progress - Gaussian Mixture Model (https://arxiv.org/abs/1910.07224). Args: gmm_fitness_func: Fitness criterion when selecting best GMM among range of GMMs varying in number of Gaussians. warm_start: Restart new fit by initializing with last fit nb_em_init: Number of Expectation-Maximization trials when fitting fit_rate: Number of episodes between two fit of the GMM alp_max_size: Maximum number of episodes stored alp_buffer_size: Maximal number of episodes to account for when computing ALP potential_ks: Range of number of Gaussians to try when fitting the GMM random_task_ratio: Ratio of randomly sampled tasks VS tasks sampling using GMM nb_bootstrap: Number of bootstrapping episodes, must be >= to fit_rate initial_dist: Initial Gaussian distribution. If None, bootstrap with random tasks ''' AbstractTeacher.__init__(self, mins, maxs, env_reward_lb, env_reward_ub, seed) # Range of number of Gaussians to try when fitting the GMM self.potential_ks = potential_ks # Restart new fit by initializing with last fit self.warm_start = warm_start # Fitness criterion when selecting best GMM among range of GMMs varying in number of Gaussians. self.gmm_fitness_func = gmm_fitness_func # Number of Expectation-Maximization trials when fitting self.nb_em_init = nb_em_init # Number of episodes between two fit of the GMM self.fit_rate = fit_rate self.nb_bootstrap = nb_bootstrap if nb_bootstrap is not None else fit_rate # Number of bootstrapping episodes, must be >= to fit_rate self.initial_dist = initial_dist # Initial Gaussian distribution. If None, bootstrap with random tasks # Ratio of randomly sampled tasks VS tasks sampling using GMM self.random_task_ratio = random_task_ratio self.random_task_generator = Box(self.mins, self.maxs, dtype=np.float32) self.random_task_generator.seed(self.seed) # Maximal number of episodes to account for when computing ALP alp_max_size = alp_max_size alp_buffer_size = alp_buffer_size # Init ALP computer self.alp_computer = EmpiricalALPComputer(len(mins), max_size=alp_max_size, buffer_size=alp_buffer_size) self.tasks = [] self.alps = [] self.tasks_alps = [] # Init GMMs self.potential_gmms = [self.init_gmm(k) for k in self.potential_ks] self.gmm = None # Boring book-keeping self.bk = {'weights': [], 'covariances': [], 'means': [], 'tasks_alps': [], 'tasks_lps':[], 'episodes': [], 'tasks_origin': []} def init_gmm(self, nb_gaussians): ''' Init the GMM given the number of gaussians. ''' return GMM(n_components=nb_gaussians, covariance_type='full', random_state=self.seed, warm_start=self.warm_start, n_init=self.nb_em_init) def get_nb_gmm_params(self, gmm): ''' Assumes full covariance. See https://stats.stackexchange.com/questions/229293/the-number-of-parameters-in-gaussian-mixture-model ''' nb_gmms = gmm.get_params()['n_components'] d = len(self.mins) params_per_gmm = (d*d - d)/2 + 2*d + 1 return nb_gmms * params_per_gmm - 1 def episodic_update(self, task, reward, is_success): self.tasks.append(task) is_update_time = False # Compute corresponding ALP alp, lp = self.alp_computer.compute_alp(task, reward) self.alps.append(alp) # Concatenate task vector with ALP dimension self.tasks_alps.append(np.array(task.tolist() + [self.alps[-1]])) if len(self.tasks) >= self.nb_bootstrap: # If initial bootstrapping is done if (len(self.tasks) % self.fit_rate) == 0: # Time to fit is_update_time = True # 1 - Retrieve last <fit_rate> (task, reward) pairs cur_tasks_alps = np.array(self.tasks_alps[-self.fit_rate:]) # 2 - Fit batch of GMMs with varying number of Gaussians self.potential_gmms = [g.fit(cur_tasks_alps) for g in self.potential_gmms] # 3 - Compute fitness and keep best GMM fitnesses = [] if self.gmm_fitness_func == 'bic': # Bayesian Information Criterion fitnesses = [m.bic(cur_tasks_alps) for m in self.potential_gmms] elif self.gmm_fitness_func == 'aic': # Akaike Information Criterion fitnesses = [m.aic(cur_tasks_alps) for m in self.potential_gmms] elif self.gmm_fitness_func == 'aicc': # Modified AIC n = self.fit_rate fitnesses = [] for l, m in enumerate(self.potential_gmms): k = self.get_nb_gmm_params(m) penalty = (2*k*(k+1)) / (n-k-1) fitnesses.append(m.aic(cur_tasks_alps) + penalty) else: raise NotImplementedError exit(1) self.gmm = self.potential_gmms[np.argmin(fitnesses)] # book-keeping self.bk['weights'].append(self.gmm.weights_.copy()) self.bk['covariances'].append(self.gmm.covariances_.copy()) self.bk['means'].append(self.gmm.means_.copy()) self.bk['tasks_alps'] = self.tasks_alps self.bk['tasks_lps'].append(lp) self.bk['episodes'].append(len(self.tasks)) return is_update_time def sample_task(self): task_origin = None if len(self.tasks) < self.nb_bootstrap or self.random_state.random() < self.random_task_ratio or self.gmm is None: if self.initial_dist and len(self.tasks) < self.nb_bootstrap: # bootstrap in initial dist # Expert bootstrap Gaussian task sampling new_task = self.random_state.multivariate_normal(self.initial_dist['mean'], self.initial_dist['variance']) new_task = np.clip(new_task, self.mins, self.maxs).astype(np.float32) task_origin = -2 # -2 = task originates from initial bootstrap gaussian sampling else: # Random task sampling new_task = self.random_task_generator.sample() task_origin = -1 # -1 = task originates from random sampling else: # ALP-based task sampling # 1 - Retrieve the mean ALP value of each Gaussian in the GMM self.alp_means = [] for pos, _, w in zip(self.gmm.means_, self.gmm.covariances_, self.gmm.weights_): self.alp_means.append(pos[-1]) # 2 - Sample Gaussian proportionally to its mean ALP idx = proportional_choice(self.alp_means, self.random_state, eps=0.0) task_origin = idx # 3 - Sample task in Gaussian, without forgetting to remove ALP dimension new_task = self.random_state.multivariate_normal(self.gmm.means_[idx], self.gmm.covariances_[idx])[:-1] new_task = np.clip(new_task, self.mins, self.maxs).astype(np.float32) # boring book-keeping self.bk['tasks_origin'].append(task_origin) return new_task def is_non_exploratory_task_sampling_available(self): return self.gmm is not None def non_exploratory_task_sampling(self): # 1 - Retrieve the mean ALP value of each Gaussian in the GMM alp_means = [] for pos, _, w in zip(self.gmm.means_, self.gmm.covariances_, self.gmm.weights_): alp_means.append(pos[-1]) # 2 - Sample Gaussian proportionally to its mean ALP idx = proportional_choice(alp_means, self.random_state, eps=0.0) # 3 - Sample task in Gaussian, without forgetting to remove ALP dimension new_task = self.random_state.multivariate_normal(self.gmm.means_[idx], self.gmm.covariances_[idx])[:-1] new_task = np.clip(new_task, self.mins, self.maxs).astype(np.float32) return {"task": new_task, "infos": { "bk_index": len(self.bk[list(self.bk.keys())[0]]) - 1, "task_infos": idx} }
Ancestors
Methods
def get_nb_gmm_params(self, gmm)
-
Assumes full covariance. See https://stats.stackexchange.com/questions/229293/the-number-of-parameters-in-gaussian-mixture-model
Expand source code
def get_nb_gmm_params(self, gmm): ''' Assumes full covariance. See https://stats.stackexchange.com/questions/229293/the-number-of-parameters-in-gaussian-mixture-model ''' nb_gmms = gmm.get_params()['n_components'] d = len(self.mins) params_per_gmm = (d*d - d)/2 + 2*d + 1 return nb_gmms * params_per_gmm - 1
def init_gmm(self, nb_gaussians)
-
Init the GMM given the number of gaussians.
Expand source code
def init_gmm(self, nb_gaussians): ''' Init the GMM given the number of gaussians. ''' return GMM(n_components=nb_gaussians, covariance_type='full', random_state=self.seed, warm_start=self.warm_start, n_init=self.nb_em_init)
Inherited members
class EmpiricalALPComputer (task_size, max_size=None, buffer_size=500)
-
Absolute Learning Progress (ALP) computer object. It uses a buffered kd-tree to efficiently implement a k-nearest-neighbor algorithm.
Expand source code
class EmpiricalALPComputer(): ''' Absolute Learning Progress (ALP) computer object. It uses a buffered kd-tree to efficiently implement a k-nearest-neighbor algorithm. ''' def __init__(self, task_size, max_size=None, buffer_size=500): self.alp_knn = BufferedDataset(1, task_size, buffer_size=buffer_size, lateness=0, max_size=max_size) def compute_alp(self, task, reward): alp = 0 lp = 0 if len(self.alp_knn) > 5: # Compute absolute learning progress for new task # 1 - Retrieve closest previous task dist, idx = self.alp_knn.nn_y(task) # 2 - Retrieve corresponding reward closest_previous_task_reward = self.alp_knn.get_x(idx[0]) # 3 - Compute alp as absolute difference in reward lp = reward - closest_previous_task_reward alp = np.abs(lp) # Add to database self.alp_knn.add_xy(reward, task) return alp, lp
Methods
def compute_alp(self, task, reward)
-
Expand source code
def compute_alp(self, task, reward): alp = 0 lp = 0 if len(self.alp_knn) > 5: # Compute absolute learning progress for new task # 1 - Retrieve closest previous task dist, idx = self.alp_knn.nn_y(task) # 2 - Retrieve corresponding reward closest_previous_task_reward = self.alp_knn.get_x(idx[0]) # 3 - Compute alp as absolute difference in reward lp = reward - closest_previous_task_reward alp = np.abs(lp) # Add to database self.alp_knn.add_xy(reward, task) return alp, lp