Module TeachMyAgent.teachers.utils.dimensions_shuffler

Expand source code
import numpy as np
import random

class DimensionsShuffler():
    def __init__(self, mins, maxs, cuttings=4, seed=21):
        '''
            Object evenly cutting a task space into hypercubes and shuffling them.

            Args:
                mins: Lower bounds of task space
                max: Upper bounds of task space
                cuttings: How many cuttings should be done on each dimension
                seed: Seed of the random shuffler
        '''
        self.rnd_state = np.random.RandomState(seed)
        self.nb_dims = len(mins)
        self.dims_dicts = []
        self.last_raw_task = None
        self.last_interpolated_task = None
        for i in range(len(mins)):
            current_min = mins[i]
            current_max = maxs[i]
            region_size = abs(current_max-current_min) / cuttings
            current_dim_dict = {
                "original": [(current_min + j*region_size, current_min + (j+1)*region_size) for j in range(cuttings)],
                "shuffled": [(current_min + j*region_size, current_min + (j+1)*region_size) for j in range(cuttings)]
            }

            self.rnd_state.shuffle(current_dim_dict["shuffled"])
            self.dims_dicts.append(current_dim_dict)

    def interpolate_task(self, task):
        '''
            Maps a task from the original task space towards the shuffled one.
        '''
        new_task = []
        for i in range(self.nb_dims):
            try:
                region_index = next(idx for idx,v in enumerate(self.dims_dicts[i]["original"]) if v[0] <= task[i] <= v[1])
            except StopIteration as err:
                raise Exception("Unable to find the index of the {} dimension of task {}."
                                .format(i, task)) from err

            new_value = np.interp(task[i],
                                  self.dims_dicts[i]["original"][region_index],
                                  self.dims_dicts[i]["shuffled"][region_index])
            new_task.append(new_value)
        self.last_raw_task = task
        self.last_interpolated_task = np.array(new_task, dtype=np.float32)
        return self.last_interpolated_task

    def inverse_interpolate_task(self, task):
        '''
            Maps a task from the shuffled task space towards the original one.
        '''
        new_task = []
        for i in range(self.nb_dims):
            try:
                region_index = next(idx for idx,v in enumerate(self.dims_dicts[i]["shuffled"]) if v[0] <= task[i] <= v[1])
            except StopIteration as err:
                raise Exception("Unable to find the index of the {} dimension of task {}."
                                .format(i, task)) from err

            new_value = np.interp(task[i],
                                  self.dims_dicts[i]["shuffled"][region_index],
                                  self.dims_dicts[i]["original"][region_index])
            new_task.append(new_value)
        return np.array(new_task, dtype=np.float32)

Classes

class DimensionsShuffler (mins, maxs, cuttings=4, seed=21)

Object evenly cutting a task space into hypercubes and shuffling them.

Args

mins
Lower bounds of task space
max
Upper bounds of task space
cuttings
How many cuttings should be done on each dimension
seed
Seed of the random shuffler
Expand source code
class DimensionsShuffler():
    def __init__(self, mins, maxs, cuttings=4, seed=21):
        '''
            Object evenly cutting a task space into hypercubes and shuffling them.

            Args:
                mins: Lower bounds of task space
                max: Upper bounds of task space
                cuttings: How many cuttings should be done on each dimension
                seed: Seed of the random shuffler
        '''
        self.rnd_state = np.random.RandomState(seed)
        self.nb_dims = len(mins)
        self.dims_dicts = []
        self.last_raw_task = None
        self.last_interpolated_task = None
        for i in range(len(mins)):
            current_min = mins[i]
            current_max = maxs[i]
            region_size = abs(current_max-current_min) / cuttings
            current_dim_dict = {
                "original": [(current_min + j*region_size, current_min + (j+1)*region_size) for j in range(cuttings)],
                "shuffled": [(current_min + j*region_size, current_min + (j+1)*region_size) for j in range(cuttings)]
            }

            self.rnd_state.shuffle(current_dim_dict["shuffled"])
            self.dims_dicts.append(current_dim_dict)

    def interpolate_task(self, task):
        '''
            Maps a task from the original task space towards the shuffled one.
        '''
        new_task = []
        for i in range(self.nb_dims):
            try:
                region_index = next(idx for idx,v in enumerate(self.dims_dicts[i]["original"]) if v[0] <= task[i] <= v[1])
            except StopIteration as err:
                raise Exception("Unable to find the index of the {} dimension of task {}."
                                .format(i, task)) from err

            new_value = np.interp(task[i],
                                  self.dims_dicts[i]["original"][region_index],
                                  self.dims_dicts[i]["shuffled"][region_index])
            new_task.append(new_value)
        self.last_raw_task = task
        self.last_interpolated_task = np.array(new_task, dtype=np.float32)
        return self.last_interpolated_task

    def inverse_interpolate_task(self, task):
        '''
            Maps a task from the shuffled task space towards the original one.
        '''
        new_task = []
        for i in range(self.nb_dims):
            try:
                region_index = next(idx for idx,v in enumerate(self.dims_dicts[i]["shuffled"]) if v[0] <= task[i] <= v[1])
            except StopIteration as err:
                raise Exception("Unable to find the index of the {} dimension of task {}."
                                .format(i, task)) from err

            new_value = np.interp(task[i],
                                  self.dims_dicts[i]["shuffled"][region_index],
                                  self.dims_dicts[i]["original"][region_index])
            new_task.append(new_value)
        return np.array(new_task, dtype=np.float32)

Methods

def interpolate_task(self, task)

Maps a task from the original task space towards the shuffled one.

Expand source code
def interpolate_task(self, task):
    '''
        Maps a task from the original task space towards the shuffled one.
    '''
    new_task = []
    for i in range(self.nb_dims):
        try:
            region_index = next(idx for idx,v in enumerate(self.dims_dicts[i]["original"]) if v[0] <= task[i] <= v[1])
        except StopIteration as err:
            raise Exception("Unable to find the index of the {} dimension of task {}."
                            .format(i, task)) from err

        new_value = np.interp(task[i],
                              self.dims_dicts[i]["original"][region_index],
                              self.dims_dicts[i]["shuffled"][region_index])
        new_task.append(new_value)
    self.last_raw_task = task
    self.last_interpolated_task = np.array(new_task, dtype=np.float32)
    return self.last_interpolated_task
def inverse_interpolate_task(self, task)

Maps a task from the shuffled task space towards the original one.

Expand source code
def inverse_interpolate_task(self, task):
    '''
        Maps a task from the shuffled task space towards the original one.
    '''
    new_task = []
    for i in range(self.nb_dims):
        try:
            region_index = next(idx for idx,v in enumerate(self.dims_dicts[i]["shuffled"]) if v[0] <= task[i] <= v[1])
        except StopIteration as err:
            raise Exception("Unable to find the index of the {} dimension of task {}."
                            .format(i, task)) from err

        new_value = np.interp(task[i],
                              self.dims_dicts[i]["shuffled"][region_index],
                              self.dims_dicts[i]["original"][region_index])
        new_task.append(new_value)
    return np.array(new_task, dtype=np.float32)