From 7d299c018e40fd198fcc120ced4972e7af081687 Mon Sep 17 00:00:00 2001 From: Jesse Hodges Date: Mon, 20 Apr 2026 11:34:49 -0500 Subject: [PATCH] if the gateway does not exist, continue the janitor process --- sqlmesh/core/snapshot/evaluator.py | 19 +++++++++++++-- tests/core/test_snapshot_evaluator.py | 35 +++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index b1ffd4dc26..4df9ecb695 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -528,12 +528,27 @@ def cleanup( target_snapshots = [ t for t in target_snapshots if t.snapshot.is_model and not t.snapshot.is_symbolic ] + available_gateways = set(self.adapters.keys()) + skipped = [] + filtered_targets = [] + for t in target_snapshots: + gw = t.snapshot.model_gateway + if gw and gw not in available_gateways: + skipped.append((t.snapshot.snapshot_id, gw)) + else: + filtered_targets.append(t) + if skipped: + logger.warning( + "Skipping cleanup of %d snapshot(s) with unavailable gateway(s): %s", + len(skipped), + ", ".join(f"{sid} (gateway={gw})" for sid, gw in skipped), + ) snapshots_to_dev_table_only = { - t.snapshot.snapshot_id: t.dev_table_only for t in target_snapshots + t.snapshot.snapshot_id: t.dev_table_only for t in filtered_targets } with self.concurrent_context(): concurrent_apply_to_snapshots( - [t.snapshot for t in target_snapshots], + [t.snapshot for t in filtered_targets], lambda s: self._cleanup_snapshot( s, snapshots_to_dev_table_only[s.snapshot_id], diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index f3fae15e8a..d7aa9e4a80 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -4503,6 +4503,41 @@ def test_multiple_engine_cleanup(snapshot: Snapshot, adapters, make_snapshot): ) +def test_cleanup_skips_unavailable_gateway(snapshot: Snapshot, adapters, make_snapshot): + engine_adapters = {"default": adapters[0]} + evaluator = SnapshotEvaluator(engine_adapters) + + model_with_missing_gw = load_sql_based_model( + parse( # type: ignore + """ + MODEL ( + name test_schema.test_model, + kind FULL, + gateway nonexistent_gateway, + ); + SELECT a::int FROM tbl; + """ + ), + ) + + snapshot_missing_gw = make_snapshot(model_with_missing_gw) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + snapshot_missing_gw.categorize_as(SnapshotChangeCategory.BREAKING) + + evaluator.create([snapshot], {}, DeployabilityIndex.all_deployable()) + + evaluator.cleanup( + [ + SnapshotTableCleanupTask(snapshot=snapshot.table_info, dev_table_only=True), + SnapshotTableCleanupTask(snapshot=snapshot_missing_gw.table_info, dev_table_only=True), + ], + ) + + engine_adapters["default"].drop_table.assert_called_once_with( + f"sqlmesh__db.db__model__{snapshot.version}__dev", cascade=True + ) + + def test_multi_engine_python_model_with_macros(adapters, make_snapshot): engine_adapters = {"default": adapters[0], "secondary": adapters[1]} evaluator = SnapshotEvaluator(engine_adapters)