diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c2775d239f..3ed7580d17e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Features +- [#745](https://github.com/influxdata/influxdb-client-java/pull/745): New example `WriteHttpExceptionHandled.java` showing how to make use of `InfluxException.headers()` when HTTP Errors are returned from server. Also, now writes selected headers to client log. - [#719](https://github.com/influxdata/influxdb-client-java/issues/719): `InfluxQLQueryService` header changes. - `Accept` header can now be defined when making `InfluxQLQuery` calls. Supoorted MIME types: - `application/csv` diff --git a/client-core/src/test/java/com/influxdb/exceptions/InfluxExceptionTest.java b/client-core/src/test/java/com/influxdb/exceptions/InfluxExceptionTest.java index df1619a379f..74afe1e8a1c 100644 --- a/client-core/src/test/java/com/influxdb/exceptions/InfluxExceptionTest.java +++ b/client-core/src/test/java/com/influxdb/exceptions/InfluxExceptionTest.java @@ -322,6 +322,32 @@ void messageContainsHttpErrorCode() { .matches((Predicate) throwable -> throwable.toString().equals("com.influxdb.exceptions.InfluxException: HTTP status code: 501; Message: Wrong query")); } + @Test + void exceptionContainsHttpResponseHeaders() { + Assertions.assertThatThrownBy(() -> { + Response response = errorResponse( + "not found", + 404, + 15, + "not-json", + "X-Platform-Error-Code", + Map.of("Retry-After", "145", + "Trace-ID", "1234567989ABCDEF0", + "X-Influxdb-Build", "OSS")); + throw new InfluxException(new HttpException(response)); + } + ).matches((Predicate) throwable -> ((InfluxException) throwable).status() == 404) + .matches((Predicate) throwable -> throwable.getMessage().equals( + "HTTP status code: 404; Message: not found" + )) + .matches((Predicate) throwable -> ((InfluxException) throwable).headers().size() == 5) + .matches((Predicate) throwable -> ((InfluxException) throwable).headers().get("Retry-After").equals("145")) + .matches((Predicate) throwable -> ((InfluxException) throwable).headers().get("X-Influxdb-Build").equals("OSS")) + .matches((Predicate) throwable -> ((InfluxException) throwable).headers().get("X-Influx-Reference").equals("15")) + .matches((Predicate) throwable -> ((InfluxException) throwable).headers().get("X-Platform-Error-Code").equals("not found")) + .matches((Predicate) throwable -> ((InfluxException) throwable).headers().get("Trace-ID").equals("1234567989ABCDEF0")); + } + @Nonnull private Response errorResponse(@Nullable final String influxError) { return errorResponse(influxError, 500); diff --git a/client/src/main/java/com/influxdb/client/write/events/WriteErrorEvent.java b/client/src/main/java/com/influxdb/client/write/events/WriteErrorEvent.java index 99220c19691..cd58a0c70f8 100644 --- a/client/src/main/java/com/influxdb/client/write/events/WriteErrorEvent.java +++ b/client/src/main/java/com/influxdb/client/write/events/WriteErrorEvent.java @@ -23,8 +23,10 @@ import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Stream; import javax.annotation.Nonnull; +import com.influxdb.exceptions.InfluxException; import com.influxdb.utils.Arguments; /** @@ -55,6 +57,21 @@ public Throwable getThrowable() { @Override public void logEvent() { - LOG.log(Level.SEVERE, "The error occurred during writing of data", throwable); + if (throwable instanceof InfluxException ie) { + String selectHeaders = Stream.of("trace-id", + "trace-sampled", + "X-Influxdb-Build", + "X-Influxdb-Request-ID", + "X-Influxdb-Version") + .filter(name -> ie.headers().get(name) != null) + .reduce("", (message, name) -> message.concat(String.format("%s: %s\n", + name, ie.headers().get(name)))); + LOG.log(Level.SEVERE, + String.format("An error occurred during writing of data. Select Response Headers:\n%s", selectHeaders), + throwable); + } else { + LOG.log(Level.SEVERE, "An error occurred during writing of data", throwable); + + } } } diff --git a/client/src/test/java/com/influxdb/client/ITWriteApiBlocking.java b/client/src/test/java/com/influxdb/client/ITWriteApiBlocking.java index d9910f221eb..0b53b91e267 100644 --- a/client/src/test/java/com/influxdb/client/ITWriteApiBlocking.java +++ b/client/src/test/java/com/influxdb/client/ITWriteApiBlocking.java @@ -21,9 +21,12 @@ */ package com.influxdb.client; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; import java.time.Instant; import java.util.Arrays; import java.util.List; +import java.util.function.Predicate; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; @@ -186,4 +189,22 @@ void defaultTags() { Assertions.assertThat(query.get(0).getRecords().get(0).getValueByKey("sensor-version")).isEqualTo("1.23a"); Assertions.assertThat(query.get(0).getRecords().get(0).getValueByKey("env-var")).isEqualTo(System.getenv(envKey)); } -} \ No newline at end of file + + + @Test + public void httpErrorHeaders(){ + Assertions.assertThatThrownBy(() -> { + influxDBClient.getWriteApiBlocking().writeRecord(WritePrecision.MS, "asdf"); + }).isInstanceOf(InfluxException.class) + .matches((Predicate) throwable -> throwable.getMessage().equals( + "HTTP status code: 400; Message: unable to parse 'asdf': missing fields" + )) + .matches((Predicate) throwable -> ((InfluxException) throwable).headers().keySet().size() == 6) + .matches((Predicate) throwable -> ((InfluxException) throwable).headers().get("X-Influxdb-Build").equals("OSS")) + .matches((Predicate) throwable -> ((InfluxException) throwable).headers().get("X-Influxdb-Version") != null) + .matches((Predicate) throwable -> ((InfluxException) throwable).headers().get("X-Platform-Error-Code") != null) + .matches((Predicate) throwable -> ((InfluxException) throwable).headers().get("Content-Length") != null) + .matches((Predicate) throwable -> ((InfluxException) throwable).headers().get("Content-Type") != null) + .matches((Predicate) throwable -> ((InfluxException) throwable).headers().get("Date") != null); + } +} diff --git a/client/src/test/java/com/influxdb/client/ITWriteQueryApi.java b/client/src/test/java/com/influxdb/client/ITWriteQueryApi.java index 51835c7cccf..fc35d6624e4 100644 --- a/client/src/test/java/com/influxdb/client/ITWriteQueryApi.java +++ b/client/src/test/java/com/influxdb/client/ITWriteQueryApi.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -41,6 +42,7 @@ import com.influxdb.client.write.Point; import com.influxdb.client.write.events.WriteErrorEvent; import com.influxdb.client.write.events.WriteSuccessEvent; +import com.influxdb.exceptions.InfluxException; import com.influxdb.query.FluxRecord; import com.influxdb.query.FluxTable; @@ -860,4 +862,34 @@ public void queryParameters() { client.close(); } + @Test + public void handlesWriteApiHttpError(){ + + InfluxDBClient client = InfluxDBClientFactory.create(influxDB_URL, token.toCharArray()); + WriteApi writeApi = influxDBClient.makeWriteApi(); + AtomicReference called = new AtomicReference<>(false); + + writeApi.listenEvents(WriteErrorEvent.class, (error) -> { + called.set(true); + Assertions.assertThat(error).isInstanceOf(WriteErrorEvent.class); + Assertions.assertThat(error.getThrowable()).isInstanceOf(InfluxException.class); + if(error.getThrowable() instanceof InfluxException ie){ + Assertions.assertThat(ie.headers()).isNotNull(); + Assertions.assertThat(ie.headers().keySet()).hasSize(6); + Assertions.assertThat(ie.headers().get("Content-Length")).isNotNull(); + Assertions.assertThat(ie.headers().get("Content-Type")).contains("application/json"); + Assertions.assertThat(ie.headers().get("Date")).isNotNull(); + Assertions.assertThat(ie.headers().get("X-Influxdb-Build")).isEqualTo("OSS"); + Assertions.assertThat(ie.headers().get("X-Influxdb-Version")).startsWith("v"); + Assertions.assertThat(ie.headers().get("X-Platform-Error-Code")).isNotNull(); + } + }); + + writeApi.writeRecord(bucket.getName(), organization.getId(), WritePrecision.MS, "asdf"); + writeApi.flush(); + writeApi.close(); + Assertions.assertThat(called.get()).as("WriteErrorEvent should have occurred") + .isEqualTo(true); + } + } \ No newline at end of file diff --git a/examples/README.md b/examples/README.md index b4406a74e0e..9624f17378e 100644 --- a/examples/README.md +++ b/examples/README.md @@ -18,6 +18,7 @@ This directory contains Java, Kotlin and Scala examples. - [InfluxDBEnterpriseExample.java](src/main/java/example/InfluxDBEnterpriseExample.java) - How to use `consistency` parameter for InfluxDB Enterprise - [RecordRowExample.java](src/main/java/example/RecordRowExample.java) - How to use `FluxRecord.getRow()` (List) instead of `FluxRecord.getValues()` (Map), in case of duplicity column names +- [WriteHttpExceptionHandled](src/main/java/example/WriteHttpExceptionHandled.java) - How to work with HTTP Exceptions for debugging and recovery. ## Kotlin diff --git a/examples/src/main/java/example/WriteHttpExceptionHandled.java b/examples/src/main/java/example/WriteHttpExceptionHandled.java new file mode 100644 index 00000000000..a5140271b8f --- /dev/null +++ b/examples/src/main/java/example/WriteHttpExceptionHandled.java @@ -0,0 +1,128 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package example; + +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import com.influxdb.client.WriteApi; +import com.influxdb.client.WriteApiBlocking; +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.write.events.WriteErrorEvent; +import com.influxdb.exceptions.InfluxException; + +import javax.annotation.Nonnull; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.logging.Logger; + +public class WriteHttpExceptionHandled { + + static Logger Log = Logger.getLogger(WriteHttpExceptionHandled.class.getName()); + + public static String resolveProperty(final String property, final String fallback) { + return System.getProperty(property, System.getenv(property)) == null + ? fallback : System.getProperty(property, System.getenv(property)); + } + + private static final String influxUrl = resolveProperty("INFLUX_URL", "http://localhost:8086"); + private static final char[] token = resolveProperty("INFLUX_TOKEN","my-token").toCharArray(); + private static final String org = resolveProperty("INFLUX_ORG","my-org"); + private static final String bucket = resolveProperty("INFLUX_DATABASE","my-bucket"); + + public static void main(String[] args) { + + InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxUrl, token, org, bucket); + + WriteApiBlocking writeApiBlocking = influxDBClient.getWriteApiBlocking(); + WriteApi writeApi = influxDBClient.makeWriteApi(); + + // InfluxExceptions in Rx streams can be handled in an EventListener + writeApi.listenEvents(WriteErrorEvent.class, (error) -> { + if (error.getThrowable() instanceof InfluxException ie) { + Log.warning("\n*** Custom event handler\n******\n" + + influxExceptionString(ie) + + "******\n"); + } + }); + + // the following call will cause an HTTP 400 error + writeApi.writeRecords(WritePrecision.MS, List.of("invalid", "clumsy", "broken", "unusable")); + writeApi.close(); + + + Log.info("\nWriting invalid records to InfluxDB blocking - can handle caught InfluxException.\n"); + try { + writeApiBlocking.writeRecord(WritePrecision.MS, "asdf"); + } catch (InfluxException e) { + Log.info(influxExceptionString(e)); + } + + // Note when writing batches with one bad record: + // Cloud v3.x - The bad record is ignored. + // OSS v2.x - returns exception + Log.info("Writing Batch with 1 bad record."); + Instant now = Instant.now(); + + List lpData = List.of( + String.format("temperature,location=north value=60.0 %d", now.toEpochMilli()), + String.format("temperature,location=south value=65.0 %d", now.minus(1, ChronoUnit.SECONDS).toEpochMilli()), + String.format("temperature,location=north value=59.8 %d", now.minus(2, ChronoUnit.SECONDS).toEpochMilli()), + String.format("temperature,location=south value=64.8 %d", now.minus(3, ChronoUnit.SECONDS).toEpochMilli()), + String.format("temperature,location=north value=59.7 %d", now.minus(4, ChronoUnit.SECONDS).toEpochMilli()), + "asdf", + String.format("temperature,location=north value=59.9 %d", now.minus(6, ChronoUnit.SECONDS).toEpochMilli()), + String.format("temperature,location=south value=64.9 %d", now.minus(7, ChronoUnit.SECONDS).toEpochMilli()), + String.format("temperature,location=north value=60.1 %d", now.minus(8, ChronoUnit.SECONDS).toEpochMilli()), + String.format("temperature,location=south value=65.1 %d", now.minus(9, ChronoUnit.SECONDS).toEpochMilli()) + ); + + try { + writeApiBlocking.writeRecords(WritePrecision.MS, lpData); + } catch (InfluxException e) { + Log.info(influxExceptionString(e)); + } + + try { + writeApi.writeRecords(WritePrecision.MS, lpData); + } catch (Exception exception) { + if (exception instanceof InfluxException) { + Log.info(influxExceptionString((InfluxException) exception)); + } + } + Log.info("Done"); + } + + private static String influxExceptionString(@Nonnull InfluxException e) { + StringBuilder sBuilder = new StringBuilder().append("Handling InfluxException:\n"); + sBuilder.append(" ").append(e.getMessage()); + String headers = e.headers() + .keySet() + .stream() + .reduce("\n", (set, key) -> set.concat( + String.format(" %s: %s\n", key, e.headers().get(key))) + ); + sBuilder.append("\n HTTP Response Headers:"); + sBuilder.append(headers); + return sBuilder.toString(); + } +}