Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Server/src/transport/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class SessionDetails(BaseModel):
hash: str
unity_version: str
connected_at: str
project_path: str | None = None


class SessionList(BaseModel):
Expand Down
1 change: 1 addition & 0 deletions Server/src/transport/plugin_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ async def get_sessions(cls, user_id: str | None = None) -> SessionList:
hash=session.project_hash,
unity_version=session.unity_version,
connected_at=session.connected_at.isoformat(),
project_path=session.project_path,
)
for session_id, session in sessions.items()
}
Expand Down
168 changes: 164 additions & 4 deletions Server/src/transport/unity_instance_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
into the request-scoped state, allowing tools to access it via ctx.get_state("unity_instance").
"""
from threading import RLock
import asyncio
import logging
import os
import time
from urllib.parse import unquote, urlparse

from fastmcp.server.middleware import Middleware, MiddlewareContext

Expand Down Expand Up @@ -47,6 +50,28 @@ def set_unity_instance_middleware(middleware: 'UnityInstanceMiddleware') -> None
_unity_instance_middleware = middleware


def _file_uri_to_path(uri: str) -> str | None:
"""Convert a ``file://`` URI to a local filesystem path (UNC- and drive-aware)."""
if not uri.startswith("file://"):
return None
parsed = urlparse(uri)
host = (parsed.netloc or "").strip()
path = unquote(parsed.path or "")
if host and host.lower() != "localhost":
path = f"//{host}{path}" # UNC: file://server/share/... -> //server/share/...
elif os.name == "nt" and len(path) >= 3 and path[0] == "/" and path[2] == ":":
path = path[1:] # drive-letter form /C:/... -> C:/...
return path or None


def _strip_assets(project_path: str) -> str:
"""Return the Unity project root (stdio reports ``.../Assets``, HTTP the root)."""
normalized = project_path.replace("\\", "/").rstrip("/")
if normalized.lower().endswith("/assets"):
return normalized[: -len("/assets")]
return normalized


class UnityInstanceMiddleware(Middleware):
"""
Middleware that manages per-session Unity instance selection.
Expand All @@ -58,6 +83,8 @@ class UnityInstanceMiddleware(Middleware):
def __init__(self):
super().__init__()
self._active_by_key: dict[str, str] = {}
self._root_dirs_by_key: dict[str, list[str]] = {}
self._root_dir_tasks_by_key: dict[str, asyncio.Task] = {}
self._lock = RLock()
self._metadata_lock = RLock()
self._unity_managed_tool_names: set[str] = set()
Expand Down Expand Up @@ -224,6 +251,110 @@ async def _resolve_instance_value(self, value: str, ctx) -> str:
"Read mcpforunity://instances for current sessions."
)

async def _resolve_launch_dirs(self, ctx) -> list[str]:
"""Discover the directories the client is working in, client-agnostic.

The package's own ``UNITY_MCP_PROJECT_DIR`` override wins (a single dir);
otherwise the ``file://`` MCP roots the client advertises - the
protocol-native signal for what the session is working on, of which there
may be several. Empty when neither is available.
"""
explicit = os.environ.get("UNITY_MCP_PROJECT_DIR")
if explicit:
return [explicit]
return await self._client_root_dirs(ctx)

async def _client_root_dirs(self, ctx) -> list[str]:
"""The client's ``file://`` MCP roots, resolved once per session.

Cached because each lookup is a round-trip to the client and a session's
working directory does not change underneath us; this keeps the no-match
path off the wire on every subsequent tool call.
"""
key = await self.get_session_key(ctx)
with self._lock:
cached = self._root_dirs_by_key.get(key)
if cached is not None:
return cached
# Memoize the in-flight probe so concurrent first calls share one
# round-trip instead of each issuing their own.
task = self._root_dir_tasks_by_key.get(key)
if task is None:
task = asyncio.create_task(self._fetch_client_root_dirs(ctx))
self._root_dir_tasks_by_key[key] = task
dirs = await task
with self._lock:
self._root_dir_tasks_by_key.pop(key, None)
self._root_dirs_by_key[key] = dirs
return dirs
Comment thread
imurashka marked this conversation as resolved.

@staticmethod
async def _fetch_client_root_dirs(ctx) -> list[str]:
"""Query MCP roots and return their local paths (``file://`` only).

The caller caches the result per session, so a client that does not
support roots costs at most one failed probe, not one per tool call.
"""
list_roots = getattr(ctx, "list_roots", None)
if not callable(list_roots):
return []
try:
roots = await list_roots()
except Exception:
# Client does not implement roots, or the request failed; not fatal.
return []
dirs: list[str] = []
for root in roots or []:
path = _file_uri_to_path(str(getattr(root, "uri", "") or ""))
if path:
dirs.append(path)
return dirs

@staticmethod
def _select_instance_by_launch_dir(
candidates: list[tuple[str | None, str | None]],
launch_dirs: list[str],
) -> str | None:
"""Pick the single connected editor whose project matches a launch dir.

Each candidate's project path is normalized to the project root (Unity
reports the ``Assets`` folder over stdio but the project root over HTTP)
and matched by path lineage against every launch directory - the project
root contains, equals, or is contained by one of them. This routes
per-checkout and git-worktree setups (identical project names, different
paths) without an explicit ``unity_instance``. Returns the id only when
exactly one editor matches; otherwise None, so the caller keeps its
"ask the user to choose" behavior.
"""
launch_reals = [
os.path.normcase(os.path.realpath(d)) for d in launch_dirs if d
]
if not launch_reals:
return None

matches: set[str] = set()
for inst_id, project_path in candidates:
if not inst_id:
continue
project_root = _strip_assets(project_path) if project_path else ""
if not project_root:
# A connected instance we cannot place (e.g. an older plugin that
# does not report project_path). Refuse to guess among the rest.
return None
project_real = os.path.normcase(os.path.realpath(project_root))
for launch_real in launch_reals:
try:
shared = os.path.commonpath([launch_real, project_real])
except ValueError:
continue # e.g. paths on different Windows drives
if shared in (launch_real, project_real):
matches.add(inst_id)
break

if len(matches) == 1:
return next(iter(matches))
return None
Comment thread
imurashka marked this conversation as resolved.

async def _maybe_autoselect_instance(self, ctx) -> str | None:
"""
Auto-select the sole Unity instance when no active instance is set.
Expand All @@ -241,13 +372,17 @@ async def _maybe_autoselect_instance(self, ctx) -> str | None:
try:
sessions_data = await PluginHub.get_sessions()
sessions = sessions_data.sessions or {}
ids: list[str] = []
candidates: list[tuple[str | None, str | None]] = []
for session_info in sessions.values():
project = getattr(
session_info, "project", None) or "Unknown"
hash_value = getattr(session_info, "hash", None)
if hash_value:
ids.append(f"{project}@{hash_value}")
candidates.append((
f"{project}@{hash_value}",
getattr(session_info, "project_path", None),
))
ids = [inst_id for inst_id, _ in candidates]
if len(ids) == 1:
chosen = ids[0]
await self.set_active_instance(ctx, chosen)
Expand All @@ -257,6 +392,17 @@ async def _maybe_autoselect_instance(self, ctx) -> str | None:
)
return chosen
if len(ids) > 1:
launch_dirs = await self._resolve_launch_dirs(ctx)
chosen = self._select_instance_by_launch_dir(
candidates, launch_dirs)
if chosen:
await self.set_active_instance(ctx, chosen)
logger.info(
"Auto-selected Unity instance %s via launch directory "
"(of %d running) over PluginHub.",
chosen, len(ids),
)
return chosen
logger.info(
"Multiple Unity instances found (%d). Pass unity_instance on any tool call "
"or call set_active_instance to choose one. Available: %s",
Expand Down Expand Up @@ -284,8 +430,11 @@ async def _maybe_autoselect_instance(self, ctx) -> str | None:

pool = get_unity_connection_pool()
instances = pool.discover_all_instances(force_refresh=True)
ids = [getattr(inst, "id", None) for inst in instances]
ids = [inst_id for inst_id in ids if inst_id]
candidates = [
(getattr(inst, "id", None), getattr(inst, "path", None))
for inst in instances
]
ids = [inst_id for inst_id, _ in candidates if inst_id]
if len(ids) == 1:
chosen = ids[0]
await self.set_active_instance(ctx, chosen)
Expand All @@ -295,6 +444,17 @@ async def _maybe_autoselect_instance(self, ctx) -> str | None:
)
return chosen
if len(ids) > 1:
launch_dirs = await self._resolve_launch_dirs(ctx)
chosen = self._select_instance_by_launch_dir(
candidates, launch_dirs)
if chosen:
await self.set_active_instance(ctx, chosen)
logger.info(
"Auto-selected Unity instance %s via launch directory "
"(of %d running) via stdio discovery.",
chosen, len(ids),
)
return chosen
logger.info(
"Multiple Unity instances found (%d). Pass unity_instance on any tool call "
"or call set_active_instance to choose one. Available: %s",
Expand Down
Loading