diff --git a/dimos/hardware/base/twist_adapter.py b/dimos/hardware/base/twist_adapter.py new file mode 100644 index 000000000..563a045ce --- /dev/null +++ b/dimos/hardware/base/twist_adapter.py @@ -0,0 +1,217 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Hardware adapter for mobile base using Twist commands. + +Maps joint commands [base_vx, base_vy, base_wz] to Twist messages +for integration with Unitree GO2 and other mobile robots. +""" + +from typing import Protocol + +from dimos.hardware.manipulators.spec import ( + ControlMode, + JointLimits, + ManipulatorInfo, +) +from dimos.msgs.geometry_msgs import Twist, Vector3 +from dimos.utils.logging_config import setup_logger + + +logger = setup_logger() + + +class TwistCommandCallback(Protocol): + """Protocol for receiving Twist commands.""" + + def __call__(self, twist: Twist) -> None: ... + + +class BaseTwistAdapter: + """Adapter that converts joint commands to Twist for mobile base. + + This adapter implements the ManipulatorAdapter protocol but outputs + Twist commands instead of direct hardware commands. The Twist is + sent to a callback (e.g., GO2Connection.move()). + + Joint mapping: + base_vx -> Twist.linear.x (forward velocity) + base_vy -> Twist.linear.y (lateral velocity) + base_wz -> Twist.angular.z (yaw velocity) + """ + + def __init__(self, twist_callback: TwistCommandCallback): + """Initialize base twist adapter. + + Args: + twist_callback: Callback function to receive Twist commands + """ + self._twist_callback = twist_callback + self._current_mode: ControlMode = ControlMode.VELOCITY + self._connected = True + + # ========================================================================= + # ManipulatorAdapter Protocol Implementation + # ========================================================================= + + # --- Connection --- + + def connect(self) -> bool: + """Connect to hardware (always succeeds for callback-based adapter).""" + self._connected = True + return True + + def disconnect(self) -> None: + """Disconnect from hardware.""" + self._connected = False + + def is_connected(self) -> bool: + """Check if connected.""" + return self._connected + + # --- Info --- + + def get_info(self) -> ManipulatorInfo: + """Get manipulator info.""" + return ManipulatorInfo( + vendor="DIMOS", + model="BaseTwistAdapter", + dof=3, + ) + + def get_dof(self) -> int: + """Get degrees of freedom.""" + return 3 + + def get_limits(self) -> JointLimits: + """Get joint limits for base velocities. + + - Linear: ±2.2 m/s (typical max for Unitree GO2) + - Angular: ±2.0 rad/s + """ + return JointLimits( + position_lower=[-2.2, -2.2, -2.0], # base_vx, base_vy, base_wz + position_upper=[2.2, 2.2, 2.0], + velocity_max=[2.2, 2.2, 2.0], + ) + + # --- Control Mode --- + + def set_control_mode(self, mode: ControlMode) -> bool: + """Set control mode (only VELOCITY supported).""" + if mode == ControlMode.VELOCITY: + self._current_mode = mode + return True + return False + + def get_control_mode(self) -> ControlMode: + """Get current control mode.""" + return self._current_mode + + # --- State Reading --- + + def read_joint_positions(self) -> list[float]: + """Read joint positions (not applicable for base, returns zeros).""" + return [0.0, 0.0, 0.0] + + def read_joint_velocities(self) -> list[float]: + """Read joint velocities (not available, returns zeros).""" + return [0.0, 0.0, 0.0] + + def read_joint_efforts(self) -> list[float]: + """Read joint efforts (not applicable, returns zeros).""" + return [0.0, 0.0, 0.0] + + def read_state(self) -> dict[str, int]: + """Minimal state dictionary (no real hardware state).""" + return {} + + def read_error(self) -> tuple[int, str]: + """No error reporting for this simple adapter.""" + return 0, "" + + # --- Motion Control (Joint Space) --- + + def write_joint_positions(self, positions: list[float], velocity: float = 1.0) -> bool: + """Write joint positions (not supported for base).""" + return False + + def write_joint_velocities(self, velocities: list[float]) -> bool: + """Write joint velocities as Twist command. + + Args: + velocities: [base_vx, base_vy, base_wz] in m/s and rad/s + + Returns: + True if command sent successfully + """ + if not self._connected: + return False + + if len(velocities) != 3: + return False + + # Convert to Twist + twist = Twist( + linear=Vector3(velocities[0], velocities[1], 0.0), # vx, vy, vz + angular=Vector3(0.0, 0.0, velocities[2]), # wx, wy, wz + ) + + # Send via callback + try: + self._twist_callback(twist) + return True + except Exception as e: + logger.error(f"BaseTwistAdapter error: {e}") + return False + + def write_stop(self) -> bool: + """Emergency stop (send zero velocities).""" + return self.write_joint_velocities([0.0, 0.0, 0.0]) + + # --- Servo Control (stubs to satisfy ManipulatorAdapter) --- + + def write_enable(self, enable: bool) -> bool: + """Enable/disable (for base, just toggles connected flag).""" + self._connected = enable + return True + + def read_enabled(self) -> bool: + """Whether the adapter is considered enabled.""" + return self._connected + + def write_clear_errors(self) -> bool: + """No error state to clear for this adapter.""" + return True + + # --- Optional Cartesian / Gripper / Force-Torque (unsupported) --- + + def read_cartesian_position(self) -> dict[str, float] | None: + return None + + def write_cartesian_position( + self, + pose: dict[str, float], + velocity: float = 1.0, + ) -> bool: + return False + + def read_gripper_position(self) -> float | None: + return None + + def write_gripper_position(self, position: float) -> bool: + return False + + def read_force_torque(self) -> list[float] | None: + return None \ No newline at end of file diff --git a/dimos/hardware/manipulators/base/__init__.py b/dimos/hardware/manipulators/base/__init__.py new file mode 100644 index 000000000..6563ae129 --- /dev/null +++ b/dimos/hardware/manipulators/base/__init__.py @@ -0,0 +1,22 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Base twist adapter for mobile robots. + +Provides BaseTwistAdapterWrapper for converting joint commands to Twist messages. +""" + +from dimos.hardware.manipulators.base.adapter import BaseTwistAdapterWrapper + +__all__ = ["BaseTwistAdapterWrapper"] \ No newline at end of file diff --git a/dimos/hardware/manipulators/base/adapter.py b/dimos/hardware/manipulators/base/adapter.py new file mode 100644 index 000000000..1ddbcc640 --- /dev/null +++ b/dimos/hardware/manipulators/base/adapter.py @@ -0,0 +1,87 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Base twist adapter for mobile robots. + +Wraps BaseTwistAdapter to integrate with the manipulator adapter registry. +""" + +from typing import Callable + +from dimos.hardware.base.twist_adapter import BaseTwistAdapter +from dimos.hardware.manipulators.registry import AdapterRegistry +from dimos.msgs.geometry_msgs import Twist +from dimos.core.transport import LCMTransport + + +class BaseTwistAdapterWrapper(BaseTwistAdapter): + """Wrapper for BaseTwistAdapter that works with adapter registry. + + This adapter accepts a twist_callback function that will be called + with Twist commands. The callback should be set up to call + GO2Connection.move() or similar. If no callback is provided, it will + publish directly to the `/cmd_vel` LCM topic. + + Args: + twist_callback: Callable that receives Twist commands + dof: Degrees of freedom (must be 3 for base) + cmd_vel_topic: LCM topic to publish Twist messages to when no callback + is provided (default: "/cmd_vel"). + """ + + def __init__( + self, + twist_callback: Callable[[Twist], None] | None = None, + dof: int = 3, + cmd_vel_topic: str = "/cmd_vel", + **kwargs, + ): + """Initialize base twist adapter wrapper. + + Args: + twist_callback: Callback function for Twist commands. If None, the + adapter publishes directly to `cmd_vel_topic` using LCM. + dof: Degrees of freedom (must be 3) + cmd_vel_topic: LCM topic to publish Twist messages to when no + callback is provided. + **kwargs: Ignored (for compatibility with registry) + """ + if dof != 3: + raise ValueError(f"BaseTwistAdapter requires dof=3, got {dof}") + + # Transport is stored on the instance so it can be pickled safely. + # LCMTransport implements __reduce__, so it can cross RPC boundaries. + self._transport: LCMTransport[Twist] | None = None + + # If no callback provided, publish Twist to LCM `/cmd_vel` + if twist_callback is None: + self._transport = LCMTransport(cmd_vel_topic, Twist) + twist_callback = self._publish_twist + + super().__init__(twist_callback) + + def _publish_twist(self, twist: Twist) -> None: + """Default callback: publish Twist to LCM `/cmd_vel`.""" + # Create transport if it doesn't exist + if self._transport is None: + self._transport = LCMTransport("/cmd_vel", Twist) + self._transport.broadcast(None, twist) + + +def register(registry: AdapterRegistry) -> None: + """Register this adapter with the registry.""" + registry.register("base_twist", BaseTwistAdapterWrapper) + + +__all__ = ["BaseTwistAdapterWrapper"] \ No newline at end of file diff --git a/dimos/navigation/replanning_a_star/controllers.py b/dimos/navigation/replanning_a_star/controllers.py index 865aafb8b..b5878c455 100644 --- a/dimos/navigation/replanning_a_star/controllers.py +++ b/dimos/navigation/replanning_a_star/controllers.py @@ -154,3 +154,249 @@ def _compute_angular_velocity(self, yaw_error: float) -> float: self._prev_angular_velocity = angular_velocity return float(angular_velocity) + + +class PIDCrossTrackController: + """PID controller on lateral (cross‑track) error to tighten path tracking. + + Produces an additional angular velocity term (Δω) that you add on top + of the base yaw command. + + The Integral term helps eliminate steady-state lateral bias, especially + useful when the robot consistently drifts to one side of the path. + """ + + def __init__( + self, + control_frequency: float, + k_p: float = 1.5, + k_i: float = 0.1, + k_d: float = 0.2, + max_correction: float = 0.6, + max_integral: float = 0.3, + ) -> None: + """Initialize PID cross-track controller. + + Args: + control_frequency: Control loop frequency (Hz) + k_p: Proportional gain + k_i: Integral gain + k_d: Derivative gain + max_correction: Maximum total correction (rad/s) + max_integral: Maximum integral term to prevent windup (rad/s) + """ + self._dt = 1.0 / control_frequency + self._k_p = k_p + self._k_i = k_i + self._k_d = k_d + self._max_correction = max_correction + self._max_integral = max_integral + + self._prev_error = 0.0 + self._integral = 0.0 + + def reset(self) -> None: + """Reset internal state (call when starting a new path).""" + self._prev_error = 0.0 + self._integral = 0.0 + + def compute_correction(self, cross_track_error: float) -> float: + """Return additional angular velocity (Δω) for given lateral error. + + Positive error means robot is to the left of the path; negative to the right. + + Args: + cross_track_error: Signed lateral distance from path (m) + + Returns: + Angular velocity correction (rad/s) + """ + error = float(cross_track_error) + + # Proportional term + p_term = self._k_p * error + + # Integral term (with windup protection) + self._integral += error * self._dt + # Clamp integral to prevent windup + self._integral = float(np.clip( + self._integral, + -self._max_integral / max(self._k_i, 1e-6), + self._max_integral / max(self._k_i, 1e-6), + )) + i_term = self._k_i * self._integral + + # Derivative term + derr = (error - self._prev_error) / self._dt + d_term = self._k_d * derr + + # Total PID correction + correction = p_term + i_term + d_term + correction = float(np.clip(correction, -self._max_correction, self._max_correction)) + + self._prev_error = error + return correction + + +class PurePursuitController: + """Pure Pursuit path-following controller with adaptive lookahead. + + Pure Pursuit is a geometric path-tracking algorithm that: + 1. Finds a lookahead point on the path + 2. Computes the arc to reach that point + 3. Uses the arc's curvature to determine angular velocity + + This controller is path-aware and handles high speeds better than P/PD controllers. + """ + + _global_config: GlobalConfig + _control_frequency: float + + # Lookahead parameters + _min_lookahead: float = 0.2 # Minimum lookahead distance (m) + _max_lookahead: float = 0.8 # Maximum lookahead distance (m) + _lookahead_gain: float = 0.25 # Gain for adaptive lookahead (speed-based) + + # Control parameters + _k_angular: float = 0.6 # Angular velocity gain + _max_angular_velocity: float = 1.2 # rad/s + _min_linear_velocity: float = 0.05 # m/s + _max_linear_speed: float = 0.8 # new: cap linear speed here too + + def __init__( + self, + global_config: GlobalConfig, + control_frequency: float, + min_lookahead: float = 0.3, + max_lookahead: float = 2.0, + lookahead_gain: float = 0.5, + max_linear_speed: float = 0.8, + ): + """Initialize Pure Pursuit controller. + + Args: + global_config: Global configuration + control_frequency: Control loop frequency (Hz) + min_lookahead: Minimum lookahead distance (m) + max_lookahead: Maximum lookahead distance (m) + lookahead_gain: Gain for speed-adaptive lookahead + """ + self._global_config = global_config + self._control_frequency = control_frequency + self._min_lookahead = min_lookahead + self._max_lookahead = max_lookahead + self._lookahead_gain = lookahead_gain + self._max_linear_speed = max_linear_speed + + def advance( + self, + lookahead_point: NDArray[np.float64], + current_odom: PoseStamped, + current_speed: float = 0.0, + path_curvature: float | None = None, + ) -> Twist: + """Compute control command using Pure Pursuit. + + Args: + lookahead_point: Lookahead point on path [x, y] + current_odom: Current robot pose + current_speed: Current forward speed (m/s) for adaptive lookahead + path_curvature: Curvature at lookahead point (1/m), optional + + Returns: + Twist command + """ + current_pos = np.array([current_odom.position.x, current_odom.position.y]) + robot_yaw = current_odom.orientation.euler[2] + + # Vector from robot to lookahead point + to_lookahead = lookahead_point - current_pos + distance_to_lookahead = np.linalg.norm(to_lookahead) + + if distance_to_lookahead < 1e-6: + return Twist() + + # Angle to lookahead point + angle_to_lookahead = np.arctan2(to_lookahead[1], to_lookahead[0]) + heading_error = angle_diff(angle_to_lookahead, robot_yaw) + + # Pure Pursuit geometry: signed curvature of arc to lookahead point. + # Standard formula: κ = 2 * sin(α) / L (sign comes from sin(α)) + if abs(heading_error) < 1e-6: + curvature = 0.0 + else: + curvature = 2.0 * np.sin(heading_error) / distance_to_lookahead + + # Clamp requested speed to controller's own max + current_speed = min(current_speed, self._max_linear_speed) + + # Compute angular velocity from curvature + # ω = v * κ (for differential drive) + # But we also add proportional correction for heading error + if current_speed > 0.1: + # Use curvature-based control when moving + angular_velocity = current_speed * curvature + # Add proportional correction + angular_velocity += self._k_angular * heading_error + else: + # When slow/stopped, use pure proportional control + angular_velocity = self._k_angular * heading_error + + # Limit angular velocity + angular_velocity = np.clip(angular_velocity, -self._max_angular_velocity, self._max_angular_velocity) + + # Base linear velocity on requested speed + linear_velocity = current_speed + + # Rotate-then-drive behavior: + # - if heading error > 90°, rotate in place (no forward motion) + # - if 45°–90°, move very slowly while turning + abs_heading = abs(heading_error) + if abs_heading > np.pi / 2.0: # > 90 degrees + linear_velocity = 0.0 + elif abs_heading > np.pi / 4.0: # 45–90 degrees + linear_velocity *= 0.3 + + # Also limit speed based on curvature if provided + if path_curvature is not None and path_curvature > 1e-6: + # v_max ≈ sqrt(a_max / κ) with a_max ≈ 0.8 m/s² (very conservative) + v_curv = float(np.sqrt(0.8 / path_curvature)) + linear_velocity = min(linear_velocity, v_curv) + + # Enforce bounds and minimum crawl speed + linear_velocity = float(np.clip(linear_velocity, 0.0, self._max_linear_speed)) + if 0.0 < linear_velocity < self._min_linear_velocity: + linear_velocity = self._min_linear_velocity + + return Twist( + linear=Vector3(linear_velocity, 0.0, 0.0), + angular=Vector3(0.0, 0.0, float(angular_velocity)), + ) + + def rotate(self, yaw_error: float) -> Twist: + """Rotate in place. + + Args: + yaw_error: Yaw error in radians + + Returns: + Twist command for rotation + """ + angular_velocity = self._k_angular * yaw_error + angular_velocity = np.clip(angular_velocity, -self._max_angular_velocity, self._max_angular_velocity) + + # In simulation, add small forward velocity + linear_x = 0.18 if self._global_config.simulation else 0.0 + + return Twist( + linear=Vector3(linear_x, 0.0, 0.0), + angular=Vector3(0.0, 0.0, float(angular_velocity)), + ) + + def reset_errors(self) -> None: + """Reset controller state.""" + pass + + def reset_yaw_error(self, value: float) -> None: + """Reset yaw error (not used in Pure Pursuit).""" + pass diff --git a/dimos/navigation/replanning_a_star/global_planner.py b/dimos/navigation/replanning_a_star/global_planner.py index df2680a4a..f0d071ec9 100644 --- a/dimos/navigation/replanning_a_star/global_planner.py +++ b/dimos/navigation/replanning_a_star/global_planner.py @@ -86,6 +86,16 @@ def __init__(self, global_config: GlobalConfig) -> None: self._replan_reason = None self._lock = RLock() + # Coordinator-based path follower (optional, for mobile base control). + # These are set via set_path_follower_task() when using the ControlCoordinator. + self._path_follower_task_name: str | None = None + self._coordinator: object | None = None + + # Polling bookkeeping for coordinator task state + self._last_task_state_poll: float = 0.0 + self._task_state_poll_period: float = 0.2 # seconds + self._replanning_in_progress: bool = False + def start(self) -> None: self._local_planner.start() self._disposables.add( @@ -115,6 +125,41 @@ def handle_odom(self, msg: PoseStamped) -> None: self._local_planner.handle_odom(msg) self._position_tracker.add_position(msg) + # Forward odometry to coordinator-backed PathFollowerTask if configured. + if self._coordinator is not None and self._path_follower_task_name is not None: + try: + # Update the PathFollowerTask instance that is inside the ControlCoordinator. + self._coordinator.task_invoke( + self._path_follower_task_name, + "update_odom", + {"odom": msg}, + ) + except Exception as e: + logger.error(f"Failed to update PathFollowerTask odom via coordinator: {e}") + + # Poll task state and clear path/goal when it finishes + now = time.perf_counter() + if ( + not self._replanning_in_progress + and now - self._last_task_state_poll >= self._task_state_poll_period + ): + self._last_task_state_poll = now + try: + task_state = self._coordinator.task_invoke( + self._path_follower_task_name, + "get_state", + {}, + ) + with self._lock: + has_goal = self._current_goal is not None + + if has_goal and task_state == "completed": + self.cancel_goal(arrived=True) + elif has_goal and task_state == "aborted": + self.cancel_goal(arrived=False) + except Exception as e: + logger.error(f"Failed to poll PathFollowerTask state via coordinator: {e}") + def handle_global_costmap(self, msg: OccupancyGrid) -> None: self._navigation_map.update(msg) @@ -137,9 +182,22 @@ def cancel_goal(self, *, but_will_try_again: bool = False, arrived: bool = False self._goal_reached = arrived self._replan_limiter.reset() + if but_will_try_again: + self._replanning_in_progress = True + self.path.on_next(Path()) self._local_planner.stop_planning() + if self._coordinator is not None and self._path_follower_task_name is not None: + try: + self._coordinator.task_invoke( # type: ignore[call-arg, attr-defined] + self._path_follower_task_name, + "cancel", + {}, + ) + except Exception as e: + logger.warning(f"Failed to cancel PathFollowerTask via coordinator: {e}") + if not but_will_try_again: self.goal_reached.on_next(Bool(arrived)) @@ -305,10 +363,27 @@ def _plan_path(self) -> None: self.path.on_next(resampled_path) - self._local_planner.start_planning(resampled_path) + # Prefer coordinator + PathFollowerTask when available, else fall back to LocalPlanner. + if ( + self._coordinator is not None + and self._path_follower_task_name is not None + and self._current_odom is not None + ): + try: + self._coordinator.task_invoke( # type: ignore[call-arg, attr-defined] + self._path_follower_task_name, + "start_path", + {"path": resampled_path, "current_odom": self._current_odom}, + ) + except Exception as e: + logger.error(f"Failed to start PathFollowerTask via coordinator: {e}") + else: + self._local_planner.start_planning(resampled_path) + + # Planning complete — re-enable task-state polling. + self._replanning_in_progress = False def _find_wide_path(self, goal: Vector3, robot_pos: Vector3) -> Path | None: - # sizes_to_try: list[float] = [2.2, 1.7, 1.3, 1] sizes_to_try: list[float] = [1.1] for size in sizes_to_try: @@ -346,3 +421,13 @@ def _find_safe_goal(self, goal: Vector3) -> Vector3 | None: logger.info("Found safe goal.", x=round(safe_goal.x, 2), y=round(safe_goal.y, 2)) return safe_goal + + def set_path_follower_task(self, coordinator: object, task_name: str) -> None: + """Configure coordinator-based PathFollowerTask usage. + + Args: + coordinator: ControlCoordinator RPC client proxy + task_name: Name of the task registered in the coordinator + """ + self._coordinator = coordinator + self._path_follower_task_name = task_name diff --git a/dimos/navigation/replanning_a_star/module.py b/dimos/navigation/replanning_a_star/module.py index d1d87cbbf..b602ea61c 100644 --- a/dimos/navigation/replanning_a_star/module.py +++ b/dimos/navigation/replanning_a_star/module.py @@ -12,18 +12,30 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math import os +import traceback +from typing import Callable from dimos_lcm.std_msgs import Bool, String from reactivex.disposable import Disposable +from dimos.control.components import HardwareComponent, HardwareType +from dimos.control.coordinator import ControlCoordinator from dimos.core import In, Module, Out, rpc from dimos.core.global_config import GlobalConfig, global_config +from dimos.hardware.manipulators.registry import adapter_registry from dimos.msgs.geometry_msgs import PoseStamped, Twist from dimos.msgs.nav_msgs import OccupancyGrid, Path from dimos.navigation.base import NavigationInterface, NavigationState from dimos.navigation.replanning_a_star.global_planner import GlobalPlanner +from dimos.navigation.replanning_a_star.path_follower_task import ( + PathFollowerTask, + PathFollowerTaskConfig, +) +from dimos.utils.logging_config import setup_logger +logger = setup_logger() class ReplanningAStarPlanner(Module, NavigationInterface): odom: In[PoseStamped] # TODO: Use TF. @@ -45,6 +57,10 @@ def __init__(self, cfg: GlobalConfig = global_config) -> None: self._global_config = cfg self._planner = GlobalPlanner(self._global_config) + self._coordinator: ControlCoordinator | None = None + self._path_follower_task: PathFollowerTask | None = None + self._local_planner_cmd_vel_disposable: Disposable | None = None + @rpc def start(self) -> None: super().start() @@ -60,7 +76,12 @@ def start(self) -> None: self._disposables.add(self._planner.path.subscribe(self.path.publish)) - self._disposables.add(self._planner.cmd_vel.subscribe(self.cmd_vel.publish)) + if self._path_follower_task is None: + self._local_planner_cmd_vel_disposable = self._planner.cmd_vel.subscribe(self.cmd_vel.publish) + self._disposables.add(self._local_planner_cmd_vel_disposable) + logger.warning("PathFollowerTask not set - using LocalPlanner fallback") + else: + logger.info("PathFollowerTask active - LocalPlanner cmd_vel disabled") self._disposables.add(self._planner.goal_reached.subscribe(self.goal_reached.publish)) @@ -96,6 +117,126 @@ def cancel_goal(self) -> bool: self._planner.cancel_goal() return True + @rpc + def set_coordinator(self, coordinator: ControlCoordinator) -> bool: + """Set the ControlCoordinator for path following. + + Args: + coordinator: ControlCoordinator instance + + Returns: + True if setup successful + """ + try: + self._coordinator = coordinator + + # Create path follower task + task_config = PathFollowerTaskConfig( + joint_names=["base_vx", "base_vy", "base_wz"], + priority=10, + max_linear_speed=1.3, + goal_tolerance=0.2, # match GlobalPlanner._goal_tolerance + orientation_tolerance=math.radians(15), # match _rotation_tolerance + ) + self._path_follower_task = PathFollowerTask( + name="path_follower", + config=task_config, + global_config=self._global_config, + ) + + # Add task to coordinator + if not coordinator.add_task(self._path_follower_task): + logger.error("Failed to add PathFollowerTask to coordinator") + return False + + # Tell the GlobalPlanner to use the coordinator-backed PathFollowerTask. + # NOTE: The actual task instance that runs lives inside the ControlCoordinator + # process; we refer to it by name via coordinator.task_invoke(). + self._planner.set_path_follower_task(coordinator, "path_follower") + + + # Disable LocalPlanner cmd_vel if it was subscribed + if self._local_planner_cmd_vel_disposable is not None: + self._local_planner_cmd_vel_disposable.dispose() + self._local_planner_cmd_vel_disposable = None + logger.info("Disabled LocalPlanner cmd_vel - PathFollowerTask is now active") + + logger.info("PathFollowerTask added to ControlCoordinator and connected to GlobalPlanner") + return True + except Exception as e: + logger.error(f"Failed to set coordinator: {e}") + logger.error(traceback.format_exc()) + return False + + @rpc + def on_system_modules(self, modules: list) -> None: + """Called by the framework after all modules are started. + + Auto-detects ControlCoordinator and wires up coordinator-based path + following. This runs after start(), so it properly overrides the + LocalPlanner fallback if a coordinator is present. + """ + from dimos.control.coordinator import ControlCoordinator as CoordinatorClass + + coordinator = None + for module in modules: + try: + if issubclass(module.actor_class, CoordinatorClass): + coordinator = module + break + except (AttributeError, TypeError): + continue + + if coordinator is not None: + logger.info("ControlCoordinator found - setting up coordinator-based path following") + if self.set_coordinator(coordinator): + self.setup_base_hardware_with_callback(coordinator, None) + else: + logger.info("No ControlCoordinator in blueprint - LocalPlanner fallback active") + + @rpc + def setup_base_hardware_with_callback( + self, + coordinator: ControlCoordinator, + twist_callback: Callable[[Twist], None] | None, + ) -> bool: + """Set up base hardware adapter with provided callback. + + Args: + coordinator: ControlCoordinator instance + twist_callback: Callback function that receives Twist commands + (should call GO2Connection.move() or a ROS publisher) + + Returns: + True if setup successful + """ + try: + # Create adapter + adapter = adapter_registry.create( + "base_twist", + twist_callback=twist_callback, + dof=3, + ) + + # Create hardware component + component = HardwareComponent( + hardware_id="base", + hardware_type=HardwareType.MANIPULATOR, + joints=["base_vx", "base_vy", "base_wz"], + ) + + # Add hardware to coordinator + if not coordinator.add_hardware(adapter, component): + logger.error("Failed to add base hardware to coordinator") + return False + + logger.info("Base hardware adapter set up successfully") + return True + + except Exception as e: + logger.error(f"Failed to set up base hardware: {e}") + return False + replanning_a_star_planner = ReplanningAStarPlanner.blueprint diff --git a/dimos/navigation/replanning_a_star/path_distancer.py b/dimos/navigation/replanning_a_star/path_distancer.py index 04d844267..5bec30eee 100644 --- a/dimos/navigation/replanning_a_star/path_distancer.py +++ b/dimos/navigation/replanning_a_star/path_distancer.py @@ -74,6 +74,116 @@ def find_closest_point_index(self, pos: NDArray[np.float64]) -> int: distances = np.linalg.norm(self._path - pos, axis=1) return int(np.argmin(distances)) + def find_adaptive_lookahead_point( + self, start_idx: int, current_speed: float, min_lookahead: float = 0.3, max_lookahead: float = 2.0 + ) -> NDArray[np.float64]: + """Find lookahead point with adaptive distance based on speed. + + Args: + start_idx: Starting path index + current_speed: Current robot speed (m/s) + min_lookahead: Minimum lookahead distance (m) + max_lookahead: Maximum lookahead distance (m) + + Returns: + Lookahead point [x, y] + """ + # Adaptive lookahead: faster = longer lookahead + lookahead_gain = 0.5 + adaptive_dist = min_lookahead + lookahead_gain * current_speed + adaptive_dist = np.clip(adaptive_dist, min_lookahead, max_lookahead) + + # Temporarily set lookahead distance + original_lookahead = self._lookahead_dist + self._lookahead_dist = adaptive_dist + try: + return self.find_lookahead_point(start_idx) + finally: + self._lookahead_dist = original_lookahead + + def get_curvature_at_index(self, index: int) -> float: + """Get curvature at a specific path index. + + Args: + index: Path point index + + Returns: + Curvature (1/m) at that point + """ + if len(self._path) < 3 or index < 0 or index >= len(self._path): + return 0.0 + + # Use three-point method + if index == 0: + p0, p1, p2 = self._path[0], self._path[1], self._path[2] + elif index == len(self._path) - 1: + p0, p1, p2 = ( + self._path[-3], + self._path[-2], + self._path[-1], + ) + else: + p0, p1, p2 = ( + self._path[index - 1], + self._path[index], + self._path[index + 1], + ) + + # Compute angle change + v1 = p1 - p0 + v2 = p2 - p1 + + norm1 = np.linalg.norm(v1) + norm2 = np.linalg.norm(v2) + + if norm1 < 1e-6 or norm2 < 1e-6: + return 0.0 + + v1 = v1 / norm1 + v2 = v2 / norm2 + + cos_angle = np.clip(np.dot(v1, v2), -1.0, 1.0) + angle = np.arccos(cos_angle) + + # Curvature = angle / arc_length + arc_length = (norm1 + norm2) / 2.0 + if arc_length > 1e-6: + return float(abs(angle) / arc_length) + + return 0.0 + + def get_signed_cross_track_error(self, pos: NDArray[np.float64]) -> float: + """Signed lateral distance from robot to path (left positive, right negative). + + Args: + pos: Robot position [x, y] + + Returns: + Signed cross-track error (m). Positive = left of path, negative = right of path. + """ + index = self.find_closest_point_index(pos) + p = self._path[index] + + # Choose a path tangent at this index + if index < len(self._path) - 1: + t = self._path[index + 1] - p + elif index > 0: + t = p - self._path[index - 1] + else: + return 0.0 + + norm_t = np.linalg.norm(t) + if norm_t < 1e-6: + return 0.0 + + t = t / norm_t # Normalize tangent + v = pos - p # Vector from path point to robot + + # 2D cross product z-component: t × v + # Positive = robot left of path, negative = robot right of path + cross_z = t[0] * v[1] - t[1] * v[0] + return float(cross_z) + def _make_cumulative_distance_array(array: NDArray[np.float64]) -> NDArray[np.float64]: """ diff --git a/dimos/navigation/replanning_a_star/path_follower_task.py b/dimos/navigation/replanning_a_star/path_follower_task.py new file mode 100644 index 000000000..ca7453a09 --- /dev/null +++ b/dimos/navigation/replanning_a_star/path_follower_task.py @@ -0,0 +1,377 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Path follower task for ControlCoordinator. + +Implements path-following control as a ControlTask that runs at 100Hz +within the ControlCoordinator tick loop. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Literal + +import numpy as np + +from dimos.control.task import ( + BaseControlTask, + ControlMode, + CoordinatorState, + JointCommandOutput, + ResourceClaim, +) +from dimos.core.global_config import GlobalConfig +from dimos.msgs.geometry_msgs import PoseStamped +from dimos.msgs.nav_msgs import Path +from dimos.navigation.replanning_a_star.controllers import ( + PurePursuitController, + PIDCrossTrackController, +) +from dimos.navigation.replanning_a_star.path_distancer import PathDistancer +from dimos.navigation.replanning_a_star.velocity_profiler import VelocityProfiler +from dimos.utils.logging_config import setup_logger +from dimos.utils.trigonometry import angle_diff + +logger = setup_logger() + + +@dataclass +class PathFollowerTaskConfig: + """Configuration for path follower task. + + Attributes: + joint_names: Joint names for base control ["base_vx", "base_vy", "base_wz"] + priority: Task priority for arbitration + max_linear_speed: Maximum forward speed (m/s) + goal_tolerance: Distance tolerance for goal (m) + orientation_tolerance: Orientation tolerance for goal (rad) + """ + + joint_names: list[str] = None # type: ignore[assignment] + priority: int = 10 + max_linear_speed: float = 0.8 + goal_tolerance: float = 0.3 + orientation_tolerance: float = 0.35 + + def __post_init__(self) -> None: + """Set default joint names if not provided.""" + if self.joint_names is None: + self.joint_names = ["base_vx", "base_vy", "base_wz"] + + +class PathFollowerTask(BaseControlTask): + """Path-following control task for mobile base. + + Follows a Path using Pure Pursuit controller with velocity profiling. + Outputs JointCommandOutput with base velocities [vx, vy, wz]. + + CRITICAL: Uses state.t_now from CoordinatorState, never calls time.time() + + State Machine: + IDLE ──start_path()──► FOLLOWING ──goal_reached──► COMPLETED + ▲ │ │ + │ cancel() reset() + │ ▼ │ + └─────reset()───── ABORTED ◄──────────────────────┘ + """ + + # Task states + State: type = Literal["idle", "following", "completed", "aborted"] + + def __init__( + self, + name: str, + config: PathFollowerTaskConfig, + global_config: GlobalConfig, + ): + """Initialize path follower task. + + Args: + name: Unique task name + config: Task configuration + global_config: Global configuration + """ + if len(config.joint_names) != 3: + raise ValueError( + f"PathFollowerTask '{name}' requires exactly 3 joints " + f"(base_vx, base_vy, base_wz), got {len(config.joint_names)}" + ) + + self._name = name + self._config = config + self._global_config = global_config + self._joint_names = frozenset(config.joint_names) + self._joint_names_list = list(config.joint_names) + + # State machine + self._state: PathFollowerTask.State = "idle" + self._path: Path | None = None + self._path_distancer: PathDistancer | None = None + self._velocity_profiler: VelocityProfiler | None = None + self._current_odom: PoseStamped | None = None + self._closest_index: int = 0 + + # Controller + self._controller = PurePursuitController( + global_config, + control_frequency=100.0, + min_lookahead=0.2, + max_lookahead=0.5, + lookahead_gain=0.3, + max_linear_speed=self._config.max_linear_speed, + ) + # Cross-track PID controller for tighter lateral tracking + self._cross_track_controller = PIDCrossTrackController( + control_frequency=100.0, + k_p=1.5, + k_i=0.1, + k_d=0.2, + max_correction=1.0, + max_integral=0.3, + ) + + logger.info( + f"PathFollowerTask {name} initialized for joints: {config.joint_names}" + ) + + @property + def name(self) -> str: + """Unique task identifier.""" + return self._name + + def claim(self) -> ResourceClaim: + """Declare resource requirements.""" + return ResourceClaim( + joints=self._joint_names, + priority=self._config.priority, + mode=ControlMode.VELOCITY, + ) + + def is_active(self) -> bool: + """Check if task should run this tick.""" + return self._state == "following" + + def compute(self, state: CoordinatorState) -> JointCommandOutput | None: + """Compute path-following command for this tick. + + CRITICAL: Uses state.t_now for timing, NOT time.time()! + + Args: + state: Current coordinator state + + Returns: + JointCommandOutput with [base_vx, base_vy, base_wz], or None if not active + """ + # Check if task is active and has a path and path distancer + if self._state != "following" or self._path is None or self._path_distancer is None: + return None + if self._current_odom is None: + return None + + path_distancer = self._path_distancer + velocity_profiler = self._velocity_profiler + current_odom = self._current_odom + closest_index = self._closest_index + + # Check if goal reached + current_pos = np.array([current_odom.position.x, current_odom.position.y]) + + # Distance to goal + distance_to_goal = path_distancer.distance_to_goal(current_pos) + + # --- FINAL ROTATION MODE: inside positional tolerance --- + if distance_to_goal < self._config.goal_tolerance and len(self._path.poses) > 0: + goal_yaw = self._path.poses[-1].orientation.euler[2] + robot_yaw = current_odom.orientation.euler[2] + yaw_error = angle_diff(goal_yaw, robot_yaw) + + # If orientation is also aligned, we are done + if abs(yaw_error) < self._config.orientation_tolerance: + self._state = "completed" + logger.info(f"PathFollowerTask {self._name} completed - goal reached") + return JointCommandOutput( + joint_names=self._joint_names_list, + velocities=[0.0, 0.0, 0.0], + mode=ControlMode.VELOCITY, + ) + + # Otherwise, rotate to the goal orientation + twist = self._controller.rotate(yaw_error) + + # Clamp yaw rate + max_wz = 1.2 + twist.angular.z = float(np.clip(twist.angular.z, -max_wz, max_wz)) + + # Return JointCommandOutput + return JointCommandOutput( + joint_names=self._joint_names_list, + velocities=[ + float(twist.linear.x), + float(twist.linear.y), + float(twist.angular.z), + ], + mode=ControlMode.VELOCITY, + ) + + # --- NORMAL PATH FOLLOWING (Pure Pursuit + cross-track PID) --- + # Update closest point on path + closest_index = path_distancer.find_closest_point_index(current_pos) + self._closest_index = closest_index + + # Get velocity from profiler + target_velocity = velocity_profiler.get_velocity_at_index(self._path, closest_index) + + # Get curvature for better control + curvature = path_distancer.get_curvature_at_index(closest_index) + + # Find adaptive lookahead point + lookahead_point = path_distancer.find_adaptive_lookahead_point( + closest_index, + target_velocity, + min_lookahead=0.3, + max_lookahead=2.0, + ) + + # Compute control command using Pure Pursuit + twist = self._controller.advance( + lookahead_point, + current_odom, + current_speed=target_velocity, + path_curvature=curvature, + ) + + # Cross-track correction: tighten lateral tracking around the path + # (except when very close to goal to avoid conflicting with orientation alignment) + cross_track_error = 0.0 + ct_correction = 0.0 + if distance_to_goal >= self._config.goal_tolerance: + cross_track_error = path_distancer.get_signed_cross_track_error(current_pos) + ct_correction = self._cross_track_controller.compute_correction(cross_track_error) + twist.angular.z -= ct_correction + + # Re-clamp yaw rate to match controller limits + max_wz = 1.2 + twist.angular.z = float(np.clip(twist.angular.z, -max_wz, max_wz)) + + # Convert Twist to JointCommandOutput + # Twist: linear.x (forward), linear.y (left), angular.z (yaw) + # Joints: base_vx (forward), base_vy (left), base_wz (yaw) + return JointCommandOutput( + joint_names=self._joint_names_list, + velocities=[ + float(twist.linear.x), # base_vx + float(twist.linear.y), # base_vy + float(twist.angular.z), # base_wz + ], + mode=ControlMode.VELOCITY, + ) + + def on_preempted(self, by_task: str, joints: frozenset[str]) -> None: + """Handle preemption by higher-priority task. + + Args: + by_task: Name of preempting task + joints: Joints that were preempted + """ + if joints & self._joint_names: + logger.warning( + f"PathFollowerTask {self._name} preempted by {by_task} on joints {joints}" + ) + if self._state == "following": + self._state = "aborted" + + # ========================================================================= + # Task-specific methods + # ========================================================================= + + def start_path(self, path: Path, current_odom: PoseStamped) -> bool: + """Start following a new path. + + Args: + path: Path to follow + current_odom: Current robot pose + + Returns: + True if accepted, False if invalid + """ + if path is None or len(path.poses) < 2: + logger.warning(f"PathFollowerTask {self._name}: invalid path") + return False + + self._path = path + self._path_distancer = PathDistancer(path) + self._velocity_profiler = VelocityProfiler( + max_linear_speed=self._config.max_linear_speed, + max_angular_speed=2.0, + max_linear_accel=1.0, + max_linear_decel=2.0, + max_centripetal_accel=1.5, + min_speed=0.1, + ) + + self._cross_track_controller.reset() + + self._current_odom = current_odom + self._closest_index = self._path_distancer.find_closest_point_index( + np.array([current_odom.position.x, current_odom.position.y]) + ) + self._state = "following" + + logger.info( + f"PathFollowerTask {self._name} started following path with " + f"{len(path.poses)} points" + ) + return True + + def update_odom(self, odom: PoseStamped) -> None: + """Update current robot pose. + + Args: + odom: Current robot pose + """ + self._current_odom = odom + + def cancel(self) -> bool: + """Cancel current path following. + + Returns: + True if cancelled, False if not following + """ + if self._state != "following": + return False + self._state = "aborted" + logger.info(f"PathFollowerTask {self._name} cancelled") + return True + + def reset(self) -> bool: + """Reset to idle state. + + Returns: + True if reset, False if currently following + """ + if self._state == "following": + logger.warning(f"Cannot reset {self._name} while following") + return False + self._state = "idle" + self._path = None + self._path_distancer = None + self._velocity_profiler = None + self._current_odom = None + logger.info(f"PathFollowerTask {self._name} reset to IDLE") + return True + + def get_state(self) -> State: + """Get current state.""" + return self._state \ No newline at end of file diff --git a/dimos/navigation/replanning_a_star/velocity_profiler.py b/dimos/navigation/replanning_a_star/velocity_profiler.py new file mode 100644 index 000000000..e4e148244 --- /dev/null +++ b/dimos/navigation/replanning_a_star/velocity_profiler.py @@ -0,0 +1,275 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Velocity profiler for path-aware speed control.""" + +import numpy as np +from numpy.typing import NDArray + +from dimos.msgs.nav_msgs import Path + + +class VelocityProfiler: + """Computes optimal velocity profile along a path based on curvature and constraints. + + The profiler: + 1. Calculates curvature at each path point + 2. Determines max safe speed based on curvature (centripetal force limits) + 3. Applies acceleration/deceleration constraints + 4. Returns speed at each point + """ + + def __init__( + self, + max_linear_speed: float = 0.8, + max_angular_speed: float = 1.5, + max_linear_accel: float = 1.0, + max_linear_decel: float = 2.0, + max_centripetal_accel: float = 1.0, + min_speed: float = 0.05, + ): + """Initialize velocity profiler. + + Args: + max_linear_speed: Maximum forward speed (m/s) + max_angular_speed: Maximum angular speed (rad/s) + max_linear_accel: Maximum linear acceleration (m/s^2) + max_linear_decel: Maximum linear deceleration (m/s^2) + max_centripetal_accel: Maximum centripetal acceleration (m/s^2) + min_speed: Minimum speed to maintain (m/s) + """ + self._max_linear_speed = max_linear_speed + self._max_angular_speed = max_angular_speed + self._max_linear_accel = max_linear_accel + self._max_linear_decel = max_linear_decel + self._max_centripetal_accel = max_centripetal_accel + self._min_speed = min_speed + + self._cached_path_id: int | None = None + self._cached_profile: NDArray[np.float64] | None = None + + def compute_profile(self, path: Path) -> NDArray[np.float64]: + """Compute velocity profile for entire path. + + Args: + path: Path to profile + + Returns: + Array of velocities (m/s) for each path point + """ + if len(path.poses) < 2: + return np.array([self._min_speed]) + + # Convert path to numpy array + path_points = np.array([[p.position.x, p.position.y] for p in path.poses]) + + # Calculate curvatures + curvatures = self._compute_curvatures(path_points) + + # Calculate max speeds based on curvature + max_speeds = self._compute_max_speeds_from_curvature(curvatures) + + # Apply acceleration constraints (forward pass) + velocities = self._apply_acceleration_constraints( + path_points, max_speeds, forward=True + ) + + # Apply deceleration constraints (backward pass) + velocities = self._apply_acceleration_constraints( + path_points, velocities, forward=False + ) + + # Ensure minimum speed + velocities = np.maximum(velocities, self._min_speed) + + return velocities + + def get_velocity_at_index(self, path: Path, index: int) -> float: + """Get velocity at specific path index. + + Args: + path: Path + index: Path point index + + Returns: + Velocity at that point (m/s) + """ + # The profile is recomputed only when the path object changes. + path_id = id(path) + if self._cached_path_id != path_id or self._cached_profile is None: + self._cached_profile = self.compute_profile(path) + self._cached_path_id = path_id + + idx = min(max(0, index), len(self._cached_profile) - 1) + return float(self._cached_profile[idx]) + + def _compute_curvatures(self, path_points: NDArray[np.float64]) -> NDArray[np.float64]: + """Compute curvature at each path point. + + Curvature κ = |dθ/ds| where θ is heading and s is arc length. + For discrete points, we use: κ = |Δθ| / |Δs| + + Args: + path_points: Array of [x, y] points + + Returns: + Array of curvatures (1/m) at each point + """ + n = len(path_points) + if n < 3: + return np.zeros(n) + + curvatures = np.zeros(n) + + # First point: use first segment + if n > 1: + ds1 = np.linalg.norm(path_points[1] - path_points[0]) + if n > 2: + ds2 = np.linalg.norm(path_points[2] - path_points[1]) + dtheta = self._angle_between_segments( + path_points[0], path_points[1], path_points[2] + ) + if ds1 + ds2 > 1e-6: + curvatures[0] = abs(dtheta) / (ds1 + ds2) + + # Middle points: use three-point method + for i in range(1, n - 1): + p0 = path_points[i - 1] + p1 = path_points[i] + p2 = path_points[i + 1] + + ds1 = np.linalg.norm(p1 - p0) + ds2 = np.linalg.norm(p2 - p1) + dtheta = self._angle_between_segments(p0, p1, p2) + + if ds1 + ds2 > 1e-6: + curvatures[i] = abs(dtheta) / (ds1 + ds2) + + # Last point: use last segment + if n > 1: + ds1 = np.linalg.norm(path_points[n - 1] - path_points[n - 2]) + if n > 2: + ds2 = np.linalg.norm(path_points[n - 2] - path_points[n - 3]) + dtheta = self._angle_between_segments( + path_points[n - 3], path_points[n - 2], path_points[n - 1] + ) + if ds1 + ds2 > 1e-6: + curvatures[n - 1] = abs(dtheta) / (ds1 + ds2) + + return curvatures + + def _angle_between_segments( + self, p0: NDArray[np.float64], p1: NDArray[np.float64], p2: NDArray[np.float64] + ) -> float: + """Compute angle change between two path segments. + + Args: + p0, p1, p2: Three consecutive points + + Returns: + Angle change in radians + """ + v1 = p1 - p0 + v2 = p2 - p1 + + # Normalize + norm1 = np.linalg.norm(v1) + norm2 = np.linalg.norm(v2) + + if norm1 < 1e-6 or norm2 < 1e-6: + return 0.0 + + v1 = v1 / norm1 + v2 = v2 / norm2 + + # Angle between vectors + cos_angle = np.clip(np.dot(v1, v2), -1.0, 1.0) + angle = np.arccos(cos_angle) + + # Determine sign using cross product (for 2D) + cross = v1[0] * v2[1] - v1[1] * v2[0] + if cross < 0: + angle = -angle + + return float(angle) + + def _compute_max_speeds_from_curvature( + self, curvatures: NDArray[np.float64] + ) -> NDArray[np.float64]: + """Compute maximum safe speed based on curvature. + + From centripetal acceleration: a = v²/r = v²·κ + Therefore: v_max = sqrt(a_max / κ) + + Args: + curvatures: Curvature at each point (1/m) + + Returns: + Maximum safe speeds (m/s) + """ + max_speeds = np.full(len(curvatures), self._max_linear_speed) + + # For non-zero curvature, limit speed + nonzero_curvature = curvatures > 1e-6 + if np.any(nonzero_curvature): + # v_max = sqrt(a_max / κ) + curvature_limited = np.sqrt( + self._max_centripetal_accel / curvatures[nonzero_curvature] + ) + max_speeds[nonzero_curvature] = np.minimum( + max_speeds[nonzero_curvature], curvature_limited + ) + + return max_speeds + + def _apply_acceleration_constraints( + self, + path_points: NDArray[np.float64], + max_speeds: NDArray[np.float64], + forward: bool = True, + ) -> NDArray[np.float64]: + """Apply acceleration/deceleration constraints. + + Args: + path_points: Path points + max_speeds: Maximum speeds at each point + forward: If True, apply forward (acceleration), else backward (deceleration) + + Returns: + Velocities with acceleration constraints applied + """ + velocities = max_speeds.copy() + max_accel = self._max_linear_accel if forward else self._max_linear_decel + + if forward: + # Forward pass: can't accelerate too fast + for i in range(1, len(path_points)): + ds = np.linalg.norm(path_points[i] - path_points[i - 1]) + if ds > 1e-6: + # v² = v₀² + 2·a·s + # v = sqrt(v₀² + 2·a·s) + v_prev = velocities[i - 1] + v_max_from_accel = np.sqrt(v_prev**2 + 2 * max_accel * ds) + velocities[i] = min(velocities[i], v_max_from_accel) + else: + # Backward pass: can't decelerate too fast + for i in range(len(path_points) - 2, -1, -1): + ds = np.linalg.norm(path_points[i + 1] - path_points[i]) + if ds > 1e-6: + # v² = v₀² + 2·a·s (backward) + v_next = velocities[i + 1] + v_max_from_decel = np.sqrt(v_next**2 + 2 * max_accel * ds) + velocities[i] = min(velocities[i], v_max_from_decel) + + return velocities \ No newline at end of file diff --git a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py index 5d096444d..33938677c 100644 --- a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py +++ b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py @@ -19,13 +19,27 @@ from dimos.navigation.frontier_exploration import wavefront_frontier_explorer from dimos.navigation.replanning_a_star.module import replanning_a_star_planner from dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic import unitree_go2_basic +from dimos.control.coordinator import control_coordinator +# Coordinator blueprint +coordinator = control_coordinator( + tick_rate=100.0, + hardware=[], # Will be added via auto-setup + tasks=[], # PathFollowerTask added via auto-setup +) + +# Blueprint with coordinator-based path following unitree_go2 = autoconnect( unitree_go2_basic, voxel_mapper(voxel_size=0.1), cost_mapper(), replanning_a_star_planner(), wavefront_frontier_explorer(), + coordinator, ).global_config(n_dask_workers=6, robot_model="unitree_go2") -__all__ = ["unitree_go2"] + +# Keep old name for backward compatibility (but it's the same now) +unitree_go2_with_coordinator = unitree_go2 + +__all__ = ["unitree_go2", "unitree_go2_with_coordinator"] \ No newline at end of file