Allow strongly-consistent schema updates in BigQueryIO#38854
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances BigQueryIO's schema update capabilities by introducing a strongly-consistent mode for Storage API writes. This ensures that data integrity is maintained during schema evolution by preventing the ingestion of rows with new schemas until the updated table schema is detected. The changes include new configuration options for timeouts, improved schema change detection logic, and internal refactoring to support more robust error handling and row splitting during schema transitions. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a consistent auto-schema update mechanism for BigQuery Storage API writes, allowing users to specify a timeout for waiting for schema updates. However, there are several critical compilation issues in the current implementation. These include a stray 'T' character in StorageApiDynamicDestinationsTableRow.java, a missing semicolon in StorageApiWritesShardedRecords.java, a call to a non-existent method bufferMismatchedRows, and calls to updateSchemaFromTable() on the MessageConverter interface where it is not defined.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| public void updateSchemaFromTable() throws IOException, InterruptedException { | ||
| SCHEMA_CACHE.refreshSchema( | ||
| delegate.get().tableReference, datasetService, writeStreamService, bigQueryOptions); | ||
| T |
| org.joda.time.Instant targetExpirationTime = startMismatchTime.plus(autoUpdateSchemaStrictTimeout); | ||
| Iterable<SplittingIterable.Value> mismatchedRows = | ||
| Iterables.transform(messages, SplittingIterable.Value::getSchemaMismatchedRowsOnly); | ||
| bufferMismatchedRows(mismatchedRows, targetExpirationTime) |
| // Start off with the base schema. As we get notified of schema updates, we | ||
| // will update the descriptor. | ||
| if (autoUpdateSchemaStrictTimeout != null) { | ||
| messageConverter.updateSchemaFromTable(); |
There was a problem hiding this comment.
The method updateSchemaFromTable() is called on messageConverter at lines 864 and 1116, but this method is not defined in the StorageApiDynamicDestinations.MessageConverter interface. This will cause a compilation failure. To resolve this, please add updateSchemaFromTable() to the MessageConverter interface in StorageApiDynamicDestinations.java and provide appropriate implementations in all classes implementing this interface.
| org.joda.time.Instant targetExpirationTime = startMismatchTime.plus(autoUpdateSchemaStrictTimeout); | ||
| Iterable<SplittingIterable.Value> mismatchedRows = | ||
| Iterables.transform(messages, SplittingIterable.Value::getSchemaMismatchedRowsOnly); | ||
| bufferMismatchedRows(mismatchedRows, targetExpirationTime) |
There was a problem hiding this comment.
Code Review
This pull request introduces a strict timeout option for automatic schema updates in BigQueryIO (autoSchemaUpdateStrictTimeout), adding a new SchemaChangeDetectorHelper and updating several Storage API writing classes to support local retries and buffering of mismatched rows. The review feedback highlights several critical compilation issues that must be addressed, including a stray T character in StorageApiDynamicDestinationsTableRow.java, a missing semicolon and an undefined method bufferMismatchedRows in StorageApiWritesShardedRecords.java, and a missing interface method updateSchemaFromTable in StorageApiDynamicDestinations.java. Additionally, the reviewer recommended removing redundant loop conditions in SplittingIterable.java and simplifying a fully-qualified Preconditions call.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| delegate.get().tableReference, datasetService, writeStreamService, bigQueryOptions); | ||
| T | ||
| // Recycle the internal MessageConverter so that we pick up the new schema from the cache. |
There was a problem hiding this comment.
There is a stray T character on line 154 which will cause a compilation failure. It should be removed.
| delegate.get().tableReference, datasetService, writeStreamService, bigQueryOptions); | |
| T | |
| // Recycle the internal MessageConverter so that we pick up the new schema from the cache. | |
| delegate.get().tableReference, datasetService, writeStreamService, bigQueryOptions); | |
| // Recycle the internal MessageConverter so that we pick up the new schema from the cache. |
| org.joda.time.Instant targetExpirationTime = startMismatchTime.plus(autoUpdateSchemaStrictTimeout); | ||
| Iterable<SplittingIterable.Value> mismatchedRows = | ||
| Iterables.transform(messages, SplittingIterable.Value::getSchemaMismatchedRowsOnly); | ||
| bufferMismatchedRows(mismatchedRows, targetExpirationTime) |
There was a problem hiding this comment.
There is a missing semicolon at the end of this statement. Additionally, the method bufferMismatchedRows is called here but is not defined anywhere in this class or the PR changes, which will cause a compilation failure.
| bufferMismatchedRows(mismatchedRows, targetExpirationTime) | |
| bufferMismatchedRows(mismatchedRows, targetExpirationTime); |
| TableSchema getTableSchema(); | ||
|
|
||
| DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns) throws Exception; |
There was a problem hiding this comment.
The MessageConverter interface does not define updateSchemaFromTable(), but it is called on messageConverter in StorageApiWritesShardedRecords.java. Adding a default no-op implementation to the interface will prevent compilation failures for other implementations.
TableSchema getTableSchema();
DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns) throws Exception;
default void updateSchemaFromTable() throws IOException, InterruptedException {}| for (int i = 0; ! getSchemaMismatchedRows().isEmpty() && i < getProtoRows().getSerializedRowsCount(); i++) { | ||
| if (getSchemaMismatchedRows().get(i)) { | ||
| inserts.addSerializedRows(getProtoRows().getSerializedRows(i)); | ||
| timestamps.add(getTimestamps().get(i)); | ||
| failsafeTableRows.add(getFailsafeTableRows().get(i)); | ||
| } | ||
| } |
There was a problem hiding this comment.
The loop condition contains ! getSchemaMismatchedRows().isEmpty() && which is redundant and evaluated on every iteration. It can be safely removed.
| for (int i = 0; ! getSchemaMismatchedRows().isEmpty() && i < getProtoRows().getSerializedRowsCount(); i++) { | |
| if (getSchemaMismatchedRows().get(i)) { | |
| inserts.addSerializedRows(getProtoRows().getSerializedRows(i)); | |
| timestamps.add(getTimestamps().get(i)); | |
| failsafeTableRows.add(getFailsafeTableRows().get(i)); | |
| } | |
| } | |
| for (int i = 0; i < getProtoRows().getSerializedRowsCount(); i++) { | |
| if (getSchemaMismatchedRows().get(i)) { | |
| inserts.addSerializedRows(getProtoRows().getSerializedRows(i)); | |
| timestamps.add(getTimestamps().get(i)); | |
| failsafeTableRows.add(getFailsafeTableRows().get(i)); | |
| } | |
| } |
| for (int i = 0; ! getSchemaMismatchedRows().isEmpty() && i < getProtoRows().getSerializedRowsCount(); i++) { | ||
| if (!getSchemaMismatchedRows().get(i)) { | ||
| inserts.addSerializedRows(getProtoRows().getSerializedRows(i)); | ||
| timestamps.add(getTimestamps().get(i)); | ||
| failsafeTableRows.add(getFailsafeTableRows().get(i)); | ||
| } | ||
| } |
There was a problem hiding this comment.
The loop condition contains ! getSchemaMismatchedRows().isEmpty() && which is redundant and evaluated on every iteration. It can be safely removed.
| for (int i = 0; ! getSchemaMismatchedRows().isEmpty() && i < getProtoRows().getSerializedRowsCount(); i++) { | |
| if (!getSchemaMismatchedRows().get(i)) { | |
| inserts.addSerializedRows(getProtoRows().getSerializedRows(i)); | |
| timestamps.add(getTimestamps().get(i)); | |
| failsafeTableRows.add(getFailsafeTableRows().get(i)); | |
| } | |
| } | |
| for (int i = 0; i < getProtoRows().getSerializedRowsCount(); i++) { | |
| if (!getSchemaMismatchedRows().get(i)) { | |
| inserts.addSerializedRows(getProtoRows().getSerializedRows(i)); | |
| timestamps.add(getTimestamps().get(i)); | |
| failsafeTableRows.add(getFailsafeTableRows().get(i)); | |
| } | |
| } |
| org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState( | ||
| !createRetryManagerResult.getSchemaMismatchSeen()); |
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a consistent schema update mechanism for the BigQuery Storage Write API in Apache Beam, allowing rows with mismatched schemas to be buffered and retried until the updated schema is observed or a timeout is reached. The review feedback highlights critical issues, including missing side input accessors for dynamic destinations during timer executions (which would cause runtime failures), a potential integer overflow in the sharding logic, and the need to prevent unnecessary empty RPC calls when all rows are buffered. Additionally, suggestions were made to support standard Java maps in schema checks and to clean up leftover debugging print statements.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| PipelineOptions pipelineOptions, | ||
| MultiOutputReceiver o) | ||
| throws Exception { | ||
| // TODO XXX SET DYNAMIC DESTINATIONS |
There was a problem hiding this comment.
The side input accessor for dynamicDestinations is not set during the timer execution. If dynamicDestinations relies on side inputs, calling dynamicDestinations.getTable(...) on line 238 will fail at runtime. You should add OnTimerContext (or WindowContext) to the onTimer method parameters and use it to set the side input accessor on dynamicDestinations before accessing any tables.
| currentTimerValue.readLater(); | ||
| minPendingTimestamp.readLater(); | ||
|
|
||
| // TODOTDO XXX add context for side inputs. |
There was a problem hiding this comment.
The side input accessor for dynamicDestinations is not set during the timer execution. If dynamicDestinations relies on side inputs, calling dynamicDestinations.getTable(...) on line 1272 will fail at runtime. You should add OnTimerContext to the onMismatchedRowsTimer method parameters and use it to set the side input accessor on dynamicDestinations before accessing the table destination.
| } | ||
| } while (schemaMismatchSeen && BackOffUtils.next(Sleeper.DEFAULT, backoff)); | ||
|
|
||
| pendingMessages.clear(); |
There was a problem hiding this comment.
If all rows in the batch are mismatched, they are buffered for later retry, leaving rowsToProcess empty. If we proceed without checking, an empty operation is added to the retryManager, resulting in an unnecessary empty RPC call to BigQuery. We should return early if there are no rows to process.
pendingMessages.clear();
if (rowsToProcess.getProtoRows().getSerializedRowsCount() == 0) {
return 0;
}| Instant now = Instant.now(); | ||
| System.err.println("PROCESS " + now); |
| idleTimer, | ||
| o, | ||
| processMismatchedRows); | ||
| System.err.println("PROCESS DONE AFTER " + java.time.Duration.between(now, Instant.now())); |
| @StateId("currentMismatchedRowTimerValue") ValueState<Long> currentTimerValue, | ||
| @StateId("minPendingTimestamp") ValueState<Long> minPendingTimestamp) | ||
| throws Exception { | ||
| System.err.println("RETRY TIMER " + Instant.now()); |
| } else if (value instanceof TableRow) { | ||
| return missingUnknownField((TableRow) value, fieldDescriptor.getMessageType()); | ||
| } |
There was a problem hiding this comment.
Check for AbstractMap instead of TableRow to ensure that nested records represented as standard Java maps (e.g., LinkedHashMap) are also correctly checked for missing unknown fields.
| } else if (value instanceof TableRow) { | |
| return missingUnknownField((TableRow) value, fieldDescriptor.getMessageType()); | |
| } | |
| } else if (value instanceof AbstractMap) { | |
| return missingUnknownField((AbstractMap<String, Object>) value, fieldDescriptor.getMessageType()); | |
| } |
| ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); | ||
| buffer.putInt(++shardNumber % NUM_DEFAULT_SHARDS); |
There was a problem hiding this comment.
Using an instance variable shardNumber and incrementing it sequentially can lead to negative values if it overflows (since % in Java with a negative operand returns a negative result). It is simpler, thread-safe, and robust to use ThreadLocalRandom directly to generate the shard number.
| ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); | |
| buffer.putInt(++shardNumber % NUM_DEFAULT_SHARDS); | |
| ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); | |
| buffer.putInt(ThreadLocalRandom.current().nextInt(NUM_DEFAULT_SHARDS)); |
7e959e9 to
c3ca6b5
Compare
e426391 to
4ac6c18
Compare
|
RE: @ahmedabu98 |
No description provided.