-
Notifications
You must be signed in to change notification settings - Fork 827
feat: add proactive context compression to conversation managers #2239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,14 +1,33 @@ | ||
| """Abstract interface for conversation history management.""" | ||
|
|
||
| import logging | ||
| from abc import ABC, abstractmethod | ||
| from typing import TYPE_CHECKING, Any | ||
| from typing import TYPE_CHECKING, Any, TypedDict, Union | ||
|
|
||
| from ...hooks.events import BeforeModelCallEvent | ||
| from ...hooks.registry import HookProvider, HookRegistry | ||
| from ...types.content import Message | ||
|
|
||
| if TYPE_CHECKING: | ||
| from ...agent.agent import Agent | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| DEFAULT_COMPRESSION_THRESHOLD = 0.7 | ||
| DEFAULT_CONTEXT_WINDOW_LIMIT = 200_000 | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: This constant introduces implicit behavior that diverges from the design doc, which states: "Users must set contextWindowLimit on their model config until we ship per-provider default lookup tables." A model with a 32k context window using this 200k fallback would never trigger proactive compression until the real overflow occurs — defeating the purpose of the feature. The warning at line 140 is good, but it only fires once and at Suggestion: Consider one of:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be resolved once we add the lookup table to Python (WIP) |
||
|
|
||
|
|
||
| class ProactiveCompressionConfig(TypedDict, total=False): | ||
| """Configuration for proactive compression when passed as an object. | ||
|
|
||
| Attributes: | ||
| compression_threshold: Ratio of context window usage that triggers proactive compression. | ||
| Value between 0 (exclusive) and 1 (inclusive). | ||
| Defaults to 0.7 (compress when 70% of the context window is used). | ||
| """ | ||
|
|
||
| compression_threshold: float | ||
|
|
||
|
|
||
| class ConversationManager(ABC, HookProvider): | ||
| """Abstract base class for managing conversation history. | ||
|
|
@@ -22,45 +41,122 @@ class ConversationManager(ABC, HookProvider): | |
|
|
||
| ConversationManager implements the HookProvider protocol, allowing derived classes to register hooks for agent | ||
| lifecycle events. Derived classes that override register_hooks must call the base implementation to ensure proper | ||
| hook registration. | ||
| hook registration chain. | ||
|
|
||
| The primary responsibility of a ConversationManager is overflow recovery: when the model encounters a context | ||
| window overflow, :meth:`reduce_context` is called with ``e`` set and MUST reduce the history enough for the next | ||
| model call to succeed. | ||
|
|
||
| Subclasses can enable proactive compression by passing ``proactive_compression`` in the constructor. | ||
| When enabled, the base class registers a ``BeforeModelCallEvent`` hook that checks projected input tokens | ||
| against the model's context window limit and calls :meth:`reduce_context` (without ``e``) when the | ||
| threshold is exceeded. This is a best-effort operation — errors are swallowed so the model call can | ||
| still proceed. | ||
|
|
||
| Example: | ||
| ```python | ||
| class MyConversationManager(ConversationManager): | ||
| def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None: | ||
| super().register_hooks(registry, **kwargs) | ||
| # Register additional hooks here | ||
| # Enable proactive compression with default threshold (0.7) | ||
| SlidingWindowConversationManager(window_size=50, proactive_compression=True) | ||
|
|
||
| # Enable proactive compression with custom threshold | ||
| SummarizingConversationManager(proactive_compression={"compression_threshold": 0.8}) | ||
| ``` | ||
| """ | ||
|
|
||
| def __init__(self) -> None: | ||
| def __init__(self, *, proactive_compression: Union[bool, "ProactiveCompressionConfig", None] = None) -> None: | ||
| """Initialize the ConversationManager. | ||
|
|
||
| Args: | ||
| proactive_compression: Enable proactive context compression before the model call. | ||
| - ``True``: compress when 70% of the context window is used (default threshold). | ||
| - ``{"compression_threshold": float}``: compress at the specified ratio (0, 1]. | ||
| - ``False`` or ``None``: disabled, only reactive overflow recovery is used. | ||
|
|
||
| Raises: | ||
| ValueError: If compression_threshold is not in the valid range (0, 1]. | ||
|
|
||
| Attributes: | ||
| removed_message_count: The messages that have been removed from the agents messages array. | ||
| These represent messages provided by the user or LLM that have been removed, not messages | ||
| included by the conversation manager through something like summarization. | ||
| """ | ||
| # Resolve the threshold from proactive_compression parameter | ||
| if proactive_compression is True: | ||
| threshold: float | None = DEFAULT_COMPRESSION_THRESHOLD | ||
| elif isinstance(proactive_compression, dict): | ||
| threshold = proactive_compression.get("compression_threshold", DEFAULT_COMPRESSION_THRESHOLD) | ||
| else: | ||
| threshold = None | ||
|
|
||
| if threshold is not None and (threshold <= 0 or threshold > 1): | ||
| raise ValueError( | ||
| f"compression_threshold must be between 0 (exclusive) and 1 (inclusive), got {threshold}" | ||
| ) | ||
|
|
||
| self.removed_message_count = 0 | ||
| self._compression_threshold = threshold | ||
| self._context_window_limit_warned = False | ||
|
|
||
| def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None: | ||
| """Register hooks for agent lifecycle events. | ||
|
|
||
| Always registers a ``BeforeModelCallEvent`` hook for proactive compression. | ||
| When ``proactive_compression`` is not configured, the handler is a no-op (early return). | ||
|
|
||
| Derived classes that override this method must call the base implementation to ensure proper hook | ||
| registration chain. | ||
|
|
||
| Args: | ||
| registry: The hook registry to register callbacks with. | ||
| **kwargs: Additional keyword arguments for future extensibility. | ||
| """ | ||
| # Always subscribe — the threshold check happens inside the handler | ||
| registry.add_callback(BeforeModelCallEvent, self._on_before_model_call_threshold) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: The hook is now always registered even when Suggestion: This is likely acceptable for simplicity and consistency (matching the TypeScript approach), but consider adding a brief comment here explaining why "always register" was chosen over "register only when configured" — e.g., for dynamic enable/disable in the future, or to keep registration logic simple. |
||
|
|
||
| Example: | ||
| ```python | ||
| def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None: | ||
| super().register_hooks(registry, **kwargs) | ||
| registry.add_callback(SomeEvent, self.on_some_event) | ||
| ``` | ||
| def _on_before_model_call_threshold(self, event: BeforeModelCallEvent) -> None: | ||
| """Handle BeforeModelCallEvent for proactive compression. | ||
|
|
||
| When proactive compression is not configured, this is a no-op. | ||
| When configured, checks projected input tokens against the context window limit | ||
| and calls reduce_context() without error (best-effort) when threshold is exceeded. | ||
|
|
||
| Args: | ||
| event: The before model call event. | ||
| """ | ||
| pass | ||
| # Early return if proactive compression is not enabled | ||
| if self._compression_threshold is None: | ||
| return | ||
|
|
||
| context_window_limit = event.agent.model.context_window_limit | ||
| if context_window_limit is None: | ||
| context_window_limit = DEFAULT_CONTEXT_WINDOW_LIMIT | ||
| if not self._context_window_limit_warned: | ||
| self._context_window_limit_warned = True | ||
| logger.warning( | ||
| "context_window_limit=<%s> | context_window_limit not set on model, using default." | ||
| " Set context_window_limit in your model config for accurate proactive compression", | ||
| DEFAULT_CONTEXT_WINDOW_LIMIT, | ||
| ) | ||
|
|
||
| if event.projected_input_tokens is None: | ||
| logger.debug("projected_input_tokens=<None> | skipping proactive compression") | ||
| return | ||
|
|
||
| ratio = event.projected_input_tokens / context_window_limit | ||
| if ratio >= self._compression_threshold: | ||
| logger.debug( | ||
| "projected_tokens=<%s>, limit=<%s>, ratio=<%.2f>, compression_threshold=<%s>" | ||
| " | compression threshold exceeded, reducing context", | ||
| event.projected_input_tokens, | ||
| context_window_limit, | ||
| ratio, | ||
| self._compression_threshold, | ||
| ) | ||
| # Proactive compression is best-effort: swallow errors so the model call can still proceed. | ||
| try: | ||
|
opieter-aws marked this conversation as resolved.
|
||
| self.reduce_context(agent=event.agent) | ||
| except Exception: | ||
| logger.debug("proactive compression failed, will proceed with model call", exc_info=True) | ||
|
|
||
| def restore_from_session(self, state: dict[str, Any]) -> list[Message] | None: | ||
| """Restore the Conversation Manager's state from a session. | ||
|
|
@@ -99,22 +195,24 @@ def apply_management(self, agent: "Agent", **kwargs: Any) -> None: | |
|
|
||
| @abstractmethod | ||
| def reduce_context(self, agent: "Agent", e: Exception | None = None, **kwargs: Any) -> None: | ||
| """Called when the model's context window is exceeded. | ||
|
|
||
| This method should implement the specific strategy for reducing the window size when a context overflow occurs. | ||
| It is typically called after a ContextWindowOverflowException is caught. | ||
| """Reduce the conversation history. | ||
|
|
||
| Implementations might use strategies such as: | ||
| Called in two scenarios: | ||
| 1. **Reactive** (e is set): A context window overflow occurred. The implementation | ||
| MUST remove enough history for the next model call to succeed, or re-raise the error. | ||
| 2. **Proactive** (e is None): The compression threshold was exceeded. This is best-effort — | ||
| returning without reduction or raising is acceptable; the model call proceeds regardless. | ||
|
|
||
| - Removing the N oldest messages | ||
| - Summarizing older context | ||
| - Applying importance-based filtering | ||
| - Maintaining critical conversation markers | ||
| Implementations should modify ``agent.messages`` in-place. | ||
|
|
||
| Args: | ||
| agent: The agent whose conversation history will be reduced. | ||
| This list is modified in-place. | ||
| e: The exception that triggered the context reduction, if any. | ||
| When set, this is a reactive overflow recovery call — the implementation MUST | ||
| reduce enough history for the next model call to succeed. | ||
| When None, this is a proactive compression call — best-effort reduction to avoid | ||
| hitting the context window limit. | ||
| **kwargs: Additional keyword arguments for future extensibility. | ||
| """ | ||
| pass | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,7 +10,7 @@ | |
| from ...types.content import ContentBlock, Messages | ||
| from ...types.exceptions import ContextWindowOverflowException | ||
| from ...types.tools import ToolResultContent | ||
| from .conversation_manager import ConversationManager | ||
| from .conversation_manager import ConversationManager, ProactiveCompressionConfig | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
@@ -37,6 +37,7 @@ def __init__( | |
| should_truncate_results: bool = True, | ||
| *, | ||
| per_turn: bool | int = False, | ||
| proactive_compression: bool | ProactiveCompressionConfig | None = None, | ||
| ): | ||
| """Initialize the sliding window conversation manager. | ||
|
|
||
|
|
@@ -54,6 +55,10 @@ def __init__( | |
| manage message history and prevent the agent loop from slowing down. Start with | ||
| per_turn=True and adjust to a specific frequency (e.g., per_turn=5) if needed | ||
| for performance tuning. | ||
| proactive_compression: Enable proactive context compression before the model call. | ||
| - ``True``: compress when 70% of the context window is used (default threshold). | ||
| - ``{"compression_threshold": float}``: compress at the specified ratio (0, 1]. | ||
| - ``False`` or ``None``: disabled, only reactive overflow recovery is used. | ||
|
|
||
| Raises: | ||
| ValueError: If window_size is negative, or if per_turn is 0 or a negative integer. | ||
|
|
@@ -63,7 +68,7 @@ def __init__( | |
| if isinstance(per_turn, int) and not isinstance(per_turn, bool) and per_turn <= 0: | ||
| raise ValueError(f"per_turn must be a positive integer, True, or False, got {per_turn}") | ||
|
|
||
| super().__init__() | ||
| super().__init__(proactive_compression=proactive_compression) | ||
|
|
||
| self.window_size = window_size | ||
| self.should_truncate_results = should_truncate_results | ||
|
|
@@ -158,6 +163,12 @@ def apply_management(self, agent: "Agent", **kwargs: Any) -> None: | |
| def reduce_context(self, agent: "Agent", e: Exception | None = None, **kwargs: Any) -> None: | ||
| """Trim the oldest messages to reduce the conversation context size. | ||
|
|
||
| When ``e`` is set (reactive overflow recovery), attempts to truncate large tool results | ||
| first before falling back to message trimming. | ||
|
|
||
| When ``e`` is None (proactive compression or routine management), only trims messages | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: Docstring/code mismatch. The docstring states: "When Suggestion: Either:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in the latest push — tool result truncation is now guarded by |
||
| without attempting tool result truncation. | ||
|
|
||
| The method handles special cases where trimming the messages leads to: | ||
| - toolResult with no corresponding toolUse | ||
| - toolUse with no corresponding toolResult | ||
|
|
@@ -166,12 +177,14 @@ def reduce_context(self, agent: "Agent", e: Exception | None = None, **kwargs: A | |
| agent: The agent whose messages will be reduce. | ||
| This list is modified in-place. | ||
| e: The exception that triggered the context reduction, if any. | ||
| When set, this is a reactive overflow recovery call. | ||
| When None, this is a proactive or routine management call. | ||
| **kwargs: Additional keyword arguments for future extensibility. | ||
|
|
||
| Raises: | ||
| ContextWindowOverflowException: If the context cannot be reduced further and a context overflow | ||
| error was provided (e is not None). When called during routine window management (e is None), | ||
| logs a warning and returns without modification. | ||
| error was provided (e is not None). When called during routine window management or | ||
| proactive compression (e is None), logs a warning and returns without modification. | ||
| """ | ||
| messages = agent.messages | ||
|
|
||
|
|
@@ -181,16 +194,18 @@ def reduce_context(self, agent: "Agent", e: Exception | None = None, **kwargs: A | |
| messages[:] = [] | ||
| return | ||
|
|
||
| # Try to truncate the tool result first | ||
| oldest_message_idx_with_tool_results = self._find_oldest_message_with_tool_results(messages) | ||
| if oldest_message_idx_with_tool_results is not None and self.should_truncate_results: | ||
| logger.debug( | ||
| "message_index=<%s> | found message with tool results at index", oldest_message_idx_with_tool_results | ||
| ) | ||
| results_truncated = self._truncate_tool_results(messages, oldest_message_idx_with_tool_results) | ||
| if results_truncated: | ||
| logger.debug("message_index=<%s> | tool results truncated", oldest_message_idx_with_tool_results) | ||
| return | ||
| # Try to truncate the tool result first (only for reactive overflow, not proactive compression) | ||
| if e is not None: | ||
| oldest_message_idx_with_tool_results = self._find_oldest_message_with_tool_results(messages) | ||
| if oldest_message_idx_with_tool_results is not None and self.should_truncate_results: | ||
| logger.debug( | ||
| "message_index=<%s> | found message with tool results at index", | ||
| oldest_message_idx_with_tool_results, | ||
| ) | ||
| results_truncated = self._truncate_tool_results(messages, oldest_message_idx_with_tool_results) | ||
| if results_truncated: | ||
| logger.debug("message_index=<%s> | tool results truncated", oldest_message_idx_with_tool_results) | ||
| return | ||
|
|
||
| # Try to trim index id when tool result cannot be truncated anymore | ||
| # If the number of messages is less than the window_size, then we default to 2, otherwise, trim to window size | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.