From cec9809b3ae060619d7f9c0e3887dffc508aaea1 Mon Sep 17 00:00:00 2001 From: Brandt Newton Date: Mon, 8 Jun 2026 11:45:14 -0400 Subject: [PATCH 1/3] feat: add configurable bigtable retries --- .../bigtable/BigtableConfigTranslator.java | 41 ++++++---- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 77 ++++++++++++++++--- .../io/gcp/bigtable/BigtableReadOptions.java | 31 ++++++-- .../io/gcp/bigtable/BigtableWriteOptions.java | 31 ++++++-- .../changestreams/dao/DaoFactory.java | 6 +- .../BigtableConfigTranslatorTest.java | 8 +- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 2 +- 7 files changed, 144 insertions(+), 52 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java index 78fef141515f..3f63a6e5462e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java @@ -234,16 +234,16 @@ private static BigtableDataSettings configureWriteSettings( // with the old behavior. long initialRpcTimeout = writeOptions.getAttemptTimeout() != null - ? writeOptions.getAttemptTimeout().getMillis() + ? writeOptions.getAttemptTimeout().get().getMillis() : (fromBigtableOptions != null && fromBigtableOptions.getAttemptTimeout() != null - ? fromBigtableOptions.getAttemptTimeout().getMillis() + ? fromBigtableOptions.getAttemptTimeout().get().getMillis() : Duration.ofMinutes(6).toMillis()); long totalTimeout = writeOptions.getOperationTimeout() != null - ? writeOptions.getOperationTimeout().getMillis() + ? writeOptions.getOperationTimeout().get().getMillis() : (fromBigtableOptions != null && fromBigtableOptions.getOperationTimeout() != null - ? fromBigtableOptions.getOperationTimeout().getMillis() + ? fromBigtableOptions.getOperationTimeout().get().getMillis() : retrySettings.getTotalTimeout().toMillis()); retrySettings @@ -334,26 +334,26 @@ private static BigtableDataSettings configureReadSettings( // Options set directly on readOptions overrides Options set in BigtableOptions long initialRpcTimeout = readOptions.getAttemptTimeout() != null - ? readOptions.getAttemptTimeout().getMillis() + ? readOptions.getAttemptTimeout().get().getMillis() : (optionsFromBigtableOptions != null && optionsFromBigtableOptions.getAttemptTimeout() != null - ? optionsFromBigtableOptions.getAttemptTimeout().getMillis() + ? optionsFromBigtableOptions.getAttemptTimeout().get().getMillis() : retrySettings.getInitialRpcTimeout().toMillis()); long totalTimeout = readOptions.getOperationTimeout() != null - ? readOptions.getOperationTimeout().getMillis() + ? readOptions.getOperationTimeout().get().getMillis() : (optionsFromBigtableOptions != null && optionsFromBigtableOptions.getOperationTimeout() != null - ? optionsFromBigtableOptions.getOperationTimeout().getMillis() + ? optionsFromBigtableOptions.getOperationTimeout().get().getMillis() : retrySettings.getTotalTimeout().toMillis()); long waitTimeout = readOptions.getWaitTimeout() != null - ? readOptions.getWaitTimeout().getMillis() + ? readOptions.getWaitTimeout().get().getMillis() : (optionsFromBigtableOptions != null && optionsFromBigtableOptions.getWaitTimeout() != null - ? optionsFromBigtableOptions.getWaitTimeout().getMillis() + ? optionsFromBigtableOptions.getWaitTimeout().get().getMillis() : settings.stubSettings().readRowsSettings().getWaitTimeout().toMillis()); retrySettings @@ -464,14 +464,19 @@ static BigtableReadOptions translateToBigtableReadOptions( BigtableReadOptions readOptions, BigtableOptions options) { BigtableReadOptions.Builder builder = readOptions.toBuilder(); builder.setWaitTimeout( - org.joda.time.Duration.millis(options.getRetryOptions().getReadPartialRowTimeoutMillis())); + ValueProvider.StaticValueProvider.of( + org.joda.time.Duration.millis( + options.getRetryOptions().getReadPartialRowTimeoutMillis()))); if (options.getCallOptionsConfig().getReadStreamRpcAttemptTimeoutMs().isPresent()) { builder.setAttemptTimeout( - org.joda.time.Duration.millis( - options.getCallOptionsConfig().getReadStreamRpcAttemptTimeoutMs().get())); + ValueProvider.StaticValueProvider.of( + org.joda.time.Duration.millis( + options.getCallOptionsConfig().getReadStreamRpcAttemptTimeoutMs().get()))); } builder.setOperationTimeout( - org.joda.time.Duration.millis(options.getCallOptionsConfig().getReadStreamRpcTimeoutMs())); + ValueProvider.StaticValueProvider.of( + org.joda.time.Duration.millis( + options.getCallOptionsConfig().getReadStreamRpcTimeoutMs()))); return builder.build(); } @@ -482,14 +487,16 @@ static BigtableWriteOptions translateToBigtableWriteOptions( // configure timeouts if (options.getCallOptionsConfig().getMutateRpcAttemptTimeoutMs().isPresent()) { builder.setAttemptTimeout( - org.joda.time.Duration.millis( - options.getCallOptionsConfig().getMutateRpcAttemptTimeoutMs().get())); + ValueProvider.StaticValueProvider.of( + org.joda.time.Duration.millis( + options.getCallOptionsConfig().getMutateRpcAttemptTimeoutMs().get()))); } if (options.getBulkOptions().isEnableBulkMutationThrottling()) { builder.setThrottlingTargetMs(options.getBulkOptions().getBulkMutationRpcTargetMs()); } builder.setOperationTimeout( - org.joda.time.Duration.millis(options.getCallOptionsConfig().getMutateRpcTimeoutMs())); + ValueProvider.StaticValueProvider.of( + org.joda.time.Duration.millis(options.getCallOptionsConfig().getMutateRpcTimeoutMs()))); // configure batch size builder.setMaxElementsPerBatch(options.getBulkOptions().getBulkMaxRowKeyCount()); builder.setMaxBytesPerBatch(options.getBulkOptions().getBulkMaxRequestSize()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 100078d32e70..126977961aa8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -705,14 +705,24 @@ public Read withEmulator(String emulatorHost) { * *

Does not modify this object. */ - public Read withAttemptTimeout(Duration timeout) { - checkArgument(timeout.isLongerThan(Duration.ZERO), "attempt timeout must be positive"); + public Read withAttemptTimeout(ValueProvider timeout) { BigtableReadOptions readOptions = getBigtableReadOptions(); return toBuilder() .setBigtableReadOptions(readOptions.toBuilder().setAttemptTimeout(timeout).build()) .build(); } + /** + * Returns a new {@link BigtableIO.Read} with the attempt timeout. Attempt timeout controls the + * timeout for each remote call. + * + *

Does not modify this object. + */ + public Read withAttemptTimeout(Duration timeout) { + checkArgument(timeout.isLongerThan(Duration.ZERO), "attempt timeout must be positive"); + return withAttemptTimeout(StaticValueProvider.of(timeout)); + } + /** * Returns a new {@link BigtableIO.Read} with the operation timeout. Operation timeout has * ultimate control over how long the logic should keep trying the remote call until it gives up @@ -720,14 +730,25 @@ public Read withAttemptTimeout(Duration timeout) { * *

Does not modify this object. */ - public Read withOperationTimeout(Duration timeout) { - checkArgument(timeout.isLongerThan(Duration.ZERO), "operation timeout must be positive"); + public Read withOperationTimeout(ValueProvider timeout) { BigtableReadOptions readOptions = getBigtableReadOptions(); return toBuilder() .setBigtableReadOptions(readOptions.toBuilder().setOperationTimeout(timeout).build()) .build(); } + /** + * Returns a new {@link BigtableIO.Read} with the operation timeout. Operation timeout has + * ultimate control over how long the logic should keep trying the remote call until it gives up + * completely. + * + *

Does not modify this object. + */ + public Read withOperationTimeout(Duration timeout) { + checkArgument(timeout.isLongerThan(Duration.ZERO), "operation timeout must be positive"); + return withOperationTimeout(StaticValueProvider.of(timeout)); + } + Read withServiceFactory(BigtableServiceFactory factory) { return toBuilder().setServiceFactory(factory).build(); } @@ -1043,14 +1064,24 @@ public Write withEmulator(String emulatorHost) { * *

Does not modify this object. */ - public Write withAttemptTimeout(Duration timeout) { - checkArgument(timeout.isLongerThan(Duration.ZERO), "attempt timeout must be positive"); + public Write withAttemptTimeout(ValueProvider timeout) { BigtableWriteOptions options = getBigtableWriteOptions(); return toBuilder() .setBigtableWriteOptions(options.toBuilder().setAttemptTimeout(timeout).build()) .build(); } + /** + * Returns a new {@link BigtableIO.Write} with the attempt timeout. Attempt timeout controls the + * timeout for each remote call. + * + *

Does not modify this object. + */ + public Write withAttemptTimeout(Duration timeout) { + checkArgument(timeout.isLongerThan(Duration.ZERO), "attempt timeout must be positive"); + return withAttemptTimeout(StaticValueProvider.of(timeout)); + } + /** * Returns a new {@link BigtableIO.Write} with the operation timeout. Operation timeout has * ultimate control over how long the logic should keep trying the remote call until it gives up @@ -1058,14 +1089,25 @@ public Write withAttemptTimeout(Duration timeout) { * *

Does not modify this object. */ - public Write withOperationTimeout(Duration timeout) { - checkArgument(timeout.isLongerThan(Duration.ZERO), "operation timeout must be positive"); + public Write withOperationTimeout(ValueProvider timeout) { BigtableWriteOptions options = getBigtableWriteOptions(); return toBuilder() .setBigtableWriteOptions(options.toBuilder().setOperationTimeout(timeout).build()) .build(); } + /** + * Returns a new {@link BigtableIO.Write} with the operation timeout. Operation timeout has + * ultimate control over how long the logic should keep trying the remote call until it gives up + * completely. + * + *

Does not modify this object. + */ + public Write withOperationTimeout(Duration timeout) { + checkArgument(timeout.isLongerThan(Duration.ZERO), "operation timeout must be positive"); + return withOperationTimeout(StaticValueProvider.of(timeout)); + } + /** * Returns a new {@link BigtableIO.Write} with the max elements a batch can have. After this * many elements are accumulated, they will be wrapped up in a batch and sent to Bigtable. @@ -2183,7 +2225,7 @@ static ReadChangeStream create() { abstract @Nullable Duration getBacklogReplicationAdjustment(); - abstract @Nullable Duration getReadChangeStreamTimeout(); + abstract @Nullable ValueProvider getReadChangeStreamTimeout(); abstract @Nullable Boolean getValidateConfig(); @@ -2403,10 +2445,23 @@ public ReadChangeStream withBacklogReplicationAdjustment(Duration adjustment) { * *

Does not modify this object. */ - public ReadChangeStream withReadChangeStreamTimeout(Duration timeout) { + public ReadChangeStream withReadChangeStreamTimeout(ValueProvider timeout) { return toBuilder().setReadChangeStreamTimeout(timeout).build(); } + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that overrides timeout for ReadChangeStream + * remote calls. + * + *

This is useful to override the default of 15s timeout if the checkpoint duration is longer + * than 15s. + * + *

Does not modify this object. + */ + public ReadChangeStream withReadChangeStreamTimeout(Duration timeout) { + return withReadChangeStreamTimeout(StaticValueProvider.of(timeout)); + } + /** * Disables validation that the table being read and the metadata table exists, and that the app * profile used is single cluster and single row transaction enabled. Set this option if the @@ -2599,7 +2654,7 @@ abstract ReadChangeStream.Builder setExistingPipelineOptions( abstract ReadChangeStream.Builder setBacklogReplicationAdjustment(Duration adjustment); - abstract ReadChangeStream.Builder setReadChangeStreamTimeout(Duration timeout); + abstract ReadChangeStream.Builder setReadChangeStreamTimeout(ValueProvider timeout); abstract ReadChangeStream.Builder setValidateConfig(boolean validateConfig); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java index af4b7da18b17..ff1b82af295f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java @@ -53,15 +53,15 @@ abstract class BigtableReadOptions implements Serializable { abstract @Nullable Integer getMaxBufferElementCount(); /** Returns the attempt timeout of the reads. */ - abstract @Nullable Duration getAttemptTimeout(); + abstract @Nullable ValueProvider getAttemptTimeout(); /** Returns the operation timeout of the reads. */ - abstract @Nullable Duration getOperationTimeout(); + abstract @Nullable ValueProvider getOperationTimeout(); /** * Watchdog will kill the stream after waiting this much time for the next response from server. */ - abstract @Nullable Duration getWaitTimeout(); + abstract @Nullable ValueProvider getWaitTimeout(); abstract Builder toBuilder(); @@ -82,11 +82,23 @@ abstract static class Builder { abstract Builder setKeyRanges(ValueProvider> keyRanges); - abstract Builder setAttemptTimeout(Duration timeout); + abstract Builder setAttemptTimeout(ValueProvider timeout); - abstract Builder setOperationTimeout(Duration timeout); + Builder setAttemptTimeout(Duration timeout) { + return setAttemptTimeout(ValueProvider.StaticValueProvider.of(timeout)); + } + + abstract Builder setOperationTimeout(ValueProvider timeout); + + Builder setOperationTimeout(Duration timeout) { + return setOperationTimeout(ValueProvider.StaticValueProvider.of(timeout)); + } + + abstract Builder setWaitTimeout(ValueProvider timeout); - abstract Builder setWaitTimeout(Duration timeout); + Builder setWaitTimeout(Duration timeout) { + return setWaitTimeout(ValueProvider.StaticValueProvider.of(timeout)); + } abstract BigtableReadOptions build(); } @@ -152,9 +164,12 @@ void validate() { } } - if (getAttemptTimeout() != null && getOperationTimeout() != null) { + if (getAttemptTimeout() != null + && getAttemptTimeout().isAccessible() + && getOperationTimeout() != null + && getOperationTimeout().isAccessible()) { checkArgument( - getAttemptTimeout().isShorterThan(getOperationTimeout()), + getAttemptTimeout().get().isShorterThan(getOperationTimeout().get()), "attempt timeout can't be longer than operation timeout"); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java index a63cc575809b..6b26bc468ded 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java @@ -37,10 +37,10 @@ abstract class BigtableWriteOptions implements Serializable { abstract @Nullable ValueProvider getTableId(); /** Returns the attempt timeout for writes. */ - abstract @Nullable Duration getAttemptTimeout(); + abstract @Nullable ValueProvider getAttemptTimeout(); /** Returns the operation timeout for writes. */ - abstract @Nullable Duration getOperationTimeout(); + abstract @Nullable ValueProvider getOperationTimeout(); /** Returns the max number of elements of a batch. */ abstract @Nullable Long getMaxElementsPerBatch(); @@ -61,7 +61,7 @@ abstract class BigtableWriteOptions implements Serializable { abstract @Nullable Boolean getFlowControl(); /** Returns the time to wait when closing the writer. */ - abstract @Nullable Duration getCloseWaitTimeout(); + abstract @Nullable ValueProvider getCloseWaitTimeout(); abstract Builder toBuilder(); @@ -74,9 +74,17 @@ abstract static class Builder { abstract Builder setTableId(ValueProvider tableId); - abstract Builder setAttemptTimeout(Duration timeout); + abstract Builder setAttemptTimeout(ValueProvider timeout); - abstract Builder setOperationTimeout(Duration timeout); + Builder setAttemptTimeout(Duration timeout) { + return setAttemptTimeout(ValueProvider.StaticValueProvider.of(timeout)); + } + + abstract Builder setOperationTimeout(ValueProvider timeout); + + Builder setOperationTimeout(Duration timeout) { + return setOperationTimeout(ValueProvider.StaticValueProvider.of(timeout)); + } abstract Builder setMaxElementsPerBatch(long size); @@ -90,7 +98,11 @@ abstract static class Builder { abstract Builder setFlowControl(boolean enableFlowControl); - abstract Builder setCloseWaitTimeout(Duration timeout); + abstract Builder setCloseWaitTimeout(ValueProvider timeout); + + Builder setCloseWaitTimeout(Duration timeout) { + return setCloseWaitTimeout(ValueProvider.StaticValueProvider.of(timeout)); + } abstract BigtableWriteOptions build(); } @@ -130,9 +142,12 @@ void validate() { getTableId() != null && (!getTableId().isAccessible() || !getTableId().get().isEmpty()), "Could not obtain Bigtable table id"); - if (getAttemptTimeout() != null && getOperationTimeout() != null) { + if (getAttemptTimeout() != null + && getAttemptTimeout().isAccessible() + && getOperationTimeout() != null + && getOperationTimeout().isAccessible()) { checkArgument( - getAttemptTimeout().isShorterThan(getOperationTimeout()), + getAttemptTimeout().get().isShorterThan(getOperationTimeout().get()), "attempt timeout can't be longer than operation timeout"); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java index 1545c8eb86d7..c85b693545bf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java @@ -47,7 +47,7 @@ public class DaoFactory implements Serializable, AutoCloseable { private final String metadataTableId; private final String changeStreamName; - private @Nullable Duration readChangeStreamTimeout; + private @Nullable ValueProvider readChangeStreamTimeout; public DaoFactory( BigtableConfig changeStreamConfig, @@ -76,7 +76,7 @@ public void close() { } } - public void setReadChangeStreamTimeout(@Nullable Duration readChangeStreamTimeout) { + public void setReadChangeStreamTimeout(@Nullable ValueProvider readChangeStreamTimeout) { this.readChangeStreamTimeout = readChangeStreamTimeout; } @@ -117,7 +117,7 @@ public synchronized ChangeStreamDao getChangeStreamDao() throws IOException { checkArgumentNotNull(changeStreamConfig.getAppProfileId()); if (readChangeStreamTimeout != null) { BigtableChangeStreamAccessor.setReadChangeStreamTimeout( - org.threeten.bp.Duration.ofMillis(readChangeStreamTimeout.getMillis())); + org.threeten.bp.Duration.ofMillis(readChangeStreamTimeout.get().getMillis())); } BigtableDataClient dataClient = BigtableChangeStreamAccessor.getOrCreate(changeStreamConfig).getDataClient(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslatorTest.java index 802f04506efe..cfa063c9827b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslatorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslatorTest.java @@ -104,8 +104,8 @@ public void testBigtableOptionsToBigtableReadOptions() throws Exception { assertNotNull(fromBigtableOptions.getAttemptTimeout()); assertNotNull(fromBigtableOptions.getOperationTimeout()); - assertEquals(org.joda.time.Duration.millis(100), fromBigtableOptions.getAttemptTimeout()); - assertEquals(org.joda.time.Duration.millis(1000), fromBigtableOptions.getOperationTimeout()); + assertEquals(org.joda.time.Duration.millis(100), fromBigtableOptions.getAttemptTimeout().get()); + assertEquals(org.joda.time.Duration.millis(1000), fromBigtableOptions.getOperationTimeout().get()); } @Test @@ -145,8 +145,8 @@ public void testBigtableOptionsToBigtableWriteOptions() throws Exception { assertNotNull(fromBigtableOptions.getMaxOutstandingElements()); assertNotNull(fromBigtableOptions.getMaxOutstandingBytes()); - assertEquals(org.joda.time.Duration.millis(200), fromBigtableOptions.getAttemptTimeout()); - assertEquals(org.joda.time.Duration.millis(2000), fromBigtableOptions.getOperationTimeout()); + assertEquals(org.joda.time.Duration.millis(200), fromBigtableOptions.getAttemptTimeout().get()); + assertEquals(org.joda.time.Duration.millis(2000), fromBigtableOptions.getOperationTimeout().get()); assertEquals(20, (long) fromBigtableOptions.getMaxBytesPerBatch()); assertEquals(100, (long) fromBigtableOptions.getMaxElementsPerBatch()); assertEquals(5 * 100, (long) fromBigtableOptions.getMaxOutstandingElements()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 8d9a0f1bcfa5..85e56c379d86 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -2151,7 +2151,7 @@ public void testReadChangeStreamBuildsCorrectly() { assertEquals("change-stream-name", readChangeStream.getChangeStreamName()); assertEquals(startTime, readChangeStream.getStartTime()); assertEquals(Duration.standardMinutes(1), readChangeStream.getBacklogReplicationAdjustment()); - assertEquals(Duration.standardMinutes(1), readChangeStream.getReadChangeStreamTimeout()); + assertEquals(Duration.standardMinutes(1), readChangeStream.getReadChangeStreamTimeout().get()); assertEquals(false, readChangeStream.getCreateOrUpdateMetadataTable()); assertEquals( BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS, From 179902f8143628bc54f39dfc5d5fd822f7b7770c Mon Sep 17 00:00:00 2001 From: Brandt Newton Date: Mon, 8 Jun 2026 13:45:31 -0400 Subject: [PATCH 2/3] fix imports --- .../apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java | 2 +- .../beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index f7aa50a7437f..69944609a57e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -129,7 +129,7 @@ public BigtableWriterImpl openForWriting(BigtableWriteOptions writeOptions) { projectId, instanceId, writeOptions.getTableId().get(), - writeOptions.getCloseWaitTimeout()); + writeOptions.getCloseWaitTimeout().get()); } @VisibleForTesting diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java index c85b693545bf..100636983b9a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java @@ -26,6 +26,7 @@ import java.io.Serializable; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.gcp.bigtable.BigtableConfig; +import org.apache.beam.sdk.options.ValueProvider; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; From 3d882073c37cf54914086151c4d0e478e3cc3ad8 Mon Sep 17 00:00:00 2001 From: Brandt Newton Date: Mon, 8 Jun 2026 15:01:47 -0400 Subject: [PATCH 3/3] fmt --- .../sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java | 3 ++- .../sdk/io/gcp/bigtable/BigtableConfigTranslatorTest.java | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java index 100636983b9a..a45f4e14c16f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java @@ -77,7 +77,8 @@ public void close() { } } - public void setReadChangeStreamTimeout(@Nullable ValueProvider readChangeStreamTimeout) { + public void setReadChangeStreamTimeout( + @Nullable ValueProvider readChangeStreamTimeout) { this.readChangeStreamTimeout = readChangeStreamTimeout; } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslatorTest.java index cfa063c9827b..fe004d5a56ff 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslatorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslatorTest.java @@ -105,7 +105,8 @@ public void testBigtableOptionsToBigtableReadOptions() throws Exception { assertNotNull(fromBigtableOptions.getOperationTimeout()); assertEquals(org.joda.time.Duration.millis(100), fromBigtableOptions.getAttemptTimeout().get()); - assertEquals(org.joda.time.Duration.millis(1000), fromBigtableOptions.getOperationTimeout().get()); + assertEquals( + org.joda.time.Duration.millis(1000), fromBigtableOptions.getOperationTimeout().get()); } @Test @@ -146,7 +147,8 @@ public void testBigtableOptionsToBigtableWriteOptions() throws Exception { assertNotNull(fromBigtableOptions.getMaxOutstandingBytes()); assertEquals(org.joda.time.Duration.millis(200), fromBigtableOptions.getAttemptTimeout().get()); - assertEquals(org.joda.time.Duration.millis(2000), fromBigtableOptions.getOperationTimeout().get()); + assertEquals( + org.joda.time.Duration.millis(2000), fromBigtableOptions.getOperationTimeout().get()); assertEquals(20, (long) fromBigtableOptions.getMaxBytesPerBatch()); assertEquals(100, (long) fromBigtableOptions.getMaxElementsPerBatch()); assertEquals(5 * 100, (long) fromBigtableOptions.getMaxOutstandingElements());