Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
Expand Down Expand Up @@ -193,11 +194,41 @@ private PCollection<Row> standardJoin(
}

// Flatten the lhs and rhs fields into a single row.
FieldAccessDescriptor flattenFields =
FieldAccessDescriptor.withFieldNames(
org.apache.beam.sdk.schemas.transforms.Join.LHS_TAG + ".*",
org.apache.beam.sdk.schemas.transforms.Join.RHS_TAG + ".*");

// Reconcile the desired output schema (which carries the Calcite-derived field names and the
// correct top-level, outer-join-aware nullability) with the types the data actually carries.
// Calcite's join row-type derivation can report a different nullability than the rows hold --
// in particular it can mark the fields nested inside a struct column as nullable even when the
// joined rows still keep them NOT NULL. Forcing the Calcite schema verbatim then trips Select's
// type-equality guard. The flatten emits the lhs struct's fields followed by the rhs struct's
// fields, so walk the Calcite output positionally against those data fields, keeping Calcite's
// names and top-level nullability but adopting the data's (possibly deeper) field types.
Schema calciteSchema = CalciteUtils.toSchema(getRowType());
Schema joinedSchema = joinedRows.getSchema();
List<Schema.Field> dataFields = new java.util.ArrayList<>();
dataFields.addAll(
Preconditions.checkArgumentNotNull(joinedSchema.getField(0).getType().getRowSchema())
.getFields());
dataFields.addAll(
Preconditions.checkArgumentNotNull(joinedSchema.getField(1).getType().getRowSchema())
.getFields());
Schema.Builder reconciled = Schema.builder();
for (int i = 0; i < calciteSchema.getFieldCount(); i++) {
Schema.Field calciteField = calciteSchema.getField(i);
// Keep the data's type verbatim -- including the nullability of fields nested inside struct
// columns, which Calcite's join row-type derivation can report incorrectly -- but honour
// Calcite's top-level nullability so outer-join columns stay nullable.
reconciled.addField(
calciteField.withType(
dataFields.get(i).getType().withNullable(calciteField.getType().getNullable())));
}

return joinedRows.apply(
Select.<Row>fieldNames(
org.apache.beam.sdk.schemas.transforms.Join.LHS_TAG + ".*",
org.apache.beam.sdk.schemas.transforms.Join.RHS_TAG + ".*")
.withOutputSchema(CalciteUtils.toSchema(getRowType())));
Select.<Row>fieldAccess(flattenFields).withOutputSchema(reconciled.build()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,54 @@ private PCollection<Row> ordersUnbounded() {
.buildUnbounded();
}

@Test
public void testLeftOuterJoinWithNestedRows() {
Schema nestedSchema =
Schema.builder().addInt32Field("nested_int").addStringField("nested_str").build();
Schema lhsSchema = Schema.builder().addInt32Field("id").addStringField("val").build();
Schema rhsSchema =
Schema.builder().addInt32Field("id").addRowField("nested", nestedSchema).build();

PCollection<Row> lhs =
pipeline.apply(
"lhs",
org.apache.beam.sdk.transforms.Create.of(
Row.withSchema(lhsSchema).addValues(1, "lhs1").build(),
Row.withSchema(lhsSchema).addValues(2, "lhs2").build())
.withRowSchema(lhsSchema));

PCollection<Row> rhs =
pipeline.apply(
"rhs",
org.apache.beam.sdk.transforms.Create.of(
Row.withSchema(rhsSchema)
.addValues(1, Row.withSchema(nestedSchema).addValues(10, "nested1").build())
.build())
.withRowSchema(rhsSchema));

String sql =
"SELECT lhs.id, lhs.val, rhs.nested FROM lhs LEFT OUTER JOIN rhs ON lhs.id = rhs.id";

PCollection<Row> result =
PCollectionTuple.of("lhs", lhs).and("rhs", rhs).apply("sql", SqlTransform.query(sql));

Schema expectedSchema =
Schema.builder()
.addInt32Field("id")
.addStringField("val")
.addNullableField("nested", Schema.FieldType.row(nestedSchema))
.build();

PAssert.that(result)
.containsInAnyOrder(
Row.withSchema(expectedSchema)
.addValues(1, "lhs1", Row.withSchema(nestedSchema).addValues(10, "nested1").build())
.build(),
Row.withSchema(expectedSchema).addValues(2, "lhs2", null).build());

pipeline.run();
}

private PCollection<Row> queryFromOrderTables(String sql) {
return tuple(
"ORDER_DETAILS1",
Expand Down
Loading