Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 2
"modification": 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ public void processElement(

try {
// see if table already exists with a spec
spec = catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).spec();
spec =
TableCache.getAndRefreshIfStale(
catalogConfig, TableIdentifier.parse(tableIdentifier))
.spec();

} catch (NoSuchTableException ignored) {
// no partition to apply
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,13 @@ class CreateReadTasksDoFn
this.scanConfig = scanConfig;
}

@Setup
public void setup() {
TableCache.setup(scanConfig);
}

@ProcessElement
public void process(
@Element KV<String, List<SnapshotInfo>> element,
OutputReceiver<KV<ReadTaskDescriptor, ReadTask>> out)
throws IOException, ExecutionException {
// force refresh because the table must be updated before scanning snapshots
Table table = TableCache.getRefreshed(element.getKey());
Table table = TableCache.getRefreshed(scanConfig.getCatalogConfig(), element.getKey());

// scan snapshots individually and assign commit timestamp to files
for (SnapshotInfo snapshot : element.getValue()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ public PCollection<Row> expand(PBegin input) {
TableIdentifier tableId =
checkStateNotNull(getTableIdentifier(), "Must set a table to read from.");

Table table = getCatalogConfig().catalog().loadTable(tableId);
Table table = TableCache.get(getCatalogConfig(), tableId);

IcebergScanConfig scanConfig =
IcebergScanConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ public enum ScanType {
@Pure
public Table getTable() {
if (cachedTable == null) {
cachedTable =
getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier()));
cachedTable = TableCache.get(getCatalogConfig(), TableIdentifier.parse(getTableIdentifier()));
}
return cachedTable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@ class IncrementalScanSource extends PTransform<PBegin, PCollection<Row>> {
@Override
public PCollection<Row> expand(PBegin input) {
Table table =
scanConfig
.getCatalogConfig()
.catalog()
.loadTable(TableIdentifier.parse(scanConfig.getTableIdentifier()));
TableCache.get(
scanConfig.getCatalogConfig(), TableIdentifier.parse(scanConfig.getTableIdentifier()));

PCollection<KV<String, List<SnapshotInfo>>> snapshots =
MoreObjects.firstNonNull(scanConfig.getStreaming(), false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,14 @@ class ReadFromTasks extends DoFn<KV<ReadTaskDescriptor, ReadTask>, Row> {
this.scanConfig = scanConfig;
}

@Setup
public void setup() {
TableCache.setup(scanConfig);
}

@ProcessElement
public void process(
@Element KV<ReadTaskDescriptor, ReadTask> element,
RestrictionTracker<OffsetRange, Long> tracker,
OutputReceiver<Row> out)
throws IOException, ExecutionException, InterruptedException {
ReadTask readTask = element.getValue();
Table table = TableCache.get(scanConfig.getTableIdentifier());
Table table = TableCache.get(scanConfig.getCatalogConfig(), scanConfig.getTableIdentifier());

List<FileScanTask> fileScanTasks = readTask.getFileScanTasks();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.YearMonth;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -248,7 +246,7 @@ static String getPartitionDataPath(
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
private static final LocalDateTime EPOCH = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC);

private final Catalog catalog;
private final IcebergCatalogConfig catalogConfig;
private final String filePrefix;
private final long maxFileSize;
private final int maxNumWriters;
Expand All @@ -260,46 +258,11 @@ static String getPartitionDataPath(
private final Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>>
totalSerializableDataFiles = Maps.newHashMap();

static final class LastRefreshedTable {
final Table table;
volatile Instant lastRefreshTime;
static final Duration STALENESS_THRESHOLD = Duration.ofMinutes(2);

LastRefreshedTable(Table table, Instant lastRefreshTime) {
this.table = table;
this.lastRefreshTime = lastRefreshTime;
}

/**
* Refreshes the table metadata if it is considered stale (older than 2 minutes).
*
* <p>This method first performs a non-synchronized check on the table's freshness. This
* provides a lock-free fast path that avoids synchronization overhead in the common case where
* the table does not need to be refreshed. If the table might be stale, it then enters a
* synchronized block to ensure that only one thread performs the refresh operation.
*/
void refreshIfStale() {
// Fast path: Avoid entering the synchronized block if the table is not stale.
if (lastRefreshTime.isAfter(Instant.now().minus(STALENESS_THRESHOLD))) {
return;
}
synchronized (this) {
if (lastRefreshTime.isBefore(Instant.now().minus(STALENESS_THRESHOLD))) {
table.refresh();
lastRefreshTime = Instant.now();
}
}
}
}

@VisibleForTesting
static final Cache<TableIdentifier, LastRefreshedTable> LAST_REFRESHED_TABLE_CACHE =
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();

private boolean isClosed = false;

RecordWriterManager(Catalog catalog, String filePrefix, long maxFileSize, int maxNumWriters) {
this.catalog = catalog;
RecordWriterManager(
IcebergCatalogConfig catalogConfig, String filePrefix, long maxFileSize, int maxNumWriters) {
this.catalogConfig = catalogConfig;
this.filePrefix = filePrefix;
this.maxFileSize = maxFileSize;
this.maxNumWriters = maxNumWriters;
Expand All @@ -308,9 +271,9 @@ void refreshIfStale() {
/**
* Returns an Iceberg {@link Table}.
*
* <p>First attempts to fetch the table from the {@link #LAST_REFRESHED_TABLE_CACHE}. If it's not
* there, we attempt to load it using the Iceberg API. If the table doesn't exist at all, we
* attempt to create it, inferring the table schema from the record schema.
* <p>First attempts to fetch the table from the shared {@link TableCache}. If it's not there, we
* attempt to load it using the Iceberg API. If the table doesn't exist at all, we attempt to
* create it, inferring the table schema from the record schema.
*
* <p>Note that this is a best-effort operation that depends on the {@link Catalog}
* implementation. Although it is expected, some implementations may not support creating a table
Expand All @@ -319,13 +282,13 @@ void refreshIfStale() {
@VisibleForTesting
Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
TableIdentifier identifier = destination.getTableIdentifier();
@Nullable
LastRefreshedTable lastRefreshedTable = LAST_REFRESHED_TABLE_CACHE.getIfPresent(identifier);
if (lastRefreshedTable != null && lastRefreshedTable.table != null) {
lastRefreshedTable.refreshIfStale();
return lastRefreshedTable.table;
}
return TableCache.getAndRefreshIfStale(
catalogConfig, identifier, () -> loadOrCreateTable(destination, dataSchema));
}

private Table loadOrCreateTable(IcebergDestination destination, Schema dataSchema) {
Catalog catalog = catalogConfig.catalog();
TableIdentifier identifier = destination.getTableIdentifier();
Namespace namespace = identifier.namespace();
@Nullable IcebergTableCreateConfig createConfig = destination.getTableCreateConfig();
PartitionSpec partitionSpec =
Expand All @@ -336,53 +299,48 @@ Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
? createConfig.getTableProperties()
: Maps.newHashMap();

@Nullable Table table = null;
synchronized (LAST_REFRESHED_TABLE_CACHE) {
// Create namespace if it does not exist yet
if (!namespace.isEmpty() && catalog instanceof SupportsNamespaces) {
SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog;
if (!supportsNamespaces.namespaceExists(namespace)) {
try {
supportsNamespaces.createNamespace(namespace);
LOG.info("Created new namespace '{}'.", namespace);
} catch (AlreadyExistsException ignored) {
// race condition: another worker already created this namespace
}
// Create namespace if it does not exist yet
if (!namespace.isEmpty() && catalog instanceof SupportsNamespaces) {
SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog;
if (!supportsNamespaces.namespaceExists(namespace)) {
try {
supportsNamespaces.createNamespace(namespace);
LOG.info("Created new namespace '{}'.", namespace);
} catch (AlreadyExistsException ignored) {
// race condition: another worker already created this namespace
}
}
}

// If table exists, just load it
// Note: the implementation of catalog.tableExists() will load the table to check its
// existence. We don't use it here to avoid double loadTable() calls.
// If table exists, just load it
// Note: the implementation of catalog.tableExists() will load the table to check its
// existence. We don't use it here to avoid double loadTable() calls.
try {
return catalog.loadTable(identifier);
} catch (NoSuchTableException e) { // Otherwise, create the table
org.apache.iceberg.Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
try {
table = catalog.loadTable(identifier);
} catch (NoSuchTableException e) { // Otherwise, create the table
org.apache.iceberg.Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
try {
table =
catalog
.buildTable(identifier, tableSchema)
.withPartitionSpec(partitionSpec)
.withSortOrder(sortOrder)
.withProperties(tableProperties)
.create();
LOG.info(
"Created Iceberg table '{}' with schema: {}\n"
+ ", partition spec: {}, sort order: {}, table properties: {}",
identifier,
tableSchema,
partitionSpec,
sortOrder,
tableProperties);
} catch (AlreadyExistsException ignored) {
// race condition: another worker already created this table
table = catalog.loadTable(identifier);
}
Table table =
catalog
.buildTable(identifier, tableSchema)
.withPartitionSpec(partitionSpec)
.withSortOrder(sortOrder)
.withProperties(tableProperties)
.create();
LOG.info(
"Created Iceberg table '{}' with schema: {}\n"
+ ", partition spec: {}, sort order: {}, table properties: {}",
identifier,
tableSchema,
partitionSpec,
sortOrder,
tableProperties);
return table;
} catch (AlreadyExistsException ignored) {
// race condition: another worker already created this table
return catalog.loadTable(identifier);
}
}
lastRefreshedTable = new LastRefreshedTable(table, Instant.now());
LAST_REFRESHED_TABLE_CACHE.put(identifier, lastRefreshedTable);
return table;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public ScanSource(IcebergScanConfig scanConfig) {
}

private TableScan getTableScan() {
Table table = scanConfig.getTable();
Table table =
TableCache.getRefreshed(scanConfig.getCatalogConfig(), scanConfig.getTableIdentifier());
TableScan tableScan = table.newScan().project(scanConfig.getProjectedSchema());

if (scanConfig.getFilter() != null) {
Expand Down
Loading
Loading