Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,12 @@ tests/test_data/rapl/*
credentials*
.codecarbon.config*
scripts/agent-vm.personal.config.sh

# Added by ggshield
.cache_ggshield

# Added by ggshield
.cache_ggshield

# Added by ggshield
.cache_ggshield
1 change: 1 addition & 0 deletions codecarbon/integrations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Optional integrations for frameworks and platforms."""
13 changes: 13 additions & 0 deletions codecarbon/integrations/fastapi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""FastAPI integration: middleware and lifespan helpers."""

from codecarbon.integrations.fastapi.lifespan import create_codecarbon_lifespan
from codecarbon.integrations.fastapi.middleware import (
CodeCarbonMiddleware,
add_codecarbon_middleware,
)

__all__ = [
"CodeCarbonMiddleware",
"add_codecarbon_middleware",
"create_codecarbon_lifespan",
]
119 changes: 119 additions & 0 deletions codecarbon/integrations/fastapi/_headers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Configurable response headers from emissions measurements."""

from __future__ import annotations

from collections.abc import Callable, Mapping, Sequence
from typing import Union

from starlette.requests import Request
from starlette.responses import Response

from codecarbon.output_methods.emissions_data import EmissionsData

HeaderConfig = Union[bool, str, Sequence[str], Mapping[str, str], None]
HeaderFormatter = Callable[[EmissionsData, Request], Mapping[str, str]]

FIELD_UNITS: dict[str, str] = {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be transformed in an Enum ? I have the feeling it could be leveraged elsewhere for labels and it could be maintained from the core package, not only the fastapi integration. Thoughts ?

"emissions": "kg",
"emissions_rate": "kg-per-s",
"duration": "s",
"energy_consumed": "kwh",
"cpu_energy": "kwh",
"gpu_energy": "kwh",
"ram_energy": "kwh",
"water_consumed": "l",
"cpu_power": "w",
"gpu_power": "w",
"ram_power": "w",
"cpu_utilization_percent": "percent",
"gpu_utilization_percent": "percent",
"ram_utilization_percent": "percent",
"ram_used_gb": "gb",
"pue": "ratio",
"wue": "l-per-kwh",
}

HEADER_PRESETS: dict[str, dict[str, str]] = {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, a collection of Enums here would make sense no ?

"emissions": {"emissions": "X-CodeCarbon-Emissions-kg"},
"default": {
"emissions": "X-CodeCarbon-Emissions-kg",
"energy_consumed": "X-CodeCarbon-Energy-Consumed-kwh",
"duration": "X-CodeCarbon-Duration-s",
"emissions_rate": "X-CodeCarbon-Emissions-Rate-kg-per-s",
},
"energy": {
"emissions": "X-CodeCarbon-Emissions-kg",
"energy_consumed": "X-CodeCarbon-Energy-Consumed-kwh",
"cpu_energy": "X-CodeCarbon-Cpu-Energy-kwh",
"gpu_energy": "X-CodeCarbon-Gpu-Energy-kwh",
"ram_energy": "X-CodeCarbon-Ram-Energy-kwh",
"duration": "X-CodeCarbon-Duration-s",
},
"power": {
"emissions": "X-CodeCarbon-Emissions-kg",
"cpu_power": "X-CodeCarbon-Cpu-Power-w",
"gpu_power": "X-CodeCarbon-Gpu-Power-w",
"ram_power": "X-CodeCarbon-Ram-Power-w",
"duration": "X-CodeCarbon-Duration-s",
},
}

FULL_HEADER_FIELDS: tuple[str, ...] = tuple(FIELD_UNITS.keys())


def _auto_header_name(field: str) -> str:
unit = FIELD_UNITS.get(field, "")
title = "-".join(part.capitalize() for part in field.split("_"))
suffix = f"-{unit}" if unit else ""
return f"X-CodeCarbon-{title}{suffix}"


def resolve_header_mapping(config: HeaderConfig) -> dict[str, str]:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure this defensive method could be converted in a way more straightforward (and clear) one, if the unit tests demonstrate resilience to each corner case :)

"""Normalize ``response_headers`` settings to ``{field_name: header_name}``.

Args:
config: ``None`` or ``False`` for no headers; ``True`` for the emissions preset;
a preset name (``emissions``, ``default``, ``energy``, ``power``, ``full``);
a sequence of field names (auto header names); or an explicit mapping.

Returns:
Mapping from :class:`~codecarbon.output_methods.emissions_data.EmissionsData`
attribute names to HTTP header names.

Raises:
ValueError: If ``config`` is a string that is not a known preset (other than
``full``).
"""
if config is None or config is False:
return {}
if config is True:
return dict(HEADER_PRESETS["emissions"])
if isinstance(config, str):
preset = HEADER_PRESETS.get(config)
if preset is None:
if config == "full":
return {field: _auto_header_name(field) for field in FULL_HEADER_FIELDS}
raise ValueError(f"Unknown response_headers preset: {config!r}")
return dict(preset)
if isinstance(config, Mapping):
return dict(config)
return {field: _auto_header_name(field) for field in config}


def apply_response_headers(
response: Response,
emissions_data: EmissionsData,
header_mapping: Mapping[str, str],
) -> None:
"""Write selected emission fields onto an HTTP response as headers.

Args:
response: Outgoing Starlette response (headers are updated in place).
emissions_data: Values read via ``getattr`` for each key in ``header_mapping``.
header_mapping: Field name to HTTP header name; unknown fields are skipped.
"""
for field, header_name in header_mapping.items():
if not hasattr(emissions_data, field):
continue
value = getattr(emissions_data, field)
response.headers[header_name] = str(value)
135 changes: 135 additions & 0 deletions codecarbon/integrations/fastapi/_routing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Route naming and endpoint filter helpers for FastAPI/Starlette."""

from collections.abc import Callable, Iterable
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from starlette.requests import Request

DEFAULT_EXCLUDE: frozenset[str] = frozenset(
{
"/docs",
"/redoc",
"/openapi.json",
"/health",
"/healthz",
"/ready",
"/live",
}
)

HTTP_METHODS = frozenset(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, this could benefit from being an Enum

{"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS", "TRACE", "CONNECT"}
)


def get_endpoint_path(request: "Request") -> str:
"""Return the mounted route template or the raw URL path.

Args:
request: Current Starlette/FastAPI request.

Returns:
Route template such as ``/items/{item_id}``, or ``request.url.path``.
"""
route = request.scope.get("route")
if route is not None:
return route.path
return request.url.path


def build_endpoint_key(request: "Request") -> str:
"""Build a stable endpoint identifier such as ``GET /predict``.

Args:
request: Current Starlette/FastAPI request.

Returns:
HTTP method plus route template or URL path.
"""
return f"{request.method} {get_endpoint_path(request)}"


def is_method_pattern(pattern: str) -> bool:
"""Return True when ``pattern`` is ``METHOD /path``."""
method, _, path = pattern.partition(" ")
return method in HTTP_METHODS and path.startswith("/")


def matches_exclude(
pattern: str,
url_path: str,
endpoint_key: str,
endpoint_path: str,
) -> bool:
"""Return True when an exclude pattern matches the request."""
if is_method_pattern(pattern):
return endpoint_key == pattern
if not pattern.startswith("/"):
return endpoint_key == pattern
return (
url_path == pattern
or url_path.startswith(f"{pattern}/")
or endpoint_path == pattern
)


def matches_include(pattern: str, endpoint_key: str, endpoint_path: str) -> bool:
"""Return True when an include pattern matches the request."""
if is_method_pattern(pattern):
return endpoint_key == pattern
if pattern.startswith("/"):
return endpoint_path == pattern
return endpoint_key == pattern


def should_track_request(
request: "Request",
include: Iterable[str] | None,
exclude: Iterable[str],
) -> bool:
"""Return True when the request should be measured.

Patterns use one of two forms:

* ``METHOD /route/template`` — one HTTP method on one route (e.g. ``GET /predict``)
* ``/route/template`` — any method on that route, or a URL path prefix when excluding

Args:
request: Current Starlette/FastAPI request.
include: When set, only matching endpoints are tracked.
exclude: Endpoints or URL prefixes to skip.

Returns:
True when CodeCarbon should track this request.
"""
url_path = request.url.path
endpoint_key = build_endpoint_key(request)
endpoint_path = get_endpoint_path(request)
for pattern in exclude:
if matches_exclude(pattern, url_path, endpoint_key, endpoint_path):
return False
if include is None:
return True
return any(
matches_include(pattern, endpoint_key, endpoint_path) for pattern in include
)


def build_task_name(
request: "Request",
formatter: Callable[["Request"], str] | None = None,
) -> str:
"""Derive a stable label like ``GET /items/{item_id}`` for task-scoped tracking.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could lead to task override if resource is being accessed by 2 parallel or sequential requests. Maybe we could add some random here if the goal of this method is to provide a name for the codecarbon task concept


Args:
request: Current Starlette/FastAPI request.
formatter: Optional function that returns the task name instead of the default.

Returns:
Method plus route template when a route is mounted on the request scope,
otherwise method plus the raw URL path.
"""
if formatter is not None:
return formatter(request)
return build_endpoint_key(request)
38 changes: 38 additions & 0 deletions codecarbon/integrations/fastapi/lifespan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Lifespan helpers for sharing one ``EmissionsTracker`` across requests."""

from __future__ import annotations

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any

from codecarbon import EmissionsTracker


@asynccontextmanager
async def create_codecarbon_lifespan(
app: Any,
*,
project_name: str = "codecarbon-fastapi",
**tracker_kwargs: Any,
) -> AsyncIterator[None]:
"""Start a tracker for the app lifetime and expose it on ``app.state``.

Args:
app: Starlette/FastAPI application with ``state`` namespace.
project_name: ``project_name`` for :class:`~codecarbon.EmissionsTracker`.
**tracker_kwargs: Extra constructor kwargs for the tracker.

Yields:
``None`` while the app runs.
"""
merged = dict(tracker_kwargs)
merged.setdefault("allow_multiple_runs", True)
tracker = EmissionsTracker(project_name=project_name, **merged)
tracker.start()
app.state.codecarbon_tracker = tracker
try:
yield
finally:
tracker.stop()
app.state.codecarbon_tracker = None
Loading
Loading