Tactics2d
Train racing agent
Initializing search
    GitHub
    GitHub
    • Home
    • Release Notes
    • Installation
    • Dataset Support
      • CitySim
      • Dragon Lake Parking
      • DriveInsight
      • highD (LevelX)
      • inD (LevelX)
      • rounD (LevelX)
      • exiD (LevelX)
      • uniD (LevelX)
      • NuPlan
      • Waymo Open Motion Dataset (WOMD)
      • tactics2d.controller
      • tactics2d.dataset_parser
      • tactics2d.envs
      • tactics2d.geometry
      • tactics2d.interpolator
      • tactics2d.map
      • tactics2d.participant
      • tactics2d.physics
      • tactics2d.sensor
      • tactics2d.traffic
      • Graph-based Planning Algorithms
      • Sampling-based Planning Algorithms
      • Routing Demo on WOMD Map
      • LimSim PDP Behavior Tutorial
      • LimSim PDP Visualization Demo
    • Community
    • Publications

    Copyright (C) 2024, Tactics2D Authors. Released under the GNU GPLv3. SPDX-License-Identifier: GPL-3.0-or-later

    In [ ]:
    Copied!
    """Train racing agent implementation."""
    
    """Train racing agent implementation."""
    In [ ]:
    Copied!
    import sys
    
    import sys
    In [ ]:
    Copied!
    sys.path.append(".")
    sys.path.append("./rllib")
    sys.path.append("..")
    
    sys.path.append(".") sys.path.append("./rllib") sys.path.append("..")
    In [ ]:
    Copied!
    from collections import deque
    
    from collections import deque
    In [ ]:
    Copied!
    import gymnasium as gym
    import numpy as np
    import torch
    import torch.nn as nn
    import tqdm
    import wandb
    from rllib.algorithms.ppo import *
    
    import gymnasium as gym import numpy as np import torch import torch.nn as nn import tqdm import wandb from rllib.algorithms.ppo import *
    In [ ]:
    Copied!
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    In [ ]:
    Copied!
    wandb.init(project="tactics2d-racing")
    
    wandb.init(project="tactics2d-racing")

    ========================= Define the network¶

    In [ ]:
    Copied!
    def orthogonal_init(layer, gain: float = np.sqrt(2), constant: float = 0.0):
        nn.init.orthogonal_(layer.weight.data, gain)
        nn.init.constant_(layer.bias.data, constant)
        return layer
    
    def orthogonal_init(layer, gain: float = np.sqrt(2), constant: float = 0.0): nn.init.orthogonal_(layer.weight.data, gain) nn.init.constant_(layer.bias.data, constant) return layer
    In [ ]:
    Copied!
    class ImageEncoder(nn.Module):
        def __init__(self, channels: list) -> None:
            super().__init__()
            in_channels, out_channels = channels[:-1], channels[1:]
            self.net = nn.Sequential()
            for in_channel, out_channel in zip(in_channels, out_channels):
                self.net.append(orthogonal_init(nn.Conv2d(in_channel, out_channel, 3, 1)))
                self.net.append(nn.Tanh())
                self.net.append(nn.MaxPool2d(2))
    
            self.net.append(nn.Flatten())
    
        def forward(self, x):
            return self.net(x)
    
    class ImageEncoder(nn.Module): def __init__(self, channels: list) -> None: super().__init__() in_channels, out_channels = channels[:-1], channels[1:] self.net = nn.Sequential() for in_channel, out_channel in zip(in_channels, out_channels): self.net.append(orthogonal_init(nn.Conv2d(in_channel, out_channel, 3, 1))) self.net.append(nn.Tanh()) self.net.append(nn.MaxPool2d(2)) self.net.append(nn.Flatten()) def forward(self, x): return self.net(x)
    In [ ]:
    Copied!
    class AgentActor(PPOActor):
        def __init__(self, state_dim, action_dim, hidden_size, continuous):
            super().__init__(state_dim, action_dim, hidden_size, continuous)
    
            self.encoder = ImageEncoder([12, 4, 16, 64])
    
            self.net = nn.Sequential(
                orthogonal_init(nn.Linear(6400, 640)),
                nn.Tanh(),
                orthogonal_init(nn.Linear(640, 128)),
                nn.Tanh(),
                orthogonal_init(nn.Linear(128, 32)),
                nn.Tanh(),
                orthogonal_init(nn.Linear(32, action_dim)),
                nn.Tanh(),
            )
    
        def forward(self, state):
            state = self.encoder(state)
            return self.net(state)
    
    class AgentActor(PPOActor): def __init__(self, state_dim, action_dim, hidden_size, continuous): super().__init__(state_dim, action_dim, hidden_size, continuous) self.encoder = ImageEncoder([12, 4, 16, 64]) self.net = nn.Sequential( orthogonal_init(nn.Linear(6400, 640)), nn.Tanh(), orthogonal_init(nn.Linear(640, 128)), nn.Tanh(), orthogonal_init(nn.Linear(128, 32)), nn.Tanh(), orthogonal_init(nn.Linear(32, action_dim)), nn.Tanh(), ) def forward(self, state): state = self.encoder(state) return self.net(state)
    In [ ]:
    Copied!
    class AgentCritic(PPOCritic):
        def __init__(self, state_dim, hidden_size):
            super().__init__(state_dim, hidden_size)
    
            self.encoder = ImageEncoder([12, 4, 16, 64])
    
            self.net = nn.Sequential(
                orthogonal_init(nn.Linear(6400, 640)),
                nn.Tanh(),
                orthogonal_init(nn.Linear(640, 128)),
                nn.Tanh(),
                orthogonal_init(nn.Linear(128, 32)),
                nn.Tanh(),
                orthogonal_init(nn.Linear(32, 1)),
            )
    
        def forward(self, state):
            state = self.encoder(state)
            return self.net(state)
    
    class AgentCritic(PPOCritic): def __init__(self, state_dim, hidden_size): super().__init__(state_dim, hidden_size) self.encoder = ImageEncoder([12, 4, 16, 64]) self.net = nn.Sequential( orthogonal_init(nn.Linear(6400, 640)), nn.Tanh(), orthogonal_init(nn.Linear(640, 128)), nn.Tanh(), orthogonal_init(nn.Linear(128, 32)), nn.Tanh(), orthogonal_init(nn.Linear(32, 1)), ) def forward(self, state): state = self.encoder(state) return self.net(state)

    ========================= Define the environment wrapper¶

    In [ ]:
    Copied!
    class RacingWrapper(gym.Wrapper):
        def __init__(self, env):
            super().__init__(env)
            self.history_states = [np.zeros((1, 3, 96, 96))] * 4
    
            self.prev_position = (0, 0)
            self.track_len = 0
            self.next_closet_point = 0
    
        def _process_action(self, action):
            # action = action.cpu().detach().numpy()[0]
            return [action[0], 0 if action[1] < 0 else action[1], 0 if action[1] > 0 else action[1]]
    
        def _process_observation(self, state):
            state = state[None, ...].transpose((0, 3, 1, 2)) / 255.0
            self.history_states.pop(0)
            self.history_states.append(state)
    
        def is_pass(self, x, y):
            _, beta, ckpt_x, ckpt_y = self.env.track[self.next_closet_point]
            ckpt_line = (ckpt_x + np.cos(beta), ckpt_y + np.sin(beta))
            if (ckpt_x - x) * (ckpt_line[1] - y) - (ckpt_y - y) * (ckpt_line[0] - x) >= 0:
                return True
            else:
                return False
    
        def _process_reward(self, reward):
            reward = max(0, reward)
            # car position
            x, y = self.env.unwrapped.car.hull.position
    
            if (self.prev_position[0] - x) ** 2 + (self.prev_position[1] - y) ** 2 < 0.0001:
                reward -= 0.1
    
            if (self.prev_position[0] - x) ** 2 + (self.prev_position[1] - y) ** 2 > 0.8:
                reward -= 0.2
    
            self.prev_position = (x, y)
            if self.is_pass(x, y):
                self.next_closet_point += 1
    
            if self.next_closet_point == self.track_len:
                self.next_closet_point = self.track_len - 1
    
            _, _, x1, y1 = self.env.track[self.next_closet_point]
            if self.next_closet_point == 0:
                _, _, x2, y2 = self.env.track[self.next_closet_point + 1]
            else:
                _, _, x2, y2 = self.env.track[self.next_closet_point - 1]
    
            dist = (
                (x - x1) ** 2
                + (y - y1) ** 2
                - ((x - x1) * (x2 - x1) + (y - y1) * (y2 - y1)) / ((x1 - x2) ** 2 + (y1 - y2) ** 2)
            )
            if dist < 0:
                dist = 1
            dist = np.sqrt(dist)
    
            if dist > 40 / 6:
                return -20
    
            return min(reward, 1)
    
        def step(self, action):
            action = self._process_action(action)
            state, reward, terminated, truncated, info = self.env.step(action)
            self._process_observation(state)
            processed_state = np.concatenate(self.history_states, axis=1)
    
            reward = self._process_reward(reward)
            if reward < -10:
                truncated = True
    
            return processed_state, reward, terminated, truncated, info
    
        def reset(self):
            state, info = self.env.reset()
            self._process_observation(state)
            processed_state = np.concatenate(self.history_states, axis=1)
    
            x, y = self.env.unwrapped.car.hull.position
            self.prev_position = (x, y)
            waypoints = np.array([[point[2], point[3]] for point in self.env.track])
            self.track_len = len(waypoints)
            distance = np.sqrt((waypoints[:, 0] - x) ** 2 + (waypoints[:, 1] - y) ** 2)
            self.next_closet_point = np.argsort(distance)[0]
    
            return processed_state, info
    
    class RacingWrapper(gym.Wrapper): def __init__(self, env): super().__init__(env) self.history_states = [np.zeros((1, 3, 96, 96))] * 4 self.prev_position = (0, 0) self.track_len = 0 self.next_closet_point = 0 def _process_action(self, action): # action = action.cpu().detach().numpy()[0] return [action[0], 0 if action[1] < 0 else action[1], 0 if action[1] > 0 else action[1]] def _process_observation(self, state): state = state[None, ...].transpose((0, 3, 1, 2)) / 255.0 self.history_states.pop(0) self.history_states.append(state) def is_pass(self, x, y): _, beta, ckpt_x, ckpt_y = self.env.track[self.next_closet_point] ckpt_line = (ckpt_x + np.cos(beta), ckpt_y + np.sin(beta)) if (ckpt_x - x) * (ckpt_line[1] - y) - (ckpt_y - y) * (ckpt_line[0] - x) >= 0: return True else: return False def _process_reward(self, reward): reward = max(0, reward) # car position x, y = self.env.unwrapped.car.hull.position if (self.prev_position[0] - x) ** 2 + (self.prev_position[1] - y) ** 2 < 0.0001: reward -= 0.1 if (self.prev_position[0] - x) ** 2 + (self.prev_position[1] - y) ** 2 > 0.8: reward -= 0.2 self.prev_position = (x, y) if self.is_pass(x, y): self.next_closet_point += 1 if self.next_closet_point == self.track_len: self.next_closet_point = self.track_len - 1 _, _, x1, y1 = self.env.track[self.next_closet_point] if self.next_closet_point == 0: _, _, x2, y2 = self.env.track[self.next_closet_point + 1] else: _, _, x2, y2 = self.env.track[self.next_closet_point - 1] dist = ( (x - x1) ** 2 + (y - y1) ** 2 - ((x - x1) * (x2 - x1) + (y - y1) * (y2 - y1)) / ((x1 - x2) ** 2 + (y1 - y2) ** 2) ) if dist < 0: dist = 1 dist = np.sqrt(dist) if dist > 40 / 6: return -20 return min(reward, 1) def step(self, action): action = self._process_action(action) state, reward, terminated, truncated, info = self.env.step(action) self._process_observation(state) processed_state = np.concatenate(self.history_states, axis=1) reward = self._process_reward(reward) if reward < -10: truncated = True return processed_state, reward, terminated, truncated, info def reset(self): state, info = self.env.reset() self._process_observation(state) processed_state = np.concatenate(self.history_states, axis=1) x, y = self.env.unwrapped.car.hull.position self.prev_position = (x, y) waypoints = np.array([[point[2], point[3]] for point in self.env.track]) self.track_len = len(waypoints) distance = np.sqrt((waypoints[:, 0] - x) ** 2 + (waypoints[:, 1] - y) ** 2) self.next_closet_point = np.argsort(distance)[0] return processed_state, info
    In [ ]:
    Copied!
    def trainer():
        num_epoch = 100
    
        env = gym.make("CarRacing-v2", render_mode="human")
        env = RacingWrapper(env)
        state, info = env.reset()
        done = False
        total_reward = 0
        rewards = deque(maxlen=100)
    
        agent_configs = PPOConfig(
            {
                "debug": True,
                "state_space": env.observation_space,
                "action_dim": 2,
                "actor_net": AgentActor,
                "actor_kwargs": {
                    "state_dim": 6400,
                    "action_dim": 2,
                    "hidden_size": 32,
                    "continuous": True,
                },
                "critic_net": AgentCritic,
                "critic_kwargs": {"state_dim": 6400, "hidden_size": 32},
                "vf_coef": 1,
                "gae_lambda": 0.97,
                "adv_norm": False,
            }
        )
        agent = PPO(agent_configs, device)
    
        wandb.config.update(agent_configs.__dict__)
        for i in range(num_epoch):
            for t in tqdm.tqdm(range(2048)):
                action, log_prob, value = agent.get_action(state)
                next_state, reward, terminated, truncated, info = env.step(action[0])
                env.render()
    
                transition = (
                    (next_state, [reward], [terminated], [truncated], [info]),
                    state,
                    action,
                    log_prob,
                    value,
                )
                agent.push(transition)
    
                state = next_state
                done = terminated or truncated
                total_reward += reward
    
                train_result = agent.train()
                if train_result is not None:
                    loss, loss_clip, loss_vf, loss_entropy = train_result
                    wandb.log(
                        {
                            "loss": loss,
                            "loss_clip": loss_clip,
                            "loss_vf": loss_vf,
                            "loss_entropy": loss_entropy,
                        }
                    )
    
                if done:
                    rewards.append(total_reward)
                    wandb.log({"mean_reward": np.mean(rewards), "reward": total_reward})
                    state, info = env.reset()
                    done = False
                    total_reward = 0
    
                wandb.log(
                    {
                        "reward": reward,
                        "value": value,
                        "log_prob_0": log_prob[0][0],
                        "log_prob_1": log_prob[0][1],
                    }
                )
    
            print(f"epoch {i}, mean reward: {np.mean(rewards)}")
    
    def trainer(): num_epoch = 100 env = gym.make("CarRacing-v2", render_mode="human") env = RacingWrapper(env) state, info = env.reset() done = False total_reward = 0 rewards = deque(maxlen=100) agent_configs = PPOConfig( { "debug": True, "state_space": env.observation_space, "action_dim": 2, "actor_net": AgentActor, "actor_kwargs": { "state_dim": 6400, "action_dim": 2, "hidden_size": 32, "continuous": True, }, "critic_net": AgentCritic, "critic_kwargs": {"state_dim": 6400, "hidden_size": 32}, "vf_coef": 1, "gae_lambda": 0.97, "adv_norm": False, } ) agent = PPO(agent_configs, device) wandb.config.update(agent_configs.__dict__) for i in range(num_epoch): for t in tqdm.tqdm(range(2048)): action, log_prob, value = agent.get_action(state) next_state, reward, terminated, truncated, info = env.step(action[0]) env.render() transition = ( (next_state, [reward], [terminated], [truncated], [info]), state, action, log_prob, value, ) agent.push(transition) state = next_state done = terminated or truncated total_reward += reward train_result = agent.train() if train_result is not None: loss, loss_clip, loss_vf, loss_entropy = train_result wandb.log( { "loss": loss, "loss_clip": loss_clip, "loss_vf": loss_vf, "loss_entropy": loss_entropy, } ) if done: rewards.append(total_reward) wandb.log({"mean_reward": np.mean(rewards), "reward": total_reward}) state, info = env.reset() done = False total_reward = 0 wandb.log( { "reward": reward, "value": value, "log_prob_0": log_prob[0][0], "log_prob_1": log_prob[0][1], } ) print(f"epoch {i}, mean reward: {np.mean(rewards)}")
    In [ ]:
    Copied!
    if __name__ == "__main__":
        trainer()
    
    if __name__ == "__main__": trainer()
    Made with Material for MkDocs