Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.beam.sdk.schemas.Schema.TypeName.INT16;
import static org.apache.beam.sdk.schemas.Schema.TypeName.INT32;
import static org.apache.beam.sdk.schemas.Schema.TypeName.INT64;
import static org.apache.beam.sdk.schemas.Schema.TypeName.MAP;
import static org.apache.beam.sdk.schemas.Schema.TypeName.STRING;
import static org.apache.beam.sdk.util.RowJsonValueExtractors.booleanValueExtractor;
import static org.apache.beam.sdk.util.RowJsonValueExtractors.byteValueExtractor;
Expand Down Expand Up @@ -57,6 +58,9 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -95,7 +99,8 @@
})
public class RowJson {
private static final ImmutableSet<TypeName> SUPPORTED_TYPES =
ImmutableSet.of(BYTE, INT16, INT32, INT64, FLOAT, DOUBLE, BOOLEAN, STRING, DECIMAL, DATETIME);
ImmutableSet.of(
BYTE, INT16, INT32, INT64, FLOAT, DOUBLE, BOOLEAN, STRING, DECIMAL, DATETIME, MAP);
private static final ImmutableSet<String> KNOWN_LOGICAL_TYPE_IDENTIFIERS =
ImmutableSet.of(
SqlTypes.DATE.getIdentifier(),
Expand Down Expand Up @@ -160,6 +165,14 @@ private static ImmutableList<UnsupportedField> findUnsupportedFields(
return findUnsupportedFields(fieldType.getCollectionElementType(), fieldName + "[]");
}

if (fieldTypeName.isMapType()) {
if (!STRING.equals(fieldType.getMapKeyType().getTypeName())) {
return ImmutableList.of(
new UnsupportedField(fieldName + ".key", fieldType.getMapKeyType().getTypeName()));
}
return findUnsupportedFields(fieldType.getMapValueType(), fieldName + "{}");
}

if (fieldTypeName.isLogicalType()) {
if (KNOWN_LOGICAL_TYPE_IDENTIFIERS.contains(fieldType.getLogicalType().getIdentifier())) {
return ImmutableList.of();
Expand Down Expand Up @@ -303,6 +316,10 @@ private Object extractJsonNodeValue(FieldValue fieldValue) {
return jsonArrayToList(fieldValue);
}

if (fieldValue.isMapType()) {
return jsonObjectToMap(fieldValue);
}

if (fieldValue.typeName().isLogicalType()) {
String identifier = fieldValue.type().getLogicalType().getIdentifier();
if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
Expand Down Expand Up @@ -365,6 +382,32 @@ private Object jsonArrayToList(FieldValue arrayFieldValue) {
.collect(toImmutableList());
}

private Map<String, Object> jsonObjectToMap(FieldValue mapFieldValue) {
if (!mapFieldValue.isJsonObject()) {
throw new UnsupportedRowJsonException(
"Expected JSON object for field '"
+ mapFieldValue.name()
+ "'. Instead got "
+ mapFieldValue.jsonNodeType().name());
}

Map<String, Object> result = new HashMap<>();
Iterator<Map.Entry<String, JsonNode>> fields = mapFieldValue.jsonValue().fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
String key = field.getKey();
JsonNode value = field.getValue();

Object extractedValue =
extractJsonNodeValue(
FieldValue.of(
mapFieldValue.name() + "['" + key + "']", mapFieldValue.mapValueType(), value));

result.put(key, extractedValue);
}
return result;
}

private static Object extractJsonPrimitiveValue(FieldValue fieldValue) {
try {
return JSON_VALUE_GETTERS.get(fieldValue.typeName()).extractValue(fieldValue.jsonValue());
Expand Down Expand Up @@ -440,6 +483,18 @@ Schema rowSchema() {
return type().getRowSchema();
}

boolean isMapType() {
return TypeName.MAP.equals(type().getTypeName());
}

FieldType mapKeyType() {
return type().getMapKeyType();
}

FieldType mapValueType() {
return type().getMapValueType();
}

static FieldValue of(String name, FieldType type, JsonNode jsonValue) {
return new AutoValue_RowJson_RowJsonDeserializer_FieldValue(name, type, jsonValue);
}
Expand Down Expand Up @@ -538,6 +593,14 @@ private void writeValue(JsonGenerator gen, FieldType type, Object value) throws
case ROW:
writeRow((Row) value, type.getRowSchema(), gen);
break;
case MAP:
gen.writeStartObject();
for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) value).entrySet()) {
gen.writeFieldName(entry.getKey().toString());
writeValue(gen, type.getMapValueType(), entry.getValue());
}
gen.writeEndObject();
break;
case LOGICAL_TYPE:
String identifier = type.getLogicalType().getIdentifier();
if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,14 @@ static ValueExtractor<BigDecimal> decimalValueExtractor() {
*/
static ValueExtractor<DateTime> datetimeValueExtractor() {
return ValidatingValueExtractor.<DateTime>builder()
.setExtractor(jsonNode -> DateTime.parse(jsonNode.textValue()))
.setExtractor(
jsonNode -> {
String text = jsonNode.textValue();
if (text.contains(" ")) {
text = text.replace(' ', 'T');
}
return DateTime.parse(text);
})
.setValidator(JsonNode::isTextual)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
Expand Down Expand Up @@ -80,7 +82,8 @@ public static Collection<Object[]> data() {
makeArrayOfArraysTestCase(),
makeNestedRowTestCase(),
makeDoublyNestedRowTestCase(),
makeNullsTestCase());
makeNullsTestCase(),
makeMapFieldTestCase());
}

private static Object[] makeFlatRowTestCase() {
Expand Down Expand Up @@ -244,6 +247,21 @@ private static Object[] makeNullsTestCase() {
return new Object[] {"Nulls", schema, rowString, expectedRow};
}

private static Object[] makeMapFieldTestCase() {
Schema schema =
Schema.builder().addMapField("f_map", FieldType.STRING, FieldType.INT32).build();

String rowString = "{\n" + "\"f_map\" : {\"key1\": 1, \"key2\": 2}\n" + "}";

Map<String, Integer> expectedMap = new HashMap<>();
expectedMap.put("key1", 1);
expectedMap.put("key2", 2);

Row expectedRow = Row.withSchema(schema).addValues(expectedMap).build();

return new Object[] {"Map field", schema, rowString, expectedRow};
}

@Test
public void testDeserialize() throws IOException {
Row parsedRow =
Expand Down Expand Up @@ -564,6 +582,12 @@ public void testSupportedDatetimeConversions() throws Exception {
testSupportedConversion(FieldType.DATETIME, quoted(DATETIME_STRING), DATETIME_VALUE);
}

@Test
public void testSupportedDatetimeWithSpaceConversions() throws Exception {
String datetimeWithSpace = DATETIME_STRING.replace('T', ' ');
testSupportedConversion(FieldType.DATETIME, quoted(datetimeWithSpace), DATETIME_VALUE);
}

private void testSupportedConversion(
FieldType fieldType, String jsonFieldValue, Object expectedRowFieldValue) throws Exception {

Expand Down
Loading