Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexBuilder;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexDynamicParam;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexShuttle;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable;
Expand Down Expand Up @@ -180,8 +185,8 @@ public SqlNode parse(String sqlStatement) throws ParseException {
public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryParameters)
throws ParseException, SqlConversionException {
Preconditions.checkArgument(
queryParameters.getKind() == Kind.NONE,
"Beam SQL Calcite dialect does not yet support query parameters.");
queryParameters.getKind() == Kind.NONE || queryParameters.getKind() == Kind.POSITIONAL,
"Beam SQL Calcite dialect only supports positional query parameters.");
BeamRelNode beamRelNode;
try {
SqlNode parsed = planner.parse(sqlStatement);
Expand All @@ -191,28 +196,35 @@ public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryPa

// root of original logical plan
RelRoot root = planner.rel(validated);
RelNode relNode = root.rel;
if (queryParameters.getKind() == Kind.POSITIONAL) {
relNode =
bindParameters(
relNode,
new ParameterBinder(root.rel.getCluster().getRexBuilder(), queryParameters));
}
LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logging statement still prints the original plan root.rel which contains unbound dynamic parameters (?). It would be more helpful to log the plan after parameter binding (relNode) so that the fully resolved query plan is visible in the logs.

Suggested change
LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel));
LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(relNode));

RelTraitSet desiredTraits =
root.rel
relNode
.getTraitSet()
.replace(BeamLogicalConvention.INSTANCE)
.replace(root.collation)
.simplify();
// beam physical plan
root.rel
relNode
.getCluster()
.setMetadataProvider(
ChainedRelMetadataProvider.of(
ImmutableList.of(
NonCumulativeCostImpl.SOURCE,
RelMdNodeStats.SOURCE,
root.rel.getCluster().getMetadataProvider())));
relNode.getCluster().getMetadataProvider())));

root.rel.getCluster().setMetadataQuerySupplier(BeamRelMetadataQuery::instance);
relNode.getCluster().setMetadataQuerySupplier(BeamRelMetadataQuery::instance);
RelMetadataQuery.THREAD_PROVIDERS.set(
JaninoRelMetadataProvider.of(root.rel.getCluster().getMetadataProvider()));
root.rel.getCluster().invalidateMetadataQuery();
beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, root.rel);
JaninoRelMetadataProvider.of(relNode.getCluster().getMetadataProvider()));
relNode.getCluster().invalidateMetadataQuery();
beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, relNode);
LOG.info("BEAMPlan>\n{}", BeamSqlRelUtils.explainLazily(beamRelNode));
} catch (RelConversionException | CannotPlanException e) {
throw new SqlConversionException(
Expand All @@ -225,6 +237,15 @@ public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryPa
return beamRelNode;
}

private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
RelNode newRel = rel.accept(binder);
java.util.List<RelNode> newInputs = new java.util.ArrayList<>();
for (RelNode input : newRel.getInputs()) {
newInputs.add(bindParameters(input, binder));
}
return newRel.copy(newRel.getTraitSet(), newInputs);
}
Comment on lines +240 to +247

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The bindParameters method attempts to call rel.accept(binder) where binder is a RexShuttle. However, in Apache Calcite, RelNode only has an accept(RelShuttle) method, and RexShuttle does not implement RelShuttle. This will cause a compilation error.

To apply a RexShuttle to a RelNode's expressions, you should use a RelShuttle that visits the RelNode and applies the RexShuttle to its expressions (for example, by overriding visit methods or using rel.accept(new RelShuttleImpl() { ... })). Since doing this manually for all relational operators is verbose, you can use rel.accept(new RelShuttleImpl() { ... }) and override the visit methods, or use a custom RelShuttle implementation that handles expression transformation.


// It needs to be public so that the generated code in Calcite can access it.
public static class NonCumulativeCostImpl
implements MetadataHandler<BuiltInMetadata.NonCumulativeCost> {
Expand Down Expand Up @@ -265,4 +286,58 @@ public RelOptCost getNonCumulativeCost(RelNode rel, RelMetadataQuery mq) {
return ((BeamRelNode) rel).beamComputeSelfCost(rel.getCluster().getPlanner(), bmq);
}
}

private static class ParameterBinder extends RexShuttle {
private final RexBuilder rexBuilder;
private final List<?> positionalParams;

ParameterBinder(RexBuilder rexBuilder, QueryParameters params) {
this.rexBuilder = rexBuilder;
this.positionalParams = params.getKind() == Kind.POSITIONAL ? params.positional() : null;
}

@Override
public RexNode visitDynamicParam(RexDynamicParam dynamicParam) {
if (positionalParams != null) {
int index = dynamicParam.getIndex();
if (index < 0 || index >= positionalParams.size()) {
throw new IllegalArgumentException(
"Index out of bounds for positional parameter: " + index);
}
Object val = positionalParams.get(index);
return makeLiteral(cleanValue(val), dynamicParam.getType());
}
return super.visitDynamicParam(dynamicParam);
}

private RexNode makeLiteral(Object val, RelDataType type) {
if (val == null) {
return rexBuilder.makeNullLiteral(type);
}
return rexBuilder.makeLiteral(val, type, true);
}

@SuppressWarnings("JavaUtilDate") // explicit java.util.Date support
private Object cleanValue(Object value) {
if (value instanceof org.joda.time.ReadableInstant) {
return ((org.joda.time.ReadableInstant) value).getMillis();
}
if (value instanceof java.time.LocalDate) {
return (int) ((java.time.LocalDate) value).toEpochDay();
}
if (value instanceof java.time.LocalTime) {
return (int) (((java.time.LocalTime) value).toNanoOfDay() / 1_000_000L);
}
if (value instanceof java.time.LocalDateTime) {
return ((java.time.LocalDateTime) value).toInstant(java.time.ZoneOffset.UTC).toEpochMilli();
}
if (value instanceof java.sql.Timestamp) {
return ((java.sql.Timestamp) value).getTime();
}
if (value instanceof java.util.Date) {
return ((java.util.Date) value).getTime();
}
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.extensions.sql;

import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
Expand All @@ -33,19 +35,17 @@
import org.apache.beam.sdk.values.Row;
import org.junit.Rule;
import org.junit.Test;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.MapperFeature;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced in #30902, this test never get exercised

import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;

public class BeamSqlAliasTest implements Serializable {

@Rule public final transient TestPipeline pipeline = TestPipeline.create();

@Test
public void testSqlWithAliasIsNotIgnoredWithOptimizers() {
String ID = "id";
String EVENT = "event";
final String id = "id";
final String event = "event";

Schema inputType = Schema.builder().addStringField(ID).addStringField(EVENT).build();
Schema inputType = Schema.builder().addStringField(id).addStringField(event).build();

String sql =
"select event as event_name, count(*) as c\n" + "from PCOLLECTION\n" + "group by event";
Expand Down Expand Up @@ -91,4 +91,4 @@ public void processElement(DoFn<Row, String>.ProcessContext c)

pipeline.run().waitUntilFinish();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql;

import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithoutTimeZone;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Instant;
import org.junit.Test;

/** Tests for query parameters in Beam SQL. */
public class BeamSqlDslParametersTest extends BeamSqlDslBase {

@Test
public void testPositionalParameters() {
String sql = "SELECT f_int, f_string FROM PCOLLECTION WHERE f_int = ? AND f_string = ?";

PCollection<Row> result =
boundedInput1.apply(
"testPositionalParameters",
SqlTransform.query(sql).withPositionalParameters(Arrays.asList(1, "string_row1")));

Row expectedRow =
Row.withSchema(Schema.builder().addInt32Field("f_int").addStringField("f_string").build())
.addValues(1, "string_row1")
.build();

PAssert.that(result).containsInAnyOrder(expectedRow);

pipeline.run();
}

@Test
public void testDateTimeParameters() {
String sql =
"SELECT f_int FROM PCOLLECTION WHERE f_date = ? AND f_time = ? AND f_datetime = ? AND f_timestamp = ?";

PCollection<Row> result =
boundedInput1.apply(
"testDateTimeParameters",
SqlTransform.query(sql)
.withPositionalParameters(
Arrays.asList(
LocalDate.of(2017, 1, 1),
LocalTime.of(1, 1, 3),
LocalDateTime.of(2017, 1, 1, 1, 1, 3),
new Instant(parseTimestampWithoutTimeZone("2017-01-01 01:01:03")))));

Row expectedRow =
Row.withSchema(Schema.builder().addInt32Field("f_int").build()).addValues(1).build();

PAssert.that(result).containsInAnyOrder(expectedRow);

pipeline.run();
}
}
Loading