From 73ee0deb602d3b37758aaec68044adcd70ca6873 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 9 Jun 2026 15:55:17 +0000 Subject: [PATCH] Reconcile schema nullability in BeamCoGBKJoinRel --- .../sql/impl/rel/BeamCoGBKJoinRel.java | 39 +++++++++++++-- .../extensions/sql/BeamSqlDslJoinTest.java | 48 +++++++++++++++++++ 2 files changed, 83 insertions(+), 4 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java index b16755d22180..5a9c40a11f35 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.Row; @@ -193,11 +194,41 @@ private PCollection standardJoin( } // Flatten the lhs and rhs fields into a single row. + FieldAccessDescriptor flattenFields = + FieldAccessDescriptor.withFieldNames( + org.apache.beam.sdk.schemas.transforms.Join.LHS_TAG + ".*", + org.apache.beam.sdk.schemas.transforms.Join.RHS_TAG + ".*"); + + // Reconcile the desired output schema (which carries the Calcite-derived field names and the + // correct top-level, outer-join-aware nullability) with the types the data actually carries. + // Calcite's join row-type derivation can report a different nullability than the rows hold -- + // in particular it can mark the fields nested inside a struct column as nullable even when the + // joined rows still keep them NOT NULL. Forcing the Calcite schema verbatim then trips Select's + // type-equality guard. The flatten emits the lhs struct's fields followed by the rhs struct's + // fields, so walk the Calcite output positionally against those data fields, keeping Calcite's + // names and top-level nullability but adopting the data's (possibly deeper) field types. + Schema calciteSchema = CalciteUtils.toSchema(getRowType()); + Schema joinedSchema = joinedRows.getSchema(); + List dataFields = new java.util.ArrayList<>(); + dataFields.addAll( + Preconditions.checkArgumentNotNull(joinedSchema.getField(0).getType().getRowSchema()) + .getFields()); + dataFields.addAll( + Preconditions.checkArgumentNotNull(joinedSchema.getField(1).getType().getRowSchema()) + .getFields()); + Schema.Builder reconciled = Schema.builder(); + for (int i = 0; i < calciteSchema.getFieldCount(); i++) { + Schema.Field calciteField = calciteSchema.getField(i); + // Keep the data's type verbatim -- including the nullability of fields nested inside struct + // columns, which Calcite's join row-type derivation can report incorrectly -- but honour + // Calcite's top-level nullability so outer-join columns stay nullable. + reconciled.addField( + calciteField.withType( + dataFields.get(i).getType().withNullable(calciteField.getType().getNullable()))); + } + return joinedRows.apply( - Select.fieldNames( - org.apache.beam.sdk.schemas.transforms.Join.LHS_TAG + ".*", - org.apache.beam.sdk.schemas.transforms.Join.RHS_TAG + ".*") - .withOutputSchema(CalciteUtils.toSchema(getRowType()))); + Select.fieldAccess(flattenFields).withOutputSchema(reconciled.build())); } @Override diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java index b210bb0258f5..2bdad7219e07 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java @@ -325,6 +325,54 @@ private PCollection ordersUnbounded() { .buildUnbounded(); } + @Test + public void testLeftOuterJoinWithNestedRows() { + Schema nestedSchema = + Schema.builder().addInt32Field("nested_int").addStringField("nested_str").build(); + Schema lhsSchema = Schema.builder().addInt32Field("id").addStringField("val").build(); + Schema rhsSchema = + Schema.builder().addInt32Field("id").addRowField("nested", nestedSchema).build(); + + PCollection lhs = + pipeline.apply( + "lhs", + org.apache.beam.sdk.transforms.Create.of( + Row.withSchema(lhsSchema).addValues(1, "lhs1").build(), + Row.withSchema(lhsSchema).addValues(2, "lhs2").build()) + .withRowSchema(lhsSchema)); + + PCollection rhs = + pipeline.apply( + "rhs", + org.apache.beam.sdk.transforms.Create.of( + Row.withSchema(rhsSchema) + .addValues(1, Row.withSchema(nestedSchema).addValues(10, "nested1").build()) + .build()) + .withRowSchema(rhsSchema)); + + String sql = + "SELECT lhs.id, lhs.val, rhs.nested FROM lhs LEFT OUTER JOIN rhs ON lhs.id = rhs.id"; + + PCollection result = + PCollectionTuple.of("lhs", lhs).and("rhs", rhs).apply("sql", SqlTransform.query(sql)); + + Schema expectedSchema = + Schema.builder() + .addInt32Field("id") + .addStringField("val") + .addNullableField("nested", Schema.FieldType.row(nestedSchema)) + .build(); + + PAssert.that(result) + .containsInAnyOrder( + Row.withSchema(expectedSchema) + .addValues(1, "lhs1", Row.withSchema(nestedSchema).addValues(10, "nested1").build()) + .build(), + Row.withSchema(expectedSchema).addValues(2, "lhs2", null).build()); + + pipeline.run(); + } + private PCollection queryFromOrderTables(String sql) { return tuple( "ORDER_DETAILS1",