Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add parameterized queries #33

Merged
merged 5 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
## 2.2.0 [unreleased]

### Features
1. [#33](https://github.com/influxdata/influxdb-client-dart/pull/33): Add support for Parameterized Queries

### Documentation
1. [#33](https://github.com/influxdata/influxdb-client-dart/pull/33): Add Parameterized Queries example


## 2.1.0 [2022-01-20]

### Bug Fixes
Expand Down
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,54 @@ main() async {


```

#### Parameterized queries
InfluxDB Cloud supports [Parameterized Queries](https://docs.influxdata.com/influxdb/cloud/query-data/parameterized-queries/)
that let you dynamically change values in a query using the InfluxDB API. Parameterized queries make Flux queries more
reusable and can also be used to help prevent injection attacks.

InfluxDB Cloud inserts the params object into the Flux query as a Flux record named `params`. Use dot or bracket
notation to access parameters in the `params` record in your Flux query. Parameterized Flux queries support only `int`
, `float`, and `string` data types. To convert the supported data types into
other [Flux basic data types, use Flux type conversion functions](https://docs.influxdata.com/influxdb/cloud/query-data/parameterized-queries/#supported-parameter-data-types).

Parameterized query example:
> :warning: Parameterized Queries are supported only in InfluxDB Cloud, currently there is no support in InfluxDB OSS.

```dart

import 'package:influxdb_client/api.dart';

void main() async {
var client = InfluxDBClient(
url: 'http://localhost:8086',
token: 'my-token',
org: 'my-org',
bucket: 'my-bucket',
);

var queryService = client.getQueryService();

var queryService = client.getQueryService();
var queryString = '''
from(bucket: params.bucketParam)
|> range(start: duration(v: params.startParam))
|> filter(fn: (r) => r["_measurement"] == "weather"
and r["location"] == "Prague")''';
var queryParams = {'bucketParam':'my-bucket', 'startParam':'-10d'};
var query = Query(query: queryString, params: queryParams);

// Using string for query and Map for params
var recordMap = await queryService.query(queryString, params: queryParams);

// Using Query class
var recordClass = await queryService.query(query);

client.close();
}

```

### Delete points

The [DeleteService](lib/client/delete_service.dart) supports deletes
Expand Down
1 change: 1 addition & 0 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
## Queries
- [query_example.dart](query_example.dart) - How to query data into `Stream` and `CSV string`
- [query_lines_example.dart](query_lines_example.dart) - How to query data into `CSV Stream`
- [parameterized_query_example.dart](parameterized_query_example.dart) - How to use parameterized Flux queries

## Management API
- [create_bucket_example.dart](create_bucket_example.dart) - How to create Buckets
Expand Down
71 changes: 71 additions & 0 deletions example/parameterized_query_example.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import 'dart:async';
import 'dart:math';
import 'package:influxdb_client/api.dart';

/*
* warning: Parameterized Queries are supported only in InfluxDB Cloud, currently there is no support in InfluxDB OSS.
*/

void main() async {
// Create InfluxDBClient
var client = InfluxDBClient(
url: 'https://us-west-2-1.aws.cloud2.influxdata.com',
token: 'my-token',
org: 'my-org',
bucket: 'my-bucket',
debug: true);

// Create write service
var writeApi = client.getWriteService(WriteOptions().merge(
precision: WritePrecision.s,
batchSize: 100,
flushInterval: 5000,
gzip: true));

// Write data to InfluxDB
var data = List<Point>.empty(growable: true);
var random = Random();
for (var i = 0; i < 5; i++) {
var temperature = random.nextInt(30);
data.add(Point('weather')
.addTag('location', 'Prague')
.addField('temperature', temperature)
.time(DateTime.now().subtract(Duration(days: i)).toUtc()));
}
await writeApi.write(data);

// Create Query service and query
var queryService = client.getQueryService();
var queryString = '''
from(bucket: params.bucketParam)
|> range(start: duration(v: params.startParam))
|> filter(fn: (r) => r["_measurement"] == "weather"
and r["location"] == "Prague")''';
var queryParams = {'bucketParam': 'my-bucket', 'startParam': '-10d'};

// Using string for query and Map for params
print(
'\n\n------------------------------ Map params ------------------------------\n');
var recordStream = await queryService.query(queryString, params: queryParams);
print(
'\n --------------------- Result --------------------- \n');
await recordStream.forEach((record) {
print(
'\t\t Temperature in ${record['location']} at ${record['_time']} is ${record['_value']} °C');
});

// Using Query class
print(
'\n\n------------------------------ Query class ------------------------------\n');
var query = Query(query: queryString, params: queryParams);
recordStream = await queryService.query(query);
print(
'\n --------------------- Result --------------------- \n');
await recordStream.forEach((record) {
print(
'\t\t Temperature in ${record['location']} at ${record['_time']} is ${record['_value']} °C');
});

await Future.delayed(Duration(seconds: 10));
client.close();
}
32 changes: 23 additions & 9 deletions lib/client/query_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ class QueryService extends DefaultService {
///
/// Use this with care, all response is stored in memory.
/// Result CSV format can be changed using [dialect].
Future<String> queryRaw(String fluxQuery, {Dialect? dialect}) async {
var query = Query(dialect: dialect, query: fluxQuery);
Future<String> queryRaw(fluxQuery,
{Dialect? dialect, Map<String, Object>? params}) async {
var query = fluxQuery is Query ? fluxQuery : Query(query: fluxQuery);

query.params = params ?? query.params;
query.dialect = dialect ?? query.dialect;

var uri = _buildUri(influxDB.url!, '/api/v2/query', {'org': influxDB.org});
var body = jsonEncode(query);
Map<String, String> headers = {};
Expand All @@ -51,19 +56,28 @@ class QueryService extends DefaultService {
/// Streams the result of [fluxQuery] using [Dialect].
///
/// Each line is CSV parsed list of objects.
Future<Stream<List<dynamic>>> queryLines(String fluxQuery,
{Dialect? dialect}) async {
var q = Query(query: fluxQuery, dialect: dialect);
var response = await _send('/api/v2/query', {'org': influxDB.org}, q);
Future<Stream<List<dynamic>>> queryLines(fluxQuery,
{Dialect? dialect, Map<String, Object>? params}) async {
var query = fluxQuery is Query ? fluxQuery : Query(query: fluxQuery);

query.params = params ?? query.params;
query.dialect = dialect ?? query.dialect;

var response = await _send('/api/v2/query', {'org': influxDB.org}, query);
return utf8.decoder
.bind((response as StreamedResponse).stream)
.transform(CsvToListConverter());
}

/// Streams the result of query into [Stream<FluxRecord>]
Future<Stream<FluxRecord>> query(String fluxQuery) async {
var q = Query(query: fluxQuery, dialect: DEFAULT_dialect);
var response = await _send('/api/v2/query', {'org': influxDB.org}, q);
Future<Stream<FluxRecord>> query(fluxQuery,
{Map<String, Object>? params}) async {
var query = fluxQuery is Query ? fluxQuery : Query(query: fluxQuery);

query.params = params ?? query.params;
query.dialect = query.dialect ?? DEFAULT_dialect;

var response = await _send('/api/v2/query', {'org': influxDB.org}, query);
return utf8.decoder
.bind((response as StreamedResponse).stream)
.transform(CsvToListConverter())
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ environment:
sdk: '>=2.12.0 <3.0.0'

dependencies:
archive: '>=2.0.13 <=3.1.2'
archive: '>=2.0.13 <4.0.0'
http: '>=0.13.0 <0.14.0'
csv: '>=4.1.0 <6.0.0'
universal_io: ^2.0.4
Expand Down
159 changes: 159 additions & 0 deletions test/query_test.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'package:csv/csv.dart';
import 'package:http/http.dart';
import 'package:http/testing.dart';
import 'package:influxdb_client/api.dart';
import 'package:test/test.dart';
import 'commons_test.dart';

Expand Down Expand Up @@ -78,6 +79,44 @@ void main() {
expect(resp, oneTable);
});

test('queryRawParameterized', () async {
var mockClient = MockClient((request) async {
return Response(oneTable, 200);
});

client.client = mockClient;

var query = '''from(bucket: params.bucketParam)
|> range(start: duration(v: params.startParam), stop: now())
|> filter(fn: (r) => r._measurement == 'mem')
|> filter(fn: (r) => r._field == 'used')
''';
var queryParams = {'bucketParam': 'my-bucket', 'startParam': '-5s'};

var resp =
await client.getQueryService().queryRaw(query, params: queryParams);
expect(resp, oneTable);
});

test('queryRawParameterizedClass', () async {
var mockClient = MockClient((request) async {
return Response(oneTable, 200);
});

client.client = mockClient;

var queryString = '''from(bucket: params.bucketParam)
|> range(start: duration(v: params.startParam), stop: now())
|> filter(fn: (r) => r._measurement == 'mem')
|> filter(fn: (r) => r._field == 'used')
''';
var queryParams = {'bucketParam': 'my-bucket', 'startParam': '-5s'};
var query = Query(query: queryString, params: queryParams);

var resp = await client.getQueryService().queryRaw(query);
expect(resp, oneTable);
});

test('queryOneTableStream', () async {
var mockClient = MockClient((request) async {
return Response(oneTable, 200);
Expand All @@ -102,6 +141,55 @@ void main() {
expect(list.length, 9);
});

test('queryStreamParameterized', () async {
var mockClient = MockClient((request) async {
return Response(oneTable, 200);
});

client.client = mockClient;

var query = '''from(bucket: params.bucketParam)
|> range(start: duration(v: params.startParam), stop: now())
|> filter(fn: (r) => r._measurement == 'mem')
|> filter(fn: (r) => r._field == 'used')
''';
var queryParams = {'bucketParam': 'my-bucket', 'startParam': '-5s'};
var resp =
await client.getQueryService().queryLines(query, params: queryParams);

var list = await resp.toList();

list.forEach((element) {
print('-> $element');
});
expect(list.length, 9);
});

test('queryStreamParameterizedClass', () async {
var mockClient = MockClient((request) async {
return Response(oneTable, 200);
});

client.client = mockClient;

var queryString = '''from(bucket: params.bucketParam)
|> range(start: duration(v: params.startParam), stop: now())
|> filter(fn: (r) => r._measurement == 'mem')
|> filter(fn: (r) => r._field == 'used')
''';
var queryParams = {'bucketParam': 'my-bucket', 'startParam': '-5s'};
var query = Query(query: queryString, params: queryParams);
var resp =
await client.getQueryService().queryLines(query, params: queryParams);

var list = await resp.toList();

list.forEach((element) {
print('-> $element');
});
expect(list.length, 9);
});

test('queryOneTableFluxRecord', () async {
var mockClient = MockClient((request) async {
return Response(oneTable, 200);
Expand Down Expand Up @@ -136,6 +224,77 @@ void main() {
expect(res[4]['_time'], '2019-11-12T08:09:09Z');
});

test('queryFluxRecordParameterized', () async {
var mockClient = MockClient((request) async {
return Response(oneTable, 200);
});

client.client = mockClient;

var query = '''from(bucket: params.bucketParam)
|> range(start: duration(v: params.startParam), stop: now())
|> filter(fn: (r) => r._measurement == 'mem')
|> filter(fn: (r) => r._field == 'used')
''';
var queryParams = {'bucketParam': 'my-bucket', 'startParam': '-5s'};
var resp = await client.getQueryService().query(query, params: queryParams);

var res = await resp.toList();
for (var r in res) {
print(r);
expect(r['table'], 0);
expect(r['_measurement'], 'mem');
expect(r['host'], 'mac.local');
}

expect(res[0]['_value'], 11125907456);
expect(res[0]['_time'], '2019-11-12T08:09:05Z');
expect(res[1]['_value'], 11127103488);
expect(res[1]['_time'], '2019-11-12T08:09:06Z');
expect(res[2]['_value'], 11127291904);
expect(res[2]['_time'], '2019-11-12T08:09:07Z');
expect(res[3]['_value'], 11126190080);
expect(res[3]['_time'], '2019-11-12T08:09:08Z');
expect(res[4]['_value'], 11127832576);
expect(res[4]['_time'], '2019-11-12T08:09:09Z');
});

test('queryFluxRecordParameterizedClass', () async {
var mockClient = MockClient((request) async {
return Response(oneTable, 200);
});

client.client = mockClient;

var queryString = '''from(bucket: params.bucketParam)
|> range(start: duration(v: params.startParam), stop: now())
|> filter(fn: (r) => r._measurement == 'mem')
|> filter(fn: (r) => r._field == 'used')
''';
var queryParams = {'bucketParam': 'my-bucket', 'startParam': '-5s'};
var query = Query(query: queryString, params: queryParams);
var resp = await client.getQueryService().query(query, params: queryParams);

var res = await resp.toList();
for (var r in res) {
print(r);
expect(r['table'], 0);
expect(r['_measurement'], 'mem');
expect(r['host'], 'mac.local');
}

expect(res[0]['_value'], 11125907456);
expect(res[0]['_time'], '2019-11-12T08:09:05Z');
expect(res[1]['_value'], 11127103488);
expect(res[1]['_time'], '2019-11-12T08:09:06Z');
expect(res[2]['_value'], 11127291904);
expect(res[2]['_time'], '2019-11-12T08:09:07Z');
expect(res[3]['_value'], 11126190080);
expect(res[3]['_time'], '2019-11-12T08:09:08Z');
expect(res[4]['_value'], 11127832576);
expect(res[4]['_time'], '2019-11-12T08:09:09Z');
});

test('queryMultipleTableFluxRecord', () async {
var mockClient = MockClient((request) async {
return Response(multipleQueries, 200);
Expand Down