Skip to content

Commit

Permalink
feat: better expose HTTP Error header information in error handling o…
Browse files Browse the repository at this point in the history
…n write (#745)

* feat: expose select HttpError headers in warning log.

* chore: move logging code for Http Headers to more sensible WriteErrorEvent

* docs: adds example of working with HTTP Errors on write.

* chore: add license to new example

* chore: remove unused imports and commented code

* chore: remove commented line of code

* chore: add WriteApi EventListener to example.

* docs: update CHANGELOG.md

* test: add handlesWriteApiHttpError test
  • Loading branch information
karel-rehor authored Aug 6, 2024
1 parent 486dd13 commit 9ab894c
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features

- [#745](https://github.com/influxdata/influxdb-client-java/pull/745): New example `WriteHttpExceptionHandled.java` showing how to make use of `InfluxException.headers()` when HTTP Errors are returned from server. Also, now writes selected headers to client log.
- [#719](https://github.com/influxdata/influxdb-client-java/issues/719): `InfluxQLQueryService` header changes.
- `Accept` header can now be defined when making `InfluxQLQuery` calls. Supoorted MIME types:
- `application/csv`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,32 @@ void messageContainsHttpErrorCode() {
.matches((Predicate<Throwable>) throwable -> throwable.toString().equals("com.influxdb.exceptions.InfluxException: HTTP status code: 501; Message: Wrong query"));
}

@Test
void exceptionContainsHttpResponseHeaders() {
Assertions.assertThatThrownBy(() -> {
Response<Object> response = errorResponse(
"not found",
404,
15,
"not-json",
"X-Platform-Error-Code",
Map.of("Retry-After", "145",
"Trace-ID", "1234567989ABCDEF0",
"X-Influxdb-Build", "OSS"));
throw new InfluxException(new HttpException(response));
}
).matches((Predicate<Throwable>) throwable -> ((InfluxException) throwable).status() == 404)
.matches((Predicate<Throwable>) throwable -> throwable.getMessage().equals(
"HTTP status code: 404; Message: not found"
))
.matches((Predicate<Throwable>) throwable -> ((InfluxException) throwable).headers().size() == 5)
.matches((Predicate<Throwable>) throwable -> ((InfluxException) throwable).headers().get("Retry-After").equals("145"))
.matches((Predicate<Throwable>) throwable -> ((InfluxException) throwable).headers().get("X-Influxdb-Build").equals("OSS"))
.matches((Predicate<Throwable>) throwable -> ((InfluxException) throwable).headers().get("X-Influx-Reference").equals("15"))
.matches((Predicate<Throwable>) throwable -> ((InfluxException) throwable).headers().get("X-Platform-Error-Code").equals("not found"))
.matches((Predicate<Throwable>) throwable -> ((InfluxException) throwable).headers().get("Trace-ID").equals("1234567989ABCDEF0"));
}

@Nonnull
private Response<Object> errorResponse(@Nullable final String influxError) {
return errorResponse(influxError, 500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@

import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

import com.influxdb.exceptions.InfluxException;
import com.influxdb.utils.Arguments;

/**
Expand Down Expand Up @@ -55,6 +57,21 @@ public Throwable getThrowable() {

@Override
public void logEvent() {
LOG.log(Level.SEVERE, "The error occurred during writing of data", throwable);
if (throwable instanceof InfluxException ie) {
String selectHeaders = Stream.of("trace-id",
"trace-sampled",
"X-Influxdb-Build",
"X-Influxdb-Request-ID",
"X-Influxdb-Version")
.filter(name -> ie.headers().get(name) != null)
.reduce("", (message, name) -> message.concat(String.format("%s: %s\n",
name, ie.headers().get(name))));
LOG.log(Level.SEVERE,
String.format("An error occurred during writing of data. Select Response Headers:\n%s", selectHeaders),
throwable);
} else {
LOG.log(Level.SEVERE, "An error occurred during writing of data", throwable);

}
}
}
23 changes: 22 additions & 1 deletion client/src/test/java/com/influxdb/client/ITWriteApiBlocking.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
*/
package com.influxdb.client;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;

import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
Expand Down Expand Up @@ -186,4 +189,22 @@ void defaultTags() {
Assertions.assertThat(query.get(0).getRecords().get(0).getValueByKey("sensor-version")).isEqualTo("1.23a");
Assertions.assertThat(query.get(0).getRecords().get(0).getValueByKey("env-var")).isEqualTo(System.getenv(envKey));
}
}


@Test
public void httpErrorHeaders(){
Assertions.assertThatThrownBy(() -> {
influxDBClient.getWriteApiBlocking().writeRecord(WritePrecision.MS, "asdf");
}).isInstanceOf(InfluxException.class)
.matches((Predicate<Throwable>) throwable -> throwable.getMessage().equals(
"HTTP status code: 400; Message: unable to parse 'asdf': missing fields"
))
.matches((Predicate<Throwable>) throwable -> ((InfluxException) throwable).headers().keySet().size() == 6)
.matches((Predicate<Throwable>) throwable -> ((InfluxException) throwable).headers().get("X-Influxdb-Build").equals("OSS"))
.matches((Predicate<Throwable>) throwable -> ((InfluxException) throwable).headers().get("X-Influxdb-Version") != null)
.matches((Predicate<Throwable>) throwable -> ((InfluxException) throwable).headers().get("X-Platform-Error-Code") != null)
.matches((Predicate<Throwable>) throwable -> ((InfluxException) throwable).headers().get("Content-Length") != null)
.matches((Predicate<Throwable>) throwable -> ((InfluxException) throwable).headers().get("Content-Type") != null)
.matches((Predicate<Throwable>) throwable -> ((InfluxException) throwable).headers().get("Date") != null);
}
}
32 changes: 32 additions & 0 deletions client/src/test/java/com/influxdb/client/ITWriteQueryApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -41,6 +42,7 @@
import com.influxdb.client.write.Point;
import com.influxdb.client.write.events.WriteErrorEvent;
import com.influxdb.client.write.events.WriteSuccessEvent;
import com.influxdb.exceptions.InfluxException;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;

Expand Down Expand Up @@ -860,4 +862,34 @@ public void queryParameters() {
client.close();
}

@Test
public void handlesWriteApiHttpError(){

InfluxDBClient client = InfluxDBClientFactory.create(influxDB_URL, token.toCharArray());
WriteApi writeApi = influxDBClient.makeWriteApi();
AtomicReference<Boolean> called = new AtomicReference<>(false);

writeApi.listenEvents(WriteErrorEvent.class, (error) -> {
called.set(true);
Assertions.assertThat(error).isInstanceOf(WriteErrorEvent.class);
Assertions.assertThat(error.getThrowable()).isInstanceOf(InfluxException.class);
if(error.getThrowable() instanceof InfluxException ie){
Assertions.assertThat(ie.headers()).isNotNull();
Assertions.assertThat(ie.headers().keySet()).hasSize(6);
Assertions.assertThat(ie.headers().get("Content-Length")).isNotNull();
Assertions.assertThat(ie.headers().get("Content-Type")).contains("application/json");
Assertions.assertThat(ie.headers().get("Date")).isNotNull();
Assertions.assertThat(ie.headers().get("X-Influxdb-Build")).isEqualTo("OSS");
Assertions.assertThat(ie.headers().get("X-Influxdb-Version")).startsWith("v");
Assertions.assertThat(ie.headers().get("X-Platform-Error-Code")).isNotNull();
}
});

writeApi.writeRecord(bucket.getName(), organization.getId(), WritePrecision.MS, "asdf");
writeApi.flush();
writeApi.close();
Assertions.assertThat(called.get()).as("WriteErrorEvent should have occurred")
.isEqualTo(true);
}

}
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ This directory contains Java, Kotlin and Scala examples.
- [InfluxDBEnterpriseExample.java](src/main/java/example/InfluxDBEnterpriseExample.java) - How to use `consistency` parameter for InfluxDB Enterprise
- [RecordRowExample.java](src/main/java/example/RecordRowExample.java) - How to use `FluxRecord.getRow()` (List) instead of `FluxRecord.getValues()` (Map),
in case of duplicity column names
- [WriteHttpExceptionHandled](src/main/java/example/WriteHttpExceptionHandled.java) - How to work with HTTP Exceptions for debugging and recovery.

## Kotlin

Expand Down
128 changes: 128 additions & 0 deletions examples/src/main/java/example/WriteHttpExceptionHandled.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* The MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package example;

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.events.WriteErrorEvent;
import com.influxdb.exceptions.InfluxException;

import javax.annotation.Nonnull;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.logging.Logger;

public class WriteHttpExceptionHandled {

static Logger Log = Logger.getLogger(WriteHttpExceptionHandled.class.getName());

public static String resolveProperty(final String property, final String fallback) {
return System.getProperty(property, System.getenv(property)) == null
? fallback : System.getProperty(property, System.getenv(property));
}

private static final String influxUrl = resolveProperty("INFLUX_URL", "http://localhost:8086");
private static final char[] token = resolveProperty("INFLUX_TOKEN","my-token").toCharArray();
private static final String org = resolveProperty("INFLUX_ORG","my-org");
private static final String bucket = resolveProperty("INFLUX_DATABASE","my-bucket");

public static void main(String[] args) {

InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxUrl, token, org, bucket);

WriteApiBlocking writeApiBlocking = influxDBClient.getWriteApiBlocking();
WriteApi writeApi = influxDBClient.makeWriteApi();

// InfluxExceptions in Rx streams can be handled in an EventListener
writeApi.listenEvents(WriteErrorEvent.class, (error) -> {
if (error.getThrowable() instanceof InfluxException ie) {
Log.warning("\n*** Custom event handler\n******\n"
+ influxExceptionString(ie)
+ "******\n");
}
});

// the following call will cause an HTTP 400 error
writeApi.writeRecords(WritePrecision.MS, List.of("invalid", "clumsy", "broken", "unusable"));
writeApi.close();


Log.info("\nWriting invalid records to InfluxDB blocking - can handle caught InfluxException.\n");
try {
writeApiBlocking.writeRecord(WritePrecision.MS, "asdf");
} catch (InfluxException e) {
Log.info(influxExceptionString(e));
}

// Note when writing batches with one bad record:
// Cloud v3.x - The bad record is ignored.
// OSS v2.x - returns exception
Log.info("Writing Batch with 1 bad record.");
Instant now = Instant.now();

List<String> lpData = List.of(
String.format("temperature,location=north value=60.0 %d", now.toEpochMilli()),
String.format("temperature,location=south value=65.0 %d", now.minus(1, ChronoUnit.SECONDS).toEpochMilli()),
String.format("temperature,location=north value=59.8 %d", now.minus(2, ChronoUnit.SECONDS).toEpochMilli()),
String.format("temperature,location=south value=64.8 %d", now.minus(3, ChronoUnit.SECONDS).toEpochMilli()),
String.format("temperature,location=north value=59.7 %d", now.minus(4, ChronoUnit.SECONDS).toEpochMilli()),
"asdf",
String.format("temperature,location=north value=59.9 %d", now.minus(6, ChronoUnit.SECONDS).toEpochMilli()),
String.format("temperature,location=south value=64.9 %d", now.minus(7, ChronoUnit.SECONDS).toEpochMilli()),
String.format("temperature,location=north value=60.1 %d", now.minus(8, ChronoUnit.SECONDS).toEpochMilli()),
String.format("temperature,location=south value=65.1 %d", now.minus(9, ChronoUnit.SECONDS).toEpochMilli())
);

try {
writeApiBlocking.writeRecords(WritePrecision.MS, lpData);
} catch (InfluxException e) {
Log.info(influxExceptionString(e));
}

try {
writeApi.writeRecords(WritePrecision.MS, lpData);
} catch (Exception exception) {
if (exception instanceof InfluxException) {
Log.info(influxExceptionString((InfluxException) exception));
}
}
Log.info("Done");
}

private static String influxExceptionString(@Nonnull InfluxException e) {
StringBuilder sBuilder = new StringBuilder().append("Handling InfluxException:\n");
sBuilder.append(" ").append(e.getMessage());
String headers = e.headers()
.keySet()
.stream()
.reduce("\n", (set, key) -> set.concat(
String.format(" %s: %s\n", key, e.headers().get(key)))
);
sBuilder.append("\n HTTP Response Headers:");
sBuilder.append(headers);
return sBuilder.toString();
}
}

0 comments on commit 9ab894c

Please sign in to comment.