Skip to content

Allow strongly-consistent schema updates in BigQueryIO#38854

Open
reuvenlax wants to merge 2 commits into
apache:masterfrom
reuvenlax:schema_update_handle_unknown_fields
Open

Allow strongly-consistent schema updates in BigQueryIO#38854
reuvenlax wants to merge 2 commits into
apache:masterfrom
reuvenlax:schema_update_handle_unknown_fields

Conversation

@reuvenlax

Copy link
Copy Markdown
Contributor

No description provided.

@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances BigQueryIO's schema update capabilities by introducing a strongly-consistent mode for Storage API writes. This ensures that data integrity is maintained during schema evolution by preventing the ingestion of rows with new schemas until the updated table schema is detected. The changes include new configuration options for timeouts, improved schema change detection logic, and internal refactoring to support more robust error handling and row splitting during schema transitions.

Highlights

  • Strongly-consistent schema updates: Introduced a new 'withAutoSchemaUpdateConsistent' method in BigQueryIO to allow for strongly-consistent schema updates, ensuring rows with new schemas are not sent to BigQuery until the updated schema is observed.
  • Schema change detection: Added 'SchemaChangeDetectorHelper' to centralize logic for detecting and merging schema updates during BigQuery Storage API writes.
  • Timeout support: Added support for a strict timeout parameter when waiting for schema updates, after which rows are routed to the failed-rows collection.
  • SplittingIterable refactoring: Refactored 'SplittingIterable' to handle schema mismatches using a BitSet, allowing for more granular control over row processing during schema transitions.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@reuvenlax

Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a consistent auto-schema update mechanism for BigQuery Storage API writes, allowing users to specify a timeout for waiting for schema updates. However, there are several critical compilation issues in the current implementation. These include a stray 'T' character in StorageApiDynamicDestinationsTableRow.java, a missing semicolon in StorageApiWritesShardedRecords.java, a call to a non-existent method bufferMismatchedRows, and calls to updateSchemaFromTable() on the MessageConverter interface where it is not defined.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

public void updateSchemaFromTable() throws IOException, InterruptedException {
SCHEMA_CACHE.refreshSchema(
delegate.get().tableReference, datasetService, writeStreamService, bigQueryOptions);
T

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a stray 'T' on line 154, which is a syntax error and will cause a compilation failure. This line should be removed.

org.joda.time.Instant targetExpirationTime = startMismatchTime.plus(autoUpdateSchemaStrictTimeout);
Iterable<SplittingIterable.Value> mismatchedRows =
Iterables.transform(messages, SplittingIterable.Value::getSchemaMismatchedRowsOnly);
bufferMismatchedRows(mismatchedRows, targetExpirationTime)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a missing semicolon at the end of the bufferMismatchedRows call, which will cause a compilation failure.

Suggested change
bufferMismatchedRows(mismatchedRows, targetExpirationTime)
bufferMismatchedRows(mismatchedRows, targetExpirationTime);

// Start off with the base schema. As we get notified of schema updates, we
// will update the descriptor.
if (autoUpdateSchemaStrictTimeout != null) {
messageConverter.updateSchemaFromTable();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The method updateSchemaFromTable() is called on messageConverter at lines 864 and 1116, but this method is not defined in the StorageApiDynamicDestinations.MessageConverter interface. This will cause a compilation failure. To resolve this, please add updateSchemaFromTable() to the MessageConverter interface in StorageApiDynamicDestinations.java and provide appropriate implementations in all classes implementing this interface.

org.joda.time.Instant targetExpirationTime = startMismatchTime.plus(autoUpdateSchemaStrictTimeout);
Iterable<SplittingIterable.Value> mismatchedRows =
Iterables.transform(messages, SplittingIterable.Value::getSchemaMismatchedRowsOnly);
bufferMismatchedRows(mismatchedRows, targetExpirationTime)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The method bufferMismatchedRows is called here, but its definition is missing from this class and the rest of the PR. Please ensure that this method is implemented and included in the pull request.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a strict timeout option for automatic schema updates in BigQueryIO (autoSchemaUpdateStrictTimeout), adding a new SchemaChangeDetectorHelper and updating several Storage API writing classes to support local retries and buffering of mismatched rows. The review feedback highlights several critical compilation issues that must be addressed, including a stray T character in StorageApiDynamicDestinationsTableRow.java, a missing semicolon and an undefined method bufferMismatchedRows in StorageApiWritesShardedRecords.java, and a missing interface method updateSchemaFromTable in StorageApiDynamicDestinations.java. Additionally, the reviewer recommended removing redundant loop conditions in SplittingIterable.java and simplifying a fully-qualified Preconditions call.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines 153 to 155
delegate.get().tableReference, datasetService, writeStreamService, bigQueryOptions);
T
// Recycle the internal MessageConverter so that we pick up the new schema from the cache.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a stray T character on line 154 which will cause a compilation failure. It should be removed.

Suggested change
delegate.get().tableReference, datasetService, writeStreamService, bigQueryOptions);
T
// Recycle the internal MessageConverter so that we pick up the new schema from the cache.
delegate.get().tableReference, datasetService, writeStreamService, bigQueryOptions);
// Recycle the internal MessageConverter so that we pick up the new schema from the cache.

org.joda.time.Instant targetExpirationTime = startMismatchTime.plus(autoUpdateSchemaStrictTimeout);
Iterable<SplittingIterable.Value> mismatchedRows =
Iterables.transform(messages, SplittingIterable.Value::getSchemaMismatchedRowsOnly);
bufferMismatchedRows(mismatchedRows, targetExpirationTime)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a missing semicolon at the end of this statement. Additionally, the method bufferMismatchedRows is called here but is not defined anywhere in this class or the PR changes, which will cause a compilation failure.

Suggested change
bufferMismatchedRows(mismatchedRows, targetExpirationTime)
bufferMismatchedRows(mismatchedRows, targetExpirationTime);

Comment on lines +33 to 35
TableSchema getTableSchema();

DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns) throws Exception;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The MessageConverter interface does not define updateSchemaFromTable(), but it is called on messageConverter in StorageApiWritesShardedRecords.java. Adding a default no-op implementation to the interface will prevent compilation failures for other implementations.

    TableSchema getTableSchema();

    DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns) throws Exception;

    default void updateSchemaFromTable() throws IOException, InterruptedException {}

Comment on lines +64 to +70
for (int i = 0; ! getSchemaMismatchedRows().isEmpty() && i < getProtoRows().getSerializedRowsCount(); i++) {
if (getSchemaMismatchedRows().get(i)) {
inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
timestamps.add(getTimestamps().get(i));
failsafeTableRows.add(getFailsafeTableRows().get(i));
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The loop condition contains ! getSchemaMismatchedRows().isEmpty() && which is redundant and evaluated on every iteration. It can be safely removed.

Suggested change
for (int i = 0; ! getSchemaMismatchedRows().isEmpty() && i < getProtoRows().getSerializedRowsCount(); i++) {
if (getSchemaMismatchedRows().get(i)) {
inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
timestamps.add(getTimestamps().get(i));
failsafeTableRows.add(getFailsafeTableRows().get(i));
}
}
for (int i = 0; i < getProtoRows().getSerializedRowsCount(); i++) {
if (getSchemaMismatchedRows().get(i)) {
inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
timestamps.add(getTimestamps().get(i));
failsafeTableRows.add(getFailsafeTableRows().get(i));
}
}

Comment on lines +84 to +90
for (int i = 0; ! getSchemaMismatchedRows().isEmpty() && i < getProtoRows().getSerializedRowsCount(); i++) {
if (!getSchemaMismatchedRows().get(i)) {
inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
timestamps.add(getTimestamps().get(i));
failsafeTableRows.add(getFailsafeTableRows().get(i));
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The loop condition contains ! getSchemaMismatchedRows().isEmpty() && which is redundant and evaluated on every iteration. It can be safely removed.

Suggested change
for (int i = 0; ! getSchemaMismatchedRows().isEmpty() && i < getProtoRows().getSerializedRowsCount(); i++) {
if (!getSchemaMismatchedRows().get(i)) {
inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
timestamps.add(getTimestamps().get(i));
failsafeTableRows.add(getFailsafeTableRows().get(i));
}
}
for (int i = 0; i < getProtoRows().getSerializedRowsCount(); i++) {
if (!getSchemaMismatchedRows().get(i)) {
inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
timestamps.add(getTimestamps().get(i));
failsafeTableRows.add(getFailsafeTableRows().get(i));
}
}

Comment on lines +1059 to +1060
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(
!createRetryManagerResult.getSchemaMismatchSeen());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The fully-qualified class name for Preconditions is used here, but checkState is already statically imported at the top of the file. Simplifying this improves readability.

                checkState(!createRetryManagerResult.getSchemaMismatchSeen());

@github-actions

github-actions Bot commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@reuvenlax

Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a consistent schema update mechanism for the BigQuery Storage Write API in Apache Beam, allowing rows with mismatched schemas to be buffered and retried until the updated schema is observed or a timeout is reached. The review feedback highlights critical issues, including missing side input accessors for dynamic destinations during timer executions (which would cause runtime failures), a potential integer overflow in the sharding logic, and the need to prevent unnecessary empty RPC calls when all rows are buffered. Additionally, suggestions were made to support standard Java maps in schema checks and to clean up leftover debugging print statements.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

PipelineOptions pipelineOptions,
MultiOutputReceiver o)
throws Exception {
// TODO XXX SET DYNAMIC DESTINATIONS

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The side input accessor for dynamicDestinations is not set during the timer execution. If dynamicDestinations relies on side inputs, calling dynamicDestinations.getTable(...) on line 238 will fail at runtime. You should add OnTimerContext (or WindowContext) to the onTimer method parameters and use it to set the side input accessor on dynamicDestinations before accessing any tables.

currentTimerValue.readLater();
minPendingTimestamp.readLater();

// TODOTDO XXX add context for side inputs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The side input accessor for dynamicDestinations is not set during the timer execution. If dynamicDestinations relies on side inputs, calling dynamicDestinations.getTable(...) on line 1272 will fail at runtime. You should add OnTimerContext to the onMismatchedRowsTimer method parameters and use it to set the side input accessor on dynamicDestinations before accessing the table destination.

}
} while (schemaMismatchSeen && BackOffUtils.next(Sleeper.DEFAULT, backoff));

pendingMessages.clear();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If all rows in the batch are mismatched, they are buffered for later retry, leaving rowsToProcess empty. If we proceed without checking, an empty operation is added to the retryManager, resulting in an unnecessary empty RPC call to BigQuery. We should return early if there are no rows to process.

        pendingMessages.clear();
        if (rowsToProcess.getProtoRows().getSerializedRowsCount() == 0) {
          return 0;
        }

Comment on lines +803 to +804
Instant now = Instant.now();
System.err.println("PROCESS " + now);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Remove leftover debugging System.err.println statements and the unused now variable.

idleTimer,
o,
processMismatchedRows);
System.err.println("PROCESS DONE AFTER " + java.time.Duration.between(now, Instant.now()));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Remove leftover debugging System.err.println statement.

@StateId("currentMismatchedRowTimerValue") ValueState<Long> currentTimerValue,
@StateId("minPendingTimestamp") ValueState<Long> minPendingTimestamp)
throws Exception {
System.err.println("RETRY TIMER " + Instant.now());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Remove leftover debugging System.err.println statement.

Comment on lines +395 to +397
} else if (value instanceof TableRow) {
return missingUnknownField((TableRow) value, fieldDescriptor.getMessageType());
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Check for AbstractMap instead of TableRow to ensure that nested records represented as standard Java maps (e.g., LinkedHashMap) are also correctly checked for missing unknown fields.

Suggested change
} else if (value instanceof TableRow) {
return missingUnknownField((TableRow) value, fieldDescriptor.getMessageType());
}
} else if (value instanceof AbstractMap) {
return missingUnknownField((AbstractMap<String, Object>) value, fieldDescriptor.getMessageType());
}

Comment on lines +114 to +115
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
buffer.putInt(++shardNumber % NUM_DEFAULT_SHARDS);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using an instance variable shardNumber and incrementing it sequentially can lead to negative values if it overflows (since % in Java with a negative operand returns a negative result). It is simpler, thread-safe, and robust to use ThreadLocalRandom directly to generate the shard number.

Suggested change
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
buffer.putInt(++shardNumber % NUM_DEFAULT_SHARDS);
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
buffer.putInt(ThreadLocalRandom.current().nextInt(NUM_DEFAULT_SHARDS));

@reuvenlax reuvenlax force-pushed the schema_update_handle_unknown_fields branch from 7e959e9 to c3ca6b5 Compare June 9, 2026 18:48
@reuvenlax reuvenlax force-pushed the schema_update_handle_unknown_fields branch from e426391 to 4ac6c18 Compare June 9, 2026 22:22
@reuvenlax reuvenlax requested a review from ahmedabu98 June 10, 2026 00:04
@reuvenlax

Copy link
Copy Markdown
Contributor Author

RE: @ahmedabu98

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant