Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ SqlCreate SqlCreateDatabase(Span s, boolean replace) :
}

/**
* USE DATABASE ( catalog_name '.' )? database_name
* USE [ DATABASE ] ( catalog_name '.' )? database_name
*/
SqlCall SqlUseDatabase(Span s, String scope) :
{
Expand All @@ -391,7 +391,7 @@ SqlCall SqlUseDatabase(Span s, String scope) :
<USE> {
s.add(this);
}
<DATABASE>
[ <DATABASE> ]
databaseName = CompoundIdentifier()
{
return new SqlUseDatabase(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class BeamCalciteTable extends AbstractQueryableTable
private final Map<String, String> pipelineOptionsMap;
private @Nullable PipelineOptions pipelineOptions;

BeamCalciteTable(
public BeamCalciteTable(
BeamSqlTable beamTable,
Map<String, String> pipelineOptionsMap,
@Nullable PipelineOptions pipelineOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,23 @@
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptUtil;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Contains the metadata of tables/UDF functions, and exposes APIs to
* query/validate/optimize/translate SQL statements.
*/
@Internal
public class BeamSqlEnv {
private static final Logger LOG = LoggerFactory.getLogger(BeamSqlEnv.class);

JdbcConnection connection;
QueryPlanner planner;

Expand Down Expand Up @@ -116,6 +122,31 @@ public BeamRelNode parseQuery(String query, QueryParameters queryParameters)
return planner.convertToBeamRel(query, queryParameters);
}

public QueryPlanner getPlanner() {
return planner;
}

public RelBuilder getRelBuilder() {
return planner.getRelBuilder();
}

public BeamRelNode convertToBeamRel(RelNode relNode) {
return planner.convertToBeamRel(relNode, QueryParameters.ofNone());
}

public RelNode parseLogicalPlan(String query) throws ParseException {
return planner.parseToRel(query, QueryParameters.ofNone());
}

public void registerSchemaFunction(String name, Function function) {
connection.getCurrentSchemaPlus().add(name, function);
}

public org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable
getOperatorTable() {
return planner.getOperatorTable();
}

public boolean isDdl(String sqlStatement) throws ParseException {
return planner.parse(sqlStatement).getKind().belongsTo(SqlKind.DDL);
}
Expand Down Expand Up @@ -196,6 +227,7 @@ public BeamSqlEnvBuilder setCurrentSchema(String name) {

/** Set the ruleSet used for query optimizer. */
public BeamSqlEnvBuilder setRuleSets(Collection<RuleSet> ruleSets) {
LOG.info("Setting BeamSqlEnv rulesets to: {}", ruleSets);
this.ruleSets = ruleSets;
return this;
}
Expand Down Expand Up @@ -262,6 +294,7 @@ public BeamSqlEnv build() {

configureSchemas(jdbcConnection);

LOG.info("Instantiating planner with ruleSets: {}", ruleSets);
QueryPlanner planner = instantiatePlanner(jdbcConnection, ruleSets);

// The planner may choose to add its own builtin functions to the schema, so load user-defined
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
Expand All @@ -39,9 +41,29 @@ public interface QueryPlanner {
BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryParameters)
throws ParseException, SqlConversionException;

/** It parses and validate the input query, then convert into a {@link BeamRelNode} tree. */
BeamRelNode convertToBeamRel(RelNode sqlStatement, QueryParameters queryParameters)
throws SqlConversionException;

/**
* Parses and validates {@code sqlStatement} and returns the logical {@link RelNode} (standard
* Calcite convention), WITHOUT converting to Beam physical rels. This lets callers rewrite the
* logical plan (e.g. inject custom rels) before {@link #convertToBeamRel(RelNode,
* QueryParameters)}.
*/
default RelNode parseToRel(String sqlStatement, QueryParameters queryParameters)
throws ParseException, SqlConversionException {
throw new UnsupportedOperationException(
"parseToRel is not implemented by " + getClass().getName());
}

/** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */
SqlNode parse(String sqlStatement) throws ParseException;

RelBuilder getRelBuilder();

org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable getOperatorTable();

@AutoOneOf(QueryParameters.Kind.class)
abstract class QueryParameters {
public enum Kind {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to you under
the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
-->

# Correlated sub-query decorrelation pre-pass

## Context

`CalciteQueryPlanner.convertToBeamRel(RelNode, ...)` is the single chokepoint
through which every SQL query the Beam SQL extension plans is converted from a
Calcite logical tree into a `BeamRelNode` tree by a single Volcano program.

The SqlToRel converter (driven by `Planner.rel`) already decorrelates most
queries: `SqlToRelConverter` is configured with `withExpand(true)`, and the
default `Planner.rel` path calls `RelDecorrelator.decorrelateQuery` once on the
post-`SqlToRelConverter` tree. For the common correlated-scalar shape this is
enough — the result is a `Join` + `Aggregate[SINGLE_VALUE]` that existing Beam
rules already cover.

However, some correlated shapes survive that first pass:

- An un-expanded `RexSubQuery` inside a PROJECT/JOIN/FILTER condition (e.g. some
correlated `EXISTS`/`IN` forms), or
- a residual `LogicalCorrelate` the SqlToRel decorrelate pass could not lower.

The Beam Volcano ruleset (`BeamRuleSets.LOGICAL_OPTIMIZATIONS` /
`BEAM_CONVERTERS`) has **no** converter rule for a general `LogicalCorrelate`.
The only consumer is `BeamUnnestRule`, which matches strictly
`LogicalCorrelate(_, Uncollect)` (the `UNNEST` shape). So any other residual
correlate reaching Volcano fails planning with a `CannotPlanException`, surfaced
as `SqlConversionException: Unable to convert relNode to Beam: ...`.

## Design

Add a private `normalizeForVolcano(RelNode)` pre-pass that runs at the very top
of `convertToBeamRel(RelNode, ...)`, **strictly before** both:

1. the Volcano `program.run(...)`, and
2. the metadata-provider swap (`setMetadataProvider` / `setMetadataQuerySupplier`
/ `RRelMetadataQuery.THREAD_PROVIDERS`).

Running it before the metadata swap is deliberate: the pass uses stock Calcite
metadata only, so it cannot trigger the Beam cost path (`BeamCostModel`,
`RelMdNodeStats`, `BeamRelMetadataQuery`) and the cost recursion the Volcano
search guards against.

The pass does two things:

1. A short-lived `HepPlanner` with `FILTER_SUB_QUERY_TO_CORRELATE`,
`PROJECT_SUB_QUERY_TO_CORRELATE`, and `JOIN_SUB_QUERY_TO_CORRELATE` to turn any
residual `RexSubQuery` into a `LogicalCorrelate`.
2. `RelDecorrelator.decorrelateQuery(rel, RelBuilder.create(config))` to lower
correlates into standard `Join`/`Aggregate`/`Project`/`Filter` nodes, using the
planner's configured `RelBuilder` so produced rels share the cluster type
factory and traits.

### Pre-flight safety gate

The entire pass is gated on the tree actually **referencing** a correlation
variable: `RelOptUtil.getVariablesUsed(rel).isEmpty()` ⇒ return the input
unchanged. This makes the pass a strict no-op on trees without a referenced
correlate.

The gate intentionally uses `getVariablesUsed`, **not** `getVariablesSet`:

| shape | `getVariablesSet` | `getVariablesUsed` |
|------------------------------------|-------------------|--------------------|
| `LogicalCorrelate(_, Uncollect)` (UNNEST) | non-empty (defines an id) | empty (body refs none) |
| correlated scalar / EXISTS / IN | non-empty | non-empty |

Because UNNEST *defines* a correlation id but does not *reference* one in its
body, `getVariablesUsed` is empty for it and the pass is skipped, leaving the
`LogicalCorrelate(_, Uncollect)` intact for `BeamUnnestRule`. A gate keyed on
`getVariablesSet` would wrongly run the decorrelator on UNNEST trees. This
structural guard is preferred over relying on test-only verification.

## Increments

This change is the smallest shippable increment:

- Single edit in `CalciteQueryPlanner.convertToBeamRel(RelNode, ...)` plus one
private helper. No ruleset changes, no SparkConnect-side changes.
- Deferred (explicitly out of scope): adding the PROJECT/JOIN
`*_SUB_QUERY_TO_CORRELATE` variants into the Volcano `LOGICAL_OPTIMIZATIONS`
ruleset; non-equi semi/anti-join lowering for correlated `EXISTS` without an
equi key (still blocked at `BeamJoinRel.extractJoinRexNodes`).

## Risks

1. **UNNEST regression.** Mitigated by the `getVariablesUsed` gate above and
verified by the `*Unnest*` test subset (and `BeamSqlDslArrayTest`).
2. **Idempotency.** For trees the SqlToRel pass already fully decorrelated, the
gate skips the pass entirely (no referenced correl var remains), so there is
no redundant second decorrelate and no shape churn.
3. **Cost-hang trap.** Avoided by placement strictly before the metadata-provider
swap and Volcano; the pass never touches the Beam cost model.
4. **Hand-built `RelNode` test fixtures** (join/aggregation DSL tests) flow
through this method too; with no referenced correl var they hit the gate and
are untouched. The filtered `*Join*` / `*CalciteQueryPlanner*` subset guards
this.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static SqlNode column(
}

/** Returns the schema in which to create an object. */
static Pair<CalciteSchema, String> schema(
public static Pair<CalciteSchema, String> schema(
CalcitePrepare.Context context, boolean mutable, SqlIdentifier id) {
CalciteSchema rootSchema = mutable ? context.getMutableRootSchema() : context.getRootSchema();
@Nullable CalciteSchema schema = null;
Expand All @@ -72,7 +72,7 @@ static Pair<CalciteSchema, String> schema(
return Pair.of(checkStateNotNull(schema, "Got null sub-schema for path '%s'", path), name(id));
}

private static @Nullable CalciteSchema childSchema(CalciteSchema rootSchema, List<String> path) {
public static @Nullable CalciteSchema childSchema(CalciteSchema rootSchema, List<String> path) {
@Nullable CalciteSchema schema = rootSchema;
for (String p : path) {
if (schema == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.beam.sdk.extensions.sql.meta.catalog;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -111,12 +110,16 @@ public Collection<String> databases() {

@Override
public boolean dropDatabase(String database, boolean cascade) {
checkState(!cascade, "%s does not support CASCADE.", getClass().getSimpleName());
MetaStore metaStore = metaStores.get(database);
if (!cascade && metaStore != null && !metaStore.getTables().isEmpty()) {
throw new IllegalStateException("Database '" + database + "' is not empty.");
}

boolean removed = databases.remove(database);
if (database.equals(currentDatabase)) {
currentDatabase = null;
}
metaStores.remove(database);
return removed;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public PCollection<String> expand(PCollection<Row> input) {

/** Write-side converter for {@link TextTable} with format {@code 'csv'}. */
@VisibleForTesting
static class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>>
public static class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>>
implements Serializable {

private CSVFormat csvFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ public void testUseDatabase() {
assertEquals("my_database2", catalogManager.currentCatalog().currentDatabase());
}

@Test
public void testUseDatabaseWithoutDatabaseKeyword() {
assertEquals(DEFAULT, catalogManager.currentCatalog().currentDatabase());
cli.execute("CREATE DATABASE my_database");
assertEquals(DEFAULT, catalogManager.currentCatalog().currentDatabase());
cli.execute("USE my_database");
assertEquals("my_database", catalogManager.currentCatalog().currentDatabase());
}

@Test
public void testUseDatabase_doesNotExist() {
assertEquals(DEFAULT, catalogManager.currentCatalog().currentDatabase());
Expand Down Expand Up @@ -126,6 +135,33 @@ public void testDropDatabase_nonexistent() {
cli.execute("DROP DATABASE my_database");
}

@Test
public void testDropDatabase_notEmpty_restrict() {
cli.execute("CREATE DATABASE db_1");
cli.execute("USE db_1");

TestTableProvider testTableProvider = new TestTableProvider();
catalogManager.registerTableProvider(testTableProvider);
cli.execute("CREATE EXTERNAL TABLE person(id int, name varchar, age int) TYPE 'test'");

thrown.expect(RuntimeException.class);
thrown.expectMessage("Database 'db_1' is not empty.");
cli.execute("DROP DATABASE db_1");
}

@Test
public void testDropDatabase_notEmpty_cascade() {
cli.execute("CREATE DATABASE db_1");
cli.execute("USE db_1");

TestTableProvider testTableProvider = new TestTableProvider();
catalogManager.registerTableProvider(testTableProvider);
cli.execute("CREATE EXTERNAL TABLE person(id int, name varchar, age int) TYPE 'test'");

cli.execute("DROP DATABASE db_1 CASCADE");
assertFalse(catalogManager.currentCatalog().databaseExists("db_1"));
}

@Test
public void testCreateInsertDropTableUsingDefaultDatabase() {
Catalog catalog = catalogManager.currentCatalog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,32 @@ public void testSelectArrayValue() {
pipeline.run();
}

// Spark call-form constructor ARRAY('aa', 'bb') (vs the bracket form ARRAY['aa','bb']). Parses to
// SqlLibraryOperators.ARRAY and is resolved by the operator-table registration in
// CalciteQueryPlanner; this test verifies it also LOWERS end-to-end (RexImpTable).
@Test
public void testSelectArrayFunctionForm() {
PCollection<Row> input = pCollectionOf2Elements();

Schema resultType =
Schema.builder()
.addInt32Field("f_int")
.addArrayField("f_arr", Schema.FieldType.STRING)
.build();

PCollection<Row> result =
input.apply(
"sqlQuery",
SqlTransform.query("SELECT 42, ARRAY ('aa', 'bb') as `f_arr` FROM PCOLLECTION"));

PAssert.that(result)
.containsInAnyOrder(
Row.withSchema(resultType).addValues(42, Arrays.asList("aa", "bb")).build(),
Row.withSchema(resultType).addValues(42, Arrays.asList("aa", "bb")).build());

pipeline.run();
}

@Test
public void testProjectArrayField() {
PCollection<Row> input = pCollectionOf2Elements();
Expand Down
Loading
Loading