LimSim PDP Behavior Tutorial¶
This notebook demonstrates the LimSim-style Prediction-Decision-Planning (PDP) behavior model in Tactics2D. It focuses on the API and textual diagnostics rather than animation.
The implementation follows the non-LLM LimSim interaction pipeline at a Tactics2D-native level: local Region of Interest (RoI) selection, interaction grouping, MCT/MCTS-based joint behavior decisions, and a Frenet-style final trajectory planner on lane geometry. The model outputs normal Tactics2D Trajectory objects, so results can be evaluated, visualized, or reused downstream.
Compared with the official full LimSim planner, this implementation is still intermediate, but it now includes rolling prediction memory, vehicle-only WOMD control, background obstacle prediction, reference-path Frenet conversion, quartic/quintic trajectory sampling, lane-change candidates, stop/traffic-light/junction-aware map queries, and rolling-simulation metrics.
1. Import dependencies¶
The imports follow the same style as the other tutorial notebooks. If direct imports fail in a fresh local environment, install the repository in editable mode first with pip install -e ..
%matplotlib inline
import warnings
from math import hypot
from pathlib import Path
import sys
warnings.filterwarnings("ignore")
def find_repo_root(start):
current = Path(start).resolve()
for candidate in [current, *current.parents]:
if (candidate / "tactics2d" / "__init__.py").exists():
return candidate
raise RuntimeError("Cannot find the Tactics2D repository root.")
repo_root = find_repo_root(Path.cwd())
if str(repo_root) not in sys.path:
sys.path.insert(0, str(repo_root))
import numpy as np
from shapely.geometry import LineString
from tactics2d.behavior import LimSimBehaviorModel
from tactics2d.behavior.limsim.config import LimSimConfig
from tactics2d.behavior.limsim.evaluation import (
dimensions_from_participants,
evaluate_planning_result,
evaluate_rolling_result,
)
from tactics2d.behavior.limsim.frenet_planner import (
FrenetTrajectoryPlanner,
build_reference_path_from_agent,
)
from tactics2d.behavior.limsim.prediction import LimSimPredictor
from tactics2d.behavior.limsim.roi import RoISelector
from tactics2d.behavior.limsim.rolling import LimSimRollingRunner
from tactics2d.map.element import Lane, LaneRelationship, Map
from tactics2d.participant.element import Vehicle
from tactics2d.participant.trajectory import State, Trajectory
def build_lane(lane_id, x_left, x_right, y_start, y_end):
left_side = LineString([(x_left, y_start), (x_left, y_end)])
right_side = LineString([(x_right, y_start), (x_right, y_end)])
return Lane(
id_=lane_id,
left_side=left_side,
right_side=right_side,
custom_tags={
"centerline": np.array(
[[(x_left + x_right) / 2.0, y_start], [(x_left + x_right) / 2.0, y_end]]
)
},
)
def build_vehicle(agent_id, frame, x, y, heading=np.pi / 2, speed=5.0):
trajectory = Trajectory(id_=agent_id, fps=10, stable_freq=True)
trajectory.add_state(
State(
frame=frame,
x=x,
y=y,
heading=heading,
vx=speed * np.cos(heading),
vy=speed * np.sin(heading),
)
)
return Vehicle(agent_id, "vehicle", trajectory=trajectory, length=4.5, width=1.8)
2. What this model currently does¶
For one PDP update, the model performs the following steps:
- Select vehicle participants inside a Region of Interest (RoI).
- Keep nearby outer-band vehicles as background obstacles.
- Convert Tactics2D participants and map lanes into compact decision states.
- Build local interaction groups using distance, lane topology, successor-lane relation, and junction conflict cues.
- Run group-wise MCTS over LimSim-style actions:
KS,AC,DC,LCL, andLCR. - Convert each selected action into a final short-horizon trajectory with a Frenet-style planner.
- Optionally run repeated rolling PDP updates, reusing previous planned trajectories in the next prediction step.
The final planner is separate from the MCTS rollout model. MCTS uses a fast lane-following rollout to search action combinations, then the final planner samples polynomial Frenet trajectories and scores them against dynamic obstacles and map semantics. For non-lane-change actions (KS, AC, DC), both rollout and final planning keep the current lateral offset; only LCL and LCR intentionally change lateral targets.
map_ = Map(name="limsim_tutorial_map")
lane_a = build_lane("A", 0.0, 2.0, 0.0, 80.0)
lane_b = build_lane("B", 2.0, 4.0, 0.0, 80.0)
lane_a.add_related_lane("B", LaneRelationship.RIGHT_NEIGHBOR)
lane_b.add_related_lane("A", LaneRelationship.LEFT_NEIGHBOR)
map_.add_lane(lane_a)
map_.add_lane(lane_b)
participants = {
1: build_vehicle(1, 0, 1.0, 5.0, speed=5.0),
2: build_vehicle(2, 0, 1.0, 11.0, speed=2.0),
3: build_vehicle(3, 0, 3.0, 45.0, speed=4.0),
}
3. Run single-frame PDP planning¶
The returned result contains interaction groups, selected high-level actions, MCTS root nodes, and planned Tactics2D trajectories. In this small synthetic scene, vehicles 1 and 2 are close enough to be planned as one interaction group, while vehicle 3 is far away and can be planned independently.
This cell demonstrates one single-frame PDP call through LimSimBehaviorModel.plan(...). Rolling closed-loop behavior is covered later and visualized in limsim_pdp_visualization.ipynb.
config = LimSimConfig(horizon_steps=20, mcts_iterations=60, interaction_distance=20.0)
behavior_model = LimSimBehaviorModel(config)
result = behavior_model.plan(participants, map_, frame=0)
evaluation = evaluate_planning_result(
result,
dimensions=dimensions_from_participants(participants),
)
print("interaction groups:", result.groups)
print("selected actions:", {agent_id: action.value for agent_id, action in result.actions.items()})
print("evaluation:", {"actions": evaluation.action_counts, "collision": evaluation.has_collision})
scene_states = behavior_model.scene_builder.build(participants, map_, frame=0)
reference_path = build_reference_path_from_agent(scene_states[1], map_, config)
sampled = FrenetTrajectoryPlanner(config).sample_candidates(
scene_states[1], result.actions[1], reference_path, map_
)
print("frenet candidates for agent 1:", len(sampled))
print("best candidate cost:", round(min(candidate.cost for candidate in sampled), 3))
interaction groups: [[1, 2], [3]]
selected actions: {1: 'DC', 2: 'AC', 3: 'AC'}
evaluation: {'actions': {'DC': 1, 'AC': 2}, 'collision': False}
frenet candidates for agent 1: 3
best candidate cost: 128.976
4. Inspect planned and predicted trajectories¶
result.trajectories contains final Frenet-style planned trajectories after MCTS selects actions. LimSimPredictor provides the lightweight prediction step used by the reproduced pipeline: if a previous plan is available it can reuse the remaining planned trajectory; otherwise it rolls the participant forward with a lane-following rule.
This distinction matters in rolling PDP: each update executes only the next state, but the previous multi-step plan still helps the next prediction step avoid starting from scratch.
predictor = LimSimPredictor(config)
prediction = predictor.predict(participants, map_, frame=0)
for agent_id, trajectory in result.trajectories.items():
trace = trajectory.get_trace()
print("planned", agent_id, result.actions[agent_id].value, trace[:3], "...", trace[-3:])
for agent_id, trajectory in prediction.items():
print("predicted", agent_id, len(trajectory.frames), "states")
planned 1 DC [(1.0, 5.461641666666667), (1.0, 5.8529333333333335), (1.0, 6.182974999999999)] ... [(1.0, 8.135600000000002), (1.0, 8.233641666666667), (1.0, 8.333333333333334)] planned 2 AC [(1.0, 11.203368541666666), (1.0, 11.412963333333334), (1.0, 11.628051874999999)] ... [(1.0, 15.15323), (1.0, 15.393318541666668), (1.0, 15.633333333333333)] planned 3 AC [(3.0, 45.40336854166667), (3.0, 45.812963333333336), (3.0, 46.228051875000006)] ... [(3.0, 52.75323), (3.0, 53.19331854166667), (3.0, 53.63333333333333)] predicted 1 20 states predicted 2 20 states predicted 3 20 states
5. WOMD RoI examples without animation¶
For WOMD, use the existing parser and pass the parsed participants and map directly into the same behavior model. The examples below use small RoIs: vehicles inside the RoI are controlled by PDP, while nearby outer-band vehicles are kept as predicted background obstacles. Non-vehicle WOMD participants are filtered out to keep the reproduced LimSim PDP vehicle-centric for the current planner.
Two compact interaction cases are shown from the local interactive validation sample:
- ego-centered RoI: center follows vehicle ego
371fromframe=1799, radius15 m; this case containsAC,DC,LCL, andKSin rolling evaluation. - fixed physical RoI: center is fixed at vehicle
26's initial position in scenario10, radius12 m; this case highlights local acceleration interaction in a fixed road region.
The output is textual on purpose: RoI agents, background agents, interaction groups, selected actions, collision flag, and closest planned distance are the key signals for checking whether the interaction logic is behaving reasonably. The GIF version of these examples is in limsim_pdp_visualization.ipynb.
from tactics2d.dataset_parser import WOMDParser
def min_planned_distance(planning_result):
"""Return the closest pair distance among planned trajectories."""
closest = None
items = list(planning_result.trajectories.items())
for index, (agent_id, trajectory) in enumerate(items):
for other_id, other_trajectory in items[index + 1 :]:
shared_frames = sorted(set(trajectory.frames).intersection(other_trajectory.frames))
for frame_id in shared_frames:
state = trajectory.get_state(frame_id)
other_state = other_trajectory.get_state(frame_id)
distance = hypot(state.x - other_state.x, state.y - other_state.y)
if closest is None or distance < closest["distance"]:
closest = {
"distance": distance,
"agents": (agent_id, other_id),
"frame": frame_id,
}
return closest
def run_single_frame_case(case, participants, map_, config):
behavior_model = LimSimBehaviorModel(config)
if case["mode"] == "ego":
planning_result = behavior_model.plan(
participants,
map_,
case["frame"],
ego_id=case["ego_id"],
roi_radius=case["roi_radius"],
roi_outer_radius=case["roi_outer_radius"],
)
else:
anchor_state = participants[case["anchor_id"]].trajectory.get_state(case["frame"])
planning_result = behavior_model.plan(
participants,
map_,
case["frame"],
roi_center=anchor_state.location,
roi_radius=case["roi_radius"],
roi_outer_radius=case["roi_outer_radius"],
)
selected = {agent_id: participants[agent_id] for agent_id in planning_result.roi_agent_ids}
evaluation = evaluate_planning_result(
planning_result,
dimensions=dimensions_from_participants(selected),
)
closest = min_planned_distance(planning_result)
return planning_result, evaluation, closest
def run_rolling_case(case, participants, map_, config):
runner = LimSimRollingRunner(config)
kwargs = {
"start_frame": case["frame"],
"simulation_steps": case["rolling_steps"],
"roi_radius": case["roi_radius"],
"roi_outer_radius": case["roi_outer_radius"],
}
if case["mode"] == "ego":
kwargs["ego_id"] = case["ego_id"]
else:
anchor_state = participants[case["anchor_id"]].trajectory.get_state(case["frame"])
kwargs["roi_center"] = anchor_state.location
rolling_result = runner.run(participants, map_, **kwargs)
evaluation = evaluate_rolling_result(
rolling_result, dimensions_from_participants(rolling_result.participants)
)
return rolling_result, evaluation
folder = repo_root / "tactics2d" / "data" / "trajectory_sample" / "WOMD"
file_name = "uncompressed_scenario_validation_interactive_validation_interactive.tfrecord-00000-of-00150"
cases = [
{
"name": "ego-centered RoI",
"scenario_index": 0,
"mode": "ego",
"frame": 1799,
"ego_id": 371,
"roi_radius": 15.0,
"roi_outer_radius": 27.0,
"rolling_steps": 14,
},
{
"name": "fixed physical RoI",
"scenario_index": 10,
"mode": "fixed",
"frame": 0,
"anchor_id": 26,
"roi_radius": 12.0,
"roi_outer_radius": 21.6,
"rolling_steps": 14,
},
]
if (folder / file_name).exists():
parser = WOMDParser()
single_frame_config = LimSimConfig(horizon_steps=8, mcts_iterations=15, interaction_distance=16.0)
rolling_config = LimSimConfig(horizon_steps=20, mcts_iterations=25, interaction_distance=16.0)
parsed_cache = {}
for case in cases:
scenario_index = case["scenario_index"]
if scenario_index not in parsed_cache:
participants_womd, _ = parser.parse_trajectory(
scenario_index, file=file_name, folder=str(folder)
)
map_womd = parser.parse_map(scenario_index, file=file_name, folder=str(folder))
parsed_cache[scenario_index] = (participants_womd, map_womd)
participants_womd, map_womd = parsed_cache[scenario_index]
planning_result, planning_evaluation, closest = run_single_frame_case(
case, participants_womd, map_womd, single_frame_config
)
rolling_result, rolling_evaluation = run_rolling_case(
case, participants_womd, map_womd, rolling_config
)
print("\n", case["name"])
print("single-frame RoI agents:", planning_result.roi_agent_ids)
print("single-frame background agents:", planning_result.background_agent_ids)
print("single-frame groups:", planning_result.groups)
print("single-frame actions:", {agent_id: action.value for agent_id, action in planning_result.actions.items()})
print("single-frame collision:", planning_evaluation.has_collision)
if closest is not None:
print(
"closest planned distance:",
round(closest["distance"], 3),
"between",
closest["agents"],
"at frame",
closest["frame"],
)
print("rolling frames:", rolling_result.frames[:3], "...", rolling_result.frames[-3:])
print("rolling first RoI agents:", rolling_result.results[0].roi_agent_ids)
print("rolling first groups:", rolling_result.results[0].groups)
print("rolling action counts:", rolling_evaluation.action_counts)
print("rolling min distance:", round(rolling_evaluation.min_distance, 3))
print("rolling collisions:", rolling_evaluation.collision_count)
print("rolling action switches:", rolling_evaluation.action_switch_count)
print("rolling memory hits:", rolling_evaluation.memory_hit_count)
else:
print("WOMD sample file is not available in this checkout:", folder / file_name)
ego-centered RoI
single-frame RoI agents: [366, 367, 371]
single-frame background agents: [368]
single-frame groups: [[371, 366], [367]]
single-frame actions: {371: 'AC', 366: 'AC', 367: 'AC'}
single-frame collision: False
closest planned distance: 7.322 between (366, 371) at frame 1999
rolling frames: [1799, 1899, 1999] ... [2999, 3099, 3199]
rolling first RoI agents: [366, 367, 371]
rolling first groups: [[371, 366], [367]]
rolling action counts: {'KS': 3, 'AC': 27, 'LCL': 13, 'DC': 1}
rolling min distance: 5.038
rolling collisions: 0
rolling action switches: 3
rolling memory hits: 29
fixed physical RoI
single-frame RoI agents: [26, 29, 31, 33]
single-frame background agents: [32, 34]
single-frame groups: [[26, 33, 31], [29]]
single-frame actions: {26: 'AC', 33: 'AC', 31: 'AC', 29: 'AC'}
single-frame collision: False
closest planned distance: 4.386 between (26, 31) at frame 800
rolling frames: [0, 100, 200] ... [1200, 1300, 1400]
rolling first RoI agents: [26, 29, 31, 33]
rolling first groups: [[26, 33, 31], [29]]
rolling action counts: {'AC': 29, 'KS': 27}
rolling min distance: 4.169
rolling collisions: 0
rolling action switches: 19
rolling memory hits: 52
6. Current scope and limitations¶
This reproduction currently supports RoI selection, vehicle-only WOMD control, background obstacle prediction, interaction grouping, MCTS behavior selection, lane-topology-aware rollout, rolling prediction-memory reuse, Frenet-style final trajectory sampling, dynamic-obstacle scoring, rule-based prediction, and basic evaluation. It can be used to inspect whether agents are grouped reasonably, whether selected actions are plausible, and whether short-horizon planned trajectories collide.
Compared with the official LimSim planner, this is still an intermediate Tactics2D-native version. It uses planner-facing map semantic queries for lane width, speed limit, lane-change permission, stop targets, traffic-light state, and junction conflict checks, but it does not yet reproduce the exact official full planner and cost stack. The current layer is suitable for validating the PDP interaction pipeline and can be extended with richer stop/yield/traffic-light/junction trajectory rules as map semantics mature.