diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl index 94c0161c492c..cb8eec438728 100644 --- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl +++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl @@ -381,7 +381,7 @@ SqlCreate SqlCreateDatabase(Span s, boolean replace) : } /** - * USE DATABASE ( catalog_name '.' )? database_name + * USE [ DATABASE ] ( catalog_name '.' )? database_name */ SqlCall SqlUseDatabase(Span s, String scope) : { @@ -391,7 +391,7 @@ SqlCall SqlUseDatabase(Span s, String scope) : { s.add(this); } - + [ ] databaseName = CompoundIdentifier() { return new SqlUseDatabase( diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java index 9f3ff6478ad6..c447b9b9624e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java @@ -53,7 +53,7 @@ public class BeamCalciteTable extends AbstractQueryableTable private final Map pipelineOptionsMap; private @Nullable PipelineOptions pipelineOptions; - BeamCalciteTable( + public BeamCalciteTable( BeamSqlTable beamTable, Map pipelineOptionsMap, @Nullable PipelineOptions pipelineOptions) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java index d84783118bbd..7499c7a2189a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java @@ -47,10 +47,14 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptUtil; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Contains the metadata of tables/UDF functions, and exposes APIs to @@ -58,6 +62,8 @@ */ @Internal public class BeamSqlEnv { + private static final Logger LOG = LoggerFactory.getLogger(BeamSqlEnv.class); + JdbcConnection connection; QueryPlanner planner; @@ -116,6 +122,31 @@ public BeamRelNode parseQuery(String query, QueryParameters queryParameters) return planner.convertToBeamRel(query, queryParameters); } + public QueryPlanner getPlanner() { + return planner; + } + + public RelBuilder getRelBuilder() { + return planner.getRelBuilder(); + } + + public BeamRelNode convertToBeamRel(RelNode relNode) { + return planner.convertToBeamRel(relNode, QueryParameters.ofNone()); + } + + public RelNode parseLogicalPlan(String query) throws ParseException { + return planner.parseToRel(query, QueryParameters.ofNone()); + } + + public void registerSchemaFunction(String name, Function function) { + connection.getCurrentSchemaPlus().add(name, function); + } + + public org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable + getOperatorTable() { + return planner.getOperatorTable(); + } + public boolean isDdl(String sqlStatement) throws ParseException { return planner.parse(sqlStatement).getKind().belongsTo(SqlKind.DDL); } @@ -196,6 +227,7 @@ public BeamSqlEnvBuilder setCurrentSchema(String name) { /** Set the ruleSet used for query optimizer. */ public BeamSqlEnvBuilder setRuleSets(Collection ruleSets) { + LOG.info("Setting BeamSqlEnv rulesets to: {}", ruleSets); this.ruleSets = ruleSets; return this; } @@ -262,6 +294,7 @@ public BeamSqlEnv build() { configureSchemas(jdbcConnection); + LOG.info("Instantiating planner with ruleSets: {}", ruleSets); QueryPlanner planner = instantiatePlanner(jdbcConnection, ruleSets); // The planner may choose to add its own builtin functions to the schema, so load user-defined diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java index 606a3c5f71a2..11fb41ad1966 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java @@ -31,15 +31,21 @@ import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.extensions.sql.impl.udf.BeamBuiltinFunctionProvider; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.vendor.calcite.v1_40_0.com.google.common.collect.Table; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.config.CalciteConnectionConfig; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.Contexts; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.ConventionTraitDef; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptCluster; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptCost; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptPlanner; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptPlanner.CannotPlanException; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptUtil; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelTraitDef; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.hep.HepPlanner; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.hep.HepProgramBuilder; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.prepare.CalciteCatalogReader; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelRoot; @@ -51,6 +57,12 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.rules.CoreRules; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexBuilder; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexDynamicParam; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexNode; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexShuttle; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable; @@ -59,10 +71,15 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParser; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserImplFactory; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.util.SqlOperatorTables; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.validate.SqlConformance; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.validate.SqlConformanceEnum; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql2rel.RelDecorrelator; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql2rel.SqlToRelConverter; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.FrameworkConfig; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.Frameworks; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.Planner; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.Program; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelConversionException; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.ValidationException; @@ -85,11 +102,52 @@ public class CalciteQueryPlanner implements QueryPlanner { private final Planner planner; private final JdbcConnection connection; + private final FrameworkConfig config; + + // Cannot be final because of wacky initialization logic + private RelOptCluster relOptCluster; + private CalciteCatalogReader catalogReader; + private RelDataTypeFactory typeFactory; + private RelOptPlanner calcitePlanner; /** Called by {@link BeamSqlEnv}.instantiatePlanner() reflectively. */ public CalciteQueryPlanner(JdbcConnection connection, Collection ruleSets) { this.connection = connection; - this.planner = Frameworks.getPlanner(defaultConfig(connection, ruleSets)); + this.config = defaultConfig(connection, ruleSets); + this.planner = Frameworks.getPlanner(config); + + Frameworks.withPlanner( + (cluster, relOptSchema, rootSchema) -> { + // CAPTURE THE COMPONENTS HERE + this.relOptCluster = cluster; + this.catalogReader = (CalciteCatalogReader) relOptSchema; + this.typeFactory = cluster.getTypeFactory(); + this.calcitePlanner = cluster.getPlanner(); + + // ... any other setup from the original lambda ... + // e.g., planner.setExecutor(executor); + + return null; + }, + config); + + if (this.relOptCluster == null || this.catalogReader == null) { + throw new IllegalStateException("Failed to initialize Calcite components"); + } + } + + /** + * Returns a RelBuilder instance configured with the same Calcite components used by this + * QueryPlanner. + */ + @Override + public RelBuilder getRelBuilder() { + return RelBuilder.create(config); + } + + @Override + public SqlOperatorTable getOperatorTable() { + return config.getOperatorTable(); } public static final Factory FACTORY = @@ -98,6 +156,7 @@ public CalciteQueryPlanner(JdbcConnection connection, Collection ruleSe public QueryPlanner createPlanner( JdbcConnection jdbcConnection, Collection ruleSets) { loadBuiltinFunctions(jdbcConnection); + LOG.info("Factory creating planner with ruleSets: {}", ruleSets); return new CalciteQueryPlanner(jdbcConnection, ruleSets); } @@ -115,12 +174,20 @@ private void loadBuiltinFunctions(JdbcConnection jdbcConnection) { public FrameworkConfig defaultConfig(JdbcConnection connection, Collection ruleSets) { final CalciteConnectionConfig config = connection.config(); + // Resolve the parser conformance. Calcite's Avatica JDBC connect path silently drops the + // {@code conformance} connection property (it is not in the driver's registered property set), + // so {@code config.conformance()} is always DEFAULT here even when callers set it via + // {@code BeamSqlPipelineOptions.calciteConnectionProperties}. We therefore read that map + // directly from the connection's pipeline options and let it override. This keeps the behavior + // opt-in: with no {@code conformance} property the connection's own (DEFAULT) value is used, so + // existing Beam SQL behavior is unchanged. + final SqlConformance conformance = resolveConformance(connection, config); final SqlParser.ConfigBuilder parserConfig = SqlParser.configBuilder() .setQuotedCasing(config.quotedCasing()) .setUnquotedCasing(config.unquotedCasing()) .setQuoting(config.quoting()) - .setConformance(config.conformance()) + .setConformance(conformance) .setCaseSensitive(config.caseSensitive()); final SqlParserImplFactory parserFactory = config.parserFactory(SqlParserImplFactory.class, null); @@ -145,6 +212,7 @@ public FrameworkConfig defaultConfig(JdbcConnection connection, Collection props = sqlOptions.getCalciteConnectionProperties(); + if (props == null) { + return config.conformance(); + } + String value = null; + for (Map.Entry e : props.entrySet()) { + if ("conformance".equalsIgnoreCase(e.getKey())) { + value = e.getValue(); + break; + } + } + if (value == null) { + return config.conformance(); + } + try { + return SqlConformanceEnum.valueOf(value.trim().toUpperCase()); + } catch (IllegalArgumentException ex) { + LOG.warn("Unrecognized calcite conformance '{}', using {}", value, config.conformance()); + return config.conformance(); + } + } + /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */ @Override public SqlNode parse(String sqlStatement) throws ParseException { @@ -174,15 +299,14 @@ public SqlNode parse(String sqlStatement) throws ParseException { /** * It parses and validate the input query, then convert into a {@link BeamRelNode} tree. Note that - * query parameters are not yet supported. + * query parameters are now supported for positional parameters. */ @Override public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryParameters) throws ParseException, SqlConversionException { Preconditions.checkArgument( - queryParameters.getKind() == Kind.NONE, - "Beam SQL Calcite dialect does not yet support query parameters."); - BeamRelNode beamRelNode; + queryParameters.getKind() == Kind.NONE || queryParameters.getKind() == Kind.POSITIONAL, + "Beam SQL Calcite dialect only supports positional query parameters or no parameters."); try { SqlNode parsed = planner.parse(sqlStatement); TableResolutionUtils.setupCustomTableResolution(connection, parsed); @@ -191,38 +315,161 @@ public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryPa // root of original logical plan RelRoot root = planner.rel(validated); - LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel)); + RelNode relNode = root.rel; + if (queryParameters.getKind() == Kind.POSITIONAL) { + relNode = + bindParameters( + relNode, new ParameterBinder(relOptCluster.getRexBuilder(), queryParameters)); + } + return convertToBeamRel(relNode, queryParameters); + } catch (RelConversionException | CannotPlanException e) { + throw new SqlConversionException( + String.format("Unable to convert query %s", sqlStatement), e); + } catch (SqlParseException | ValidationException e) { + throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); + } finally { + planner.close(); + } + } + + private static RelNode bindParameters(RelNode rel, RexShuttle binder) { + RelNode newRel = rel.accept(binder); + java.util.List newInputs = new java.util.ArrayList<>(); + for (RelNode input : newRel.getInputs()) { + newInputs.add(bindParameters(input, binder)); + } + return newRel.copy(newRel.getTraitSet(), newInputs); + } + + @Override + public RelNode parseToRel(String sqlStatement, QueryParameters queryParameters) + throws ParseException, SqlConversionException { + Preconditions.checkArgument( + queryParameters.getKind() == Kind.NONE, + "Beam SQL Calcite dialect does not yet support query parameters."); + try { + SqlNode parsed = planner.parse(sqlStatement); + TableResolutionUtils.setupCustomTableResolution(connection, parsed); + SqlNode validated = planner.validate(parsed); + // root of original logical plan + RelRoot root = planner.rel(validated); + return root.rel; + } catch (RelConversionException e) { + throw new SqlConversionException( + String.format("Unable to convert query %s", sqlStatement), e); + } catch (SqlParseException | ValidationException e) { + throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); + } finally { + planner.close(); + } + } + + @Override + public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters queryParameters) { + RelNode beamRelNode; + try { + // Normalize correlated sub-queries into standard relational shapes (Join + Aggregate, etc.) + // BEFORE the Volcano program runs and BEFORE the metadata-provider swap below. Running this + // here keeps it off the cost-based metadata path (it uses stock Calcite metadata), so it + // cannot trigger the BeamCostModel / RelMdNodeStats recursion that the Volcano search guards + // against. The pre-pass is a no-op on trees that carry no referenced correlation variable + // (see normalizeForVolcano), which protects the UNNEST LogicalCorrelate(_, Uncollect) shape + // that BeamUnnestRule depends on. + relNode = normalizeForVolcano(relNode); + LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(relNode)); RelTraitSet desiredTraits = - root.rel + relNode .getTraitSet() .replace(BeamLogicalConvention.INSTANCE) - .replace(root.collation) + // .replace(root.collation) .simplify(); // beam physical plan - root.rel + relNode .getCluster() .setMetadataProvider( ChainedRelMetadataProvider.of( ImmutableList.of( NonCumulativeCostImpl.SOURCE, RelMdNodeStats.SOURCE, - root.rel.getCluster().getMetadataProvider()))); + relNode.getCluster().getMetadataProvider()))); - root.rel.getCluster().setMetadataQuerySupplier(BeamRelMetadataQuery::instance); + relNode.getCluster().setMetadataQuerySupplier(BeamRelMetadataQuery::instance); RelMetadataQuery.THREAD_PROVIDERS.set( - JaninoRelMetadataProvider.of(root.rel.getCluster().getMetadataProvider())); - root.rel.getCluster().invalidateMetadataQuery(); - beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, root.rel); + JaninoRelMetadataProvider.of(relNode.getCluster().getMetadataProvider())); + relNode.getCluster().invalidateMetadataQuery(); + Program program = config.getPrograms().get(0); + LOG.info("Desired traits: {}", desiredTraits); + beamRelNode = + program.run( + relNode.getCluster().getPlanner(), + relNode, + desiredTraits, + ImmutableList.of(), + ImmutableList.of()); LOG.info("BEAMPlan>\n{}", BeamSqlRelUtils.explainLazily(beamRelNode)); - } catch (RelConversionException | CannotPlanException e) { + } catch (CannotPlanException e) { throw new SqlConversionException( - String.format("Unable to convert query %s", sqlStatement), e); - } catch (SqlParseException | ValidationException e) { - throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); + String.format("Unable to convert relNode to Beam: %s", relNode), e); } finally { planner.close(); } - return beamRelNode; + return (BeamRelNode) beamRelNode; + } + + /** + * Pre-Volcano normalization pass for correlated sub-queries. + * + *

The SqlToRel converter (driven by {@link Planner#rel}) already decorrelates most queries, + * but some correlated shapes (e.g. correlated EXISTS/IN inside a project or join condition) + * survive as a {@code RexSubQuery} or a residual {@code LogicalCorrelate}. The Beam Volcano + * ruleset has no converter rule for a general {@code LogicalCorrelate}, so such a residue would + * fail planning with a {@code CannotPlanException}. This pass rewrites those into standard + * relational nodes ({@code Join}, {@code Aggregate}, {@code Project}, {@code Filter}) that + * existing Beam converter rules already cover. + * + *

The pass: + * + *

    + *
  1. runs a short-lived {@link HepPlanner} with the three {@code *_SUB_QUERY_TO_CORRELATE} + * rules to turn any un-expanded {@code RexSubQuery} into a {@code LogicalCorrelate}, then + *
  2. runs {@link RelDecorrelator#decorrelateQuery(RelNode, RelBuilder)} to lower correlates + * into joins/aggregates, using the planner's configured {@link RelBuilder} so produced rels + * share the cluster's type factory. + *
+ * + *

It runs strictly before the Volcano {@code program.run(...)} and before the + * metadata-provider swap in {@link #convertToBeamRel(RelNode, QueryParameters)}, so it stays off + * the Beam cost path. + * + *

Pre-flight safety: the whole pass is gated on the tree actually referencing a + * correlation variable ({@link RelOptUtil#getVariablesUsed(RelNode)} non-empty). This makes it a + * strict no-op on trees without a referenced correlate. In particular the UNNEST shape {@code + * LogicalCorrelate(_, Uncollect)} defines a correlation id but does not + * reference one in its body, so {@code getVariablesUsed} is empty for it and the pass is + * skipped — leaving the correlate intact for {@code BeamUnnestRule}. (Note: {@code + * getVariablesSet} would be wrong here, as it is non-empty for UNNEST.) + */ + private RelNode normalizeForVolcano(RelNode rel) { + // No-op unless the tree references a correlation variable. Protects the UNNEST + // LogicalCorrelate(_, Uncollect) shape, which defines but does not reference a correl var. + if (RelOptUtil.getVariablesUsed(rel).isEmpty()) { + return rel; + } + + // (1) Convert any residual RexSubQuery (correlated EXISTS/IN in PROJECT/JOIN/FILTER that the + // SqlToRel converter left in place) into a LogicalCorrelate via a tiny HEP pass. + HepProgramBuilder hep = + new HepProgramBuilder() + .addRuleInstance(CoreRules.FILTER_SUB_QUERY_TO_CORRELATE) + .addRuleInstance(CoreRules.PROJECT_SUB_QUERY_TO_CORRELATE) + .addRuleInstance(CoreRules.JOIN_SUB_QUERY_TO_CORRELATE); + HepPlanner hepPlanner = new HepPlanner(hep.build()); + hepPlanner.setRoot(rel); + RelNode noSubQuery = hepPlanner.findBestExp(); + + // (2) Decorrelate the LogicalCorrelate nodes into standard Join/Aggregate shapes. Uses the + // planner's RelBuilder so produced rels share the cluster's type factory + traits. + return RelDecorrelator.decorrelateQuery(noSubQuery, RelBuilder.create(config)); } // It needs to be public so that the generated code in Calcite can access it. @@ -265,4 +512,38 @@ public RelOptCost getNonCumulativeCost(RelNode rel, RelMetadataQuery mq) { return ((BeamRelNode) rel).beamComputeSelfCost(rel.getCluster().getPlanner(), bmq); } } + + private static class ParameterBinder extends RexShuttle { + private final RexBuilder rexBuilder; + private final List positionalParams; + + ParameterBinder(RexBuilder rexBuilder, QueryParameters params) { + this.rexBuilder = rexBuilder; + this.positionalParams = params.getKind() == Kind.POSITIONAL ? params.positional() : null; + } + + @Override + public RexNode visitDynamicParam(RexDynamicParam dynamicParam) { + if (positionalParams != null) { + int index = dynamicParam.getIndex(); + if (index < 0 || index >= positionalParams.size()) { + throw new IllegalArgumentException( + "Index out of bounds for positional parameter: " + index); + } + Object val = positionalParams.get(index); + RexNode literal = makeLiteral(val, dynamicParam.getType()); + return literal; + } + return super.visitDynamicParam(dynamicParam); + } + + private RexNode makeLiteral( + Object val, + org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelDataType type) { + if (val == null) { + return rexBuilder.makeNullLiteral(type); + } + return rexBuilder.makeLiteral(val, type, true); + } + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java index 0f0f8970a3ab..fc3b0b92ba35 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java @@ -22,7 +22,9 @@ import java.util.List; import java.util.Map; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -39,9 +41,29 @@ public interface QueryPlanner { BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryParameters) throws ParseException, SqlConversionException; + /** It parses and validate the input query, then convert into a {@link BeamRelNode} tree. */ + BeamRelNode convertToBeamRel(RelNode sqlStatement, QueryParameters queryParameters) + throws SqlConversionException; + + /** + * Parses and validates {@code sqlStatement} and returns the logical {@link RelNode} (standard + * Calcite convention), WITHOUT converting to Beam physical rels. This lets callers rewrite the + * logical plan (e.g. inject custom rels) before {@link #convertToBeamRel(RelNode, + * QueryParameters)}. + */ + default RelNode parseToRel(String sqlStatement, QueryParameters queryParameters) + throws ParseException, SqlConversionException { + throw new UnsupportedOperationException( + "parseToRel is not implemented by " + getClass().getName()); + } + /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */ SqlNode parse(String sqlStatement) throws ParseException; + RelBuilder getRelBuilder(); + + org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable getOperatorTable(); + @AutoOneOf(QueryParameters.Kind.class) abstract class QueryParameters { public enum Kind { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/decorrelation_design.md b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/decorrelation_design.md new file mode 100644 index 000000000000..1ce92cc4ad1c --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/decorrelation_design.md @@ -0,0 +1,111 @@ + + +# Correlated sub-query decorrelation pre-pass + +## Context + +`CalciteQueryPlanner.convertToBeamRel(RelNode, ...)` is the single chokepoint +through which every SQL query the Beam SQL extension plans is converted from a +Calcite logical tree into a `BeamRelNode` tree by a single Volcano program. + +The SqlToRel converter (driven by `Planner.rel`) already decorrelates most +queries: `SqlToRelConverter` is configured with `withExpand(true)`, and the +default `Planner.rel` path calls `RelDecorrelator.decorrelateQuery` once on the +post-`SqlToRelConverter` tree. For the common correlated-scalar shape this is +enough — the result is a `Join` + `Aggregate[SINGLE_VALUE]` that existing Beam +rules already cover. + +However, some correlated shapes survive that first pass: + +- An un-expanded `RexSubQuery` inside a PROJECT/JOIN/FILTER condition (e.g. some + correlated `EXISTS`/`IN` forms), or +- a residual `LogicalCorrelate` the SqlToRel decorrelate pass could not lower. + +The Beam Volcano ruleset (`BeamRuleSets.LOGICAL_OPTIMIZATIONS` / +`BEAM_CONVERTERS`) has **no** converter rule for a general `LogicalCorrelate`. +The only consumer is `BeamUnnestRule`, which matches strictly +`LogicalCorrelate(_, Uncollect)` (the `UNNEST` shape). So any other residual +correlate reaching Volcano fails planning with a `CannotPlanException`, surfaced +as `SqlConversionException: Unable to convert relNode to Beam: ...`. + +## Design + +Add a private `normalizeForVolcano(RelNode)` pre-pass that runs at the very top +of `convertToBeamRel(RelNode, ...)`, **strictly before** both: + +1. the Volcano `program.run(...)`, and +2. the metadata-provider swap (`setMetadataProvider` / `setMetadataQuerySupplier` + / `RRelMetadataQuery.THREAD_PROVIDERS`). + +Running it before the metadata swap is deliberate: the pass uses stock Calcite +metadata only, so it cannot trigger the Beam cost path (`BeamCostModel`, +`RelMdNodeStats`, `BeamRelMetadataQuery`) and the cost recursion the Volcano +search guards against. + +The pass does two things: + +1. A short-lived `HepPlanner` with `FILTER_SUB_QUERY_TO_CORRELATE`, + `PROJECT_SUB_QUERY_TO_CORRELATE`, and `JOIN_SUB_QUERY_TO_CORRELATE` to turn any + residual `RexSubQuery` into a `LogicalCorrelate`. +2. `RelDecorrelator.decorrelateQuery(rel, RelBuilder.create(config))` to lower + correlates into standard `Join`/`Aggregate`/`Project`/`Filter` nodes, using the + planner's configured `RelBuilder` so produced rels share the cluster type + factory and traits. + +### Pre-flight safety gate + +The entire pass is gated on the tree actually **referencing** a correlation +variable: `RelOptUtil.getVariablesUsed(rel).isEmpty()` ⇒ return the input +unchanged. This makes the pass a strict no-op on trees without a referenced +correlate. + +The gate intentionally uses `getVariablesUsed`, **not** `getVariablesSet`: + +| shape | `getVariablesSet` | `getVariablesUsed` | +|------------------------------------|-------------------|--------------------| +| `LogicalCorrelate(_, Uncollect)` (UNNEST) | non-empty (defines an id) | empty (body refs none) | +| correlated scalar / EXISTS / IN | non-empty | non-empty | + +Because UNNEST *defines* a correlation id but does not *reference* one in its +body, `getVariablesUsed` is empty for it and the pass is skipped, leaving the +`LogicalCorrelate(_, Uncollect)` intact for `BeamUnnestRule`. A gate keyed on +`getVariablesSet` would wrongly run the decorrelator on UNNEST trees. This +structural guard is preferred over relying on test-only verification. + +## Increments + +This change is the smallest shippable increment: + +- Single edit in `CalciteQueryPlanner.convertToBeamRel(RelNode, ...)` plus one + private helper. No ruleset changes, no SparkConnect-side changes. +- Deferred (explicitly out of scope): adding the PROJECT/JOIN + `*_SUB_QUERY_TO_CORRELATE` variants into the Volcano `LOGICAL_OPTIMIZATIONS` + ruleset; non-equi semi/anti-join lowering for correlated `EXISTS` without an + equi key (still blocked at `BeamJoinRel.extractJoinRexNodes`). + +## Risks + +1. **UNNEST regression.** Mitigated by the `getVariablesUsed` gate above and + verified by the `*Unnest*` test subset (and `BeamSqlDslArrayTest`). +2. **Idempotency.** For trees the SqlToRel pass already fully decorrelated, the + gate skips the pass entirely (no referenced correl var remains), so there is + no redundant second decorrelate and no shape churn. +3. **Cost-hang trap.** Avoided by placement strictly before the metadata-provider + swap and Volcano; the pass never touches the Beam cost model. +4. **Hand-built `RelNode` test fixtures** (join/aggregation DSL tests) flow + through this method too; with no referenced correl var they hit the gate and + are untouched. The filtered `*Join*` / `*CalciteQueryPlanner*` subset guards + this. diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java index 6d6be5d5a127..43b6d86d188a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java @@ -55,7 +55,7 @@ public static SqlNode column( } /** Returns the schema in which to create an object. */ - static Pair schema( + public static Pair schema( CalcitePrepare.Context context, boolean mutable, SqlIdentifier id) { CalciteSchema rootSchema = mutable ? context.getMutableRootSchema() : context.getRootSchema(); @Nullable CalciteSchema schema = null; @@ -72,7 +72,7 @@ static Pair schema( return Pair.of(checkStateNotNull(schema, "Got null sub-schema for path '%s'", path), name(id)); } - private static @Nullable CalciteSchema childSchema(CalciteSchema rootSchema, List path) { + public static @Nullable CalciteSchema childSchema(CalciteSchema rootSchema, List path) { @Nullable CalciteSchema schema = rootSchema; for (String p : path) { if (schema == null) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java index cdee6c930224..5efc3010b9c1 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.extensions.sql.meta.catalog; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import java.util.Collection; import java.util.Collections; @@ -111,12 +110,16 @@ public Collection databases() { @Override public boolean dropDatabase(String database, boolean cascade) { - checkState(!cascade, "%s does not support CASCADE.", getClass().getSimpleName()); + MetaStore metaStore = metaStores.get(database); + if (!cascade && metaStore != null && !metaStore.getTables().isEmpty()) { + throw new IllegalStateException("Database '" + database + "' is not empty."); + } boolean removed = databases.remove(database); if (database.equals(currentDatabase)) { currentDatabase = null; } + metaStores.remove(database); return removed; } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java index 3ddd78ab232b..9373ec931e6a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java @@ -238,7 +238,7 @@ public PCollection expand(PCollection input) { /** Write-side converter for {@link TextTable} with format {@code 'csv'}. */ @VisibleForTesting - static class RowToCsv extends PTransform, PCollection> + public static class RowToCsv extends PTransform, PCollection> implements Serializable { private CSVFormat csvFormat; diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java index 588caa78a2b7..2905f8efcaad 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java @@ -83,6 +83,15 @@ public void testUseDatabase() { assertEquals("my_database2", catalogManager.currentCatalog().currentDatabase()); } + @Test + public void testUseDatabaseWithoutDatabaseKeyword() { + assertEquals(DEFAULT, catalogManager.currentCatalog().currentDatabase()); + cli.execute("CREATE DATABASE my_database"); + assertEquals(DEFAULT, catalogManager.currentCatalog().currentDatabase()); + cli.execute("USE my_database"); + assertEquals("my_database", catalogManager.currentCatalog().currentDatabase()); + } + @Test public void testUseDatabase_doesNotExist() { assertEquals(DEFAULT, catalogManager.currentCatalog().currentDatabase()); @@ -126,6 +135,33 @@ public void testDropDatabase_nonexistent() { cli.execute("DROP DATABASE my_database"); } + @Test + public void testDropDatabase_notEmpty_restrict() { + cli.execute("CREATE DATABASE db_1"); + cli.execute("USE db_1"); + + TestTableProvider testTableProvider = new TestTableProvider(); + catalogManager.registerTableProvider(testTableProvider); + cli.execute("CREATE EXTERNAL TABLE person(id int, name varchar, age int) TYPE 'test'"); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Database 'db_1' is not empty."); + cli.execute("DROP DATABASE db_1"); + } + + @Test + public void testDropDatabase_notEmpty_cascade() { + cli.execute("CREATE DATABASE db_1"); + cli.execute("USE db_1"); + + TestTableProvider testTableProvider = new TestTableProvider(); + catalogManager.registerTableProvider(testTableProvider); + cli.execute("CREATE EXTERNAL TABLE person(id int, name varchar, age int) TYPE 'test'"); + + cli.execute("DROP DATABASE db_1 CASCADE"); + assertFalse(catalogManager.currentCatalog().databaseExists("db_1")); + } + @Test public void testCreateInsertDropTableUsingDefaultDatabase() { Catalog catalog = catalogManager.currentCatalog(); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java index 65d6d72d657c..e747ca4f83a5 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java @@ -68,6 +68,32 @@ public void testSelectArrayValue() { pipeline.run(); } + // Spark call-form constructor ARRAY('aa', 'bb') (vs the bracket form ARRAY['aa','bb']). Parses to + // SqlLibraryOperators.ARRAY and is resolved by the operator-table registration in + // CalciteQueryPlanner; this test verifies it also LOWERS end-to-end (RexImpTable). + @Test + public void testSelectArrayFunctionForm() { + PCollection input = pCollectionOf2Elements(); + + Schema resultType = + Schema.builder() + .addInt32Field("f_int") + .addArrayField("f_arr", Schema.FieldType.STRING) + .build(); + + PCollection result = + input.apply( + "sqlQuery", + SqlTransform.query("SELECT 42, ARRAY ('aa', 'bb') as `f_arr` FROM PCOLLECTION")); + + PAssert.that(result) + .containsInAnyOrder( + Row.withSchema(resultType).addValues(42, Arrays.asList("aa", "bb")).build(), + Row.withSchema(resultType).addValues(42, Arrays.asList("aa", "bb")).build()); + + pipeline.run(); + } + @Test public void testProjectArrayField() { PCollection input = pCollectionOf2Elements(); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java index 4a2ad664b4b3..57839e9084b4 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java @@ -90,6 +90,31 @@ public void testSelectMapField() { pipeline.run(); } + // Spark call-form constructor MAP('aa', 1) (vs the bracket form MAP['aa', 1]). Parses to + // SqlLibraryOperators.MAP and is resolved by the operator-table registration in + // CalciteQueryPlanner; this test verifies it also LOWERS end-to-end (RexImpTable). + @Test + public void testSelectMapFunctionForm() { + PCollection input = pCollectionOf2Elements(); + + Schema resultType = + Schema.builder() + .addInt32Field("f_int") + .addMapField("f_intStringMap", Schema.FieldType.STRING, Schema.FieldType.INT32) + .build(); + + PCollection result = + input.apply( + "sqlQuery", SqlTransform.query("SELECT 42, MAP('aa', 1) as `f_map` FROM PCOLLECTION")); + + PAssert.that(result) + .containsInAnyOrder( + Row.withSchema(resultType).addValues(42, ImmutableMap.of("aa", 1)).build(), + Row.withSchema(resultType).addValues(42, ImmutableMap.of("aa", 1)).build()); + + pipeline.run(); + } + @Test public void testSelectMapFieldKeyValueSameType() { PCollection input = pCollectionOf2Elements();