diff --git a/dimos/agents/skills/navigation.py b/dimos/agents/skills/navigation.py index d88bec452e..a72465eb23 100644 --- a/dimos/agents/skills/navigation.py +++ b/dimos/agents/skills/navigation.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math import time from typing import Any @@ -37,6 +38,14 @@ logger = setup_logger() +def _make_position_goal(x: float, y: float, yaw_rad: float, frame_id: str) -> PoseStamped: + return PoseStamped( + position=make_vector3(x, y, 0.0), + orientation=Quaternion.from_euler(make_vector3(0.0, 0.0, yaw_rad)), + frame_id=frame_id, + ) + + class NavigationSkillContainer(Module): _latest_image: Image | None = None _latest_odom: PoseStamped | None = None @@ -243,6 +252,69 @@ def _navigate_using_semantic_map(self, query: str) -> str: message = f"Found a location in the semantic map matching '{query}'." return self._navigate_to(goal_pose, message) + @skill + def navigate_to_position( + self, + x: float, + y: float, + yaw_deg: float = 0.0, + ) -> str: + """Navigate to an absolute position in the map frame. + + Use this to act on a pose returned by another tool — pass its x, y, and yaw_deg straight in. Positions are interpreted in the map frame. + + Args: + x: target x in meters + y: target y in meters + yaw_deg: final heading in degrees, 0 = facing +x + """ + if not self._skill_started: + raise ValueError(f"{self} has not been started.") + + goal = _make_position_goal(x, y, math.radians(yaw_deg), "map") + return self._navigate_to(goal, f"Heading to ({x:.2f}, {y:.2f})") + + @skill + def rotate_toward_position( + self, + x: float, + y: float, + ) -> str: + """Rotate in place to face an absolute position in the map frame. + + Pass the target x and y from a pose's fields. Yaw is computed against the robot's current odometry. Positions are interpreted in the map frame. + + Args: + x: target x in meters + y: target y in meters + """ + if not self._skill_started: + raise ValueError(f"{self} has not been started.") + + if self._latest_odom is None: + return "No odometry data received yet, cannot rotate." + + cur_x = self._latest_odom.position.x + cur_y = self._latest_odom.position.y + yaw_rad = math.atan2(y - cur_y, x - cur_x) + + goal = _make_position_goal(cur_x, cur_y, yaw_rad, "map") + return self._navigate_to(goal, f"Rotating to face ({x:.2f}, {y:.2f})") + + @skill + def current_pose(self) -> PoseStamped: + """Return the robot's current pose. + + Use this to reason about a target pose relative to the robot (e.g. distance or bearing). Compare only against poses with the same frame_id. + """ + if not self._skill_started: + raise ValueError(f"{self} has not been started.") + + if self._latest_odom is None: + raise RuntimeError("No odometry data received yet.") + + return self._latest_odom + @skill def stop_navigation(self) -> str: """Immediatly stop moving.""" @@ -273,8 +345,4 @@ def _get_goal_pose_from_result(self, result: dict[str, Any]) -> PoseStamped | No pos_y = first.get("pos_y", 0) theta = first.get("rot_z", 0) - return PoseStamped( - position=make_vector3(pos_x, pos_y, 0), - orientation=Quaternion.from_euler(make_vector3(0, 0, theta)), - frame_id="map", - ) + return _make_position_goal(pos_x, pos_y, theta, "map") diff --git a/dimos/agents/skills/test_navigation.py b/dimos/agents/skills/test_navigation.py index f206d63ba0..a4fd98c0d9 100644 --- a/dimos/agents/skills/test_navigation.py +++ b/dimos/agents/skills/test_navigation.py @@ -12,9 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math from typing import Any from langchain_core.messages import HumanMessage +import pytest from dimos.agents.skills.navigation import NavigationSkillContainer from dimos.core.core import rpc @@ -117,6 +119,21 @@ def _navigate_using_semantic_map(self, query): return f"Successfuly arrived at '{query}'" +class MockedPositionNavSkill(NavigationSkillContainer): + """Direct-instantiation harness for navigate_to_position / + rotate_toward_position / current_pose tests. Skips the heavy parent + __init__ and records goals at the _navigate_to boundary.""" + + def __init__(self, latest_odom: PoseStamped | None = None) -> None: + self._skill_started = True + self._latest_odom = latest_odom + self.recorded_goals: list[PoseStamped] = [] + + def _navigate_to(self, pose: PoseStamped, message: str) -> str: + self.recorded_goals.append(pose) + return message + + def test_stop_movement(agent_setup) -> None: history = agent_setup( blueprints=[ @@ -159,3 +176,65 @@ def test_go_to_semantic_location(agent_setup) -> None: ) assert "success" in history[-1].content.lower() + + +def test_navigate_to_position_sets_goal_with_yaw() -> None: + skill = MockedPositionNavSkill() + skill.navigate_to_position(x=3.0, y=4.0, yaw_deg=90.0) + + assert len(skill.recorded_goals) == 1 + goal = skill.recorded_goals[0] + assert goal.frame_id == "map" + assert goal.position.x == pytest.approx(3.0) + assert goal.position.y == pytest.approx(4.0) + assert math.degrees(goal.yaw) == pytest.approx(90.0, abs=0.1) + + +def test_rotate_toward_position_computes_yaw_from_odom() -> None: + skill = MockedPositionNavSkill( + latest_odom=PoseStamped( + position=(0.0, 0.0, 0.0), + orientation=(0.0, 0.0, 0.0, 1.0), + frame_id="map", + ) + ) + skill.rotate_toward_position(x=0.0, y=1.0) + + assert len(skill.recorded_goals) == 1 + goal = skill.recorded_goals[0] + assert goal.position.x == pytest.approx(0.0) + assert goal.position.y == pytest.approx(0.0) + assert math.degrees(goal.yaw) == pytest.approx(90.0, abs=0.1) + + +def test_rotate_toward_position_without_odom_returns_message() -> None: + skill = MockedPositionNavSkill() + + result = skill.rotate_toward_position(x=1.0, y=0.0) + + assert "no odometry" in result.lower() + assert skill.recorded_goals == [] + + +def test_current_pose_returns_pose_stamped() -> None: + odom = PoseStamped( + position=(1.0, 2.0, 0.0), + orientation=(0.0, 0.0, 0.0, 1.0), + frame_id="map", + ) + skill = MockedPositionNavSkill(latest_odom=odom) + + result = skill.current_pose() + + assert isinstance(result, PoseStamped) + assert result is odom + assert result.position.x == pytest.approx(1.0) + assert result.position.y == pytest.approx(2.0) + assert result.frame_id == "map" + + +def test_current_pose_without_odom_raises() -> None: + skill = MockedPositionNavSkill() + + with pytest.raises(RuntimeError, match="No odometry"): + skill.current_pose() diff --git a/dimos/msgs/geometry_msgs/PoseStamped.py b/dimos/msgs/geometry_msgs/PoseStamped.py index acf0af8b32..b29c45eff5 100644 --- a/dimos/msgs/geometry_msgs/PoseStamped.py +++ b/dimos/msgs/geometry_msgs/PoseStamped.py @@ -14,9 +14,10 @@ from __future__ import annotations +import json import math import time -from typing import TYPE_CHECKING, BinaryIO, TypeAlias +from typing import TYPE_CHECKING, Any, BinaryIO, TypeAlias if TYPE_CHECKING: from rerun._baseclasses import Archetype @@ -82,6 +83,27 @@ def __str__(self) -> str: f"euler=[{math.degrees(self.roll):.1f}, {math.degrees(self.pitch):.1f}, {math.degrees(self.yaw):.1f}])" ) + def agent_encode(self) -> list[dict[str, Any]]: + """Render the pose for an LLM agent. + + Returns an MCP text content block whose text is a JSON object with + ``frame_id``, ``x``, ``y``, ``z``, ``roll_deg``, ``pitch_deg``, + ``yaw_deg``. Pass these straight into navigation skills (e.g. + ``navigate_to_position``) to act on the pose. To reason about the + pose relative to the robot, the agent should fetch its own pose + (e.g. via ``current_pose``) and combine the two. + """ + payload = { + "frame_id": self.frame_id, + "x": round(self.x, 3), + "y": round(self.y, 3), + "z": round(self.z, 3), + "roll_deg": round(math.degrees(self.roll), 1), + "pitch_deg": round(math.degrees(self.pitch), 1), + "yaw_deg": round(math.degrees(self.yaw), 1), + } + return [{"type": "text", "text": json.dumps(payload)}] + def to_rerun(self) -> Archetype: """Convert to rerun Transform3D format. diff --git a/dimos/msgs/geometry_msgs/test_PoseStamped.py b/dimos/msgs/geometry_msgs/test_PoseStamped.py index a486f33303..7b503598da 100644 --- a/dimos/msgs/geometry_msgs/test_PoseStamped.py +++ b/dimos/msgs/geometry_msgs/test_PoseStamped.py @@ -12,10 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json +import math import pickle import time +import pytest + from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Vector3 import Vector3 def test_lcm_encode_decode() -> None: @@ -53,3 +59,58 @@ def test_pickle_encode_decode() -> None: assert isinstance(pose_dest, PoseStamped) assert pose_dest is not pose_source assert pose_dest == pose_source + + +def test_agent_encode_returns_absolute_fields() -> None: + """Test that agent_encode returns frame_id, position, and full orientation.""" + + pose = PoseStamped( + position=Vector3(1.0, 2.0, 3.0), + orientation=Quaternion.from_euler( + Vector3(math.radians(10.0), math.radians(20.0), math.radians(45.0)) + ), + frame_id="map", + ) + encoded = pose.agent_encode() + + assert len(encoded) == 1 + assert encoded[0]["type"] == "text" + data = json.loads(encoded[0]["text"]) + + assert set(data.keys()) == { + "frame_id", + "x", + "y", + "z", + "roll_deg", + "pitch_deg", + "yaw_deg", + } + assert data["frame_id"] == "map" + assert data["x"] == pytest.approx(1.0) + assert data["y"] == pytest.approx(2.0) + assert data["z"] == pytest.approx(3.0) + assert data["roll_deg"] == pytest.approx(10.0, abs=0.1) + assert data["pitch_deg"] == pytest.approx(20.0, abs=0.1) + assert data["yaw_deg"] == pytest.approx(45.0, abs=0.1) + + +def test_agent_encode_rounds_values() -> None: + """Test that agent_encode rounds position to 3 dp and angles to 1 dp.""" + + pose = PoseStamped( + position=Vector3(1.23456, 2.34567, 3.45678), + orientation=Quaternion.from_euler( + Vector3(math.radians(10.123), math.radians(20.456), math.radians(45.789)) + ), + frame_id="map", + ) + encoded = pose.agent_encode() + data = json.loads(encoded[0]["text"]) + + assert data["x"] == pytest.approx(1.235) + assert data["y"] == pytest.approx(2.346) + assert data["z"] == pytest.approx(3.457) + assert data["roll_deg"] == pytest.approx(10.1, abs=0.05) + assert data["pitch_deg"] == pytest.approx(20.5, abs=0.05) + assert data["yaw_deg"] == pytest.approx(45.8, abs=0.05)