From 5bd292856d817bfc54c743f833f216e8b3718cdc Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 9 Jun 2026 14:51:30 -0400 Subject: [PATCH 1/3] support special characters in json tableid format --- .../IO_Iceberg_Integration_Tests.json | 2 +- .../apache/beam/sdk/io/iceberg/AddFiles.java | 8 +-- .../sdk/io/iceberg/AppendFilesToTables.java | 5 +- .../AssignDestinationsAndPartitions.java | 7 ++- .../beam/sdk/io/iceberg/FileWriteResult.java | 4 +- .../sdk/io/iceberg/IcebergCatalogConfig.java | 6 +- ...IcebergCdcReadSchemaTransformProvider.java | 3 +- .../IcebergReadSchemaTransformProvider.java | 3 +- .../sdk/io/iceberg/IcebergScanConfig.java | 6 +- .../beam/sdk/io/iceberg/IcebergUtils.java | 47 ++++++++++++++++ .../sdk/io/iceberg/IncrementalScanSource.java | 3 +- .../iceberg/OneTableDynamicDestinations.java | 6 +- .../iceberg/PortableIcebergDestinations.java | 3 +- .../beam/sdk/io/iceberg/SnapshotInfo.java | 3 +- .../beam/sdk/io/iceberg/TableCache.java | 3 +- .../sdk/io/iceberg/IcebergIOReadTest.java | 3 +- .../sdk/io/iceberg/IcebergIOWriteTest.java | 3 +- .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 56 +++++++++++++++++++ 18 files changed, 138 insertions(+), 33 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 7ab7bcd9a9c6..b73af5e61a43 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 2 + "modification": 1 } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java index dc8106321db7..75fc6f8c92cc 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -524,7 +524,7 @@ static T transformValue(Transform transform, Type type, ByteBuffer } private Table getOrCreateTable(String filePath, FileFormat format) throws IOException { - TableIdentifier tableId = TableIdentifier.parse(identifier); + TableIdentifier tableId = IcebergUtils.parseTableIdentifier(identifier); try { return catalogConfig.catalog().loadTable(tableId); } catch (NoSuchTableException e) { @@ -544,7 +544,7 @@ private Table getOrCreateTable(String filePath, FileFormat format) throws IOExce } return builder.create(); } catch (AlreadyExistsException e2) { // if table already exists, just load it - return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + return catalogConfig.catalog().loadTable(IcebergUtils.parseTableIdentifier(identifier)); } } } @@ -679,7 +679,7 @@ public void process( return; } if (table == null) { - table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + table = catalogConfig.catalog().loadTable(IcebergUtils.parseTableIdentifier(identifier)); } PartitionSpec spec = checkStateNotNull(table.specs().get(batch.getKey().getKey())); @@ -764,7 +764,7 @@ public void process( } String commitId = commitHash(manifests); if (table == null) { - table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + table = catalogConfig.catalog().loadTable(IcebergUtils.parseTableIdentifier(identifier)); } table.refresh(); ensureNameMappingPresent(table); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index c7981d697de4..917b087e39e0 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -47,7 +47,6 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.slf4j.Logger; @@ -75,7 +74,7 @@ public PCollection> expand(PCollection new SerializableFunction() { @Override public String apply(FileWriteResult input) { - return input.getTableIdentifier().toString(); + return IcebergUtils.tableIdentifierToString(input.getTableIdentifier()); } })) .apply("Group metadata updates by table", GroupByKey.create()) @@ -128,7 +127,7 @@ public void processElement( BoundedWindow window) throws IOException { String tableStringIdentifier = element.getKey(); - Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); + Table table = getCatalog().loadTable(IcebergUtils.parseTableIdentifier(element.getKey())); Iterable fileWriteResults = element.getValue(); if (shouldSkip(table, fileWriteResults)) { return; diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java index e5d70d85d875..53540ae4e4a5 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java @@ -35,7 +35,6 @@ import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -147,7 +146,11 @@ public void processElement( try { // see if table already exists with a spec - spec = catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).spec(); + spec = + catalogConfig + .catalog() + .loadTable(IcebergUtils.parseTableIdentifier(tableIdentifier)) + .spec(); } catch (NoSuchTableException ignored) { // no partition to apply diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java index b96b1d42c949..713b8029c15c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java @@ -44,7 +44,7 @@ abstract class FileWriteResult { @SchemaIgnore public TableIdentifier getTableIdentifier() { if (cachedTableIdentifier == null) { - cachedTableIdentifier = TableIdentifier.parse(getTableIdentifierString()); + cachedTableIdentifier = IcebergUtils.parseTableIdentifier(getTableIdentifierString()); } return cachedTableIdentifier; } @@ -70,7 +70,7 @@ abstract static class Builder { @SchemaIgnore public Builder setTableIdentifier(TableIdentifier tableId) { - return setTableIdentifierString(tableId.toString()); + return setTableIdentifierString(IcebergUtils.tableIdentifierToString(tableId)); } public abstract FileWriteResult build(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index e01b174decb0..086ee869ad04 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -155,7 +155,7 @@ public void createTable( Schema tableSchema, @Nullable List partitionFields, Map properties) { - TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier); + TableIdentifier icebergIdentifier = IcebergUtils.parseTableIdentifier(tableIdentifier); org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(tableSchema); PartitionSpec icebergSpec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema); try { @@ -172,7 +172,7 @@ public void createTable( } public @Nullable IcebergTableInfo loadTable(String tableIdentifier) { - TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier); + TableIdentifier icebergIdentifier = IcebergUtils.parseTableIdentifier(tableIdentifier); try { Table table = catalog().loadTable(icebergIdentifier); return new IcebergTableInfo(tableIdentifier, table); @@ -264,7 +264,7 @@ public void updatePartitionSpec( } public boolean dropTable(String tableIdentifier) { - TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier); + TableIdentifier icebergIdentifier = IcebergUtils.parseTableIdentifier(tableIdentifier); return catalog().dropTable(icebergIdentifier); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java index 31ff57a668bb..e029a85a812f 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCdcReadSchemaTransformProvider.java @@ -41,7 +41,6 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Enums; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional; -import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -109,7 +108,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { IcebergIO.ReadRows readRows = IcebergIO.readRows(configuration.getIcebergCatalog()) .withCdc() - .from(TableIdentifier.parse(configuration.getTable())) + .from(IcebergUtils.parseTableIdentifier(configuration.getTable())) .fromSnapshot(configuration.getFromSnapshot()) .toSnapshot(configuration.getToSnapshot()) .fromTimestamp(configuration.getFromTimestamp()) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index 63d6f792e56a..67ea60505072 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -93,7 +92,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .getPipeline() .apply( IcebergIO.readRows(configuration.getIcebergCatalog()) - .from(TableIdentifier.parse(configuration.getTable())) + .from(IcebergUtils.parseTableIdentifier(configuration.getTable())) .keeping(configuration.getKeep()) .dropping(configuration.getDrop()) .withFilter(configuration.getFilter())); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 45ecc7cf71c3..163e36c59381 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -69,7 +69,9 @@ public enum ScanType { public Table getTable() { if (cachedTable == null) { cachedTable = - getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier())); + getCatalogConfig() + .catalog() + .loadTable(IcebergUtils.parseTableIdentifier(getTableIdentifier())); } return cachedTable; } @@ -260,7 +262,7 @@ public abstract static class Builder { public abstract Builder setTableIdentifier(String tableIdentifier); public Builder setTableIdentifier(TableIdentifier tableIdentifier) { - return this.setTableIdentifier(tableIdentifier.toString()); + return this.setTableIdentifier(IcebergUtils.tableIdentifierToString(tableIdentifier)); } public Builder setTableIdentifier(String... names) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index d0d24532ff39..85835fd61e3b 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -46,6 +46,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.TableIdentifierParser; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; @@ -631,6 +633,51 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType return icebergValue; } + /** Serializes a table identifier without losing dots or other special characters in each part. */ + public static String tableIdentifierToString(TableIdentifier tableIdentifier) { + TableIdentifier identifier = checkArgumentNotNull(tableIdentifier); + return requiresJsonTableIdentifier(identifier) + ? TableIdentifierParser.toJson(identifier) + : identifier.toString(); + } + + /** Parses either Iceberg's JSON table identifier representation or the legacy dotted form. */ + public static TableIdentifier parseTableIdentifier(String table) { + if (startsWithJsonObject(table)) { + return TableIdentifierParser.fromJson(table); + } + + return TableIdentifier.parse(table); + } + + private static boolean startsWithJsonObject(@Nullable String value) { + if (value == null) { + return false; + } + + for (int i = 0; i < value.length(); i++) { + if (!Character.isWhitespace(value.charAt(i))) { + return value.charAt(i) == '{'; + } + } + + return false; + } + + private static boolean requiresJsonTableIdentifier(TableIdentifier identifier) { + if (startsWithJsonObject(identifier.toString())) { + return true; + } + + for (String level : identifier.namespace().levels()) { + if (level.contains(".")) { + return true; + } + } + + return identifier.name().contains("."); + } + static boolean isUnbounded(PCollection input) { return input.isBounded().equals(PCollection.IsBounded.UNBOUNDED); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java index 4df3eecb18e5..46952f15108a 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java @@ -33,7 +33,6 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -56,7 +55,7 @@ public PCollection expand(PBegin input) { scanConfig .getCatalogConfig() .catalog() - .loadTable(TableIdentifier.parse(scanConfig.getTableIdentifier())); + .loadTable(IcebergUtils.parseTableIdentifier(scanConfig.getTableIdentifier())); PCollection>> snapshots = MoreObjects.firstNonNull(scanConfig.getStreaming(), false) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java index 861a8ad198a8..afca43fca15c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java @@ -41,13 +41,13 @@ class OneTableDynamicDestinations implements DynamicDestinations, Externalizable @VisibleForTesting TableIdentifier getTableIdentifier() { if (tableId == null) { - tableId = TableIdentifier.parse(checkStateNotNull(tableIdString)); + tableId = IcebergUtils.parseTableIdentifier(checkStateNotNull(tableIdString)); } return tableId; } OneTableDynamicDestinations(TableIdentifier tableId, Schema rowSchema) { - this.tableIdString = tableId.toString(); + this.tableIdString = IcebergUtils.tableIdentifierToString(tableId); this.rowSchema = rowSchema; } @@ -86,6 +86,6 @@ public void writeExternal(ObjectOutput out) throws IOException { @Override public void readExternal(ObjectInput in) throws IOException { tableIdString = in.readUTF(); - tableId = TableIdentifier.parse(tableIdString); + tableId = IcebergUtils.parseTableIdentifier(tableIdString); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java index f2cec9bdfec7..775020879f67 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; class PortableIcebergDestinations implements DynamicDestinations { @@ -84,7 +83,7 @@ public String getTableStringIdentifier(ValueInSingleWindow element) { @Override public IcebergDestination instantiateDestination(String dest) { return IcebergDestination.builder() - .setTableIdentifier(TableIdentifier.parse(dest)) + .setTableIdentifier(IcebergUtils.parseTableIdentifier(dest)) .setTableCreateConfig( IcebergTableCreateConfig.builder() .setSchema(getDataSchema()) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SnapshotInfo.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SnapshotInfo.java index bab5405cd4a5..de8c400c08c8 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SnapshotInfo.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SnapshotInfo.java @@ -94,7 +94,8 @@ public static Builder builder() { @SchemaIgnore public TableIdentifier getTableIdentifier() { if (cachedTableIdentifier == null) { - cachedTableIdentifier = TableIdentifier.parse(checkStateNotNull(getTableIdentifierString())); + cachedTableIdentifier = + IcebergUtils.parseTableIdentifier(checkStateNotNull(getTableIdentifierString())); } return cachedTableIdentifier; } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java index cb00d90f7fb3..fc03e1c3cf44 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java @@ -30,7 +30,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Futures; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListenableFuture; import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.TableIdentifier; /** Utility to fetch and cache Iceberg {@link Table}s. */ class TableCache { @@ -45,7 +44,7 @@ class TableCache { public Table load(String identifier) { return checkStateNotNull(CATALOG_CACHE.get(identifier)) .catalog() - .loadTable(TableIdentifier.parse(identifier)); + .loadTable(IcebergUtils.parseTableIdentifier(identifier)); } @Override diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index f79991cee571..dd7a42e219cc 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -306,7 +306,8 @@ public void testProjectedSchemaWithNestedFields() { @Test public void testSimpleScan() throws Exception { TableIdentifier tableId = - TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + TableIdentifier.of( + "default", "table.with.dots" + Long.toString(UUID.randomUUID().hashCode(), 16)); Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); final Schema schema = icebergSchemaToBeamSchema(TestFixtures.SCHEMA); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index 52d92911f4e4..6580f4eeaf62 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -148,7 +148,8 @@ private IcebergIO.WriteRows applyDistribution(IcebergIO.WriteRows write) { @Test public void testSimpleAppend() throws Exception { TableIdentifier tableId = - TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + TableIdentifier.of( + "default", "table.with.dots" + Long.toString(UUID.randomUUID().hashCode(), 16)); Map catalogProps = ImmutableMap.builder() diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index c9026522dba3..313037a15518 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -19,11 +19,15 @@ import static org.apache.beam.sdk.io.iceberg.IcebergUtils.TypeAndMaxId; import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamFieldTypeToIcebergFieldType; +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.parseTableIdentifier; +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.tableIdentifierToString; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.math.BigDecimal; @@ -44,6 +48,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; @@ -60,6 +65,57 @@ @RunWith(Enclosed.class) public class IcebergUtilsTest { + @RunWith(JUnit4.class) + public static class TableIdentifierTests { + + @Test + public void parseTableIdentifierParsesJson() { + TableIdentifier identifier = + parseTableIdentifier( + " {\"namespace\": [\"dogs\", \"owners.and.handlers\"]," + + " \"name\": \"food.with.dots\"}"); + + assertArrayEquals( + new String[] {"dogs", "owners.and.handlers"}, identifier.namespace().levels()); + assertEquals("food.with.dots", identifier.name()); + } + + @Test + public void parseTableIdentifierParsesLegacyDottedString() { + TableIdentifier identifier = parseTableIdentifier("dogs.owners.and.handlers.food"); + + assertArrayEquals( + new String[] {"dogs", "owners", "and", "handlers"}, identifier.namespace().levels()); + assertEquals("food", identifier.name()); + } + + @Test + public void tableIdentifierToStringRoundTripsSpecialCharacters() { + TableIdentifier expected = + TableIdentifier.of("dogs", "owners.and.handlers", "food.with.dots"); + + assertEquals(expected, parseTableIdentifier(tableIdentifierToString(expected))); + } + + @Test + public void tableIdentifierToStringUsesLegacyFormWhenUnambiguous() { + assertEquals("dogs.food", tableIdentifierToString(TableIdentifier.of("dogs", "food"))); + } + + @Test + public void tableIdentifierToStringUsesJsonForLegacyStringsThatLookLikeJson() { + TableIdentifier expected = TableIdentifier.of("{dogs}", "food"); + + assertEquals(expected, parseTableIdentifier(tableIdentifierToString(expected))); + } + + @Test + public void parseTableIdentifierRejectsInvalidJsonIdentifier() { + assertThrows( + IllegalArgumentException.class, () -> parseTableIdentifier("{\"table_name\":\"food\"}")); + } + } + @RunWith(JUnit4.class) public static class RowToRecordTests { /** From d92eddfab21b2105e4585c64679e923ab74df220 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 9 Jun 2026 15:23:51 -0400 Subject: [PATCH 2/3] address comments --- .../beam/sdk/io/iceberg/IcebergUtils.java | 23 ++++++++++++------- .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 2 +- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index 85835fd61e3b..309205707a95 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -643,29 +643,36 @@ public static String tableIdentifierToString(TableIdentifier tableIdentifier) { /** Parses either Iceberg's JSON table identifier representation or the legacy dotted form. */ public static TableIdentifier parseTableIdentifier(String table) { - if (startsWithJsonObject(table)) { + if (looksLikeJsonObject(table)) { return TableIdentifierParser.fromJson(table); } return TableIdentifier.parse(table); } - private static boolean startsWithJsonObject(@Nullable String value) { + private static boolean looksLikeJsonObject(@Nullable String value) { if (value == null) { return false; } - for (int i = 0; i < value.length(); i++) { - if (!Character.isWhitespace(value.charAt(i))) { - return value.charAt(i) == '{'; - } + int start = 0; + while (start < value.length() && Character.isWhitespace(value.charAt(start))) { + start++; + } + if (start == value.length() || value.charAt(start) != '{') { + return false; + } + + int end = value.length() - 1; + while (end >= 0 && Character.isWhitespace(value.charAt(end))) { + end--; } - return false; + return end >= 0 && value.charAt(end) == '}'; } private static boolean requiresJsonTableIdentifier(TableIdentifier identifier) { - if (startsWithJsonObject(identifier.toString())) { + if (looksLikeJsonObject(identifier.toString())) { return true; } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index 313037a15518..3ed6c3d69c89 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -104,7 +104,7 @@ public void tableIdentifierToStringUsesLegacyFormWhenUnambiguous() { @Test public void tableIdentifierToStringUsesJsonForLegacyStringsThatLookLikeJson() { - TableIdentifier expected = TableIdentifier.of("{dogs}", "food"); + TableIdentifier expected = TableIdentifier.of("{dogs}"); assertEquals(expected, parseTableIdentifier(tableIdentifierToString(expected))); } From 2e267969b6a1b42baea0ee73f960e5b2e79455e9 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 9 Jun 2026 15:26:30 -0400 Subject: [PATCH 3/3] address comments --- .../apache/beam/sdk/io/iceberg/IcebergUtilsTest.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index 3ed6c3d69c89..3da31ecc2061 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -104,11 +104,19 @@ public void tableIdentifierToStringUsesLegacyFormWhenUnambiguous() { @Test public void tableIdentifierToStringUsesJsonForLegacyStringsThatLookLikeJson() { - TableIdentifier expected = TableIdentifier.of("{dogs}"); + TableIdentifier expected = TableIdentifier.of("{dogs}", "{food}"); assertEquals(expected, parseTableIdentifier(tableIdentifierToString(expected))); } + @Test + public void tableIdentifierToStringDoesNotUseJsonForPartialJsonLikeStrings() { + TableIdentifier expected = TableIdentifier.of("{dogs}", "food"); + + assertEquals("{dogs}.food", tableIdentifierToString(expected)); + assertEquals(expected, parseTableIdentifier(tableIdentifierToString(expected))); + } + @Test public void parseTableIdentifierRejectsInvalidJsonIdentifier() { assertThrows(