From 4f96a27d2d2d9b5e706cd25afb52927d9a5144d4 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 9 Jun 2026 21:16:35 -0400 Subject: [PATCH] [SQL] Support positional parameters --- .../sql/impl/CalciteQueryPlanner.java | 93 +++++++++++++++++-- ...BeamSqlAliasTest => BeamSqlAliasTest.java} | 12 +-- .../sql/BeamSqlDslParametersTest.java | 78 ++++++++++++++++ 3 files changed, 168 insertions(+), 15 deletions(-) rename sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/{BeamSqlAliasTest => BeamSqlAliasTest.java} (92%) create mode 100644 sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslParametersTest.java diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java index 606a3c5f71a2..aa6f4d121871 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java @@ -51,6 +51,11 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexBuilder; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexDynamicParam; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexNode; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexShuttle; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable; @@ -180,8 +185,8 @@ public SqlNode parse(String sqlStatement) throws ParseException { public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryParameters) throws ParseException, SqlConversionException { Preconditions.checkArgument( - queryParameters.getKind() == Kind.NONE, - "Beam SQL Calcite dialect does not yet support query parameters."); + queryParameters.getKind() == Kind.NONE || queryParameters.getKind() == Kind.POSITIONAL, + "Beam SQL Calcite dialect only supports positional query parameters."); BeamRelNode beamRelNode; try { SqlNode parsed = planner.parse(sqlStatement); @@ -191,28 +196,35 @@ public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryPa // root of original logical plan RelRoot root = planner.rel(validated); + RelNode relNode = root.rel; + if (queryParameters.getKind() == Kind.POSITIONAL) { + relNode = + bindParameters( + relNode, + new ParameterBinder(root.rel.getCluster().getRexBuilder(), queryParameters)); + } LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel)); RelTraitSet desiredTraits = - root.rel + relNode .getTraitSet() .replace(BeamLogicalConvention.INSTANCE) .replace(root.collation) .simplify(); // beam physical plan - root.rel + relNode .getCluster() .setMetadataProvider( ChainedRelMetadataProvider.of( ImmutableList.of( NonCumulativeCostImpl.SOURCE, RelMdNodeStats.SOURCE, - root.rel.getCluster().getMetadataProvider()))); + relNode.getCluster().getMetadataProvider()))); - root.rel.getCluster().setMetadataQuerySupplier(BeamRelMetadataQuery::instance); + relNode.getCluster().setMetadataQuerySupplier(BeamRelMetadataQuery::instance); RelMetadataQuery.THREAD_PROVIDERS.set( - JaninoRelMetadataProvider.of(root.rel.getCluster().getMetadataProvider())); - root.rel.getCluster().invalidateMetadataQuery(); - beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, root.rel); + JaninoRelMetadataProvider.of(relNode.getCluster().getMetadataProvider())); + relNode.getCluster().invalidateMetadataQuery(); + beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, relNode); LOG.info("BEAMPlan>\n{}", BeamSqlRelUtils.explainLazily(beamRelNode)); } catch (RelConversionException | CannotPlanException e) { throw new SqlConversionException( @@ -225,6 +237,15 @@ public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryPa return beamRelNode; } + private static RelNode bindParameters(RelNode rel, RexShuttle binder) { + RelNode newRel = rel.accept(binder); + java.util.List newInputs = new java.util.ArrayList<>(); + for (RelNode input : newRel.getInputs()) { + newInputs.add(bindParameters(input, binder)); + } + return newRel.copy(newRel.getTraitSet(), newInputs); + } + // It needs to be public so that the generated code in Calcite can access it. public static class NonCumulativeCostImpl implements MetadataHandler { @@ -265,4 +286,58 @@ public RelOptCost getNonCumulativeCost(RelNode rel, RelMetadataQuery mq) { return ((BeamRelNode) rel).beamComputeSelfCost(rel.getCluster().getPlanner(), bmq); } } + + private static class ParameterBinder extends RexShuttle { + private final RexBuilder rexBuilder; + private final List positionalParams; + + ParameterBinder(RexBuilder rexBuilder, QueryParameters params) { + this.rexBuilder = rexBuilder; + this.positionalParams = params.getKind() == Kind.POSITIONAL ? params.positional() : null; + } + + @Override + public RexNode visitDynamicParam(RexDynamicParam dynamicParam) { + if (positionalParams != null) { + int index = dynamicParam.getIndex(); + if (index < 0 || index >= positionalParams.size()) { + throw new IllegalArgumentException( + "Index out of bounds for positional parameter: " + index); + } + Object val = positionalParams.get(index); + return makeLiteral(cleanValue(val), dynamicParam.getType()); + } + return super.visitDynamicParam(dynamicParam); + } + + private RexNode makeLiteral(Object val, RelDataType type) { + if (val == null) { + return rexBuilder.makeNullLiteral(type); + } + return rexBuilder.makeLiteral(val, type, true); + } + + @SuppressWarnings("JavaUtilDate") // explicit java.util.Date support + private Object cleanValue(Object value) { + if (value instanceof org.joda.time.ReadableInstant) { + return ((org.joda.time.ReadableInstant) value).getMillis(); + } + if (value instanceof java.time.LocalDate) { + return (int) ((java.time.LocalDate) value).toEpochDay(); + } + if (value instanceof java.time.LocalTime) { + return (int) (((java.time.LocalTime) value).toNanoOfDay() / 1_000_000L); + } + if (value instanceof java.time.LocalDateTime) { + return ((java.time.LocalDateTime) value).toInstant(java.time.ZoneOffset.UTC).toEpochMilli(); + } + if (value instanceof java.sql.Timestamp) { + return ((java.sql.Timestamp) value).getTime(); + } + if (value instanceof java.util.Date) { + return ((java.util.Date) value).getTime(); + } + return value; + } + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest.java similarity index 92% rename from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest rename to sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest.java index 790312b7e756..de3c8e6f301e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.extensions.sql; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.Serializable; import java.util.HashMap; import java.util.List; @@ -33,8 +35,6 @@ import org.apache.beam.sdk.values.Row; import org.junit.Rule; import org.junit.Test; -import org.testcontainers.shaded.com.fasterxml.jackson.databind.MapperFeature; -import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; public class BeamSqlAliasTest implements Serializable { @@ -42,10 +42,10 @@ public class BeamSqlAliasTest implements Serializable { @Test public void testSqlWithAliasIsNotIgnoredWithOptimizers() { - String ID = "id"; - String EVENT = "event"; + final String id = "id"; + final String event = "event"; - Schema inputType = Schema.builder().addStringField(ID).addStringField(EVENT).build(); + Schema inputType = Schema.builder().addStringField(id).addStringField(event).build(); String sql = "select event as event_name, count(*) as c\n" + "from PCOLLECTION\n" + "group by event"; @@ -91,4 +91,4 @@ public void processElement(DoFn.ProcessContext c) pipeline.run().waitUntilFinish(); } -} \ No newline at end of file +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslParametersTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslParametersTest.java new file mode 100644 index 000000000000..9166fd16e0a6 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslParametersTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithoutTimeZone; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.Arrays; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.joda.time.Instant; +import org.junit.Test; + +/** Tests for query parameters in Beam SQL. */ +public class BeamSqlDslParametersTest extends BeamSqlDslBase { + + @Test + public void testPositionalParameters() { + String sql = "SELECT f_int, f_string FROM PCOLLECTION WHERE f_int = ? AND f_string = ?"; + + PCollection result = + boundedInput1.apply( + "testPositionalParameters", + SqlTransform.query(sql).withPositionalParameters(Arrays.asList(1, "string_row1"))); + + Row expectedRow = + Row.withSchema(Schema.builder().addInt32Field("f_int").addStringField("f_string").build()) + .addValues(1, "string_row1") + .build(); + + PAssert.that(result).containsInAnyOrder(expectedRow); + + pipeline.run(); + } + + @Test + public void testDateTimeParameters() { + String sql = + "SELECT f_int FROM PCOLLECTION WHERE f_date = ? AND f_time = ? AND f_datetime = ? AND f_timestamp = ?"; + + PCollection result = + boundedInput1.apply( + "testDateTimeParameters", + SqlTransform.query(sql) + .withPositionalParameters( + Arrays.asList( + LocalDate.of(2017, 1, 1), + LocalTime.of(1, 1, 3), + LocalDateTime.of(2017, 1, 1, 1, 1, 3), + new Instant(parseTimestampWithoutTimeZone("2017-01-01 01:01:03"))))); + + Row expectedRow = + Row.withSchema(Schema.builder().addInt32Field("f_int").build()).addValues(1).build(); + + PAssert.that(result).containsInAnyOrder(expectedRow); + + pipeline.run(); + } +}