Module TeachMyAgent.teachers.algos.adr

Expand source code
# Automatic Domain Randomization, see https://arxiv.org/abs/1910.07113 for details
# Implemented by Rémy Portelas and Clément Romac

import numpy as np
from gym.spaces import Box
from collections import deque
from TeachMyAgent.teachers.algos.AbstractTeacher import AbstractTeacher

class ADR(AbstractTeacher):
    def __init__(self, mins, maxs, seed, env_reward_lb, env_reward_ub, step_size, max_reward_thr, min_reward_thr,
                 initial_dist=None, boundary_sampling_p=0.5, queue_len=10, scale_reward=False):
        '''
            Automatic Domain Randomization (https://arxiv.org/abs/1910.07113).

            Args:
                step_size: Size of the growth (or decrease) of a bound at update
                max_reward_thr: Upper reward threshold used to inflate distribution
                min_reward_thr: Lowers reward threshold used to deflate distribution
                initial_dist: The mean of this initial distribution is used as the initial task used by ADR
                boundary_sampling_p: Probability to sample a dimension at a bound
                queue_len: Size of the queue associated to each bound. Once reached, ADR increases or decreases the bound.
        '''
        AbstractTeacher.__init__(self, mins, maxs, env_reward_lb, env_reward_ub, seed)
        self.nb_dims = len(self.mins)

        # Boundary sampling probability p_r
        self.bound_sampling_p = boundary_sampling_p

        # ADR step size
        self.step_size = step_size

        # Max reward threshold, sampling distribution inflates if mean reward above this
        self.max_reward_threshold = max_reward_thr
        if scale_reward:
            self.max_reward_threshold = np.interp(self.max_reward_threshold,
                                                  (self.env_reward_lb, self.env_reward_ub),
                                                  (0, 1))

        # Min reward threshold, sampling distribution deflates if mean reward below this
        self.min_reward_threshold = min_reward_thr
        if scale_reward:
            self.min_reward_threshold = np.interp(self.min_reward_threshold,
                                                  (self.env_reward_lb, self.env_reward_ub),
                                                  (0, 1))

        # max queue length
        self.window_len = queue_len

        # Set initial task space to predefined calibrated task
        initial_mean, initial_variance = self.get_or_create_dist(initial_dist, mins, maxs, subspace=True)

        # Single task version (as the original paper)
        self.cur_mins = initial_mean
        self.cur_maxs = initial_mean

        self.cur_mins = np.array(self.cur_mins, dtype=np.float32)  # current min bounds
        self.cur_maxs = np.array(self.cur_maxs, dtype=np.float32)  # current max bounds
        self.task_space = Box(self.cur_mins, self.cur_maxs, dtype=np.float32)
        self.task_space.seed(self.seed)

        # Init queues, one per task space dimension
        self.min_queues = [deque(maxlen=self.window_len) for _ in range(self.nb_dims)]
        self.max_queues = [deque(maxlen=self.window_len) for _ in range(self.nb_dims)]

        # Boring book-keeping
        self.episode_nb = 0
        self.bk = {'task_space': [(self.cur_mins.copy(),self.cur_maxs.copy())],
                   'episodes': []}

    def episodic_update(self, task, reward, is_success):
        self.episode_nb += 1

        # check for updates
        for i, (min_q, max_q, cur_min, cur_max) in enumerate(zip(self.min_queues, self.max_queues, self.cur_mins, self.cur_maxs)):
            if task[i] == cur_min:  # if the proposed task has the i^th dimension set to min boundary
                min_q.append(reward)
                if len(min_q) == self.window_len:
                    if np.mean(min_q) >= self.max_reward_threshold:  # decrease min boundary (inflate sampling space)
                        self.cur_mins[i] = max(self.cur_mins[i] - self.step_size, self.mins[i])
                    elif np.mean(min_q) <= self.min_reward_threshold:  # increase min boundary (deflate sampling space)
                        self.cur_mins[i] = min(self.cur_mins[i] + self.step_size, self.cur_maxs[i])
                    self.min_queues[i] = deque(maxlen=self.window_len)  # reset queue
            if task[i] == cur_max:  # if the proposed task has the i^th dimension set to max boundary
                max_q.append(reward)
                if len(max_q) == self.window_len:  # queue is full, time to update
                    if np.mean(max_q) >= self.max_reward_threshold:  # increase max boundary
                        self.cur_maxs[i] = min(self.cur_maxs[i] + self.step_size, self.maxs[i])
                    elif np.mean(max_q) <= self.min_reward_threshold:  # decrease max boundary
                        self.cur_maxs[i] = max(self.cur_maxs[i] - self.step_size, self.cur_mins[i])
                    self.max_queues[i] = deque(maxlen=self.window_len)  # reset queue

        prev_cur_mins, prev_cur_maxs = self.bk['task_space'][-1]
        if (prev_cur_mins != self.cur_mins).any() or (prev_cur_maxs != self.cur_maxs).any():  # were boundaries changed ?
            self.task_space = Box(self.cur_mins, self.cur_maxs, dtype=np.float32)
            self.task_space.seed(self.seed)
            # book-keeping only if boundaries were updates
            self.bk['task_space'].append((self.cur_mins.copy(), self.cur_maxs.copy()))
            self.bk['episodes'].append(self.episode_nb)

    def sample_task(self):
        new_task = self.non_exploratory_task_sampling()["task"]
        if self.random_state.random() < self.bound_sampling_p:  # set random dimension to min or max bound
            idx = self.random_state.randint(0, self.nb_dims)
            is_min_max_capped = np.array([self.cur_mins[idx] == self.mins[idx], self.cur_maxs[idx] == self.maxs[idx]])
            if not is_min_max_capped.all():  # both min and max bounds can increase, choose extremum randomly
                if self.random_state.random() < 0.5:  # skip min bound if already
                    new_task[idx] = self.cur_mins[idx]
                else:
                    new_task[idx] = self.cur_maxs[idx]
            elif not is_min_max_capped[0]:
                new_task[idx] = self.cur_mins[idx]
            elif not is_min_max_capped[1]:
                new_task[idx] = self.cur_maxs[idx]
        return new_task

    def non_exploratory_task_sampling(self):
        return {"task": self.task_space.sample(),
                "infos": {
                    "bk_index": len(self.bk[list(self.bk.keys())[0]]) - 1,
                    "task_infos": None}
                }

Classes

class ADR (mins, maxs, seed, env_reward_lb, env_reward_ub, step_size, max_reward_thr, min_reward_thr, initial_dist=None, boundary_sampling_p=0.5, queue_len=10, scale_reward=False)

Base class for ACL methods.

This will be used to sample tasks for the DeepRL student given a task space provided at the beginning of training.

Automatic Domain Randomization (https://arxiv.org/abs/1910.07113).

Args

step_size
Size of the growth (or decrease) of a bound at update
max_reward_thr
Upper reward threshold used to inflate distribution
min_reward_thr
Lowers reward threshold used to deflate distribution
initial_dist
The mean of this initial distribution is used as the initial task used by ADR
boundary_sampling_p
Probability to sample a dimension at a bound
queue_len
Size of the queue associated to each bound. Once reached, ADR increases or decreases the bound.
Expand source code
class ADR(AbstractTeacher):
    def __init__(self, mins, maxs, seed, env_reward_lb, env_reward_ub, step_size, max_reward_thr, min_reward_thr,
                 initial_dist=None, boundary_sampling_p=0.5, queue_len=10, scale_reward=False):
        '''
            Automatic Domain Randomization (https://arxiv.org/abs/1910.07113).

            Args:
                step_size: Size of the growth (or decrease) of a bound at update
                max_reward_thr: Upper reward threshold used to inflate distribution
                min_reward_thr: Lowers reward threshold used to deflate distribution
                initial_dist: The mean of this initial distribution is used as the initial task used by ADR
                boundary_sampling_p: Probability to sample a dimension at a bound
                queue_len: Size of the queue associated to each bound. Once reached, ADR increases or decreases the bound.
        '''
        AbstractTeacher.__init__(self, mins, maxs, env_reward_lb, env_reward_ub, seed)
        self.nb_dims = len(self.mins)

        # Boundary sampling probability p_r
        self.bound_sampling_p = boundary_sampling_p

        # ADR step size
        self.step_size = step_size

        # Max reward threshold, sampling distribution inflates if mean reward above this
        self.max_reward_threshold = max_reward_thr
        if scale_reward:
            self.max_reward_threshold = np.interp(self.max_reward_threshold,
                                                  (self.env_reward_lb, self.env_reward_ub),
                                                  (0, 1))

        # Min reward threshold, sampling distribution deflates if mean reward below this
        self.min_reward_threshold = min_reward_thr
        if scale_reward:
            self.min_reward_threshold = np.interp(self.min_reward_threshold,
                                                  (self.env_reward_lb, self.env_reward_ub),
                                                  (0, 1))

        # max queue length
        self.window_len = queue_len

        # Set initial task space to predefined calibrated task
        initial_mean, initial_variance = self.get_or_create_dist(initial_dist, mins, maxs, subspace=True)

        # Single task version (as the original paper)
        self.cur_mins = initial_mean
        self.cur_maxs = initial_mean

        self.cur_mins = np.array(self.cur_mins, dtype=np.float32)  # current min bounds
        self.cur_maxs = np.array(self.cur_maxs, dtype=np.float32)  # current max bounds
        self.task_space = Box(self.cur_mins, self.cur_maxs, dtype=np.float32)
        self.task_space.seed(self.seed)

        # Init queues, one per task space dimension
        self.min_queues = [deque(maxlen=self.window_len) for _ in range(self.nb_dims)]
        self.max_queues = [deque(maxlen=self.window_len) for _ in range(self.nb_dims)]

        # Boring book-keeping
        self.episode_nb = 0
        self.bk = {'task_space': [(self.cur_mins.copy(),self.cur_maxs.copy())],
                   'episodes': []}

    def episodic_update(self, task, reward, is_success):
        self.episode_nb += 1

        # check for updates
        for i, (min_q, max_q, cur_min, cur_max) in enumerate(zip(self.min_queues, self.max_queues, self.cur_mins, self.cur_maxs)):
            if task[i] == cur_min:  # if the proposed task has the i^th dimension set to min boundary
                min_q.append(reward)
                if len(min_q) == self.window_len:
                    if np.mean(min_q) >= self.max_reward_threshold:  # decrease min boundary (inflate sampling space)
                        self.cur_mins[i] = max(self.cur_mins[i] - self.step_size, self.mins[i])
                    elif np.mean(min_q) <= self.min_reward_threshold:  # increase min boundary (deflate sampling space)
                        self.cur_mins[i] = min(self.cur_mins[i] + self.step_size, self.cur_maxs[i])
                    self.min_queues[i] = deque(maxlen=self.window_len)  # reset queue
            if task[i] == cur_max:  # if the proposed task has the i^th dimension set to max boundary
                max_q.append(reward)
                if len(max_q) == self.window_len:  # queue is full, time to update
                    if np.mean(max_q) >= self.max_reward_threshold:  # increase max boundary
                        self.cur_maxs[i] = min(self.cur_maxs[i] + self.step_size, self.maxs[i])
                    elif np.mean(max_q) <= self.min_reward_threshold:  # decrease max boundary
                        self.cur_maxs[i] = max(self.cur_maxs[i] - self.step_size, self.cur_mins[i])
                    self.max_queues[i] = deque(maxlen=self.window_len)  # reset queue

        prev_cur_mins, prev_cur_maxs = self.bk['task_space'][-1]
        if (prev_cur_mins != self.cur_mins).any() or (prev_cur_maxs != self.cur_maxs).any():  # were boundaries changed ?
            self.task_space = Box(self.cur_mins, self.cur_maxs, dtype=np.float32)
            self.task_space.seed(self.seed)
            # book-keeping only if boundaries were updates
            self.bk['task_space'].append((self.cur_mins.copy(), self.cur_maxs.copy()))
            self.bk['episodes'].append(self.episode_nb)

    def sample_task(self):
        new_task = self.non_exploratory_task_sampling()["task"]
        if self.random_state.random() < self.bound_sampling_p:  # set random dimension to min or max bound
            idx = self.random_state.randint(0, self.nb_dims)
            is_min_max_capped = np.array([self.cur_mins[idx] == self.mins[idx], self.cur_maxs[idx] == self.maxs[idx]])
            if not is_min_max_capped.all():  # both min and max bounds can increase, choose extremum randomly
                if self.random_state.random() < 0.5:  # skip min bound if already
                    new_task[idx] = self.cur_mins[idx]
                else:
                    new_task[idx] = self.cur_maxs[idx]
            elif not is_min_max_capped[0]:
                new_task[idx] = self.cur_mins[idx]
            elif not is_min_max_capped[1]:
                new_task[idx] = self.cur_maxs[idx]
        return new_task

    def non_exploratory_task_sampling(self):
        return {"task": self.task_space.sample(),
                "infos": {
                    "bk_index": len(self.bk[list(self.bk.keys())[0]]) - 1,
                    "task_infos": None}
                }

Ancestors

Inherited members