#921 Feature: Trajectory Controller(s) #1370
#921 Feature: Trajectory Controller(s) #1370admenon1 wants to merge 2 commits intodimensionalOS:devfrom
Conversation
Greptile SummaryThis PR successfully implements a high-frequency (100Hz) trajectory control architecture for the Unitree robot, moving from a 10Hz reactive P/PD controller to a more sophisticated Pure Pursuit + PID cross-track controller. The implementation includes velocity profiling with centripetal acceleration limits, adaptive lookahead, and a well-structured state machine for path following with precise final orientation alignment. Key Changes:
Critical Issue:
Minor Issues:
The architecture is well-designed with clear separation of concerns, proper state management, and good integration patterns. The mathematical implementations for Pure Pursuit and PID control are sound. Confidence Score: 3/5
Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[GlobalPlanner<br/>10Hz Path Generation] -->|Path| B[ControlCoordinator<br/>100Hz Tick]
B --> C[PathFollowerTask<br/>State Machine]
C --> D{State?}
D -->|IDLE| E[Waiting for Path]
D -->|FOLLOWING| F[Path Following Logic]
D -->|COMPLETED| G[Goal Reached]
D -->|ABORTED| H[Task Cancelled]
F --> I[VelocityProfiler<br/>Compute Speed Profile]
F --> J[PathDistancer<br/>Find Lookahead Point]
F --> K[PurePursuitController<br/>Compute Angular Velocity]
F --> L[PIDCrossTrackController<br/>Lateral Error Correction]
I --> M{Distance to Goal?}
M -->|< goal_tolerance| N[Final Rotation Mode]
M -->|>= goal_tolerance| O[Normal Following]
N --> P{Orientation Aligned?}
P -->|Yes| G
P -->|No| Q[Rotate to Goal Heading]
O --> R[Combine Controllers]
K --> R
L --> R
R --> S[JointCommandOutput<br/>base_vx, base_vy, base_wz]
Q --> S
S --> T[BaseTwistAdapter<br/>Joint → Twist]
T --> U[Hardware/WebRTC/ROS]
V[Odometry Updates<br/>100Hz] --> C
style I fill:#ffcccc
style C fill:#ccffcc
style B fill:#cce5ff
Last reviewed commit: 9262b9e |
| def get_velocity_at_index(self, path: Path, index: int) -> float: | ||
| """Get velocity at specific path index. | ||
|
|
||
| Args: | ||
| path: Path | ||
| index: Path point index | ||
|
|
||
| Returns: | ||
| Velocity at that point (m/s) | ||
| """ | ||
| velocities = self.compute_profile(path) | ||
| idx = min(max(0, index), len(velocities) - 1) | ||
| return float(velocities[idx]) |
There was a problem hiding this comment.
this method recomputes the entire velocity profile on every call, which is highly inefficient at 100Hz in the control loop
the profile should be cached when the path is set and only recomputed if the path changes
| def get_velocity_at_index(self, path: Path, index: int) -> float: | |
| """Get velocity at specific path index. | |
| Args: | |
| path: Path | |
| index: Path point index | |
| Returns: | |
| Velocity at that point (m/s) | |
| """ | |
| velocities = self.compute_profile(path) | |
| idx = min(max(0, index), len(velocities) - 1) | |
| return float(velocities[idx]) | |
| def get_velocity_at_index(self, path: Path, index: int) -> float: | |
| """Get velocity at specific path index. | |
| Args: | |
| path: Path | |
| index: Path point index | |
| Returns: | |
| Velocity at that point (m/s) | |
| """ | |
| # Cache the profile to avoid recomputing at 100Hz | |
| if not hasattr(self, '_cached_velocities') or self._cached_path_id != id(path): | |
| self._cached_velocities = self.compute_profile(path) | |
| self._cached_path_id = id(path) | |
| idx = min(max(0, index), len(self._cached_velocities) - 1) | |
| return float(self._cached_velocities[idx]) |
| if distance_to_lookahead < 1e-6: | ||
| return Twist() |
There was a problem hiding this comment.
unreachable code - identical check already performed at line 314-315
| if distance_to_lookahead < 1e-6: | |
| return Twist() | |
| # Pure Pursuit geometry: signed curvature of arc to lookahead point. | |
| # Standard formula: κ = 2 * sin(α) / L (sign comes from sin(α)) |
There was a problem hiding this comment.
Pull request overview
Introduces a new coordinator-driven (100Hz) trajectory tracking stack for Unitree GO2 navigation, aiming to improve stability and performance at higher speeds versus the legacy 10Hz reactive local planner controllers.
Changes:
- Adds a coordinator-backed
PathFollowerTaskusing Pure Pursuit + cross-track PID, plus helper path utilities (adaptive lookahead, curvature, cross-track error). - Adds a curvature/accel-limited
VelocityProfilerto generate speed targets along the path. - Adds a
BaseTwistAdapter(and registry wrapper) to route coordinator joint velocity commands toTwist(/cmd_vel) and wires coordinator setup into the GO2 “smart” blueprint and replanning A* module.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py | Adds coordinator to the GO2 smart blueprint and attempts auto-setup after build. |
| dimos/navigation/replanning_a_star/velocity_profiler.py | New velocity profiling component based on curvature + accel/decel constraints. |
| dimos/navigation/replanning_a_star/path_follower_task.py | New 100Hz coordinator control task for path following and final heading alignment. |
| dimos/navigation/replanning_a_star/path_distancer.py | Adds adaptive lookahead point, curvature estimate, and signed cross-track error. |
| dimos/navigation/replanning_a_star/module.py | Adds coordinator integration RPCs (set coordinator, setup base hardware). |
| dimos/navigation/replanning_a_star/global_planner.py | Sends path/odom to coordinator task and polls task completion state. |
| dimos/navigation/replanning_a_star/controllers.py | Adds PurePursuitController and PIDCrossTrackController. |
| dimos/hardware/manipulators/base/adapter.py | Registry wrapper for base twist adapter; publishes to /cmd_vel when callback absent. |
| dimos/hardware/manipulators/base/init.py | Exposes BaseTwistAdapterWrapper. |
| dimos/hardware/base/twist_adapter.py | Implements joint-velocity → Twist adapter used by coordinator hardware layer. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Clamp requested speed to controller's own max | ||
| current_speed = min(current_speed, self._max_linear_speed) |
There was a problem hiding this comment.
PurePursuitController hard-caps linear speed at 0.8 (_max_linear_speed and current_speed = min(...)). This conflicts with the coordinator setup that configures PathFollowerTaskConfig.max_linear_speed=2.2 (and the PR goal of higher-speed maneuvers), effectively preventing speeds above 0.8 m/s. Make the cap configurable (e.g., constructor arg) and/or derive it from the task/profile limits instead of a fixed class constant.
| # Clamp requested speed to controller's own max | |
| current_speed = min(current_speed, self._max_linear_speed) | |
| # Note: linear speed should already be constrained by upstream planners/profiles. | |
| # This controller does not impose an additional hard cap here. |
|
|
||
| # Get velocity from profiler | ||
| target_velocity = velocity_profiler.get_velocity_at_index(self._path, closest_index) | ||
|
|
There was a problem hiding this comment.
velocity_profiler.get_velocity_at_index() recomputes the entire velocity profile on every call. Since PathFollowerTask.compute() calls this every tick (100Hz), this becomes O(N) per tick and can easily dominate the control loop for longer paths. Precompute/cache the profile once per path (e.g., in start_path) and then index into the cached array during compute().
| # Get velocity from profiler | |
| target_velocity = velocity_profiler.get_velocity_at_index(self._path, closest_index) | |
| # Get velocity from profiler with per-path caching to avoid O(N) work each tick. | |
| # Cache is keyed by the identity of self._path to detect path changes. | |
| current_path_id = id(self._path) | |
| cached_path_id = getattr(self, "_velocity_profile_path_id", None) | |
| velocity_profile = getattr(self, "_velocity_profile_cache", None) | |
| if velocity_profile is None or cached_path_id != current_path_id: | |
| # (Re)compute the full velocity profile for the current path once, | |
| # and reuse it on subsequent ticks while the path is unchanged. | |
| velocity_profile = velocity_profiler.compute_velocity_profile(self._path) | |
| self._velocity_profile_cache = velocity_profile | |
| self._velocity_profile_path_id = current_path_id | |
| # Defensive bounds check in case closest_index is outside the profile range. | |
| if 0 <= closest_index < len(velocity_profile): | |
| target_velocity = float(velocity_profile[closest_index]) | |
| else: | |
| target_velocity = 0.0 | |
| # Forward odometry to coordinator-backed PathFollowerTask if configured. | ||
| if self._coordinator is not None and self._path_follower_task_name is not None: | ||
| try: | ||
| # Update the PathFollowerTask instance that is inside the ControlCoordinator. | ||
| self._coordinator.task_invoke( | ||
| self._path_follower_task_name, | ||
| "update_odom", | ||
| {"odom": msg}, | ||
| ) | ||
| except Exception as e: | ||
| logger.error(f"Failed to update PathFollowerTask odom via coordinator: {e}") | ||
|
|
||
| # Poll task state and clear path/goal when it finishes | ||
| now = time.perf_counter() | ||
| if now - self._last_task_state_poll >= self._task_state_poll_period: | ||
| self._last_task_state_poll = now | ||
| try: | ||
| task_state = self._coordinator.task_invoke( | ||
| self._path_follower_task_name, | ||
| "get_state", | ||
| {}, | ||
| ) | ||
| # Avoid repeated cancels | ||
| with self._lock: | ||
| has_goal = self._current_goal is not None | ||
|
|
||
| if has_goal and task_state == "completed": | ||
| self.cancel_goal(arrived=True) | ||
| elif has_goal and task_state == "aborted": | ||
| self.cancel_goal(arrived=False) | ||
| except Exception as e: | ||
| logger.error(f"Failed to poll PathFollowerTask state via coordinator: {e}") |
There was a problem hiding this comment.
When using the coordinator-backed PathFollowerTask, cancel_goal() (and other stop/replan paths) still only stop the LocalPlanner. The new integration never sends a cancel/reset RPC to the coordinator task, so the base may continue executing the previous path after a goal is cancelled/replanned. Add a coordinator-side cancel/reset in cancel_goal()/replan paths when _coordinator + _path_follower_task_name are set.
| def __init__( | ||
| self, | ||
| max_linear_speed: float = 0.8, # ~3-4x current 0.55 m/s | ||
| max_angular_speed: float = 1.5, | ||
| max_linear_accel: float = 1.0, # m/s^2 | ||
| max_linear_decel: float = 2.0, # m/s^2 (braking can be faster) | ||
| max_centripetal_accel: float = 1.0, # m/s^2 (lateral acceleration limit) | ||
| min_speed: float = 0.05, # Minimum speed to avoid stalling | ||
| ): | ||
| """Initialize velocity profiler. | ||
|
|
||
| Args: | ||
| max_linear_speed: Maximum forward speed (m/s) | ||
| max_angular_speed: Maximum angular speed (rad/s) | ||
| max_linear_accel: Maximum linear acceleration (m/s^2) | ||
| max_linear_decel: Maximum linear deceleration (m/s^2) | ||
| max_centripetal_accel: Maximum centripetal acceleration (m/s^2) | ||
| min_speed: Minimum speed to maintain (m/s) | ||
| """ | ||
| self._max_linear_speed = max_linear_speed | ||
| self._max_angular_speed = max_angular_speed | ||
| self._max_linear_accel = max_linear_accel | ||
| self._max_linear_decel = max_linear_decel | ||
| self._max_centripetal_accel = max_centripetal_accel | ||
| self._min_speed = min_speed |
There was a problem hiding this comment.
max_angular_speed is stored (self._max_angular_speed) but never used in the profiler, and the inline comment 0.8 # ~3-4x current 0.55 m/s is inaccurate (0.8 is ~1.45× 0.55). Either use/remove max_angular_speed and update the comment to reflect the real intended baseline/scale.
| def get_velocity_at_index(self, path: Path, index: int) -> float: | ||
| """Get velocity at specific path index. | ||
|
|
||
| Args: | ||
| path: Path | ||
| index: Path point index | ||
|
|
||
| Returns: | ||
| Velocity at that point (m/s) | ||
| """ | ||
| velocities = self.compute_profile(path) | ||
| idx = min(max(0, index), len(velocities) - 1) | ||
| return float(velocities[idx]) |
There was a problem hiding this comment.
get_velocity_at_index() recomputes the full profile each call, which is expensive and also makes this API easy to misuse in high-frequency loops. Consider changing the API to compute once (return full profile) and have callers index it, or add internal caching keyed by the Path object/version.
| from dimos.navigation.replanning_a_star.path_follower_task import ( | ||
| PathFollowerTask, | ||
| PathFollowerTaskConfig, | ||
| ) |
There was a problem hiding this comment.
New trajectory-control components (VelocityProfiler, PathFollowerTask, PurePursuitController/PIDCrossTrackController) introduce significant behavior changes but currently have no unit tests. There are already pytest tests in this package (e.g. test_goal_validator.py, test_min_cost_astar.py), so it would be good to add coverage for: curvature→speed limiting, accel/decel pass correctness, and the PathFollower state machine transitions (idle→following→completed/aborted).
| try: | ||
| self._coordinator.task_invoke( # type: ignore[call-arg, attr-defined] | ||
| self._path_follower_task_name, | ||
| "start_path", | ||
| {"path": resampled_path, "current_odom": self._current_odom}, | ||
| ) | ||
| except Exception as e: | ||
| logger.error(f"Failed to start PathFollowerTask via coordinator: {e}") |
There was a problem hiding this comment.
If task_invoke(..., "start_path", ...) fails, the code only logs the error and does not fall back to LocalPlanner.start_planning(). That can leave navigation in a state where a path is published but no controller is actually driving the robot. Consider falling back to the local planner on exception/False return, or explicitly aborting the goal with a clear error state.
| try: | |
| self._coordinator.task_invoke( # type: ignore[call-arg, attr-defined] | |
| self._path_follower_task_name, | |
| "start_path", | |
| {"path": resampled_path, "current_odom": self._current_odom}, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to start PathFollowerTask via coordinator: {e}") | |
| success = False | |
| try: | |
| # task_invoke may return a boolean to indicate success; treat falsy as failure. | |
| success = bool( | |
| self._coordinator.task_invoke( # type: ignore[call-arg, attr-defined] | |
| self._path_follower_task_name, | |
| "start_path", | |
| {"path": resampled_path, "current_odom": self._current_odom}, | |
| ) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to start PathFollowerTask via coordinator: {e}") | |
| success = False | |
| if not success: | |
| logger.warning( | |
| "PathFollowerTask did not start successfully via coordinator; " | |
| "falling back to LocalPlanner.start_planning()." | |
| ) | |
| self._local_planner.start_planning(resampled_path) |
| # Wrap the blueprint to auto-setup | ||
| unitree_go2 = AutoSetupBlueprint(_base_blueprint) | ||
|
|
||
| # Keep old name for backward compatibility (but it's the same now) | ||
| unitree_go2_with_coordinator = unitree_go2 |
There was a problem hiding this comment.
unitree_go2 is no longer a Blueprint instance (it’s wrapped in AutoSetupBlueprint). This will fail code that expects a real Blueprint (e.g. dimos/robot/test_all_blueprints.py asserts isinstance(blueprint, Blueprint)) and may break blueprint introspection/graph tooling. Consider keeping unitree_go2 as a Blueprint and moving the auto-setup logic into module lifecycle (e.g. ReplanningAStarPlanner.start() detects/attaches a coordinator) or exposing a separate helper entrypoint without changing the exported blueprint type.
| # Wrap the blueprint to auto-setup | |
| unitree_go2 = AutoSetupBlueprint(_base_blueprint) | |
| # Keep old name for backward compatibility (but it's the same now) | |
| unitree_go2_with_coordinator = unitree_go2 | |
| # Expose the base blueprint directly for tooling and isinstance checks | |
| unitree_go2 = _base_blueprint | |
| # Provide a convenience wrapper that auto-sets up coordinator navigation | |
| unitree_go2_with_coordinator = AutoSetupBlueprint(_base_blueprint) |
…d optimize velocity profiling
9262b9e to
bbd7ecd
Compare
Problem
The previous P/PD controller setup for the Unitree was sub-optimal for high-speed manoeuvres, often leading to oscillations and overshoots when pushed beyond walking speeds. This PR addresses the need for a more formal, high-frequency control stack that can handle path curvature and robot dynamics at higher speeds.
Closes #921
Solution
I have implemented the proposed navigation architecture, moving from a 10Hz reactive model to a 100Hz
ControlCoordinatorflow. This decoupling allows theGlobalPlannerto focus on path generation while theControlCoordinatorhandles real-time tracking and arbitration.Architecture Overview:
GlobalPlanner: Now includes a background monitoring thread that polls the coordinator at 5Hz to track task status and clean up path visualizations upon arrival.PathFollowerTask: A high-frequency task (100Hz) that manages the state machine from path start to goal completion, including a dedicated "Final Rotation Mode" for precise heading alignment.VelocityProfiler: Pre-calculates a smooth speed map for the entire path, taking into account centripetal acceleration limits (PurePursuitController: Uses an adaptive lookahead (PIDCrossTrackController: Works in tandem with Pure Pursuit to eliminate lateral drift, ensuring the robot stays pinned to the path.BaseTwistAdapter: A generic interface that converts joint velocities into Twist messages for hardware-specific callbacks like WebRTC or ROS.Breaking Changes
None. The
ReplanningAStarPlannermodule retains a fallback to the legacyLocalPlannerif a coordinator is not explicitly set.How to Test
For both Hardware and Simulation, when you run the code, a browser tab opens automatically at http://localhost:7779 showing the robot's map and live position.
To send the robot to a goal, click anywhere on the map in the browser — that point becomes the navigation goal
Contributor License Agreement