Skip to content

bug: InfrahubBatch.execute() orphans in-flight tasks when re-raising #1001

@PhillSimonds

Description

@PhillSimonds

Component

Python SDK

Infrahub SDK version

1.20.0 (infrahub_sdk/batch.py on develop)

Current Behavior

InfrahubBatch.execute() creates one asyncio.Task per BatchTask and iterates with asyncio.as_completed(). When return_exceptions=False (the default) and any task raises, the generator re-raises immediately. The remaining N − 1 tasks are still running on the event loop; nothing awaits or cancels them.

async def execute(self) -> AsyncGenerator:
    tasks = [
        asyncio.create_task(execute_batch_task_in_pool(...))
        for batch_task in self._tasks
    ]

    for completed_task in asyncio.as_completed(tasks):
        node, result = await completed_task
        if isinstance(result, Exception) and not self.return_exceptions:
            raise result   # siblings still running, no cleanup
        yield node, result

When the orphaned tasks eventually finish:

  • Successful: their work was performed (e.g., node.save() did write to the server) but no caller observes it. The caller has been told the batch failed and won't retry.
  • Failed: asyncio's task GC emits Task exception was never retrieved on stderr. There is no other surface for the caller to discover them.

This surfaces under transient server slowness. Two identical batch-driven builds against separate Infrahub deployments can produce divergent state — one complete, the other missing a portion of intended writes plus all downstream objects that depended on those writes — with build exit status 0 on both sides. Logs from such a run show proportional counts of ServerNotResponsiveError and Task exception was never retrieved in the affected window, suggesting both error families are two views of the same underlying event.

Expected Behavior

After execute() returns or raises, no asyncio.Task it created is still running. Specifically:

  • When a task raises and return_exceptions=False, sibling tasks should be cancelled (or otherwise drained) before the exception propagates to the caller.
  • This must hold for all generator exit paths: explicit raise, caller break, generator close.

Steps to Reproduce

import asyncio
import pytest
from infrahub_sdk.batch import InfrahubBatch

async def test_orphan() -> None:
    side_effects: list[str] = []

    async def raise_fast() -> None:
        await asyncio.sleep(0)
        raise RuntimeError("fast failure")

    async def slow_side_effect(name: str) -> str:
        await asyncio.sleep(0.1)
        side_effects.append(name)
        return name

    batch = InfrahubBatch(max_concurrent_execution=10, return_exceptions=False)
    batch.add(task=raise_fast)
    for i in range(5):
        batch.add(task=slow_side_effect, name=f"slow-{i}")

    with pytest.raises(RuntimeError):
        async for _ in batch.execute():
            pass

    await asyncio.sleep(0.3)
    # Currently fails: side_effects == ['slow-0', 'slow-1', 'slow-2', 'slow-3', 'slow-4']
    assert side_effects == []

Additional Information

Related but separate defect: with return_exceptions=True, failures are yielded as (node, ExceptionInstance) in the same tuple shape as successes. A naive consumer that records node per yield silently treats failed tasks as successful. Will file separately — that one needs a small API redesign rather than a one-line fix.

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugSomething isn't working as expected

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions