From 8a829308a718b7a320c9efcf9a1a04b2ba11fc98 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 9 Jun 2026 14:49:34 +0000 Subject: [PATCH] Support MAP type in RowJson and fix datetime parsing with spaces --- .../org/apache/beam/sdk/util/RowJson.java | 65 ++++++++++++++++++- .../beam/sdk/util/RowJsonValueExtractors.java | 9 ++- .../org/apache/beam/sdk/util/RowJsonTest.java | 26 +++++++- 3 files changed, 97 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java index c63f673ade21..ccfbd87d450d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java @@ -26,6 +26,7 @@ import static org.apache.beam.sdk.schemas.Schema.TypeName.INT16; import static org.apache.beam.sdk.schemas.Schema.TypeName.INT32; import static org.apache.beam.sdk.schemas.Schema.TypeName.INT64; +import static org.apache.beam.sdk.schemas.Schema.TypeName.MAP; import static org.apache.beam.sdk.schemas.Schema.TypeName.STRING; import static org.apache.beam.sdk.util.RowJsonValueExtractors.booleanValueExtractor; import static org.apache.beam.sdk.util.RowJsonValueExtractors.byteValueExtractor; @@ -57,6 +58,9 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.beam.sdk.schemas.Schema; @@ -95,7 +99,8 @@ }) public class RowJson { private static final ImmutableSet SUPPORTED_TYPES = - ImmutableSet.of(BYTE, INT16, INT32, INT64, FLOAT, DOUBLE, BOOLEAN, STRING, DECIMAL, DATETIME); + ImmutableSet.of( + BYTE, INT16, INT32, INT64, FLOAT, DOUBLE, BOOLEAN, STRING, DECIMAL, DATETIME, MAP); private static final ImmutableSet KNOWN_LOGICAL_TYPE_IDENTIFIERS = ImmutableSet.of( SqlTypes.DATE.getIdentifier(), @@ -160,6 +165,14 @@ private static ImmutableList findUnsupportedFields( return findUnsupportedFields(fieldType.getCollectionElementType(), fieldName + "[]"); } + if (fieldTypeName.isMapType()) { + if (!STRING.equals(fieldType.getMapKeyType().getTypeName())) { + return ImmutableList.of( + new UnsupportedField(fieldName + ".key", fieldType.getMapKeyType().getTypeName())); + } + return findUnsupportedFields(fieldType.getMapValueType(), fieldName + "{}"); + } + if (fieldTypeName.isLogicalType()) { if (KNOWN_LOGICAL_TYPE_IDENTIFIERS.contains(fieldType.getLogicalType().getIdentifier())) { return ImmutableList.of(); @@ -303,6 +316,10 @@ private Object extractJsonNodeValue(FieldValue fieldValue) { return jsonArrayToList(fieldValue); } + if (fieldValue.isMapType()) { + return jsonObjectToMap(fieldValue); + } + if (fieldValue.typeName().isLogicalType()) { String identifier = fieldValue.type().getLogicalType().getIdentifier(); if (SqlTypes.DATE.getIdentifier().equals(identifier)) { @@ -365,6 +382,32 @@ private Object jsonArrayToList(FieldValue arrayFieldValue) { .collect(toImmutableList()); } + private Map jsonObjectToMap(FieldValue mapFieldValue) { + if (!mapFieldValue.isJsonObject()) { + throw new UnsupportedRowJsonException( + "Expected JSON object for field '" + + mapFieldValue.name() + + "'. Instead got " + + mapFieldValue.jsonNodeType().name()); + } + + Map result = new HashMap<>(); + Iterator> fields = mapFieldValue.jsonValue().fields(); + while (fields.hasNext()) { + Map.Entry field = fields.next(); + String key = field.getKey(); + JsonNode value = field.getValue(); + + Object extractedValue = + extractJsonNodeValue( + FieldValue.of( + mapFieldValue.name() + "['" + key + "']", mapFieldValue.mapValueType(), value)); + + result.put(key, extractedValue); + } + return result; + } + private static Object extractJsonPrimitiveValue(FieldValue fieldValue) { try { return JSON_VALUE_GETTERS.get(fieldValue.typeName()).extractValue(fieldValue.jsonValue()); @@ -440,6 +483,18 @@ Schema rowSchema() { return type().getRowSchema(); } + boolean isMapType() { + return TypeName.MAP.equals(type().getTypeName()); + } + + FieldType mapKeyType() { + return type().getMapKeyType(); + } + + FieldType mapValueType() { + return type().getMapValueType(); + } + static FieldValue of(String name, FieldType type, JsonNode jsonValue) { return new AutoValue_RowJson_RowJsonDeserializer_FieldValue(name, type, jsonValue); } @@ -538,6 +593,14 @@ private void writeValue(JsonGenerator gen, FieldType type, Object value) throws case ROW: writeRow((Row) value, type.getRowSchema(), gen); break; + case MAP: + gen.writeStartObject(); + for (Map.Entry entry : ((Map) value).entrySet()) { + gen.writeFieldName(entry.getKey().toString()); + writeValue(gen, type.getMapValueType(), entry.getValue()); + } + gen.writeEndObject(); + break; case LOGICAL_TYPE: String identifier = type.getLogicalType().getIdentifier(); if (SqlTypes.DATE.getIdentifier().equals(identifier)) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValueExtractors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValueExtractors.java index f7a925d5c222..2179b20010dc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValueExtractors.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValueExtractors.java @@ -189,7 +189,14 @@ static ValueExtractor decimalValueExtractor() { */ static ValueExtractor datetimeValueExtractor() { return ValidatingValueExtractor.builder() - .setExtractor(jsonNode -> DateTime.parse(jsonNode.textValue())) + .setExtractor( + jsonNode -> { + String text = jsonNode.textValue(); + if (text.contains(" ")) { + text = text.replace(' ', 'T'); + } + return DateTime.parse(text); + }) .setValidator(JsonNode::isTextual) .build(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java index 328765bf7f15..81f69b62c53b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java @@ -30,6 +30,8 @@ import java.math.BigDecimal; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.VariableString; @@ -80,7 +82,8 @@ public static Collection data() { makeArrayOfArraysTestCase(), makeNestedRowTestCase(), makeDoublyNestedRowTestCase(), - makeNullsTestCase()); + makeNullsTestCase(), + makeMapFieldTestCase()); } private static Object[] makeFlatRowTestCase() { @@ -244,6 +247,21 @@ private static Object[] makeNullsTestCase() { return new Object[] {"Nulls", schema, rowString, expectedRow}; } + private static Object[] makeMapFieldTestCase() { + Schema schema = + Schema.builder().addMapField("f_map", FieldType.STRING, FieldType.INT32).build(); + + String rowString = "{\n" + "\"f_map\" : {\"key1\": 1, \"key2\": 2}\n" + "}"; + + Map expectedMap = new HashMap<>(); + expectedMap.put("key1", 1); + expectedMap.put("key2", 2); + + Row expectedRow = Row.withSchema(schema).addValues(expectedMap).build(); + + return new Object[] {"Map field", schema, rowString, expectedRow}; + } + @Test public void testDeserialize() throws IOException { Row parsedRow = @@ -564,6 +582,12 @@ public void testSupportedDatetimeConversions() throws Exception { testSupportedConversion(FieldType.DATETIME, quoted(DATETIME_STRING), DATETIME_VALUE); } + @Test + public void testSupportedDatetimeWithSpaceConversions() throws Exception { + String datetimeWithSpace = DATETIME_STRING.replace('T', ' '); + testSupportedConversion(FieldType.DATETIME, quoted(datetimeWithSpace), DATETIME_VALUE); + } + private void testSupportedConversion( FieldType fieldType, String jsonFieldValue, Object expectedRowFieldValue) throws Exception {