Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,12 +528,27 @@ def cleanup(
target_snapshots = [
t for t in target_snapshots if t.snapshot.is_model and not t.snapshot.is_symbolic
]
available_gateways = set(self.adapters.keys())
skipped = []
filtered_targets = []
for t in target_snapshots:
gw = t.snapshot.model_gateway
if gw and gw not in available_gateways:
skipped.append((t.snapshot.snapshot_id, gw))
else:
filtered_targets.append(t)
if skipped:
logger.warning(
"Skipping cleanup of %d snapshot(s) with unavailable gateway(s): %s",
len(skipped),
", ".join(f"{sid} (gateway={gw})" for sid, gw in skipped),
)
snapshots_to_dev_table_only = {
t.snapshot.snapshot_id: t.dev_table_only for t in target_snapshots
t.snapshot.snapshot_id: t.dev_table_only for t in filtered_targets
}
with self.concurrent_context():
concurrent_apply_to_snapshots(
[t.snapshot for t in target_snapshots],
[t.snapshot for t in filtered_targets],
lambda s: self._cleanup_snapshot(
s,
snapshots_to_dev_table_only[s.snapshot_id],
Expand Down
35 changes: 35 additions & 0 deletions tests/core/test_snapshot_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4503,6 +4503,41 @@ def test_multiple_engine_cleanup(snapshot: Snapshot, adapters, make_snapshot):
)


def test_cleanup_skips_unavailable_gateway(snapshot: Snapshot, adapters, make_snapshot):
engine_adapters = {"default": adapters[0]}
evaluator = SnapshotEvaluator(engine_adapters)

model_with_missing_gw = load_sql_based_model(
parse( # type: ignore
"""
MODEL (
name test_schema.test_model,
kind FULL,
gateway nonexistent_gateway,
);
SELECT a::int FROM tbl;
"""
),
)

snapshot_missing_gw = make_snapshot(model_with_missing_gw)
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
snapshot_missing_gw.categorize_as(SnapshotChangeCategory.BREAKING)

evaluator.create([snapshot], {}, DeployabilityIndex.all_deployable())

evaluator.cleanup(
[
SnapshotTableCleanupTask(snapshot=snapshot.table_info, dev_table_only=True),
SnapshotTableCleanupTask(snapshot=snapshot_missing_gw.table_info, dev_table_only=True),
],
)

engine_adapters["default"].drop_table.assert_called_once_with(
f"sqlmesh__db.db__model__{snapshot.version}__dev", cascade=True
)


def test_multi_engine_python_model_with_macros(adapters, make_snapshot):
engine_adapters = {"default": adapters[0], "secondary": adapters[1]}
evaluator = SnapshotEvaluator(engine_adapters)
Expand Down