Skip to content
This repository has been archived by the owner on Feb 12, 2023. It is now read-only.
Jakub Bednář edited this page Sep 7, 2018 · 7 revisions

Improved streaming query API

Is there a way to tell that all query chunks have arrived ?

Yes, there is onComplete action that is invoked after successfully end of stream.

influxDB.query(new Query("SELECT * FROM disk", "telegraf"), 10_000,
    queryResult -> {
        System.out.println("result = " + queryResult);
    }, 
    () -> {
        System.out.println("The query successfully finished.");
    });

Is there a way to tell the system to stop sending more chunks once I've found what I'm looking for ?

Yes, there is onNext bi-consumer with capability to discontinue a streaming query.

influxDB.query(new Query("SELECT * FROM disk", "telegraf"), 10_000, (cancellable, queryResult) -> {

    // found what I'm looking for ?
    if (foundRequest(queryResult)) {
        // yes => cancel query
        cancellable.cancel();
    }

    // no => process next result
    processResult(queryResult);
});

Chunking Example

public class ChunkingExample {

    // Main thread
    public List<Integer> executeQuery() throws InterruptedException {

        InfluxDB client = null;

        // Creates a countDownLatch in order to wait for the asynch process
        CountDownLatch countDownLatch = new CountDownLatch(1);

        // List to be filled
        List<Integer> entries = new ArrayList<>();

        CustomConsumer consumer = new CustomConsumer(entries, 20, countDownLatch);

        QueryBuilder queryBuilder = QueryBuilder.newQuery("Select * from things").forDatabase("myDatabase");
        Query query = queryBuilder.create();

        //
        // New method:
        // 
        // query(Query query, int chunkSize, BiConsumer<Cancellable, QueryResult> onNext, Runnable onComplete)
        //
        // onNext - the consumer to invoke for each received QueryResult; with capability to discontinue a streaming query
        // onComplete - the onComplete to invoke for successfully end of stream
        //
        // Syntax with the onComplete lambda:
        //
        // client.query(query, 10, consumer, countDownLatch::countDown);
        //
        client.query(query, 10000, consumer, consumer);

        // Wait for the async thread
        countDownLatch.await();

        // return entries to the UI.
        return entries;

    }

    // Asynch Consumer
    public class CustomConsumer implements BiConsumer<InfluxDB.Cancellable, QueryResult>, Runnable {

        private final List<Integer> entries;

        private final int maxItems;

        private final CountDownLatch countDownLatch;

        public CustomConsumer(List<Integer> entries, int maxItems, CountDownLatch countDownLatch) {
            this.entries = entries;
            this.maxItems = maxItems;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void accept(InfluxDB.Cancellable cancellable, QueryResult response) {

            // process the resultset
            List<QueryResult.Result> results = response.getResults();
            if (results != null && !results.isEmpty() && results.get(0).getSeries() != null) {

                QueryResult.Series series = results.get(0).getSeries().get(0);

                for (List<Object> values : series.getValues()) {

                    // Assuming every value are integer
                    Integer value = (Integer) values.get(1);

                    // Silly filter. In real world the filter is more complex.
                    if (value < 100) {
                        continue;
                    }

                    entries.add(value);

                    // we got the max items
                    if (entries.size() == maxItems) {
                        
                        // say to influx "Stop sending information" and calling the consumer
                        cancellable.cancel();
                        countDownLatch.countDown();
                        return;
                    }
                }
            }
        }

        // In this scenario the client says, there are no more entries from influx.
        @Override
        public void run() {
            countDownLatch.countDown();
        }
    }
}

How to test?

  1. Use prepared snapshot build or
  2. Build snapshot from sources
Add dependency to Bonitoo.io snapshot
<dependency>
   <groupId>org.influxdb</groupId>
   <artifactId>influxdb-java</artifactId>
   <version>2.13.async-SNAPSHOT</version>
   <scope>test</scope>
</dependency>

<repositories>
   <repository>
      <id>bonitoo-snapshot</id>
      <name>Bonitoo.io snapshot repository</name>
      <url>https://apitea.com/nexus/content/repositories/bonitoo-snapshot/</url>
      <releases>
         <enabled>false</enabled>
      </releases>
      <snapshots>
         <enabled>true</enabled>
      </snapshots>
   </repository>
</repositories>
Build from sources
git clone -b async-query https://github.com/bonitoo-io/influxdb-java.git
cd influxdb-java/
mvn clean install -DskipTests
<dependency>
   <groupId>org.influxdb</groupId>
   <artifactId>influxdb-java</artifactId>
   <version>2.13-SNAPSHOT</version>
   <scope>test</scope>
</dependency>

influxdb-java-reactive

Java Reactive client for InfluxDB

public class ChunkingExample {

    // Main thread
    public List<Integer> executeQuery() {

        QueryBuilder queryBuilder = QueryBuilder.newQuery("Select * from things").forDatabase("myDatabase");
        Query query = queryBuilder.create();

        Single<List<Integer>> entries = executeQuery(query, 20);

        // return entries to the UI.
        return entries.blockingGet();
    }

    @Nonnull
    private Single<List<Integer>> executeQuery(@Nonnull final Query query, final int maxItems) {

        InfluxDBReactive influxDBReactive = null;

        QueryOptions options = QueryOptions.builder().chunkSize(10_000).build();

        Single<List<Integer>> listSingle = influxDBReactive
                .query(query, options)
                //
                // Get first Series from first Result
                //
                .map(queryResult -> queryResult.getResults().get(0).getSeries().get(0))
                //
                // Iterate over Series Values
                //
                .flatMap(series -> Flowable.fromIterable(series.getValues()))
                //
                // Get first Value
                //
                .map(values -> (Integer) values.get(1))
                //
                // Filter value
                //
                .filter(value -> value >= 100)
                //
                // Take first 20 values
                //
                .take(maxItems)
                //
                // Map to List
                //
                .toList();

        return listSingle;
    }
}