Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 2
"modification": 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ static <W, T> T transformValue(Transform<W, T> transform, Type type, ByteBuffer
}

private Table getOrCreateTable(String filePath, FileFormat format) throws IOException {
TableIdentifier tableId = TableIdentifier.parse(identifier);
TableIdentifier tableId = IcebergUtils.parseTableIdentifier(identifier);
try {
return catalogConfig.catalog().loadTable(tableId);
} catch (NoSuchTableException e) {
Expand All @@ -544,7 +544,7 @@ private Table getOrCreateTable(String filePath, FileFormat format) throws IOExce
}
return builder.create();
} catch (AlreadyExistsException e2) { // if table already exists, just load it
return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
return catalogConfig.catalog().loadTable(IcebergUtils.parseTableIdentifier(identifier));
}
}
}
Expand Down Expand Up @@ -679,7 +679,7 @@ public void process(
return;
}
if (table == null) {
table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
table = catalogConfig.catalog().loadTable(IcebergUtils.parseTableIdentifier(identifier));
}

PartitionSpec spec = checkStateNotNull(table.specs().get(batch.getKey().getKey()));
Expand Down Expand Up @@ -764,7 +764,7 @@ public void process(
}
String commitId = commitHash(manifests);
if (table == null) {
table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
table = catalogConfig.catalog().loadTable(IcebergUtils.parseTableIdentifier(identifier));
}
table.refresh();
ensureNameMappingPresent(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.slf4j.Logger;
Expand Down Expand Up @@ -75,7 +74,7 @@ public PCollection<KV<String, SnapshotInfo>> expand(PCollection<FileWriteResult>
new SerializableFunction<FileWriteResult, String>() {
@Override
public String apply(FileWriteResult input) {
return input.getTableIdentifier().toString();
return IcebergUtils.tableIdentifierToString(input.getTableIdentifier());
}
}))
.apply("Group metadata updates by table", GroupByKey.create())
Expand Down Expand Up @@ -128,7 +127,7 @@ public void processElement(
BoundedWindow window)
throws IOException {
String tableStringIdentifier = element.getKey();
Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
Table table = getCatalog().loadTable(IcebergUtils.parseTableIdentifier(element.getKey()));
Iterable<FileWriteResult> fileWriteResults = element.getValue();
if (shouldSkip(table, fileWriteResults)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -147,7 +146,11 @@ public void processElement(

try {
// see if table already exists with a spec
spec = catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).spec();
spec =
catalogConfig
.catalog()
.loadTable(IcebergUtils.parseTableIdentifier(tableIdentifier))
.spec();

} catch (NoSuchTableException ignored) {
// no partition to apply
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ abstract class FileWriteResult {
@SchemaIgnore
public TableIdentifier getTableIdentifier() {
if (cachedTableIdentifier == null) {
cachedTableIdentifier = TableIdentifier.parse(getTableIdentifierString());
cachedTableIdentifier = IcebergUtils.parseTableIdentifier(getTableIdentifierString());
}
return cachedTableIdentifier;
}
Expand All @@ -70,7 +70,7 @@ abstract static class Builder {

@SchemaIgnore
public Builder setTableIdentifier(TableIdentifier tableId) {
return setTableIdentifierString(tableId.toString());
return setTableIdentifierString(IcebergUtils.tableIdentifierToString(tableId));
}

public abstract FileWriteResult build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void createTable(
Schema tableSchema,
@Nullable List<String> partitionFields,
Map<String, String> properties) {
TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier);
TableIdentifier icebergIdentifier = IcebergUtils.parseTableIdentifier(tableIdentifier);
org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(tableSchema);
PartitionSpec icebergSpec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema);
try {
Expand All @@ -172,7 +172,7 @@ public void createTable(
}

public @Nullable IcebergTableInfo loadTable(String tableIdentifier) {
TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier);
TableIdentifier icebergIdentifier = IcebergUtils.parseTableIdentifier(tableIdentifier);
try {
Table table = catalog().loadTable(icebergIdentifier);
return new IcebergTableInfo(tableIdentifier, table);
Expand Down Expand Up @@ -264,7 +264,7 @@ public void updatePartitionSpec(
}

public boolean dropTable(String tableIdentifier) {
TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier);
TableIdentifier icebergIdentifier = IcebergUtils.parseTableIdentifier(tableIdentifier);
return catalog().dropTable(icebergIdentifier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Enums;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

Expand Down Expand Up @@ -109,7 +108,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
IcebergIO.ReadRows readRows =
IcebergIO.readRows(configuration.getIcebergCatalog())
.withCdc()
.from(TableIdentifier.parse(configuration.getTable()))
.from(IcebergUtils.parseTableIdentifier(configuration.getTable()))
.fromSnapshot(configuration.getFromSnapshot())
.toSnapshot(configuration.getToSnapshot())
.fromTimestamp(configuration.getFromTimestamp())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand Down Expand Up @@ -93,7 +92,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
.getPipeline()
.apply(
IcebergIO.readRows(configuration.getIcebergCatalog())
.from(TableIdentifier.parse(configuration.getTable()))
.from(IcebergUtils.parseTableIdentifier(configuration.getTable()))
.keeping(configuration.getKeep())
.dropping(configuration.getDrop())
.withFilter(configuration.getFilter()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ public enum ScanType {
public Table getTable() {
if (cachedTable == null) {
cachedTable =
getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier()));
getCatalogConfig()
.catalog()
.loadTable(IcebergUtils.parseTableIdentifier(getTableIdentifier()));
}
return cachedTable;
}
Expand Down Expand Up @@ -260,7 +262,7 @@ public abstract static class Builder {
public abstract Builder setTableIdentifier(String tableIdentifier);

public Builder setTableIdentifier(TableIdentifier tableIdentifier) {
return this.setTableIdentifier(tableIdentifier.toString());
return this.setTableIdentifier(IcebergUtils.tableIdentifierToString(tableIdentifier));
}

public Builder setTableIdentifier(String... names) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.TableIdentifierParser;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -631,6 +633,58 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType
return icebergValue;
}

/** Serializes a table identifier without losing dots or other special characters in each part. */
public static String tableIdentifierToString(TableIdentifier tableIdentifier) {
TableIdentifier identifier = checkArgumentNotNull(tableIdentifier);
return requiresJsonTableIdentifier(identifier)
? TableIdentifierParser.toJson(identifier)
: identifier.toString();
}

/** Parses either Iceberg's JSON table identifier representation or the legacy dotted form. */
public static TableIdentifier parseTableIdentifier(String table) {
if (looksLikeJsonObject(table)) {
return TableIdentifierParser.fromJson(table);
}

return TableIdentifier.parse(table);
}

private static boolean looksLikeJsonObject(@Nullable String value) {
if (value == null) {
return false;
}

int start = 0;
while (start < value.length() && Character.isWhitespace(value.charAt(start))) {
start++;
}
if (start == value.length() || value.charAt(start) != '{') {
return false;
}

int end = value.length() - 1;
while (end >= 0 && Character.isWhitespace(value.charAt(end))) {
end--;
}

return end >= 0 && value.charAt(end) == '}';
}

private static boolean requiresJsonTableIdentifier(TableIdentifier identifier) {
if (looksLikeJsonObject(identifier.toString())) {
return true;
}

for (String level : identifier.namespace().levels()) {
if (level.contains(".")) {
return true;
}
}

return identifier.name().contains(".");
}

static <T> boolean isUnbounded(PCollection<T> input) {
return input.isBounded().equals(PCollection.IsBounded.UNBOUNDED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

Expand All @@ -56,7 +55,7 @@ public PCollection<Row> expand(PBegin input) {
scanConfig
.getCatalogConfig()
.catalog()
.loadTable(TableIdentifier.parse(scanConfig.getTableIdentifier()));
.loadTable(IcebergUtils.parseTableIdentifier(scanConfig.getTableIdentifier()));

PCollection<KV<String, List<SnapshotInfo>>> snapshots =
MoreObjects.firstNonNull(scanConfig.getStreaming(), false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ class OneTableDynamicDestinations implements DynamicDestinations, Externalizable
@VisibleForTesting
TableIdentifier getTableIdentifier() {
if (tableId == null) {
tableId = TableIdentifier.parse(checkStateNotNull(tableIdString));
tableId = IcebergUtils.parseTableIdentifier(checkStateNotNull(tableIdString));
}
return tableId;
}

OneTableDynamicDestinations(TableIdentifier tableId, Schema rowSchema) {
this.tableIdString = tableId.toString();
this.tableIdString = IcebergUtils.tableIdentifierToString(tableId);
this.rowSchema = rowSchema;
}

Expand Down Expand Up @@ -86,6 +86,6 @@ public void writeExternal(ObjectOutput out) throws IOException {
@Override
public void readExternal(ObjectInput in) throws IOException {
tableIdString = in.readUTF();
tableId = TableIdentifier.parse(tableIdString);
tableId = IcebergUtils.parseTableIdentifier(tableIdString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;

class PortableIcebergDestinations implements DynamicDestinations {
Expand Down Expand Up @@ -84,7 +83,7 @@ public String getTableStringIdentifier(ValueInSingleWindow<Row> element) {
@Override
public IcebergDestination instantiateDestination(String dest) {
return IcebergDestination.builder()
.setTableIdentifier(TableIdentifier.parse(dest))
.setTableIdentifier(IcebergUtils.parseTableIdentifier(dest))
.setTableCreateConfig(
IcebergTableCreateConfig.builder()
.setSchema(getDataSchema())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public static Builder builder() {
@SchemaIgnore
public TableIdentifier getTableIdentifier() {
if (cachedTableIdentifier == null) {
cachedTableIdentifier = TableIdentifier.parse(checkStateNotNull(getTableIdentifierString()));
cachedTableIdentifier =
IcebergUtils.parseTableIdentifier(checkStateNotNull(getTableIdentifierString()));
}
return cachedTableIdentifier;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Futures;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListenableFuture;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;

/** Utility to fetch and cache Iceberg {@link Table}s. */
class TableCache {
Expand All @@ -45,7 +44,7 @@ class TableCache {
public Table load(String identifier) {
return checkStateNotNull(CATALOG_CACHE.get(identifier))
.catalog()
.loadTable(TableIdentifier.parse(identifier));
.loadTable(IcebergUtils.parseTableIdentifier(identifier));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ public void testProjectedSchemaWithNestedFields() {
@Test
public void testSimpleScan() throws Exception {
TableIdentifier tableId =
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));
TableIdentifier.of(
"default", "table.with.dots" + Long.toString(UUID.randomUUID().hashCode(), 16));
Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA);
final Schema schema = icebergSchemaToBeamSchema(TestFixtures.SCHEMA);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ private IcebergIO.WriteRows applyDistribution(IcebergIO.WriteRows write) {
@Test
public void testSimpleAppend() throws Exception {
TableIdentifier tableId =
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));
TableIdentifier.of(
"default", "table.with.dots" + Long.toString(UUID.randomUUID().hashCode(), 16));

Map<String, String> catalogProps =
ImmutableMap.<String, String>builder()
Expand Down
Loading
Loading