Module TeachMyAgent.environments.envs.parametric_continuous_stump_tracks

Expand source code
# Parametric Walker continuous environment
#
# Reward is given for moving forward, total 300+ points up to the far end. If the robot falls,
# it gets -100. Applying motor torque costs a small amount of points, more optimal agent
# will get better score.
#
# State consists of hull angle speed, angular velocity, horizontal speed, vertical speed,
# position of joints and joints angular speed, legs contact with ground, and 10 lidar
# rangefinder measurements. There's no coordinates
# in the state vector.
#
# Initially Created by Oleg Klimov. Licensed on the same terms as the rest of OpenAI Gym.
# Modified by Rémy Portelas and licensed under TeachMyAgent/teachers/LICENSES/ALP-GMM
# Modified Clément Romac

#region Imports

import numpy as np
import Box2D
from Box2D.b2 import (edgeShape, circleShape, fixtureDef, polygonShape, revoluteJointDef, contactListener)
import gym
from gym import spaces
from gym.utils import colorize, seeding, EzPickle
import math

from TeachMyAgent.environments.envs.bodies.BodiesEnum import BodiesEnum
from TeachMyAgent.environments.envs.bodies.BodyTypesEnum import BodyTypesEnum
from TeachMyAgent.environments.envs.utils.custom_user_data import CustomUserDataObjectTypes, CustomUserData

#endregion

#region Utils


class ContactDetector(contactListener):
    '''
        Custom contact detector.
    '''
    def __init__(self, env):
        contactListener.__init__(self)
        self.env = env
    def BeginContact(self, contact):
        '''
            Triggered when contact is detected.

            Checks userData of each of the two fixtures colliding.
            Sets `userData.has_contact` to True on the body if `body.userData.check_contact == True`.
            If `userData.is_contact_critical == True`, `env.critical_contact` is set to True, stopping the episode.
        '''
        for body in [contact.fixtureA.body, contact.fixtureB.body]:
            if body.userData.object_type == CustomUserDataObjectTypes.BODY_OBJECT and body.userData.check_contact:
                body.userData.has_contact = True
                if body.userData.is_contact_critical:
                    self.env.head_contact = True

    def EndContact(self, contact):
        '''
            Triggered when contact ends.

            Sets `userData.has_contact` to False on the body if `body.userData.check_contact == True`.
        '''
        for body in [contact.fixtureA.body, contact.fixtureB.body]:
            if body.userData.object_type == CustomUserDataObjectTypes.BODY_OBJECT and body.userData.check_contact:
                body.userData.has_contact = False

def Rotate2D(pts,cnt,ang=np.pi/4):
    '''pts = {} Rotates points(nx2) about center cnt(2) by angle ang(1) in radian'''
    m1 = pts-cnt
    m2 = np.array([[np.cos(ang),np.sin(ang)],[-np.sin(ang),np.cos(ang)]])
    return np.dot(m1,m2)+cnt


class LidarCallback(Box2D.b2.rayCastCallback):
    '''
        Callback function triggered when lidar detects an object.
    '''
    def __init__(self, agent_mask_filter):
        '''
            Args:
                agent_mask_filter: Mask filter used to avoid detecting collisions with the agent's body
        '''
        Box2D.b2.rayCastCallback.__init__(self)
        self.agent_mask_filter = agent_mask_filter
        self.fixture = None
    def ReportFixture(self, fixture, point, normal, fraction):
        '''
            Triggered when a body is detected by the lidar.

            Returns:
                Distance to object detected.
        '''
        if (fixture.filterData.categoryBits & self.agent_mask_filter) == 0:
            return -1
        self.p2 = point
        self.fraction = fraction
        return fraction

#endregion

#region Constants

FPS    = 50
SCALE  = 30.0   # affects how fast-paced the game is, forces should be adjusted as well
VIEWPORT_W = 600 # Careful, this affects training
VIEWPORT_H = 400 # Careful, this affects training

RENDERING_VIEWER_W = VIEWPORT_W # Only affects rendering, not the policy
RENDERING_VIEWER_H = VIEWPORT_H # Only affects rendering, not the policy

NB_LIDAR = 10 # Number of lidars used by the agent
LIDAR_RANGE   = 160/SCALE

INITIAL_RANDOM = 5

TERRAIN_STEP   = 14/SCALE
TERRAIN_LENGTH = 200     # in steps
TERRAIN_HEIGHT = VIEWPORT_H/SCALE/4
TERRAIN_END    = 10    # in steps
INITIAL_TERRAIN_STARTPAD = 20 # in steps
FRICTION = 2.5

#endregion

class ParametricContinuousStumpTracks(gym.Env, EzPickle):
    '''
        The Stump Tracks: a procedurally generated Gym environment.
    '''
    metadata = {
        'render.modes': ['human', 'rgb_array'],
        'video.frames_per_second' : FPS
    }

    def __init__(self, walker_type, **walker_args):
        '''
            Creates a Stump Tracks environment with an embodiment.

                walker_type: Embodiment
            :type walker_type: BodiesEnum
                walker_args: kwargs controlling the agent (e.g. number of body for a millipede)
        '''

        super(ParametricContinuousStumpTracks, self).__init__()

        # Seed env and init Box2D
        self.seed()
        self.viewer = None

        self.world = Box2D.b2World()
        self.terrain = []

        self.prev_shaping = None

        # Create agent
        body_type = BodiesEnum.get_body_type(walker_type)
        if body_type == BodyTypesEnum.SWIMMER or body_type == BodyTypesEnum.AMPHIBIAN:
            self.walker_body = BodiesEnum[walker_type].value(SCALE, density=1.0, **walker_args)
        elif body_type == BodyTypesEnum.WALKER:
            self.walker_body = BodiesEnum[walker_type].value(SCALE, **walker_args,
                                                                reset_on_hull_critical_contact=True)
        else:
            self.walker_body = BodiesEnum[walker_type].value(SCALE, **walker_args)

        # Adapt startpad to walker's width
        self.TERRAIN_STARTPAD = INITIAL_TERRAIN_STARTPAD if \
            self.walker_body.AGENT_WIDTH / TERRAIN_STEP + 5 <= INITIAL_TERRAIN_STARTPAD else \
            self.walker_body.AGENT_WIDTH / TERRAIN_STEP + 5  # in steps
        self.create_terrain_fixtures()

        # Set observation / action spaces
        self._generate_walker()  # To get state / action sizes
        agent_action_size = self.walker_body.get_action_size()
        self.action_space = spaces.Box(np.array([-1] * agent_action_size),
                                       np.array([1] * agent_action_size), dtype=np.float32)

        agent_state_size = self.walker_body.get_state_size()
        high = np.array([np.inf] * (agent_state_size +
                                    4 +  # head infos
                                    NB_LIDAR))  # lidars infos
        self.observation_space = spaces.Box(-high, high, dtype=np.float32)

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def set_environment(self, roughness=None, stump_height=None, stump_width=None, stump_rot=None,
                        obstacle_spacing=None, poly_shape=None, stump_seq=None):
        '''
            Set the parameters controlling the PCG algorithm to generate a task.
            Call this method before `reset()`.

            Args:
                roughness: Input vector controlling the CPPN
                stump_height: Tuple specifying mean and std of a normal distribution from which the height of each stump is sampled
                stump_width: Tuple specifying mean and std of a normal distribution from which the width of each stump is sampled
                stump_rot: Tuple specifying mean and std of a normal distribution from which the rotation degree of each stump is sampled
                obstacle_spacing: Spacing between stumps
                poly_shape: Shape of polygon stumps
        '''
        self.roughness = roughness if roughness else 0
        self.obstacle_spacing = max(0.01, obstacle_spacing) if obstacle_spacing is not None else 8.0
        self.stump_height = [stump_height, 0.1] if stump_height is not None else None
        self.stump_width = stump_width
        self.stump_rot = stump_rot
        self.hexa_shape = poly_shape
        self.stump_seq = stump_seq
        if poly_shape is not None:
            self.hexa_shape = np.interp(poly_shape,[0,4],[0,4]).tolist()
            assert(len(poly_shape) == 12)
            self.hexa_shape = self.hexa_shape[0:12]

    def _destroy(self):
        # if not self.terrain: return
        self.world.contactListener = None
        for t in self.terrain:
            self.world.DestroyBody(t)
        self.terrain = []

        self.walker_body.destroy(self.world)

    def reset(self):
        self._destroy()
        self.world.contactListener_bug_workaround = ContactDetector(self)
        self.world.contactListener = self.world.contactListener_bug_workaround
        self.head_contact = False
        self.prev_shaping = None
        self.scroll = 0.0
        self.lidar_render = 0

        self.generate_game()

        self.drawlist = self.terrain + self.walker_body.get_elements_to_render()

        self.lidar = [LidarCallback(self.walker_body.reference_head_object.fixtures[0].filterData.maskBits)
                      for _ in range(NB_LIDAR)]
        self.episodic_reward = 0

        return self.step(np.array([0] * self.action_space.shape[0]))[0]

    def step(self, action):
        self.walker_body.activate_motors(action)

        self.world.Step(1.0/FPS, 6*30, 2*30)

        head = self.walker_body.reference_head_object
        pos = head.position
        vel = head.linearVelocity

        for i in range(NB_LIDAR):
            self.lidar[i].fraction = 1.0
            self.lidar[i].p1 = pos
            self.lidar[i].p2 = (
                pos[0] + math.sin(1.5*i/NB_LIDAR)*LIDAR_RANGE,
                pos[1] - math.cos(1.5*i/NB_LIDAR)*LIDAR_RANGE)
            self.world.RayCast(self.lidar[i], self.lidar[i].p1, self.lidar[i].p2)
        state = [
            head.angle,        # Normal angles up to 0.5 here, but sure more is possible.
            2.0*head.angularVelocity/FPS,
            0.3*vel.x*(VIEWPORT_W/SCALE)/FPS,  # Normalized to get -1..1 range
            0.3*vel.y*(VIEWPORT_H/SCALE)/FPS]

        # add leg-related state
        state.extend(self.walker_body.get_motors_state())

        if self.walker_body.body_type == BodyTypesEnum.CLIMBER:
            state.extend(self.walker_body.get_sensors_state())

        state += [l.fraction for l in self.lidar]

        self.scroll = pos.x - RENDERING_VIEWER_W/SCALE/5

        shaping  = 130*pos[0]/SCALE  # moving forward is a way to receive reward (normalized to get 300 on completion)
        if not (hasattr(self.walker_body, "remove_reward_on_head_angle") and self.walker_body.remove_reward_on_head_angle):
            shaping -= 5.0*abs(state[0])  # keep head straight, other than that and falling, any behavior is unpunished

        reward = 0
        if self.prev_shaping is not None:
            reward = shaping - self.prev_shaping
        self.prev_shaping = shaping

        for a in action:
            reward -= self.walker_body.TORQUE_PENALTY * 80 * np.clip(np.abs(a), 0, 1) # 80 => Original torque
            # normalized to about -50.0 using heuristic, more optimal agent should spend less

        done = False
        if self.head_contact or pos[0] < 0:
            reward = -100
            done   = True
        if pos[0] > (TERRAIN_LENGTH-TERRAIN_END)*TERRAIN_STEP:
            done   = True
        self.episodic_reward += reward
        return np.array(state), reward, done, {"success": self.episodic_reward > 230}

    def render(self, mode='human', draw_lidars=True):
        #self.scroll = 1
        from gym.envs.classic_control import rendering
        if self.viewer is None:
            self.viewer = rendering.Viewer(RENDERING_VIEWER_W, RENDERING_VIEWER_H)
        self.viewer.set_bounds(self.scroll, RENDERING_VIEWER_W/SCALE + self.scroll, 0, RENDERING_VIEWER_H/SCALE)

        self.viewer.draw_polygon( [
            (self.scroll,                  0),
            (self.scroll+RENDERING_VIEWER_W/SCALE, 0),
            (self.scroll+RENDERING_VIEWER_W/SCALE, RENDERING_VIEWER_H/SCALE),
            (self.scroll,                  RENDERING_VIEWER_H/SCALE),
            ], color=(0.9, 0.9, 1.0) )
        for poly,x1,x2 in self.cloud_poly:
            if x2 < self.scroll/2: continue
            if x1 > self.scroll/2 + RENDERING_VIEWER_W/SCALE: continue
            self.viewer.draw_polygon( [(p[0]+self.scroll/2, p[1]) for p in poly], color=(1,1,1))
        for poly, color in self.terrain_poly:
            if poly[1][0] < self.scroll: continue
            if poly[0][0] > self.scroll + RENDERING_VIEWER_W/SCALE: continue
            self.viewer.draw_polygon(poly, color=color)

        for obj in self.drawlist:
            for f in obj.fixtures:
                trans = f.body.transform
                if type(f.shape) is circleShape:
                    t = rendering.Transform(translation=trans*f.shape.pos)
                    self.viewer.draw_circle(f.shape.radius, 30, color=obj.color1).add_attr(t)
                    self.viewer.draw_circle(f.shape.radius, 30, color=obj.color2, filled=False, linewidth=2).add_attr(t)
                else:
                    path = [trans*v for v in f.shape.vertices]
                    self.viewer.draw_polygon(path, color=obj.color1)
                    path.append(path[0])
                    self.viewer.draw_polyline(path, color=obj.color2, linewidth=2)

        # Draw lidars
        if draw_lidars:
            for i in range(len(self.lidar)):
                l = self.lidar[i]
                self.viewer.draw_polyline([l.p1, l.p2], color=(1, 0, 0), linewidth=1)

        flagy1 = TERRAIN_HEIGHT
        flagy2 = flagy1 + 50/SCALE
        x = TERRAIN_STEP*3
        self.viewer.draw_polyline( [(x, flagy1), (x, flagy2)], color=(0,0,0), linewidth=2 )
        f = [(x, flagy2), (x, flagy2-10/SCALE), (x+25/SCALE, flagy2-5/SCALE)]
        self.viewer.draw_polygon(f, color=(0.9,0.2,0) )
        self.viewer.draw_polyline(f + [f[0]], color=(0,0,0), linewidth=2 )

        return self.viewer.render(return_rgb_array = mode=='rgb_array')

    def _SET_RENDERING_VIEWPORT_SIZE(self, width, height=None, keep_ratio=True):
        '''
            Set rendering viewport's size (i.e. image size).

            Args:
                width: viewport's width
                height: viewport's height
                keep_ratio: Whether height must be automatically calculated to keep the same ratio as the environment's viewport size.
        '''
        global RENDERING_VIEWER_W, RENDERING_VIEWER_H
        RENDERING_VIEWER_W = width
        if keep_ratio or height is None:
            RENDERING_VIEWER_H = int(RENDERING_VIEWER_W / (VIEWPORT_W / VIEWPORT_H))
        else:
            RENDERING_VIEWER_H = height

    def close(self):
        self._destroy()
        if self.viewer is not None:
            self.viewer.close()
            self.viewer = None

    #region Fixtures Initialization
    # ------------------------------------------ FIXTURES INITIALIZATION ------------------------------------------

    def create_terrain_fixtures(self):
        '''
            Create fixtures used to generate terrain.
        '''
        self.fd_polygon = fixtureDef(
            shape=polygonShape(vertices=
                               [(0, 0),
                                (1, 0),
                                (1, -1),
                                (0, -1)]),
            friction=FRICTION,
            categoryBits=0x1,
            maskBits=0xFFFF
        )

        self.fd_edge = fixtureDef(
            shape=edgeShape(vertices=
                            [(0, 0),
                             (1, 1)]),
            friction=FRICTION,
            categoryBits=0x1,
            maskBits=0xFFFF
        )

        # Init default hexagon fixture and shape, used only for Hexagon Tracks
        self.fd_default_hexagon = fixtureDef(
            shape=polygonShape(vertices=
                               [(0, 0),
                                (1, 0),
                                (1, -1),
                                (0, -1)]),
            friction=FRICTION,
            categoryBits=0x1,
            maskBits=0xFFFF
        )
        self.default_hexagon = [(-0.5, 0), (-0.5, 0.25), (-0.25, 0.5), (0.25, 0.5), (0.5, 0.25), (0.5, 0)]

    #endregion

    # region Game Generation
    # ------------------------------------------ GAME GENERATION ------------------------------------------

    def generate_game(self):
        '''
            Generate the task (i.e. terrain + embodiment).
        '''
        self._generate_terrain()
        self._generate_clouds()
        self._generate_walker()

    def _generate_terrain(self):
        GRASS, STUMP, HEXA = 0, None, None
        cpt=1
        if self.stump_height:
            STUMP = cpt
            cpt += 1
        if self.hexa_shape:
            HEXA = cpt
            cpt += 1
        if self.stump_seq is not None:
            SEQ = cpt
            cpt += 1
        _STATES_ = cpt

        state = self.np_random.randint(1, _STATES_)
        velocity = 0.0
        y = TERRAIN_HEIGHT
        self.terrain = []
        self.terrain_x = []
        self.terrain_y = []
        x = 0
        max_x = TERRAIN_LENGTH * TERRAIN_STEP

        # Add startpad
        max_startpad_x = self.TERRAIN_STARTPAD * TERRAIN_STEP
        self.terrain_x.append(x)
        self.terrain_y.append(y)
        x += max_startpad_x
        self.terrain_x.append(x)
        self.terrain_y.append(y)
        oneshot = True

        # Generation of terrain
        while x < max_x:
            self.terrain_x.append(x)

            if state==GRASS and not oneshot:
                velocity = 0.8*velocity + 0.01*np.sign(TERRAIN_HEIGHT - y)
                if x > max_startpad_x: velocity += self.np_random.uniform(-self.roughness, self.roughness)/SCALE
                y += velocity
                x += self.obstacle_spacing

            elif state==STUMP and oneshot:
                stump_height = max(0.05, self.np_random.normal(self.stump_height[0], self.stump_height[1]))
                stump_width = TERRAIN_STEP
                if self.stump_width is not None:
                    stump_width *= max(0.05, np.random.normal(self.stump_width[0], self.stump_width[1]))
                poly = [
                    (x, y),
                    (x+stump_width, y),
                    (x+stump_width, y+stump_height * TERRAIN_STEP),
                    (x,y+stump_height * TERRAIN_STEP),
                    ]
                x += stump_width
                if self.stump_rot is not None:
                    anchor = (np.array(poly[0]) + np.array(poly[1]))/2
                    rotation = np.clip(self.np_random.normal(self.stump_rot[0], self.stump_rot[1]),0,2*np.pi)
                    poly = Rotate2D(np.array(poly), anchor, rotation).tolist()
                self.fd_polygon.shape.vertices=poly
                t = self.world.CreateStaticBody(
                    fixtures = self.fd_polygon,
                    userData=CustomUserData("grass", CustomUserDataObjectTypes.TERRAIN))
                t.color1, t.color2 = (1,1,1), (0.6,0.6,0.6)
                self.terrain.append(t)
            elif state==HEXA and oneshot:
                # first point do not move
                poly = []
                delta_pos = []
                for i in range(0,len(self.hexa_shape),2):
                    delta_pos.append(tuple(np.random.normal(self.hexa_shape[i:i+2],0.1)))
                for i,(b,d) in enumerate(zip(self.default_hexagon, delta_pos)):
                    if i != 0 and i != (len(self.default_hexagon)-1):
                        poly.append((x + (b[0]*TERRAIN_STEP) + (d[0]*TERRAIN_STEP),
                                     y + (b[1]*TERRAIN_STEP) + (d[1]*TERRAIN_STEP)))
                    else:
                        poly.append((x + (b[0]*TERRAIN_STEP) + (d[0]*TERRAIN_STEP),
                                     y + (b[1]*TERRAIN_STEP)))
                x += 1
                self.fd_default_hexagon.shape.vertices = poly
                t = self.world.CreateStaticBody(
                    fixtures=self.fd_default_hexagon)
                t.color1, t.color2 = (1.0, np.clip(delta_pos[0][1]/3,0,1), np.clip(delta_pos[-1][1]/3,0,1)), (0.6, 0.6, 0.6)
                self.terrain.append(t)

            elif state==SEQ and oneshot:
                for height, width in zip(self.stump_seq[0::2], self.stump_seq[1::2]):
                    stump_height = max(0.05, self.np_random.normal(height, 0.1))
                    stump_width = max(0.05, self.np_random.normal(width, 0.1))
                    poly = [
                        (x, y),
                        (x + stump_width, y),
                        (x + stump_width, y + stump_height * TERRAIN_STEP),
                        (x, y + stump_height * TERRAIN_STEP),
                    ]
                    x += stump_width
                    self.fd_polygon.shape.vertices = poly
                    t = self.world.CreateStaticBody(
                        fixtures=self.fd_polygon,
                        userData=CustomUserData("grass", CustomUserDataObjectTypes.TERRAIN))
                    t.color1, t.color2 = (1, 1, 1), (0.6, 0.6, 0.6)
                    self.terrain.append(t)

            oneshot = False
            self.terrain_y.append(y)
            if state==GRASS:
                state = self.np_random.randint(1, _STATES_)
                oneshot = True
            else:
                state = GRASS
                oneshot = False

        # Draw terrain
        self.terrain_poly = []
        assert len(self.terrain_x) == len(self.terrain_y)
        for i in range(len(self.terrain_x)-1):
            poly = [
                (self.terrain_x[i],   self.terrain_y[i]),
                (self.terrain_x[i+1], self.terrain_y[i+1])
                ]
            self.fd_edge.shape.vertices=poly
            t = self.world.CreateStaticBody(
                fixtures = self.fd_edge,
                userData=CustomUserData("grass", CustomUserDataObjectTypes.TERRAIN))
            color = (0.3, 1.0 if (i % 2) == 0 else 0.8, 0.3)
            t.color1 = color
            t.color2 = color
            self.terrain.append(t)
            color = (0.4, 0.6, 0.3)
            poly += [ (poly[1][0], 0), (poly[0][0], 0) ]
            self.terrain_poly.append( (poly, color) )
        self.terrain.reverse()

    def _generate_clouds(self):
        # Sorry for the clouds, couldn't resist
        self.cloud_poly   = []
        for i in range(TERRAIN_LENGTH//20):
            x = self.np_random.uniform(0, TERRAIN_LENGTH)*TERRAIN_STEP
            y = VIEWPORT_H/SCALE*3/4
            poly = [
                (x+15*TERRAIN_STEP*math.sin(3.14*2*a/5)+self.np_random.uniform(0,5*TERRAIN_STEP),
                 y+ 5*TERRAIN_STEP*math.cos(3.14*2*a/5)+self.np_random.uniform(0,5*TERRAIN_STEP) )
                for a in range(5) ]
            x1 = min( [p[0] for p in poly] )
            x2 = max( [p[0] for p in poly] )
            self.cloud_poly.append( (poly,x1,x2) )

    def _generate_walker(self):
        init_x = TERRAIN_STEP*self.TERRAIN_STARTPAD/2
        if hasattr(self.walker_body, "old_morphology") and self.walker_body.old_morphology:
            init_y = TERRAIN_HEIGHT + 2 * self.walker_body.LEG_H
        else:
            init_y = TERRAIN_HEIGHT + self.walker_body.AGENT_CENTER_HEIGHT

        self.walker_body.draw(
            self.world,
            init_x,
            init_y,
            self.np_random.uniform(-INITIAL_RANDOM, INITIAL_RANDOM)
        )

    #endregion

Functions

def Rotate2D(pts, cnt, ang=0.7853981633974483)

pts = {} Rotates points(nx2) about center cnt(2) by angle ang(1) in radian

Expand source code
def Rotate2D(pts,cnt,ang=np.pi/4):
    '''pts = {} Rotates points(nx2) about center cnt(2) by angle ang(1) in radian'''
    m1 = pts-cnt
    m2 = np.array([[np.cos(ang),np.sin(ang)],[-np.sin(ang),np.cos(ang)]])
    return np.dot(m1,m2)+cnt

Classes

class ContactDetector (env)

Custom contact detector.

Expand source code
class ContactDetector(contactListener):
    '''
        Custom contact detector.
    '''
    def __init__(self, env):
        contactListener.__init__(self)
        self.env = env
    def BeginContact(self, contact):
        '''
            Triggered when contact is detected.

            Checks userData of each of the two fixtures colliding.
            Sets `userData.has_contact` to True on the body if `body.userData.check_contact == True`.
            If `userData.is_contact_critical == True`, `env.critical_contact` is set to True, stopping the episode.
        '''
        for body in [contact.fixtureA.body, contact.fixtureB.body]:
            if body.userData.object_type == CustomUserDataObjectTypes.BODY_OBJECT and body.userData.check_contact:
                body.userData.has_contact = True
                if body.userData.is_contact_critical:
                    self.env.head_contact = True

    def EndContact(self, contact):
        '''
            Triggered when contact ends.

            Sets `userData.has_contact` to False on the body if `body.userData.check_contact == True`.
        '''
        for body in [contact.fixtureA.body, contact.fixtureB.body]:
            if body.userData.object_type == CustomUserDataObjectTypes.BODY_OBJECT and body.userData.check_contact:
                body.userData.has_contact = False

Ancestors

  • Box2D.Box2D.b2ContactListener

Methods

def BeginContact(self, contact)

Triggered when contact is detected.

Checks userData of each of the two fixtures colliding. Sets userData.has_contact to True on the body if body.userData.check_contact == True. If userData.is_contact_critical == True, env.critical_contact is set to True, stopping the episode.

Expand source code
def BeginContact(self, contact):
    '''
        Triggered when contact is detected.

        Checks userData of each of the two fixtures colliding.
        Sets `userData.has_contact` to True on the body if `body.userData.check_contact == True`.
        If `userData.is_contact_critical == True`, `env.critical_contact` is set to True, stopping the episode.
    '''
    for body in [contact.fixtureA.body, contact.fixtureB.body]:
        if body.userData.object_type == CustomUserDataObjectTypes.BODY_OBJECT and body.userData.check_contact:
            body.userData.has_contact = True
            if body.userData.is_contact_critical:
                self.env.head_contact = True
def EndContact(self, contact)

Triggered when contact ends.

Sets userData.has_contact to False on the body if body.userData.check_contact == True.

Expand source code
def EndContact(self, contact):
    '''
        Triggered when contact ends.

        Sets `userData.has_contact` to False on the body if `body.userData.check_contact == True`.
    '''
    for body in [contact.fixtureA.body, contact.fixtureB.body]:
        if body.userData.object_type == CustomUserDataObjectTypes.BODY_OBJECT and body.userData.check_contact:
            body.userData.has_contact = False
class LidarCallback (agent_mask_filter)

Callback function triggered when lidar detects an object.

Args

agent_mask_filter
Mask filter used to avoid detecting collisions with the agent's body
Expand source code
class LidarCallback(Box2D.b2.rayCastCallback):
    '''
        Callback function triggered when lidar detects an object.
    '''
    def __init__(self, agent_mask_filter):
        '''
            Args:
                agent_mask_filter: Mask filter used to avoid detecting collisions with the agent's body
        '''
        Box2D.b2.rayCastCallback.__init__(self)
        self.agent_mask_filter = agent_mask_filter
        self.fixture = None
    def ReportFixture(self, fixture, point, normal, fraction):
        '''
            Triggered when a body is detected by the lidar.

            Returns:
                Distance to object detected.
        '''
        if (fixture.filterData.categoryBits & self.agent_mask_filter) == 0:
            return -1
        self.p2 = point
        self.fraction = fraction
        return fraction

Ancestors

  • Box2D.Box2D.b2RayCastCallback

Methods

def ReportFixture(self, fixture, point, normal, fraction)

Triggered when a body is detected by the lidar.

Returns

Distance to object detected.

Expand source code
def ReportFixture(self, fixture, point, normal, fraction):
    '''
        Triggered when a body is detected by the lidar.

        Returns:
            Distance to object detected.
    '''
    if (fixture.filterData.categoryBits & self.agent_mask_filter) == 0:
        return -1
    self.p2 = point
    self.fraction = fraction
    return fraction
class ParametricContinuousStumpTracks (walker_type, **walker_args)

The Stump Tracks: a procedurally generated Gym environment.

Creates a Stump Tracks environment with an embodiment.

walker_type: Embodiment

:type walker_type: BodiesEnum walker_args: kwargs controlling the agent (e.g. number of body for a millipede)

Expand source code
class ParametricContinuousStumpTracks(gym.Env, EzPickle):
    '''
        The Stump Tracks: a procedurally generated Gym environment.
    '''
    metadata = {
        'render.modes': ['human', 'rgb_array'],
        'video.frames_per_second' : FPS
    }

    def __init__(self, walker_type, **walker_args):
        '''
            Creates a Stump Tracks environment with an embodiment.

                walker_type: Embodiment
            :type walker_type: BodiesEnum
                walker_args: kwargs controlling the agent (e.g. number of body for a millipede)
        '''

        super(ParametricContinuousStumpTracks, self).__init__()

        # Seed env and init Box2D
        self.seed()
        self.viewer = None

        self.world = Box2D.b2World()
        self.terrain = []

        self.prev_shaping = None

        # Create agent
        body_type = BodiesEnum.get_body_type(walker_type)
        if body_type == BodyTypesEnum.SWIMMER or body_type == BodyTypesEnum.AMPHIBIAN:
            self.walker_body = BodiesEnum[walker_type].value(SCALE, density=1.0, **walker_args)
        elif body_type == BodyTypesEnum.WALKER:
            self.walker_body = BodiesEnum[walker_type].value(SCALE, **walker_args,
                                                                reset_on_hull_critical_contact=True)
        else:
            self.walker_body = BodiesEnum[walker_type].value(SCALE, **walker_args)

        # Adapt startpad to walker's width
        self.TERRAIN_STARTPAD = INITIAL_TERRAIN_STARTPAD if \
            self.walker_body.AGENT_WIDTH / TERRAIN_STEP + 5 <= INITIAL_TERRAIN_STARTPAD else \
            self.walker_body.AGENT_WIDTH / TERRAIN_STEP + 5  # in steps
        self.create_terrain_fixtures()

        # Set observation / action spaces
        self._generate_walker()  # To get state / action sizes
        agent_action_size = self.walker_body.get_action_size()
        self.action_space = spaces.Box(np.array([-1] * agent_action_size),
                                       np.array([1] * agent_action_size), dtype=np.float32)

        agent_state_size = self.walker_body.get_state_size()
        high = np.array([np.inf] * (agent_state_size +
                                    4 +  # head infos
                                    NB_LIDAR))  # lidars infos
        self.observation_space = spaces.Box(-high, high, dtype=np.float32)

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def set_environment(self, roughness=None, stump_height=None, stump_width=None, stump_rot=None,
                        obstacle_spacing=None, poly_shape=None, stump_seq=None):
        '''
            Set the parameters controlling the PCG algorithm to generate a task.
            Call this method before `reset()`.

            Args:
                roughness: Input vector controlling the CPPN
                stump_height: Tuple specifying mean and std of a normal distribution from which the height of each stump is sampled
                stump_width: Tuple specifying mean and std of a normal distribution from which the width of each stump is sampled
                stump_rot: Tuple specifying mean and std of a normal distribution from which the rotation degree of each stump is sampled
                obstacle_spacing: Spacing between stumps
                poly_shape: Shape of polygon stumps
        '''
        self.roughness = roughness if roughness else 0
        self.obstacle_spacing = max(0.01, obstacle_spacing) if obstacle_spacing is not None else 8.0
        self.stump_height = [stump_height, 0.1] if stump_height is not None else None
        self.stump_width = stump_width
        self.stump_rot = stump_rot
        self.hexa_shape = poly_shape
        self.stump_seq = stump_seq
        if poly_shape is not None:
            self.hexa_shape = np.interp(poly_shape,[0,4],[0,4]).tolist()
            assert(len(poly_shape) == 12)
            self.hexa_shape = self.hexa_shape[0:12]

    def _destroy(self):
        # if not self.terrain: return
        self.world.contactListener = None
        for t in self.terrain:
            self.world.DestroyBody(t)
        self.terrain = []

        self.walker_body.destroy(self.world)

    def reset(self):
        self._destroy()
        self.world.contactListener_bug_workaround = ContactDetector(self)
        self.world.contactListener = self.world.contactListener_bug_workaround
        self.head_contact = False
        self.prev_shaping = None
        self.scroll = 0.0
        self.lidar_render = 0

        self.generate_game()

        self.drawlist = self.terrain + self.walker_body.get_elements_to_render()

        self.lidar = [LidarCallback(self.walker_body.reference_head_object.fixtures[0].filterData.maskBits)
                      for _ in range(NB_LIDAR)]
        self.episodic_reward = 0

        return self.step(np.array([0] * self.action_space.shape[0]))[0]

    def step(self, action):
        self.walker_body.activate_motors(action)

        self.world.Step(1.0/FPS, 6*30, 2*30)

        head = self.walker_body.reference_head_object
        pos = head.position
        vel = head.linearVelocity

        for i in range(NB_LIDAR):
            self.lidar[i].fraction = 1.0
            self.lidar[i].p1 = pos
            self.lidar[i].p2 = (
                pos[0] + math.sin(1.5*i/NB_LIDAR)*LIDAR_RANGE,
                pos[1] - math.cos(1.5*i/NB_LIDAR)*LIDAR_RANGE)
            self.world.RayCast(self.lidar[i], self.lidar[i].p1, self.lidar[i].p2)
        state = [
            head.angle,        # Normal angles up to 0.5 here, but sure more is possible.
            2.0*head.angularVelocity/FPS,
            0.3*vel.x*(VIEWPORT_W/SCALE)/FPS,  # Normalized to get -1..1 range
            0.3*vel.y*(VIEWPORT_H/SCALE)/FPS]

        # add leg-related state
        state.extend(self.walker_body.get_motors_state())

        if self.walker_body.body_type == BodyTypesEnum.CLIMBER:
            state.extend(self.walker_body.get_sensors_state())

        state += [l.fraction for l in self.lidar]

        self.scroll = pos.x - RENDERING_VIEWER_W/SCALE/5

        shaping  = 130*pos[0]/SCALE  # moving forward is a way to receive reward (normalized to get 300 on completion)
        if not (hasattr(self.walker_body, "remove_reward_on_head_angle") and self.walker_body.remove_reward_on_head_angle):
            shaping -= 5.0*abs(state[0])  # keep head straight, other than that and falling, any behavior is unpunished

        reward = 0
        if self.prev_shaping is not None:
            reward = shaping - self.prev_shaping
        self.prev_shaping = shaping

        for a in action:
            reward -= self.walker_body.TORQUE_PENALTY * 80 * np.clip(np.abs(a), 0, 1) # 80 => Original torque
            # normalized to about -50.0 using heuristic, more optimal agent should spend less

        done = False
        if self.head_contact or pos[0] < 0:
            reward = -100
            done   = True
        if pos[0] > (TERRAIN_LENGTH-TERRAIN_END)*TERRAIN_STEP:
            done   = True
        self.episodic_reward += reward
        return np.array(state), reward, done, {"success": self.episodic_reward > 230}

    def render(self, mode='human', draw_lidars=True):
        #self.scroll = 1
        from gym.envs.classic_control import rendering
        if self.viewer is None:
            self.viewer = rendering.Viewer(RENDERING_VIEWER_W, RENDERING_VIEWER_H)
        self.viewer.set_bounds(self.scroll, RENDERING_VIEWER_W/SCALE + self.scroll, 0, RENDERING_VIEWER_H/SCALE)

        self.viewer.draw_polygon( [
            (self.scroll,                  0),
            (self.scroll+RENDERING_VIEWER_W/SCALE, 0),
            (self.scroll+RENDERING_VIEWER_W/SCALE, RENDERING_VIEWER_H/SCALE),
            (self.scroll,                  RENDERING_VIEWER_H/SCALE),
            ], color=(0.9, 0.9, 1.0) )
        for poly,x1,x2 in self.cloud_poly:
            if x2 < self.scroll/2: continue
            if x1 > self.scroll/2 + RENDERING_VIEWER_W/SCALE: continue
            self.viewer.draw_polygon( [(p[0]+self.scroll/2, p[1]) for p in poly], color=(1,1,1))
        for poly, color in self.terrain_poly:
            if poly[1][0] < self.scroll: continue
            if poly[0][0] > self.scroll + RENDERING_VIEWER_W/SCALE: continue
            self.viewer.draw_polygon(poly, color=color)

        for obj in self.drawlist:
            for f in obj.fixtures:
                trans = f.body.transform
                if type(f.shape) is circleShape:
                    t = rendering.Transform(translation=trans*f.shape.pos)
                    self.viewer.draw_circle(f.shape.radius, 30, color=obj.color1).add_attr(t)
                    self.viewer.draw_circle(f.shape.radius, 30, color=obj.color2, filled=False, linewidth=2).add_attr(t)
                else:
                    path = [trans*v for v in f.shape.vertices]
                    self.viewer.draw_polygon(path, color=obj.color1)
                    path.append(path[0])
                    self.viewer.draw_polyline(path, color=obj.color2, linewidth=2)

        # Draw lidars
        if draw_lidars:
            for i in range(len(self.lidar)):
                l = self.lidar[i]
                self.viewer.draw_polyline([l.p1, l.p2], color=(1, 0, 0), linewidth=1)

        flagy1 = TERRAIN_HEIGHT
        flagy2 = flagy1 + 50/SCALE
        x = TERRAIN_STEP*3
        self.viewer.draw_polyline( [(x, flagy1), (x, flagy2)], color=(0,0,0), linewidth=2 )
        f = [(x, flagy2), (x, flagy2-10/SCALE), (x+25/SCALE, flagy2-5/SCALE)]
        self.viewer.draw_polygon(f, color=(0.9,0.2,0) )
        self.viewer.draw_polyline(f + [f[0]], color=(0,0,0), linewidth=2 )

        return self.viewer.render(return_rgb_array = mode=='rgb_array')

    def _SET_RENDERING_VIEWPORT_SIZE(self, width, height=None, keep_ratio=True):
        '''
            Set rendering viewport's size (i.e. image size).

            Args:
                width: viewport's width
                height: viewport's height
                keep_ratio: Whether height must be automatically calculated to keep the same ratio as the environment's viewport size.
        '''
        global RENDERING_VIEWER_W, RENDERING_VIEWER_H
        RENDERING_VIEWER_W = width
        if keep_ratio or height is None:
            RENDERING_VIEWER_H = int(RENDERING_VIEWER_W / (VIEWPORT_W / VIEWPORT_H))
        else:
            RENDERING_VIEWER_H = height

    def close(self):
        self._destroy()
        if self.viewer is not None:
            self.viewer.close()
            self.viewer = None

    #region Fixtures Initialization
    # ------------------------------------------ FIXTURES INITIALIZATION ------------------------------------------

    def create_terrain_fixtures(self):
        '''
            Create fixtures used to generate terrain.
        '''
        self.fd_polygon = fixtureDef(
            shape=polygonShape(vertices=
                               [(0, 0),
                                (1, 0),
                                (1, -1),
                                (0, -1)]),
            friction=FRICTION,
            categoryBits=0x1,
            maskBits=0xFFFF
        )

        self.fd_edge = fixtureDef(
            shape=edgeShape(vertices=
                            [(0, 0),
                             (1, 1)]),
            friction=FRICTION,
            categoryBits=0x1,
            maskBits=0xFFFF
        )

        # Init default hexagon fixture and shape, used only for Hexagon Tracks
        self.fd_default_hexagon = fixtureDef(
            shape=polygonShape(vertices=
                               [(0, 0),
                                (1, 0),
                                (1, -1),
                                (0, -1)]),
            friction=FRICTION,
            categoryBits=0x1,
            maskBits=0xFFFF
        )
        self.default_hexagon = [(-0.5, 0), (-0.5, 0.25), (-0.25, 0.5), (0.25, 0.5), (0.5, 0.25), (0.5, 0)]

    #endregion

    # region Game Generation
    # ------------------------------------------ GAME GENERATION ------------------------------------------

    def generate_game(self):
        '''
            Generate the task (i.e. terrain + embodiment).
        '''
        self._generate_terrain()
        self._generate_clouds()
        self._generate_walker()

    def _generate_terrain(self):
        GRASS, STUMP, HEXA = 0, None, None
        cpt=1
        if self.stump_height:
            STUMP = cpt
            cpt += 1
        if self.hexa_shape:
            HEXA = cpt
            cpt += 1
        if self.stump_seq is not None:
            SEQ = cpt
            cpt += 1
        _STATES_ = cpt

        state = self.np_random.randint(1, _STATES_)
        velocity = 0.0
        y = TERRAIN_HEIGHT
        self.terrain = []
        self.terrain_x = []
        self.terrain_y = []
        x = 0
        max_x = TERRAIN_LENGTH * TERRAIN_STEP

        # Add startpad
        max_startpad_x = self.TERRAIN_STARTPAD * TERRAIN_STEP
        self.terrain_x.append(x)
        self.terrain_y.append(y)
        x += max_startpad_x
        self.terrain_x.append(x)
        self.terrain_y.append(y)
        oneshot = True

        # Generation of terrain
        while x < max_x:
            self.terrain_x.append(x)

            if state==GRASS and not oneshot:
                velocity = 0.8*velocity + 0.01*np.sign(TERRAIN_HEIGHT - y)
                if x > max_startpad_x: velocity += self.np_random.uniform(-self.roughness, self.roughness)/SCALE
                y += velocity
                x += self.obstacle_spacing

            elif state==STUMP and oneshot:
                stump_height = max(0.05, self.np_random.normal(self.stump_height[0], self.stump_height[1]))
                stump_width = TERRAIN_STEP
                if self.stump_width is not None:
                    stump_width *= max(0.05, np.random.normal(self.stump_width[0], self.stump_width[1]))
                poly = [
                    (x, y),
                    (x+stump_width, y),
                    (x+stump_width, y+stump_height * TERRAIN_STEP),
                    (x,y+stump_height * TERRAIN_STEP),
                    ]
                x += stump_width
                if self.stump_rot is not None:
                    anchor = (np.array(poly[0]) + np.array(poly[1]))/2
                    rotation = np.clip(self.np_random.normal(self.stump_rot[0], self.stump_rot[1]),0,2*np.pi)
                    poly = Rotate2D(np.array(poly), anchor, rotation).tolist()
                self.fd_polygon.shape.vertices=poly
                t = self.world.CreateStaticBody(
                    fixtures = self.fd_polygon,
                    userData=CustomUserData("grass", CustomUserDataObjectTypes.TERRAIN))
                t.color1, t.color2 = (1,1,1), (0.6,0.6,0.6)
                self.terrain.append(t)
            elif state==HEXA and oneshot:
                # first point do not move
                poly = []
                delta_pos = []
                for i in range(0,len(self.hexa_shape),2):
                    delta_pos.append(tuple(np.random.normal(self.hexa_shape[i:i+2],0.1)))
                for i,(b,d) in enumerate(zip(self.default_hexagon, delta_pos)):
                    if i != 0 and i != (len(self.default_hexagon)-1):
                        poly.append((x + (b[0]*TERRAIN_STEP) + (d[0]*TERRAIN_STEP),
                                     y + (b[1]*TERRAIN_STEP) + (d[1]*TERRAIN_STEP)))
                    else:
                        poly.append((x + (b[0]*TERRAIN_STEP) + (d[0]*TERRAIN_STEP),
                                     y + (b[1]*TERRAIN_STEP)))
                x += 1
                self.fd_default_hexagon.shape.vertices = poly
                t = self.world.CreateStaticBody(
                    fixtures=self.fd_default_hexagon)
                t.color1, t.color2 = (1.0, np.clip(delta_pos[0][1]/3,0,1), np.clip(delta_pos[-1][1]/3,0,1)), (0.6, 0.6, 0.6)
                self.terrain.append(t)

            elif state==SEQ and oneshot:
                for height, width in zip(self.stump_seq[0::2], self.stump_seq[1::2]):
                    stump_height = max(0.05, self.np_random.normal(height, 0.1))
                    stump_width = max(0.05, self.np_random.normal(width, 0.1))
                    poly = [
                        (x, y),
                        (x + stump_width, y),
                        (x + stump_width, y + stump_height * TERRAIN_STEP),
                        (x, y + stump_height * TERRAIN_STEP),
                    ]
                    x += stump_width
                    self.fd_polygon.shape.vertices = poly
                    t = self.world.CreateStaticBody(
                        fixtures=self.fd_polygon,
                        userData=CustomUserData("grass", CustomUserDataObjectTypes.TERRAIN))
                    t.color1, t.color2 = (1, 1, 1), (0.6, 0.6, 0.6)
                    self.terrain.append(t)

            oneshot = False
            self.terrain_y.append(y)
            if state==GRASS:
                state = self.np_random.randint(1, _STATES_)
                oneshot = True
            else:
                state = GRASS
                oneshot = False

        # Draw terrain
        self.terrain_poly = []
        assert len(self.terrain_x) == len(self.terrain_y)
        for i in range(len(self.terrain_x)-1):
            poly = [
                (self.terrain_x[i],   self.terrain_y[i]),
                (self.terrain_x[i+1], self.terrain_y[i+1])
                ]
            self.fd_edge.shape.vertices=poly
            t = self.world.CreateStaticBody(
                fixtures = self.fd_edge,
                userData=CustomUserData("grass", CustomUserDataObjectTypes.TERRAIN))
            color = (0.3, 1.0 if (i % 2) == 0 else 0.8, 0.3)
            t.color1 = color
            t.color2 = color
            self.terrain.append(t)
            color = (0.4, 0.6, 0.3)
            poly += [ (poly[1][0], 0), (poly[0][0], 0) ]
            self.terrain_poly.append( (poly, color) )
        self.terrain.reverse()

    def _generate_clouds(self):
        # Sorry for the clouds, couldn't resist
        self.cloud_poly   = []
        for i in range(TERRAIN_LENGTH//20):
            x = self.np_random.uniform(0, TERRAIN_LENGTH)*TERRAIN_STEP
            y = VIEWPORT_H/SCALE*3/4
            poly = [
                (x+15*TERRAIN_STEP*math.sin(3.14*2*a/5)+self.np_random.uniform(0,5*TERRAIN_STEP),
                 y+ 5*TERRAIN_STEP*math.cos(3.14*2*a/5)+self.np_random.uniform(0,5*TERRAIN_STEP) )
                for a in range(5) ]
            x1 = min( [p[0] for p in poly] )
            x2 = max( [p[0] for p in poly] )
            self.cloud_poly.append( (poly,x1,x2) )

    def _generate_walker(self):
        init_x = TERRAIN_STEP*self.TERRAIN_STARTPAD/2
        if hasattr(self.walker_body, "old_morphology") and self.walker_body.old_morphology:
            init_y = TERRAIN_HEIGHT + 2 * self.walker_body.LEG_H
        else:
            init_y = TERRAIN_HEIGHT + self.walker_body.AGENT_CENTER_HEIGHT

        self.walker_body.draw(
            self.world,
            init_x,
            init_y,
            self.np_random.uniform(-INITIAL_RANDOM, INITIAL_RANDOM)
        )

Ancestors

  • gym.core.Env
  • typing.Generic
  • gym.utils.ezpickle.EzPickle

Class variables

var action_space : gym.spaces.space.Space[~ActType]
var metadata : Dict[str, Any]
var observation_space : gym.spaces.space.Space[~ObsType]
var render_mode : Union[str, NoneType]
var spec : EnvSpec

Methods

def close(self)

Override close in your subclass to perform any necessary cleanup.

Environments will automatically :meth:close() themselves when garbage collected or when the program exits.

Expand source code
def close(self):
    self._destroy()
    if self.viewer is not None:
        self.viewer.close()
        self.viewer = None
def create_terrain_fixtures(self)

Create fixtures used to generate terrain.

Expand source code
def create_terrain_fixtures(self):
    '''
        Create fixtures used to generate terrain.
    '''
    self.fd_polygon = fixtureDef(
        shape=polygonShape(vertices=
                           [(0, 0),
                            (1, 0),
                            (1, -1),
                            (0, -1)]),
        friction=FRICTION,
        categoryBits=0x1,
        maskBits=0xFFFF
    )

    self.fd_edge = fixtureDef(
        shape=edgeShape(vertices=
                        [(0, 0),
                         (1, 1)]),
        friction=FRICTION,
        categoryBits=0x1,
        maskBits=0xFFFF
    )

    # Init default hexagon fixture and shape, used only for Hexagon Tracks
    self.fd_default_hexagon = fixtureDef(
        shape=polygonShape(vertices=
                           [(0, 0),
                            (1, 0),
                            (1, -1),
                            (0, -1)]),
        friction=FRICTION,
        categoryBits=0x1,
        maskBits=0xFFFF
    )
    self.default_hexagon = [(-0.5, 0), (-0.5, 0.25), (-0.25, 0.5), (0.25, 0.5), (0.5, 0.25), (0.5, 0)]
def generate_game(self)

Generate the task (i.e. terrain + embodiment).

Expand source code
def generate_game(self):
    '''
        Generate the task (i.e. terrain + embodiment).
    '''
    self._generate_terrain()
    self._generate_clouds()
    self._generate_walker()
def render(self, mode='human', draw_lidars=True)

Compute the render frames as specified by render_mode attribute during initialization of the environment.

The set of supported modes varies per environment. (And some third-party environments may not support rendering at all.) By convention, if render_mode is:

  • None (default): no render is computed.
  • human: render return None. The environment is continuously rendered in the current display or terminal. Usually for human consumption.
  • rgb_array: return a single frame representing the current state of the environment. A frame is a numpy.ndarray with shape (x, y, 3) representing RGB values for an x-by-y pixel image.
  • rgb_array_list: return a list of frames representing the states of the environment since the last reset. Each frame is a numpy.ndarray with shape (x, y, 3), as with rgb_array.
  • ansi: Return a strings (str) or StringIO.StringIO containing a terminal-style text representation for each time step. The text can include newlines and ANSI escape sequences (e.g. for colors).

Note

Make sure that your class's metadata 'render_modes' key includes the list of supported modes. It's recommended to call super() in implementations to use the functionality of this method.

Expand source code
def render(self, mode='human', draw_lidars=True):
    #self.scroll = 1
    from gym.envs.classic_control import rendering
    if self.viewer is None:
        self.viewer = rendering.Viewer(RENDERING_VIEWER_W, RENDERING_VIEWER_H)
    self.viewer.set_bounds(self.scroll, RENDERING_VIEWER_W/SCALE + self.scroll, 0, RENDERING_VIEWER_H/SCALE)

    self.viewer.draw_polygon( [
        (self.scroll,                  0),
        (self.scroll+RENDERING_VIEWER_W/SCALE, 0),
        (self.scroll+RENDERING_VIEWER_W/SCALE, RENDERING_VIEWER_H/SCALE),
        (self.scroll,                  RENDERING_VIEWER_H/SCALE),
        ], color=(0.9, 0.9, 1.0) )
    for poly,x1,x2 in self.cloud_poly:
        if x2 < self.scroll/2: continue
        if x1 > self.scroll/2 + RENDERING_VIEWER_W/SCALE: continue
        self.viewer.draw_polygon( [(p[0]+self.scroll/2, p[1]) for p in poly], color=(1,1,1))
    for poly, color in self.terrain_poly:
        if poly[1][0] < self.scroll: continue
        if poly[0][0] > self.scroll + RENDERING_VIEWER_W/SCALE: continue
        self.viewer.draw_polygon(poly, color=color)

    for obj in self.drawlist:
        for f in obj.fixtures:
            trans = f.body.transform
            if type(f.shape) is circleShape:
                t = rendering.Transform(translation=trans*f.shape.pos)
                self.viewer.draw_circle(f.shape.radius, 30, color=obj.color1).add_attr(t)
                self.viewer.draw_circle(f.shape.radius, 30, color=obj.color2, filled=False, linewidth=2).add_attr(t)
            else:
                path = [trans*v for v in f.shape.vertices]
                self.viewer.draw_polygon(path, color=obj.color1)
                path.append(path[0])
                self.viewer.draw_polyline(path, color=obj.color2, linewidth=2)

    # Draw lidars
    if draw_lidars:
        for i in range(len(self.lidar)):
            l = self.lidar[i]
            self.viewer.draw_polyline([l.p1, l.p2], color=(1, 0, 0), linewidth=1)

    flagy1 = TERRAIN_HEIGHT
    flagy2 = flagy1 + 50/SCALE
    x = TERRAIN_STEP*3
    self.viewer.draw_polyline( [(x, flagy1), (x, flagy2)], color=(0,0,0), linewidth=2 )
    f = [(x, flagy2), (x, flagy2-10/SCALE), (x+25/SCALE, flagy2-5/SCALE)]
    self.viewer.draw_polygon(f, color=(0.9,0.2,0) )
    self.viewer.draw_polyline(f + [f[0]], color=(0,0,0), linewidth=2 )

    return self.viewer.render(return_rgb_array = mode=='rgb_array')
def reset(self)

Resets the environment to an initial state and returns the initial observation.

This method can reset the environment's random number generator(s) if seed is an integer or if the environment has not yet initialized a random number generator. If the environment already has a random number generator and :meth:reset is called with seed=None, the RNG should not be reset. Moreover, :meth:reset should (in the typical use case) be called with an integer seed right after initialization and then never again.

Args

seed : optional int
The seed that is used to initialize the environment's PRNG. If the environment does not already have a PRNG and seed=None (the default option) is passed, a seed will be chosen from some source of entropy (e.g. timestamp or /dev/urandom). However, if the environment already has a PRNG and seed=None is passed, the PRNG will not be reset. If you pass an integer, the PRNG will be reset even if it already exists. Usually, you want to pass an integer right after the environment has been initialized and then never again. Please refer to the minimal example above to see this paradigm in action.
options : optional dict
Additional information to specify how the environment is reset (optional, depending on the specific environment)

Returns

observation (object): Observation of the initial state. This will be an element of :attr:observation_space (typically a numpy array) and is analogous to the observation returned by :meth:step. info (dictionary): This dictionary contains auxiliary information complementing observation. It should be analogous to the info returned by :meth:step.

Expand source code
def reset(self):
    self._destroy()
    self.world.contactListener_bug_workaround = ContactDetector(self)
    self.world.contactListener = self.world.contactListener_bug_workaround
    self.head_contact = False
    self.prev_shaping = None
    self.scroll = 0.0
    self.lidar_render = 0

    self.generate_game()

    self.drawlist = self.terrain + self.walker_body.get_elements_to_render()

    self.lidar = [LidarCallback(self.walker_body.reference_head_object.fixtures[0].filterData.maskBits)
                  for _ in range(NB_LIDAR)]
    self.episodic_reward = 0

    return self.step(np.array([0] * self.action_space.shape[0]))[0]
def seed(self, seed=None)
Expand source code
def seed(self, seed=None):
    self.np_random, seed = seeding.np_random(seed)
    return [seed]
def set_environment(self, roughness=None, stump_height=None, stump_width=None, stump_rot=None, obstacle_spacing=None, poly_shape=None, stump_seq=None)

Set the parameters controlling the PCG algorithm to generate a task. Call this method before reset().

Args

roughness
Input vector controlling the CPPN
stump_height
Tuple specifying mean and std of a normal distribution from which the height of each stump is sampled
stump_width
Tuple specifying mean and std of a normal distribution from which the width of each stump is sampled
stump_rot
Tuple specifying mean and std of a normal distribution from which the rotation degree of each stump is sampled
obstacle_spacing
Spacing between stumps
poly_shape
Shape of polygon stumps
Expand source code
def set_environment(self, roughness=None, stump_height=None, stump_width=None, stump_rot=None,
                    obstacle_spacing=None, poly_shape=None, stump_seq=None):
    '''
        Set the parameters controlling the PCG algorithm to generate a task.
        Call this method before `reset()`.

        Args:
            roughness: Input vector controlling the CPPN
            stump_height: Tuple specifying mean and std of a normal distribution from which the height of each stump is sampled
            stump_width: Tuple specifying mean and std of a normal distribution from which the width of each stump is sampled
            stump_rot: Tuple specifying mean and std of a normal distribution from which the rotation degree of each stump is sampled
            obstacle_spacing: Spacing between stumps
            poly_shape: Shape of polygon stumps
    '''
    self.roughness = roughness if roughness else 0
    self.obstacle_spacing = max(0.01, obstacle_spacing) if obstacle_spacing is not None else 8.0
    self.stump_height = [stump_height, 0.1] if stump_height is not None else None
    self.stump_width = stump_width
    self.stump_rot = stump_rot
    self.hexa_shape = poly_shape
    self.stump_seq = stump_seq
    if poly_shape is not None:
        self.hexa_shape = np.interp(poly_shape,[0,4],[0,4]).tolist()
        assert(len(poly_shape) == 12)
        self.hexa_shape = self.hexa_shape[0:12]
def step(self, action)

Run one timestep of the environment's dynamics.

When end of episode is reached, you are responsible for calling :meth:reset to reset this environment's state. Accepts an action and returns either a tuple (observation, reward, terminated, truncated, info).

Args

action : ActType
an action provided by the agent

Returns

observation (object): this will be an element of the environment's :attr:observation_space.
This may, for instance, be a numpy array containing the positions and velocities of certain objects.
reward (float): The amount of reward returned as a result of taking the action.
terminated (bool): whether a terminal state (as defined under the MDP of the task) is reached.
In this case further step() calls could return undefined results.
truncated (bool): whether a truncation condition outside the scope of the MDP is satisfied.
Typically a timelimit, but could also be used to indicate agent physically going out of bounds.
Can be used to end the episode prematurely before a terminal state is reached.
info (dictionary): info contains auxiliary diagnostic information (helpful for debugging, learning, and logging).
This might, for instance, contain
metrics that describe the agent's performance state, variables that are hidden from observations, or individual reward terms that are combined to produce the total reward. It also can contain information that distinguishes truncation and termination, however this is deprecated in favour of returning two booleans, and will be removed in a future version.
(deprecated)
done (bool): A boolean value for if the episode has ended, in which case further :meth:step calls will return undefined results.
A done signal may be emitted for different reasons
Maybe the task underlying the environment was solved successfully, a certain timelimit was exceeded, or the physics simulation has entered an invalid state.
Expand source code
def step(self, action):
    self.walker_body.activate_motors(action)

    self.world.Step(1.0/FPS, 6*30, 2*30)

    head = self.walker_body.reference_head_object
    pos = head.position
    vel = head.linearVelocity

    for i in range(NB_LIDAR):
        self.lidar[i].fraction = 1.0
        self.lidar[i].p1 = pos
        self.lidar[i].p2 = (
            pos[0] + math.sin(1.5*i/NB_LIDAR)*LIDAR_RANGE,
            pos[1] - math.cos(1.5*i/NB_LIDAR)*LIDAR_RANGE)
        self.world.RayCast(self.lidar[i], self.lidar[i].p1, self.lidar[i].p2)
    state = [
        head.angle,        # Normal angles up to 0.5 here, but sure more is possible.
        2.0*head.angularVelocity/FPS,
        0.3*vel.x*(VIEWPORT_W/SCALE)/FPS,  # Normalized to get -1..1 range
        0.3*vel.y*(VIEWPORT_H/SCALE)/FPS]

    # add leg-related state
    state.extend(self.walker_body.get_motors_state())

    if self.walker_body.body_type == BodyTypesEnum.CLIMBER:
        state.extend(self.walker_body.get_sensors_state())

    state += [l.fraction for l in self.lidar]

    self.scroll = pos.x - RENDERING_VIEWER_W/SCALE/5

    shaping  = 130*pos[0]/SCALE  # moving forward is a way to receive reward (normalized to get 300 on completion)
    if not (hasattr(self.walker_body, "remove_reward_on_head_angle") and self.walker_body.remove_reward_on_head_angle):
        shaping -= 5.0*abs(state[0])  # keep head straight, other than that and falling, any behavior is unpunished

    reward = 0
    if self.prev_shaping is not None:
        reward = shaping - self.prev_shaping
    self.prev_shaping = shaping

    for a in action:
        reward -= self.walker_body.TORQUE_PENALTY * 80 * np.clip(np.abs(a), 0, 1) # 80 => Original torque
        # normalized to about -50.0 using heuristic, more optimal agent should spend less

    done = False
    if self.head_contact or pos[0] < 0:
        reward = -100
        done   = True
    if pos[0] > (TERRAIN_LENGTH-TERRAIN_END)*TERRAIN_STEP:
        done   = True
    self.episodic_reward += reward
    return np.array(state), reward, done, {"success": self.episodic_reward > 230}