Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,16 +234,16 @@ private static BigtableDataSettings configureWriteSettings(
// with the old behavior.
long initialRpcTimeout =
writeOptions.getAttemptTimeout() != null
? writeOptions.getAttemptTimeout().getMillis()
? writeOptions.getAttemptTimeout().get().getMillis()
: (fromBigtableOptions != null && fromBigtableOptions.getAttemptTimeout() != null
? fromBigtableOptions.getAttemptTimeout().getMillis()
? fromBigtableOptions.getAttemptTimeout().get().getMillis()
: Duration.ofMinutes(6).toMillis());

long totalTimeout =
writeOptions.getOperationTimeout() != null
? writeOptions.getOperationTimeout().getMillis()
? writeOptions.getOperationTimeout().get().getMillis()
: (fromBigtableOptions != null && fromBigtableOptions.getOperationTimeout() != null
? fromBigtableOptions.getOperationTimeout().getMillis()
? fromBigtableOptions.getOperationTimeout().get().getMillis()
: retrySettings.getTotalTimeout().toMillis());

retrySettings
Expand Down Expand Up @@ -334,26 +334,26 @@ private static BigtableDataSettings configureReadSettings(
// Options set directly on readOptions overrides Options set in BigtableOptions
long initialRpcTimeout =
readOptions.getAttemptTimeout() != null
? readOptions.getAttemptTimeout().getMillis()
? readOptions.getAttemptTimeout().get().getMillis()
: (optionsFromBigtableOptions != null
&& optionsFromBigtableOptions.getAttemptTimeout() != null
? optionsFromBigtableOptions.getAttemptTimeout().getMillis()
? optionsFromBigtableOptions.getAttemptTimeout().get().getMillis()
: retrySettings.getInitialRpcTimeout().toMillis());

long totalTimeout =
readOptions.getOperationTimeout() != null
? readOptions.getOperationTimeout().getMillis()
? readOptions.getOperationTimeout().get().getMillis()
: (optionsFromBigtableOptions != null
&& optionsFromBigtableOptions.getOperationTimeout() != null
? optionsFromBigtableOptions.getOperationTimeout().getMillis()
? optionsFromBigtableOptions.getOperationTimeout().get().getMillis()
: retrySettings.getTotalTimeout().toMillis());

long waitTimeout =
readOptions.getWaitTimeout() != null
? readOptions.getWaitTimeout().getMillis()
? readOptions.getWaitTimeout().get().getMillis()
: (optionsFromBigtableOptions != null
&& optionsFromBigtableOptions.getWaitTimeout() != null
? optionsFromBigtableOptions.getWaitTimeout().getMillis()
? optionsFromBigtableOptions.getWaitTimeout().get().getMillis()
: settings.stubSettings().readRowsSettings().getWaitTimeout().toMillis());

retrySettings
Expand Down Expand Up @@ -464,14 +464,19 @@ static BigtableReadOptions translateToBigtableReadOptions(
BigtableReadOptions readOptions, BigtableOptions options) {
BigtableReadOptions.Builder builder = readOptions.toBuilder();
builder.setWaitTimeout(
org.joda.time.Duration.millis(options.getRetryOptions().getReadPartialRowTimeoutMillis()));
ValueProvider.StaticValueProvider.of(
org.joda.time.Duration.millis(
options.getRetryOptions().getReadPartialRowTimeoutMillis())));
if (options.getCallOptionsConfig().getReadStreamRpcAttemptTimeoutMs().isPresent()) {
builder.setAttemptTimeout(
org.joda.time.Duration.millis(
options.getCallOptionsConfig().getReadStreamRpcAttemptTimeoutMs().get()));
ValueProvider.StaticValueProvider.of(
org.joda.time.Duration.millis(
options.getCallOptionsConfig().getReadStreamRpcAttemptTimeoutMs().get())));
}
builder.setOperationTimeout(
org.joda.time.Duration.millis(options.getCallOptionsConfig().getReadStreamRpcTimeoutMs()));
ValueProvider.StaticValueProvider.of(
org.joda.time.Duration.millis(
options.getCallOptionsConfig().getReadStreamRpcTimeoutMs())));
return builder.build();
}

Expand All @@ -482,14 +487,16 @@ static BigtableWriteOptions translateToBigtableWriteOptions(
// configure timeouts
if (options.getCallOptionsConfig().getMutateRpcAttemptTimeoutMs().isPresent()) {
builder.setAttemptTimeout(
org.joda.time.Duration.millis(
options.getCallOptionsConfig().getMutateRpcAttemptTimeoutMs().get()));
ValueProvider.StaticValueProvider.of(
org.joda.time.Duration.millis(
options.getCallOptionsConfig().getMutateRpcAttemptTimeoutMs().get())));
}
if (options.getBulkOptions().isEnableBulkMutationThrottling()) {
builder.setThrottlingTargetMs(options.getBulkOptions().getBulkMutationRpcTargetMs());
}
builder.setOperationTimeout(
org.joda.time.Duration.millis(options.getCallOptionsConfig().getMutateRpcTimeoutMs()));
ValueProvider.StaticValueProvider.of(
org.joda.time.Duration.millis(options.getCallOptionsConfig().getMutateRpcTimeoutMs())));
// configure batch size
builder.setMaxElementsPerBatch(options.getBulkOptions().getBulkMaxRowKeyCount());
builder.setMaxBytesPerBatch(options.getBulkOptions().getBulkMaxRequestSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,29 +705,50 @@ public Read withEmulator(String emulatorHost) {
*
* <p>Does not modify this object.
*/
public Read withAttemptTimeout(Duration timeout) {
checkArgument(timeout.isLongerThan(Duration.ZERO), "attempt timeout must be positive");
public Read withAttemptTimeout(ValueProvider<Duration> timeout) {
BigtableReadOptions readOptions = getBigtableReadOptions();
return toBuilder()
.setBigtableReadOptions(readOptions.toBuilder().setAttemptTimeout(timeout).build())
.build();
}

/**
* Returns a new {@link BigtableIO.Read} with the attempt timeout. Attempt timeout controls the
* timeout for each remote call.
*
* <p>Does not modify this object.
*/
public Read withAttemptTimeout(Duration timeout) {
checkArgument(timeout.isLongerThan(Duration.ZERO), "attempt timeout must be positive");
return withAttemptTimeout(StaticValueProvider.of(timeout));
}

/**
* Returns a new {@link BigtableIO.Read} with the operation timeout. Operation timeout has
* ultimate control over how long the logic should keep trying the remote call until it gives up
* completely.
*
* <p>Does not modify this object.
*/
public Read withOperationTimeout(Duration timeout) {
checkArgument(timeout.isLongerThan(Duration.ZERO), "operation timeout must be positive");
public Read withOperationTimeout(ValueProvider<Duration> timeout) {
BigtableReadOptions readOptions = getBigtableReadOptions();
return toBuilder()
.setBigtableReadOptions(readOptions.toBuilder().setOperationTimeout(timeout).build())
.build();
}

/**
* Returns a new {@link BigtableIO.Read} with the operation timeout. Operation timeout has
* ultimate control over how long the logic should keep trying the remote call until it gives up
* completely.
*
* <p>Does not modify this object.
*/
public Read withOperationTimeout(Duration timeout) {
checkArgument(timeout.isLongerThan(Duration.ZERO), "operation timeout must be positive");
return withOperationTimeout(StaticValueProvider.of(timeout));
}

Read withServiceFactory(BigtableServiceFactory factory) {
return toBuilder().setServiceFactory(factory).build();
}
Expand Down Expand Up @@ -1043,29 +1064,50 @@ public Write withEmulator(String emulatorHost) {
*
* <p>Does not modify this object.
*/
public Write withAttemptTimeout(Duration timeout) {
checkArgument(timeout.isLongerThan(Duration.ZERO), "attempt timeout must be positive");
public Write withAttemptTimeout(ValueProvider<Duration> timeout) {
BigtableWriteOptions options = getBigtableWriteOptions();
return toBuilder()
.setBigtableWriteOptions(options.toBuilder().setAttemptTimeout(timeout).build())
.build();
}

/**
* Returns a new {@link BigtableIO.Write} with the attempt timeout. Attempt timeout controls the
* timeout for each remote call.
*
* <p>Does not modify this object.
*/
public Write withAttemptTimeout(Duration timeout) {
checkArgument(timeout.isLongerThan(Duration.ZERO), "attempt timeout must be positive");
return withAttemptTimeout(StaticValueProvider.of(timeout));
}

/**
* Returns a new {@link BigtableIO.Write} with the operation timeout. Operation timeout has
* ultimate control over how long the logic should keep trying the remote call until it gives up
* completely.
*
* <p>Does not modify this object.
*/
public Write withOperationTimeout(Duration timeout) {
checkArgument(timeout.isLongerThan(Duration.ZERO), "operation timeout must be positive");
public Write withOperationTimeout(ValueProvider<Duration> timeout) {
BigtableWriteOptions options = getBigtableWriteOptions();
return toBuilder()
.setBigtableWriteOptions(options.toBuilder().setOperationTimeout(timeout).build())
.build();
}

/**
* Returns a new {@link BigtableIO.Write} with the operation timeout. Operation timeout has
* ultimate control over how long the logic should keep trying the remote call until it gives up
* completely.
*
* <p>Does not modify this object.
*/
public Write withOperationTimeout(Duration timeout) {
checkArgument(timeout.isLongerThan(Duration.ZERO), "operation timeout must be positive");
return withOperationTimeout(StaticValueProvider.of(timeout));
}

/**
* Returns a new {@link BigtableIO.Write} with the max elements a batch can have. After this
* many elements are accumulated, they will be wrapped up in a batch and sent to Bigtable.
Expand Down Expand Up @@ -2183,7 +2225,7 @@ static ReadChangeStream create() {

abstract @Nullable Duration getBacklogReplicationAdjustment();

abstract @Nullable Duration getReadChangeStreamTimeout();
abstract @Nullable ValueProvider<Duration> getReadChangeStreamTimeout();

abstract @Nullable Boolean getValidateConfig();

Expand Down Expand Up @@ -2403,10 +2445,23 @@ public ReadChangeStream withBacklogReplicationAdjustment(Duration adjustment) {
*
* <p>Does not modify this object.
*/
public ReadChangeStream withReadChangeStreamTimeout(Duration timeout) {
public ReadChangeStream withReadChangeStreamTimeout(ValueProvider<Duration> timeout) {
return toBuilder().setReadChangeStreamTimeout(timeout).build();
}

/**
* Returns a new {@link BigtableIO.ReadChangeStream} that overrides timeout for ReadChangeStream
* remote calls.
*
* <p>This is useful to override the default of 15s timeout if the checkpoint duration is longer
* than 15s.
*
* <p>Does not modify this object.
*/
public ReadChangeStream withReadChangeStreamTimeout(Duration timeout) {
return withReadChangeStreamTimeout(StaticValueProvider.of(timeout));
}

/**
* Disables validation that the table being read and the metadata table exists, and that the app
* profile used is single cluster and single row transaction enabled. Set this option if the
Expand Down Expand Up @@ -2599,7 +2654,7 @@ abstract ReadChangeStream.Builder setExistingPipelineOptions(

abstract ReadChangeStream.Builder setBacklogReplicationAdjustment(Duration adjustment);

abstract ReadChangeStream.Builder setReadChangeStreamTimeout(Duration timeout);
abstract ReadChangeStream.Builder setReadChangeStreamTimeout(ValueProvider<Duration> timeout);

abstract ReadChangeStream.Builder setValidateConfig(boolean validateConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ abstract class BigtableReadOptions implements Serializable {
abstract @Nullable Integer getMaxBufferElementCount();

/** Returns the attempt timeout of the reads. */
abstract @Nullable Duration getAttemptTimeout();
abstract @Nullable ValueProvider<Duration> getAttemptTimeout();

/** Returns the operation timeout of the reads. */
abstract @Nullable Duration getOperationTimeout();
abstract @Nullable ValueProvider<Duration> getOperationTimeout();

/**
* Watchdog will kill the stream after waiting this much time for the next response from server.
*/
abstract @Nullable Duration getWaitTimeout();
abstract @Nullable ValueProvider<Duration> getWaitTimeout();

abstract Builder toBuilder();

Expand All @@ -82,11 +82,23 @@ abstract static class Builder {

abstract Builder setKeyRanges(ValueProvider<List<ByteKeyRange>> keyRanges);

abstract Builder setAttemptTimeout(Duration timeout);
abstract Builder setAttemptTimeout(ValueProvider<Duration> timeout);

abstract Builder setOperationTimeout(Duration timeout);
Builder setAttemptTimeout(Duration timeout) {
return setAttemptTimeout(ValueProvider.StaticValueProvider.of(timeout));
}

abstract Builder setOperationTimeout(ValueProvider<Duration> timeout);

Builder setOperationTimeout(Duration timeout) {
return setOperationTimeout(ValueProvider.StaticValueProvider.of(timeout));
}

abstract Builder setWaitTimeout(ValueProvider<Duration> timeout);

abstract Builder setWaitTimeout(Duration timeout);
Builder setWaitTimeout(Duration timeout) {
return setWaitTimeout(ValueProvider.StaticValueProvider.of(timeout));
}

abstract BigtableReadOptions build();
}
Expand Down Expand Up @@ -152,9 +164,12 @@ void validate() {
}
}

if (getAttemptTimeout() != null && getOperationTimeout() != null) {
if (getAttemptTimeout() != null
&& getAttemptTimeout().isAccessible()
&& getOperationTimeout() != null
&& getOperationTimeout().isAccessible()) {
checkArgument(
getAttemptTimeout().isShorterThan(getOperationTimeout()),
getAttemptTimeout().get().isShorterThan(getOperationTimeout().get()),
"attempt timeout can't be longer than operation timeout");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public BigtableWriterImpl openForWriting(BigtableWriteOptions writeOptions) {
projectId,
instanceId,
writeOptions.getTableId().get(),
writeOptions.getCloseWaitTimeout());
writeOptions.getCloseWaitTimeout().get());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ abstract class BigtableWriteOptions implements Serializable {
abstract @Nullable ValueProvider<String> getTableId();

/** Returns the attempt timeout for writes. */
abstract @Nullable Duration getAttemptTimeout();
abstract @Nullable ValueProvider<Duration> getAttemptTimeout();

/** Returns the operation timeout for writes. */
abstract @Nullable Duration getOperationTimeout();
abstract @Nullable ValueProvider<Duration> getOperationTimeout();

/** Returns the max number of elements of a batch. */
abstract @Nullable Long getMaxElementsPerBatch();
Expand All @@ -61,7 +61,7 @@ abstract class BigtableWriteOptions implements Serializable {
abstract @Nullable Boolean getFlowControl();

/** Returns the time to wait when closing the writer. */
abstract @Nullable Duration getCloseWaitTimeout();
abstract @Nullable ValueProvider<Duration> getCloseWaitTimeout();

abstract Builder toBuilder();

Expand All @@ -74,9 +74,17 @@ abstract static class Builder {

abstract Builder setTableId(ValueProvider<String> tableId);

abstract Builder setAttemptTimeout(Duration timeout);
abstract Builder setAttemptTimeout(ValueProvider<Duration> timeout);

abstract Builder setOperationTimeout(Duration timeout);
Builder setAttemptTimeout(Duration timeout) {
return setAttemptTimeout(ValueProvider.StaticValueProvider.of(timeout));
}

abstract Builder setOperationTimeout(ValueProvider<Duration> timeout);

Builder setOperationTimeout(Duration timeout) {
return setOperationTimeout(ValueProvider.StaticValueProvider.of(timeout));
}

abstract Builder setMaxElementsPerBatch(long size);

Expand All @@ -90,7 +98,11 @@ abstract static class Builder {

abstract Builder setFlowControl(boolean enableFlowControl);

abstract Builder setCloseWaitTimeout(Duration timeout);
abstract Builder setCloseWaitTimeout(ValueProvider<Duration> timeout);

Builder setCloseWaitTimeout(Duration timeout) {
return setCloseWaitTimeout(ValueProvider.StaticValueProvider.of(timeout));
}

abstract BigtableWriteOptions build();
}
Expand Down Expand Up @@ -130,9 +142,12 @@ void validate() {
getTableId() != null && (!getTableId().isAccessible() || !getTableId().get().isEmpty()),
"Could not obtain Bigtable table id");

if (getAttemptTimeout() != null && getOperationTimeout() != null) {
if (getAttemptTimeout() != null
&& getAttemptTimeout().isAccessible()
&& getOperationTimeout() != null
&& getOperationTimeout().isAccessible()) {
checkArgument(
getAttemptTimeout().isShorterThan(getOperationTimeout()),
getAttemptTimeout().get().isShorterThan(getOperationTimeout().get()),
"attempt timeout can't be longer than operation timeout");
}
}
Expand Down
Loading
Loading