-
-
Notifications
You must be signed in to change notification settings - Fork 292
feat: add FastAPI middleware for per-request emissions tracking #1203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
80ee9af
31e7221
3d3f368
c3d053d
b2cfeb6
bb6995c
c938690
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| """Optional integrations for frameworks and platforms.""" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| """FastAPI integration: middleware and lifespan helpers.""" | ||
|
|
||
| from codecarbon.integrations.fastapi.lifespan import create_codecarbon_lifespan | ||
| from codecarbon.integrations.fastapi.middleware import ( | ||
| CodeCarbonMiddleware, | ||
| add_codecarbon_middleware, | ||
| ) | ||
|
|
||
| __all__ = [ | ||
| "CodeCarbonMiddleware", | ||
| "add_codecarbon_middleware", | ||
| "create_codecarbon_lifespan", | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| """Configurable response headers from emissions measurements.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Callable, Mapping, Sequence | ||
| from typing import Union | ||
|
|
||
| from starlette.requests import Request | ||
| from starlette.responses import Response | ||
|
|
||
| from codecarbon.output_methods.emissions_data import EmissionsData | ||
|
|
||
| HeaderConfig = Union[bool, str, Sequence[str], Mapping[str, str], None] | ||
| HeaderFormatter = Callable[[EmissionsData, Request], Mapping[str, str]] | ||
|
|
||
| FIELD_UNITS: dict[str, str] = { | ||
| "emissions": "kg", | ||
| "emissions_rate": "kg-per-s", | ||
| "duration": "s", | ||
| "energy_consumed": "kwh", | ||
| "cpu_energy": "kwh", | ||
| "gpu_energy": "kwh", | ||
| "ram_energy": "kwh", | ||
| "water_consumed": "l", | ||
| "cpu_power": "w", | ||
| "gpu_power": "w", | ||
| "ram_power": "w", | ||
| "cpu_utilization_percent": "percent", | ||
| "gpu_utilization_percent": "percent", | ||
| "ram_utilization_percent": "percent", | ||
| "ram_used_gb": "gb", | ||
| "pue": "ratio", | ||
| "wue": "l-per-kwh", | ||
| } | ||
|
|
||
| HEADER_PRESETS: dict[str, dict[str, str]] = { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same, a collection of Enums here would make sense no ? |
||
| "emissions": {"emissions": "X-CodeCarbon-Emissions-kg"}, | ||
| "default": { | ||
| "emissions": "X-CodeCarbon-Emissions-kg", | ||
| "energy_consumed": "X-CodeCarbon-Energy-Consumed-kwh", | ||
| "duration": "X-CodeCarbon-Duration-s", | ||
| "emissions_rate": "X-CodeCarbon-Emissions-Rate-kg-per-s", | ||
| }, | ||
| "energy": { | ||
| "emissions": "X-CodeCarbon-Emissions-kg", | ||
| "energy_consumed": "X-CodeCarbon-Energy-Consumed-kwh", | ||
| "cpu_energy": "X-CodeCarbon-Cpu-Energy-kwh", | ||
| "gpu_energy": "X-CodeCarbon-Gpu-Energy-kwh", | ||
| "ram_energy": "X-CodeCarbon-Ram-Energy-kwh", | ||
| "duration": "X-CodeCarbon-Duration-s", | ||
| }, | ||
| "power": { | ||
| "emissions": "X-CodeCarbon-Emissions-kg", | ||
| "cpu_power": "X-CodeCarbon-Cpu-Power-w", | ||
| "gpu_power": "X-CodeCarbon-Gpu-Power-w", | ||
| "ram_power": "X-CodeCarbon-Ram-Power-w", | ||
| "duration": "X-CodeCarbon-Duration-s", | ||
| }, | ||
| } | ||
|
|
||
| FULL_HEADER_FIELDS: tuple[str, ...] = tuple(FIELD_UNITS.keys()) | ||
|
|
||
|
|
||
| def _auto_header_name(field: str) -> str: | ||
| unit = FIELD_UNITS.get(field, "") | ||
| title = "-".join(part.capitalize() for part in field.split("_")) | ||
| suffix = f"-{unit}" if unit else "" | ||
| return f"X-CodeCarbon-{title}{suffix}" | ||
|
|
||
|
|
||
| def resolve_header_mapping(config: HeaderConfig) -> dict[str, str]: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pretty sure this defensive method could be converted in a way more straightforward (and clear) one, if the unit tests demonstrate resilience to each corner case :) |
||
| """Normalize ``response_headers`` settings to ``{field_name: header_name}``. | ||
|
|
||
| Args: | ||
| config: ``None`` or ``False`` for no headers; ``True`` for the emissions preset; | ||
| a preset name (``emissions``, ``default``, ``energy``, ``power``, ``full``); | ||
| a sequence of field names (auto header names); or an explicit mapping. | ||
|
|
||
| Returns: | ||
| Mapping from :class:`~codecarbon.output_methods.emissions_data.EmissionsData` | ||
| attribute names to HTTP header names. | ||
|
|
||
| Raises: | ||
| ValueError: If ``config`` is a string that is not a known preset (other than | ||
| ``full``). | ||
| """ | ||
| if config is None or config is False: | ||
| return {} | ||
| if config is True: | ||
| return dict(HEADER_PRESETS["emissions"]) | ||
| if isinstance(config, str): | ||
| preset = HEADER_PRESETS.get(config) | ||
| if preset is None: | ||
| if config == "full": | ||
| return {field: _auto_header_name(field) for field in FULL_HEADER_FIELDS} | ||
| raise ValueError(f"Unknown response_headers preset: {config!r}") | ||
| return dict(preset) | ||
| if isinstance(config, Mapping): | ||
| return dict(config) | ||
| return {field: _auto_header_name(field) for field in config} | ||
|
|
||
|
|
||
| def apply_response_headers( | ||
| response: Response, | ||
| emissions_data: EmissionsData, | ||
| header_mapping: Mapping[str, str], | ||
| ) -> None: | ||
| """Write selected emission fields onto an HTTP response as headers. | ||
|
|
||
| Args: | ||
| response: Outgoing Starlette response (headers are updated in place). | ||
| emissions_data: Values read via ``getattr`` for each key in ``header_mapping``. | ||
| header_mapping: Field name to HTTP header name; unknown fields are skipped. | ||
| """ | ||
| for field, header_name in header_mapping.items(): | ||
| if not hasattr(emissions_data, field): | ||
| continue | ||
| value = getattr(emissions_data, field) | ||
| response.headers[header_name] = str(value) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,135 @@ | ||
| """Route naming and endpoint filter helpers for FastAPI/Starlette.""" | ||
|
|
||
| from collections.abc import Callable, Iterable | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| if TYPE_CHECKING: | ||
| from starlette.requests import Request | ||
|
|
||
| DEFAULT_EXCLUDE: frozenset[str] = frozenset( | ||
| { | ||
| "/docs", | ||
| "/redoc", | ||
| "/openapi.json", | ||
| "/health", | ||
| "/healthz", | ||
| "/ready", | ||
| "/live", | ||
| } | ||
| ) | ||
|
|
||
| HTTP_METHODS = frozenset( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same, this could benefit from being an Enum |
||
| {"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS", "TRACE", "CONNECT"} | ||
| ) | ||
|
|
||
|
|
||
| def get_endpoint_path(request: "Request") -> str: | ||
| """Return the mounted route template or the raw URL path. | ||
|
|
||
| Args: | ||
| request: Current Starlette/FastAPI request. | ||
|
|
||
| Returns: | ||
| Route template such as ``/items/{item_id}``, or ``request.url.path``. | ||
| """ | ||
| route = request.scope.get("route") | ||
| if route is not None: | ||
| return route.path | ||
| return request.url.path | ||
|
|
||
|
|
||
| def build_endpoint_key(request: "Request") -> str: | ||
| """Build a stable endpoint identifier such as ``GET /predict``. | ||
|
|
||
| Args: | ||
| request: Current Starlette/FastAPI request. | ||
|
|
||
| Returns: | ||
| HTTP method plus route template or URL path. | ||
| """ | ||
| return f"{request.method} {get_endpoint_path(request)}" | ||
|
|
||
|
|
||
| def is_method_pattern(pattern: str) -> bool: | ||
| """Return True when ``pattern`` is ``METHOD /path``.""" | ||
| method, _, path = pattern.partition(" ") | ||
| return method in HTTP_METHODS and path.startswith("/") | ||
|
|
||
|
|
||
| def matches_exclude( | ||
| pattern: str, | ||
| url_path: str, | ||
| endpoint_key: str, | ||
| endpoint_path: str, | ||
| ) -> bool: | ||
| """Return True when an exclude pattern matches the request.""" | ||
| if is_method_pattern(pattern): | ||
| return endpoint_key == pattern | ||
| if not pattern.startswith("/"): | ||
| return endpoint_key == pattern | ||
| return ( | ||
| url_path == pattern | ||
| or url_path.startswith(f"{pattern}/") | ||
| or endpoint_path == pattern | ||
| ) | ||
|
|
||
|
|
||
| def matches_include(pattern: str, endpoint_key: str, endpoint_path: str) -> bool: | ||
| """Return True when an include pattern matches the request.""" | ||
| if is_method_pattern(pattern): | ||
| return endpoint_key == pattern | ||
| if pattern.startswith("/"): | ||
| return endpoint_path == pattern | ||
| return endpoint_key == pattern | ||
|
|
||
|
|
||
| def should_track_request( | ||
| request: "Request", | ||
| include: Iterable[str] | None, | ||
| exclude: Iterable[str], | ||
| ) -> bool: | ||
| """Return True when the request should be measured. | ||
|
|
||
| Patterns use one of two forms: | ||
|
|
||
| * ``METHOD /route/template`` — one HTTP method on one route (e.g. ``GET /predict``) | ||
| * ``/route/template`` — any method on that route, or a URL path prefix when excluding | ||
|
|
||
| Args: | ||
| request: Current Starlette/FastAPI request. | ||
| include: When set, only matching endpoints are tracked. | ||
| exclude: Endpoints or URL prefixes to skip. | ||
|
|
||
| Returns: | ||
| True when CodeCarbon should track this request. | ||
| """ | ||
| url_path = request.url.path | ||
| endpoint_key = build_endpoint_key(request) | ||
| endpoint_path = get_endpoint_path(request) | ||
| for pattern in exclude: | ||
| if matches_exclude(pattern, url_path, endpoint_key, endpoint_path): | ||
| return False | ||
| if include is None: | ||
| return True | ||
| return any( | ||
| matches_include(pattern, endpoint_key, endpoint_path) for pattern in include | ||
| ) | ||
|
|
||
|
|
||
| def build_task_name( | ||
| request: "Request", | ||
| formatter: Callable[["Request"], str] | None = None, | ||
| ) -> str: | ||
| """Derive a stable label like ``GET /items/{item_id}`` for task-scoped tracking. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could lead to task override if resource is being accessed by 2 parallel or sequential requests. Maybe we could add some random here if the goal of this method is to provide a name for the codecarbon task concept |
||
|
|
||
| Args: | ||
| request: Current Starlette/FastAPI request. | ||
| formatter: Optional function that returns the task name instead of the default. | ||
|
|
||
| Returns: | ||
| Method plus route template when a route is mounted on the request scope, | ||
| otherwise method plus the raw URL path. | ||
| """ | ||
| if formatter is not None: | ||
| return formatter(request) | ||
| return build_endpoint_key(request) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| """Lifespan helpers for sharing one ``EmissionsTracker`` across requests.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import AsyncIterator | ||
| from contextlib import asynccontextmanager | ||
| from typing import Any | ||
|
|
||
| from codecarbon import EmissionsTracker | ||
|
|
||
|
|
||
| @asynccontextmanager | ||
| async def create_codecarbon_lifespan( | ||
| app: Any, | ||
| *, | ||
| project_name: str = "codecarbon-fastapi", | ||
| **tracker_kwargs: Any, | ||
| ) -> AsyncIterator[None]: | ||
| """Start a tracker for the app lifetime and expose it on ``app.state``. | ||
|
|
||
| Args: | ||
| app: Starlette/FastAPI application with ``state`` namespace. | ||
| project_name: ``project_name`` for :class:`~codecarbon.EmissionsTracker`. | ||
| **tracker_kwargs: Extra constructor kwargs for the tracker. | ||
|
|
||
| Yields: | ||
| ``None`` while the app runs. | ||
| """ | ||
| merged = dict(tracker_kwargs) | ||
| merged.setdefault("allow_multiple_runs", True) | ||
| tracker = EmissionsTracker(project_name=project_name, **merged) | ||
| tracker.start() | ||
| app.state.codecarbon_tracker = tracker | ||
| try: | ||
| yield | ||
| finally: | ||
| tracker.stop() | ||
| app.state.codecarbon_tracker = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be transformed in an Enum ? I have the feeling it could be leveraged elsewhere for labels and it could be maintained from the core package, not only the fastapi integration. Thoughts ?