diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java
index 78fef141515f..3f63a6e5462e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java
@@ -234,16 +234,16 @@ private static BigtableDataSettings configureWriteSettings(
// with the old behavior.
long initialRpcTimeout =
writeOptions.getAttemptTimeout() != null
- ? writeOptions.getAttemptTimeout().getMillis()
+ ? writeOptions.getAttemptTimeout().get().getMillis()
: (fromBigtableOptions != null && fromBigtableOptions.getAttemptTimeout() != null
- ? fromBigtableOptions.getAttemptTimeout().getMillis()
+ ? fromBigtableOptions.getAttemptTimeout().get().getMillis()
: Duration.ofMinutes(6).toMillis());
long totalTimeout =
writeOptions.getOperationTimeout() != null
- ? writeOptions.getOperationTimeout().getMillis()
+ ? writeOptions.getOperationTimeout().get().getMillis()
: (fromBigtableOptions != null && fromBigtableOptions.getOperationTimeout() != null
- ? fromBigtableOptions.getOperationTimeout().getMillis()
+ ? fromBigtableOptions.getOperationTimeout().get().getMillis()
: retrySettings.getTotalTimeout().toMillis());
retrySettings
@@ -334,26 +334,26 @@ private static BigtableDataSettings configureReadSettings(
// Options set directly on readOptions overrides Options set in BigtableOptions
long initialRpcTimeout =
readOptions.getAttemptTimeout() != null
- ? readOptions.getAttemptTimeout().getMillis()
+ ? readOptions.getAttemptTimeout().get().getMillis()
: (optionsFromBigtableOptions != null
&& optionsFromBigtableOptions.getAttemptTimeout() != null
- ? optionsFromBigtableOptions.getAttemptTimeout().getMillis()
+ ? optionsFromBigtableOptions.getAttemptTimeout().get().getMillis()
: retrySettings.getInitialRpcTimeout().toMillis());
long totalTimeout =
readOptions.getOperationTimeout() != null
- ? readOptions.getOperationTimeout().getMillis()
+ ? readOptions.getOperationTimeout().get().getMillis()
: (optionsFromBigtableOptions != null
&& optionsFromBigtableOptions.getOperationTimeout() != null
- ? optionsFromBigtableOptions.getOperationTimeout().getMillis()
+ ? optionsFromBigtableOptions.getOperationTimeout().get().getMillis()
: retrySettings.getTotalTimeout().toMillis());
long waitTimeout =
readOptions.getWaitTimeout() != null
- ? readOptions.getWaitTimeout().getMillis()
+ ? readOptions.getWaitTimeout().get().getMillis()
: (optionsFromBigtableOptions != null
&& optionsFromBigtableOptions.getWaitTimeout() != null
- ? optionsFromBigtableOptions.getWaitTimeout().getMillis()
+ ? optionsFromBigtableOptions.getWaitTimeout().get().getMillis()
: settings.stubSettings().readRowsSettings().getWaitTimeout().toMillis());
retrySettings
@@ -464,14 +464,19 @@ static BigtableReadOptions translateToBigtableReadOptions(
BigtableReadOptions readOptions, BigtableOptions options) {
BigtableReadOptions.Builder builder = readOptions.toBuilder();
builder.setWaitTimeout(
- org.joda.time.Duration.millis(options.getRetryOptions().getReadPartialRowTimeoutMillis()));
+ ValueProvider.StaticValueProvider.of(
+ org.joda.time.Duration.millis(
+ options.getRetryOptions().getReadPartialRowTimeoutMillis())));
if (options.getCallOptionsConfig().getReadStreamRpcAttemptTimeoutMs().isPresent()) {
builder.setAttemptTimeout(
- org.joda.time.Duration.millis(
- options.getCallOptionsConfig().getReadStreamRpcAttemptTimeoutMs().get()));
+ ValueProvider.StaticValueProvider.of(
+ org.joda.time.Duration.millis(
+ options.getCallOptionsConfig().getReadStreamRpcAttemptTimeoutMs().get())));
}
builder.setOperationTimeout(
- org.joda.time.Duration.millis(options.getCallOptionsConfig().getReadStreamRpcTimeoutMs()));
+ ValueProvider.StaticValueProvider.of(
+ org.joda.time.Duration.millis(
+ options.getCallOptionsConfig().getReadStreamRpcTimeoutMs())));
return builder.build();
}
@@ -482,14 +487,16 @@ static BigtableWriteOptions translateToBigtableWriteOptions(
// configure timeouts
if (options.getCallOptionsConfig().getMutateRpcAttemptTimeoutMs().isPresent()) {
builder.setAttemptTimeout(
- org.joda.time.Duration.millis(
- options.getCallOptionsConfig().getMutateRpcAttemptTimeoutMs().get()));
+ ValueProvider.StaticValueProvider.of(
+ org.joda.time.Duration.millis(
+ options.getCallOptionsConfig().getMutateRpcAttemptTimeoutMs().get())));
}
if (options.getBulkOptions().isEnableBulkMutationThrottling()) {
builder.setThrottlingTargetMs(options.getBulkOptions().getBulkMutationRpcTargetMs());
}
builder.setOperationTimeout(
- org.joda.time.Duration.millis(options.getCallOptionsConfig().getMutateRpcTimeoutMs()));
+ ValueProvider.StaticValueProvider.of(
+ org.joda.time.Duration.millis(options.getCallOptionsConfig().getMutateRpcTimeoutMs())));
// configure batch size
builder.setMaxElementsPerBatch(options.getBulkOptions().getBulkMaxRowKeyCount());
builder.setMaxBytesPerBatch(options.getBulkOptions().getBulkMaxRequestSize());
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 100078d32e70..126977961aa8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -705,14 +705,24 @@ public Read withEmulator(String emulatorHost) {
*
*
Does not modify this object.
*/
- public Read withAttemptTimeout(Duration timeout) {
- checkArgument(timeout.isLongerThan(Duration.ZERO), "attempt timeout must be positive");
+ public Read withAttemptTimeout(ValueProvider timeout) {
BigtableReadOptions readOptions = getBigtableReadOptions();
return toBuilder()
.setBigtableReadOptions(readOptions.toBuilder().setAttemptTimeout(timeout).build())
.build();
}
+ /**
+ * Returns a new {@link BigtableIO.Read} with the attempt timeout. Attempt timeout controls the
+ * timeout for each remote call.
+ *
+ * Does not modify this object.
+ */
+ public Read withAttemptTimeout(Duration timeout) {
+ checkArgument(timeout.isLongerThan(Duration.ZERO), "attempt timeout must be positive");
+ return withAttemptTimeout(StaticValueProvider.of(timeout));
+ }
+
/**
* Returns a new {@link BigtableIO.Read} with the operation timeout. Operation timeout has
* ultimate control over how long the logic should keep trying the remote call until it gives up
@@ -720,14 +730,25 @@ public Read withAttemptTimeout(Duration timeout) {
*
*
Does not modify this object.
*/
- public Read withOperationTimeout(Duration timeout) {
- checkArgument(timeout.isLongerThan(Duration.ZERO), "operation timeout must be positive");
+ public Read withOperationTimeout(ValueProvider timeout) {
BigtableReadOptions readOptions = getBigtableReadOptions();
return toBuilder()
.setBigtableReadOptions(readOptions.toBuilder().setOperationTimeout(timeout).build())
.build();
}
+ /**
+ * Returns a new {@link BigtableIO.Read} with the operation timeout. Operation timeout has
+ * ultimate control over how long the logic should keep trying the remote call until it gives up
+ * completely.
+ *
+ * Does not modify this object.
+ */
+ public Read withOperationTimeout(Duration timeout) {
+ checkArgument(timeout.isLongerThan(Duration.ZERO), "operation timeout must be positive");
+ return withOperationTimeout(StaticValueProvider.of(timeout));
+ }
+
Read withServiceFactory(BigtableServiceFactory factory) {
return toBuilder().setServiceFactory(factory).build();
}
@@ -1043,14 +1064,24 @@ public Write withEmulator(String emulatorHost) {
*
*
Does not modify this object.
*/
- public Write withAttemptTimeout(Duration timeout) {
- checkArgument(timeout.isLongerThan(Duration.ZERO), "attempt timeout must be positive");
+ public Write withAttemptTimeout(ValueProvider timeout) {
BigtableWriteOptions options = getBigtableWriteOptions();
return toBuilder()
.setBigtableWriteOptions(options.toBuilder().setAttemptTimeout(timeout).build())
.build();
}
+ /**
+ * Returns a new {@link BigtableIO.Write} with the attempt timeout. Attempt timeout controls the
+ * timeout for each remote call.
+ *
+ * Does not modify this object.
+ */
+ public Write withAttemptTimeout(Duration timeout) {
+ checkArgument(timeout.isLongerThan(Duration.ZERO), "attempt timeout must be positive");
+ return withAttemptTimeout(StaticValueProvider.of(timeout));
+ }
+
/**
* Returns a new {@link BigtableIO.Write} with the operation timeout. Operation timeout has
* ultimate control over how long the logic should keep trying the remote call until it gives up
@@ -1058,14 +1089,25 @@ public Write withAttemptTimeout(Duration timeout) {
*
*
Does not modify this object.
*/
- public Write withOperationTimeout(Duration timeout) {
- checkArgument(timeout.isLongerThan(Duration.ZERO), "operation timeout must be positive");
+ public Write withOperationTimeout(ValueProvider timeout) {
BigtableWriteOptions options = getBigtableWriteOptions();
return toBuilder()
.setBigtableWriteOptions(options.toBuilder().setOperationTimeout(timeout).build())
.build();
}
+ /**
+ * Returns a new {@link BigtableIO.Write} with the operation timeout. Operation timeout has
+ * ultimate control over how long the logic should keep trying the remote call until it gives up
+ * completely.
+ *
+ * Does not modify this object.
+ */
+ public Write withOperationTimeout(Duration timeout) {
+ checkArgument(timeout.isLongerThan(Duration.ZERO), "operation timeout must be positive");
+ return withOperationTimeout(StaticValueProvider.of(timeout));
+ }
+
/**
* Returns a new {@link BigtableIO.Write} with the max elements a batch can have. After this
* many elements are accumulated, they will be wrapped up in a batch and sent to Bigtable.
@@ -2183,7 +2225,7 @@ static ReadChangeStream create() {
abstract @Nullable Duration getBacklogReplicationAdjustment();
- abstract @Nullable Duration getReadChangeStreamTimeout();
+ abstract @Nullable ValueProvider getReadChangeStreamTimeout();
abstract @Nullable Boolean getValidateConfig();
@@ -2403,10 +2445,23 @@ public ReadChangeStream withBacklogReplicationAdjustment(Duration adjustment) {
*
* Does not modify this object.
*/
- public ReadChangeStream withReadChangeStreamTimeout(Duration timeout) {
+ public ReadChangeStream withReadChangeStreamTimeout(ValueProvider timeout) {
return toBuilder().setReadChangeStreamTimeout(timeout).build();
}
+ /**
+ * Returns a new {@link BigtableIO.ReadChangeStream} that overrides timeout for ReadChangeStream
+ * remote calls.
+ *
+ * This is useful to override the default of 15s timeout if the checkpoint duration is longer
+ * than 15s.
+ *
+ *
Does not modify this object.
+ */
+ public ReadChangeStream withReadChangeStreamTimeout(Duration timeout) {
+ return withReadChangeStreamTimeout(StaticValueProvider.of(timeout));
+ }
+
/**
* Disables validation that the table being read and the metadata table exists, and that the app
* profile used is single cluster and single row transaction enabled. Set this option if the
@@ -2599,7 +2654,7 @@ abstract ReadChangeStream.Builder setExistingPipelineOptions(
abstract ReadChangeStream.Builder setBacklogReplicationAdjustment(Duration adjustment);
- abstract ReadChangeStream.Builder setReadChangeStreamTimeout(Duration timeout);
+ abstract ReadChangeStream.Builder setReadChangeStreamTimeout(ValueProvider timeout);
abstract ReadChangeStream.Builder setValidateConfig(boolean validateConfig);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java
index af4b7da18b17..ff1b82af295f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java
@@ -53,15 +53,15 @@ abstract class BigtableReadOptions implements Serializable {
abstract @Nullable Integer getMaxBufferElementCount();
/** Returns the attempt timeout of the reads. */
- abstract @Nullable Duration getAttemptTimeout();
+ abstract @Nullable ValueProvider getAttemptTimeout();
/** Returns the operation timeout of the reads. */
- abstract @Nullable Duration getOperationTimeout();
+ abstract @Nullable ValueProvider getOperationTimeout();
/**
* Watchdog will kill the stream after waiting this much time for the next response from server.
*/
- abstract @Nullable Duration getWaitTimeout();
+ abstract @Nullable ValueProvider getWaitTimeout();
abstract Builder toBuilder();
@@ -82,11 +82,23 @@ abstract static class Builder {
abstract Builder setKeyRanges(ValueProvider> keyRanges);
- abstract Builder setAttemptTimeout(Duration timeout);
+ abstract Builder setAttemptTimeout(ValueProvider timeout);
- abstract Builder setOperationTimeout(Duration timeout);
+ Builder setAttemptTimeout(Duration timeout) {
+ return setAttemptTimeout(ValueProvider.StaticValueProvider.of(timeout));
+ }
+
+ abstract Builder setOperationTimeout(ValueProvider timeout);
+
+ Builder setOperationTimeout(Duration timeout) {
+ return setOperationTimeout(ValueProvider.StaticValueProvider.of(timeout));
+ }
+
+ abstract Builder setWaitTimeout(ValueProvider timeout);
- abstract Builder setWaitTimeout(Duration timeout);
+ Builder setWaitTimeout(Duration timeout) {
+ return setWaitTimeout(ValueProvider.StaticValueProvider.of(timeout));
+ }
abstract BigtableReadOptions build();
}
@@ -152,9 +164,12 @@ void validate() {
}
}
- if (getAttemptTimeout() != null && getOperationTimeout() != null) {
+ if (getAttemptTimeout() != null
+ && getAttemptTimeout().isAccessible()
+ && getOperationTimeout() != null
+ && getOperationTimeout().isAccessible()) {
checkArgument(
- getAttemptTimeout().isShorterThan(getOperationTimeout()),
+ getAttemptTimeout().get().isShorterThan(getOperationTimeout().get()),
"attempt timeout can't be longer than operation timeout");
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index f7aa50a7437f..69944609a57e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -129,7 +129,7 @@ public BigtableWriterImpl openForWriting(BigtableWriteOptions writeOptions) {
projectId,
instanceId,
writeOptions.getTableId().get(),
- writeOptions.getCloseWaitTimeout());
+ writeOptions.getCloseWaitTimeout().get());
}
@VisibleForTesting
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
index a63cc575809b..6b26bc468ded 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
@@ -37,10 +37,10 @@ abstract class BigtableWriteOptions implements Serializable {
abstract @Nullable ValueProvider getTableId();
/** Returns the attempt timeout for writes. */
- abstract @Nullable Duration getAttemptTimeout();
+ abstract @Nullable ValueProvider getAttemptTimeout();
/** Returns the operation timeout for writes. */
- abstract @Nullable Duration getOperationTimeout();
+ abstract @Nullable ValueProvider getOperationTimeout();
/** Returns the max number of elements of a batch. */
abstract @Nullable Long getMaxElementsPerBatch();
@@ -61,7 +61,7 @@ abstract class BigtableWriteOptions implements Serializable {
abstract @Nullable Boolean getFlowControl();
/** Returns the time to wait when closing the writer. */
- abstract @Nullable Duration getCloseWaitTimeout();
+ abstract @Nullable ValueProvider getCloseWaitTimeout();
abstract Builder toBuilder();
@@ -74,9 +74,17 @@ abstract static class Builder {
abstract Builder setTableId(ValueProvider tableId);
- abstract Builder setAttemptTimeout(Duration timeout);
+ abstract Builder setAttemptTimeout(ValueProvider timeout);
- abstract Builder setOperationTimeout(Duration timeout);
+ Builder setAttemptTimeout(Duration timeout) {
+ return setAttemptTimeout(ValueProvider.StaticValueProvider.of(timeout));
+ }
+
+ abstract Builder setOperationTimeout(ValueProvider timeout);
+
+ Builder setOperationTimeout(Duration timeout) {
+ return setOperationTimeout(ValueProvider.StaticValueProvider.of(timeout));
+ }
abstract Builder setMaxElementsPerBatch(long size);
@@ -90,7 +98,11 @@ abstract static class Builder {
abstract Builder setFlowControl(boolean enableFlowControl);
- abstract Builder setCloseWaitTimeout(Duration timeout);
+ abstract Builder setCloseWaitTimeout(ValueProvider timeout);
+
+ Builder setCloseWaitTimeout(Duration timeout) {
+ return setCloseWaitTimeout(ValueProvider.StaticValueProvider.of(timeout));
+ }
abstract BigtableWriteOptions build();
}
@@ -130,9 +142,12 @@ void validate() {
getTableId() != null && (!getTableId().isAccessible() || !getTableId().get().isEmpty()),
"Could not obtain Bigtable table id");
- if (getAttemptTimeout() != null && getOperationTimeout() != null) {
+ if (getAttemptTimeout() != null
+ && getAttemptTimeout().isAccessible()
+ && getOperationTimeout() != null
+ && getOperationTimeout().isAccessible()) {
checkArgument(
- getAttemptTimeout().isShorterThan(getOperationTimeout()),
+ getAttemptTimeout().get().isShorterThan(getOperationTimeout().get()),
"attempt timeout can't be longer than operation timeout");
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java
index 1545c8eb86d7..a45f4e14c16f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java
@@ -26,6 +26,7 @@
import java.io.Serializable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableConfig;
+import org.apache.beam.sdk.options.ValueProvider;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
@@ -47,7 +48,7 @@ public class DaoFactory implements Serializable, AutoCloseable {
private final String metadataTableId;
private final String changeStreamName;
- private @Nullable Duration readChangeStreamTimeout;
+ private @Nullable ValueProvider readChangeStreamTimeout;
public DaoFactory(
BigtableConfig changeStreamConfig,
@@ -76,7 +77,8 @@ public void close() {
}
}
- public void setReadChangeStreamTimeout(@Nullable Duration readChangeStreamTimeout) {
+ public void setReadChangeStreamTimeout(
+ @Nullable ValueProvider readChangeStreamTimeout) {
this.readChangeStreamTimeout = readChangeStreamTimeout;
}
@@ -117,7 +119,7 @@ public synchronized ChangeStreamDao getChangeStreamDao() throws IOException {
checkArgumentNotNull(changeStreamConfig.getAppProfileId());
if (readChangeStreamTimeout != null) {
BigtableChangeStreamAccessor.setReadChangeStreamTimeout(
- org.threeten.bp.Duration.ofMillis(readChangeStreamTimeout.getMillis()));
+ org.threeten.bp.Duration.ofMillis(readChangeStreamTimeout.get().getMillis()));
}
BigtableDataClient dataClient =
BigtableChangeStreamAccessor.getOrCreate(changeStreamConfig).getDataClient();
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslatorTest.java
index 802f04506efe..fe004d5a56ff 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslatorTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslatorTest.java
@@ -104,8 +104,9 @@ public void testBigtableOptionsToBigtableReadOptions() throws Exception {
assertNotNull(fromBigtableOptions.getAttemptTimeout());
assertNotNull(fromBigtableOptions.getOperationTimeout());
- assertEquals(org.joda.time.Duration.millis(100), fromBigtableOptions.getAttemptTimeout());
- assertEquals(org.joda.time.Duration.millis(1000), fromBigtableOptions.getOperationTimeout());
+ assertEquals(org.joda.time.Duration.millis(100), fromBigtableOptions.getAttemptTimeout().get());
+ assertEquals(
+ org.joda.time.Duration.millis(1000), fromBigtableOptions.getOperationTimeout().get());
}
@Test
@@ -145,8 +146,9 @@ public void testBigtableOptionsToBigtableWriteOptions() throws Exception {
assertNotNull(fromBigtableOptions.getMaxOutstandingElements());
assertNotNull(fromBigtableOptions.getMaxOutstandingBytes());
- assertEquals(org.joda.time.Duration.millis(200), fromBigtableOptions.getAttemptTimeout());
- assertEquals(org.joda.time.Duration.millis(2000), fromBigtableOptions.getOperationTimeout());
+ assertEquals(org.joda.time.Duration.millis(200), fromBigtableOptions.getAttemptTimeout().get());
+ assertEquals(
+ org.joda.time.Duration.millis(2000), fromBigtableOptions.getOperationTimeout().get());
assertEquals(20, (long) fromBigtableOptions.getMaxBytesPerBatch());
assertEquals(100, (long) fromBigtableOptions.getMaxElementsPerBatch());
assertEquals(5 * 100, (long) fromBigtableOptions.getMaxOutstandingElements());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 8d9a0f1bcfa5..85e56c379d86 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -2151,7 +2151,7 @@ public void testReadChangeStreamBuildsCorrectly() {
assertEquals("change-stream-name", readChangeStream.getChangeStreamName());
assertEquals(startTime, readChangeStream.getStartTime());
assertEquals(Duration.standardMinutes(1), readChangeStream.getBacklogReplicationAdjustment());
- assertEquals(Duration.standardMinutes(1), readChangeStream.getReadChangeStreamTimeout());
+ assertEquals(Duration.standardMinutes(1), readChangeStream.getReadChangeStreamTimeout().get());
assertEquals(false, readChangeStream.getCreateOrUpdateMetadataTable());
assertEquals(
BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS,