Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## 1.10.0 [unreleased]

### Breaking Changes

1. [#365](https://github.com/InfluxCommunity/influxdb3-java/pull/365): Adds partial writes support and aligns write routing with v3 defaults.
See [Partial writes](https://docs.influxdata.com/influxdb3/core/write-data/http-api/v3-write-lp/#partial-writes) for more.
For InfluxDB Clustered version, set `useV2Api=true` for writing.

## 1.9.0 [2026-04-23]

### Features
Expand Down
56 changes: 54 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ dependencies {

## Usage

### Create client

To start with the client, import the `com.influxdb.v3.client` package and create a `InfluxDBClient` by:

```java
Expand All @@ -73,6 +75,7 @@ import java.util.List;
import java.util.stream.Stream;

import com.influxdb.v3.client.InfluxDBClient;
import com.influxdb.v3.client.InfluxDBPartialWriteException;
import com.influxdb.v3.client.query.QueryOptions;
import com.influxdb.v3.client.Point;
import com.influxdb.v3.client.write.WriteOptions;
Expand All @@ -90,7 +93,9 @@ public class IOxExample {
}
```

to insert data, you can use code like this:
### Write

To insert data, you can use code like this:

```java
//
Expand All @@ -113,14 +118,61 @@ client.writePoint(
orderedTagWrite
);

//
// Write with partial acceptance (default behavior)
//
WriteOptions partialWrite = new WriteOptions.Builder().build();
try {
client.writeRecords(List.of(
"temperature,region=west value=20.0",
"temperature,region=west value=\"bad\""
), partialWrite);
} catch (InfluxDBPartialWriteException e) {
// Inspect failed line details.
e.lineErrors().forEach(line ->
System.out.printf("line=%s msg=%s lp=%s%n", line.lineNumber(), line.errorMessage(), line.originalLine()));
}

//
// Write via v2 compatibility endpoint (InfluxDB Clustered)
//
WriteOptions useV2 = new WriteOptions.Builder()
.useV2Api(true)
.build();
client.writeRecord("temperature,location=north value=60.0", useV2);

//
// Write by LineProtocol
//
String record = "temperature,location=north value=60.0";
client.writeRecord(record);
```

to query your data, you can use code like this:
#### Accept partial writes and inspect failed lines

Partial writes are enabled by default.
`acceptPartial` can be configured in three ways: client defaults via `WriteOptions`, connection string / environment variable / system property (`writeAcceptPartial` / `INFLUX_WRITE_ACCEPT_PARTIAL` / `influx.writeAcceptPartial`), or per-write `WriteOptions`.

Set `acceptPartial(false)` to disable partial writes.
With InfluxDB Core/Enterprise, when a write request fails due to one or more invalid lines, the error message starts with:

- `partial write of line protocol occurred` when partial writes are enabled.
- `parsing failed for write_lp endpoint` when partial writes are disabled.

When partial writes are disabled, any rejected line causes all lines to be rejected.
InfluxDB Clustered does not return this structured partial-write error format.

#### Compatibility with InfluxDB Clustered

For InfluxDB Clustered, enable `useV2Api` for writes.
Like other write options, this can be configured in client code, connection string / environment variable / system property (`writeUseV2Api` / `INFLUX_WRITE_USE_V2_API` / `influx.writeUseV2Api`), or per-write `WriteOptions`.

If `useV2Api` is set, `acceptPartial` is ignored because this compatibility mode does not support partial-write controls.
Any rejected line causes all lines to be rejected.

### Query

To query your data, you can use code like this:

```java
//
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/influxdb/v3/client/InfluxDBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ static InfluxDBClient getInstance(@Nonnull final ClientConfig config) {
* <li>precision - timestamp precision when writing data</li>
* <li>gzipThreshold - payload size size for gzipping data</li>
* <li>writeNoSync - skip waiting for WAL persistence on write</li>
* <li>writeAcceptPartial - accept partial writes</li>
* <li>writeUseV2Api - use v2 compatibility write endpoint</li>
* </ul>
*
* @param connectionString connection string
Expand Down Expand Up @@ -590,6 +592,8 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) {
* <li>INFLUX_PRECISION - timestamp precision when writing data</li>
* <li>INFLUX_GZIP_THRESHOLD - payload size size for gzipping data</li>
* <li>INFLUX_WRITE_NO_SYNC - skip waiting for WAL persistence on write</li>
* <li>INFLUX_WRITE_ACCEPT_PARTIAL - accept partial writes</li>
* <li>INFLUX_WRITE_USE_V2_API - use v2 compatibility write endpoint</li>
* </ul>
* Supported system properties:
* <ul>
Expand All @@ -601,6 +605,8 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) {
* <li>influx.precision - timestamp precision when writing data</li>
* <li>influx.gzipThreshold - payload size size for gzipping data</li>
* <li>influx.writeNoSync - skip waiting for WAL persistence on write</li>
* <li>influx.writeAcceptPartial - accept partial writes</li>
* <li>influx.writeUseV2Api - use v2 compatibility write endpoint</li>
* </ul>
*
* @return instance of {@link InfluxDBClient}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* The MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.influxdb.v3.client;

import java.net.http.HttpHeaders;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* HTTP exception for partial write errors returned by InfluxDB 3 write endpoint.
* Contains parsed line-level write errors so callers can decide how to handle failed lines.
*/
public class InfluxDBPartialWriteException extends InfluxDBApiHttpException {

private final List<LineError> lineErrors;

/**
* Construct a new InfluxDBPartialWriteException.
*
* @param message detail message
* @param headers response headers
* @param statusCode response status code
* @param lineErrors line-level errors parsed from response body
*/
public InfluxDBPartialWriteException(
@Nullable final String message,
@Nullable final HttpHeaders headers,
final int statusCode,
@Nonnull final List<LineError> lineErrors) {
super(message, headers, statusCode);
this.lineErrors = List.copyOf(lineErrors);
}

/**
* Line-level write errors.
*
* @return immutable list of line errors
*/
@Nonnull
public List<LineError> lineErrors() {
return lineErrors;
}

/**
* Represents one failed line from a partial write response.
*/
public static final class LineError {

private final Integer lineNumber;
private final String errorMessage;
private final String originalLine;

/**
* @param lineNumber line number in the write payload; may be null if not provided by server
* @param errorMessage line-level error message
* @param originalLine original line protocol row; may be null if not provided by server
*/
public LineError(@Nullable final Integer lineNumber,
@Nonnull final String errorMessage,
@Nullable final String originalLine) {
this.lineNumber = lineNumber;
this.errorMessage = errorMessage;
this.originalLine = originalLine;
}

/**
* @return line number or null if server didn't provide it
*/
@Nullable
public Integer lineNumber() {
return lineNumber;
}

/**
* @return line-level error message
*/
@Nonnull
public String errorMessage() {
return errorMessage;
}

/**
* @return original line protocol row or null if server didn't provide it
*/
@Nullable
public String originalLine() {
return originalLine;
}
}
}
78 changes: 77 additions & 1 deletion src/main/java/com/influxdb/v3/client/config/ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
* <li><code>defaultTags</code> - defaultTags added when writing points to InfluxDB</li>
* <li><code>gzipThreshold</code> - threshold when gzip compression is used for writing points to InfluxDB</li>
* <li><code>writeNoSync</code> - skip waiting for WAL persistence on write</li>
* <li><code>writeAcceptPartial</code> - accept partial writes</li>
* <li><code>writeUseV2Api</code> - use v2 compatibility write endpoint</li>
* <li><code>timeout</code> - <i>deprecated in 1.4.0</i> timeout when connecting to InfluxDB,
* please use more informative properties <code>writeTimeout</code> and <code>queryTimeout</code></li>
* <li><code>writeTimeout</code> - timeout when writing data to InfluxDB</li>
Expand Down Expand Up @@ -107,6 +109,8 @@ public final class ClientConfig {
private final WritePrecision writePrecision;
private final Integer gzipThreshold;
private final Boolean writeNoSync;
private final Boolean writeAcceptPartial;
private final Boolean writeUseV2Api;
private final Map<String, String> defaultTags;
@Deprecated
private final Duration timeout;
Expand Down Expand Up @@ -208,6 +212,26 @@ public Boolean getWriteNoSync() {
return writeNoSync;
}

/**
* Accept partial writes?
*
* @return accept partial writes
*/
@Nonnull
public Boolean getWriteAcceptPartial() {
return writeAcceptPartial;
}

/**
* Use v2 compatibility write endpoint?
*
* @return use v2 compatibility write endpoint
*/
@Nonnull
public Boolean getWriteUseV2Api() {
return writeUseV2Api;
}

/**
* Gets default tags used when writing points.
* @return default tags
Expand Down Expand Up @@ -370,6 +394,8 @@ public boolean equals(final Object o) {
&& writePrecision == that.writePrecision
&& Objects.equals(gzipThreshold, that.gzipThreshold)
&& Objects.equals(writeNoSync, that.writeNoSync)
&& Objects.equals(writeAcceptPartial, that.writeAcceptPartial)
&& Objects.equals(writeUseV2Api, that.writeUseV2Api)
&& Objects.equals(defaultTags, that.defaultTags)
&& Objects.equals(timeout, that.timeout)
&& Objects.equals(writeTimeout, that.writeTimeout)
Expand All @@ -388,7 +414,7 @@ public boolean equals(final Object o) {
@Override
public int hashCode() {
return Objects.hash(host, Arrays.hashCode(token), authScheme, organization,
database, writePrecision, gzipThreshold, writeNoSync,
database, writePrecision, gzipThreshold, writeNoSync, writeAcceptPartial, writeUseV2Api,
timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation,
proxy, proxyUrl, authenticator, headers,
defaultTags, sslRootsFilePath, disableGRPCCompression, interceptors);
Expand All @@ -403,6 +429,8 @@ public String toString() {
.add("writePrecision=" + writePrecision)
.add("gzipThreshold=" + gzipThreshold)
.add("writeNoSync=" + writeNoSync)
.add("writeAcceptPartial=" + writeAcceptPartial)
.add("writeUseV2Api=" + writeUseV2Api)
.add("timeout=" + timeout)
.add("writeTimeout=" + writeTimeout)
.add("queryTimeout=" + queryTimeout)
Expand Down Expand Up @@ -432,6 +460,8 @@ public static final class Builder {
private WritePrecision writePrecision;
private Integer gzipThreshold;
private Boolean writeNoSync;
private Boolean writeAcceptPartial;
private Boolean writeUseV2Api;
private Map<String, String> defaultTags;
@Deprecated
private Duration timeout;
Expand Down Expand Up @@ -554,6 +584,32 @@ public Builder writeNoSync(@Nullable final Boolean writeNoSync) {
return this;
}

/**
* Sets whether to accept partial writes.
*
* @param writeAcceptPartial accept partial writes
* @return this
*/
@Nonnull
public Builder writeAcceptPartial(@Nullable final Boolean writeAcceptPartial) {

this.writeAcceptPartial = writeAcceptPartial;
return this;
}

/**
* Sets whether to use v2 compatibility write endpoint.
*
* @param writeUseV2Api use v2 compatibility write endpoint
* @return this
*/
@Nonnull
public Builder writeUseV2Api(@Nullable final Boolean writeUseV2Api) {

this.writeUseV2Api = writeUseV2Api;
return this;
}

/**
* Sets default tags to be written with points.
*
Expand Down Expand Up @@ -800,6 +856,12 @@ public ClientConfig build(@Nonnull final String connectionString) throws Malform
if (parameters.containsKey("writeNoSync")) {
this.writeNoSync(Boolean.parseBoolean(parameters.get("writeNoSync")));
}
if (parameters.containsKey("writeAcceptPartial")) {
this.writeAcceptPartial(Boolean.parseBoolean(parameters.get("writeAcceptPartial")));
}
if (parameters.containsKey("writeUseV2Api")) {
this.writeUseV2Api(Boolean.parseBoolean(parameters.get("writeUseV2Api")));
}
if (parameters.containsKey("disableGRPCCompression")) {
this.disableGRPCCompression(Boolean.parseBoolean(parameters.get("disableGRPCCompression")));
}
Expand Down Expand Up @@ -855,6 +917,14 @@ public ClientConfig build(@Nonnull final Map<String, String> env, final Properti
if (writeNoSync != null) {
this.writeNoSync(Boolean.parseBoolean(writeNoSync));
}
final String writeAcceptPartial = get.apply("INFLUX_WRITE_ACCEPT_PARTIAL", "influx.writeAcceptPartial");
if (writeAcceptPartial != null) {
this.writeAcceptPartial(Boolean.parseBoolean(writeAcceptPartial));
}
final String writeUseV2Api = get.apply("INFLUX_WRITE_USE_V2_API", "influx.writeUseV2Api");
if (writeUseV2Api != null) {
this.writeUseV2Api(Boolean.parseBoolean(writeUseV2Api));
}
final String writeTimeout = get.apply("INFLUX_WRITE_TIMEOUT", "influx.writeTimeout");
if (writeTimeout != null) {
long to = Long.parseLong(writeTimeout);
Expand Down Expand Up @@ -911,6 +981,12 @@ private ClientConfig(@Nonnull final Builder builder) {
writePrecision = builder.writePrecision != null ? builder.writePrecision : WriteOptions.DEFAULT_WRITE_PRECISION;
gzipThreshold = builder.gzipThreshold != null ? builder.gzipThreshold : WriteOptions.DEFAULT_GZIP_THRESHOLD;
writeNoSync = builder.writeNoSync != null ? builder.writeNoSync : WriteOptions.DEFAULT_NO_SYNC;
writeAcceptPartial = builder.writeAcceptPartial != null
? builder.writeAcceptPartial
: WriteOptions.DEFAULT_ACCEPT_PARTIAL;
writeUseV2Api = builder.writeUseV2Api != null
? builder.writeUseV2Api
: WriteOptions.DEFAULT_USE_V2_API;
defaultTags = builder.defaultTags;
timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT);
writeTimeout = builder.writeTimeout != null
Expand Down
Loading
Loading