diff --git a/CHANGELOG.md b/CHANGELOG.md index c3385a1..de1d772 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ ## 2.2.0 [unreleased] +### Features +1. [#33](https://github.com/influxdata/influxdb-client-dart/pull/33): Add support for Parameterized Queries + +### Documentation +1. [#33](https://github.com/influxdata/influxdb-client-dart/pull/33): Add Parameterized Queries example + + ## 2.1.0 [2022-01-20] ### Bug Fixes diff --git a/README.md b/README.md index bf71eb6..5470a08 100644 --- a/README.md +++ b/README.md @@ -242,6 +242,54 @@ main() async { ``` + +#### Parameterized queries +InfluxDB Cloud supports [Parameterized Queries](https://docs.influxdata.com/influxdb/cloud/query-data/parameterized-queries/) +that let you dynamically change values in a query using the InfluxDB API. Parameterized queries make Flux queries more +reusable and can also be used to help prevent injection attacks. + +InfluxDB Cloud inserts the params object into the Flux query as a Flux record named `params`. Use dot or bracket +notation to access parameters in the `params` record in your Flux query. Parameterized Flux queries support only `int` +, `float`, and `string` data types. To convert the supported data types into +other [Flux basic data types, use Flux type conversion functions](https://docs.influxdata.com/influxdb/cloud/query-data/parameterized-queries/#supported-parameter-data-types). + +Parameterized query example: +> :warning: Parameterized Queries are supported only in InfluxDB Cloud, currently there is no support in InfluxDB OSS. + +```dart + +import 'package:influxdb_client/api.dart'; + +void main() async { + var client = InfluxDBClient( + url: 'http://localhost:8086', + token: 'my-token', + org: 'my-org', + bucket: 'my-bucket', + ); + + var queryService = client.getQueryService(); + + var queryService = client.getQueryService(); + var queryString = ''' + from(bucket: params.bucketParam) + |> range(start: duration(v: params.startParam)) + |> filter(fn: (r) => r["_measurement"] == "weather" + and r["location"] == "Prague")'''; + var queryParams = {'bucketParam':'my-bucket', 'startParam':'-10d'}; + var query = Query(query: queryString, params: queryParams); + + // Using string for query and Map for params + var recordMap = await queryService.query(queryString, params: queryParams); + + // Using Query class + var recordClass = await queryService.query(query); + + client.close(); +} + +``` + ### Delete points The [DeleteService](lib/client/delete_service.dart) supports deletes diff --git a/example/README.md b/example/README.md index 652ddf9..e44591a 100644 --- a/example/README.md +++ b/example/README.md @@ -6,6 +6,7 @@ ## Queries - [query_example.dart](query_example.dart) - How to query data into `Stream` and `CSV string` - [query_lines_example.dart](query_lines_example.dart) - How to query data into `CSV Stream` +- [parameterized_query_example.dart](parameterized_query_example.dart) - How to use parameterized Flux queries ## Management API - [create_bucket_example.dart](create_bucket_example.dart) - How to create Buckets diff --git a/example/parameterized_query_example.dart b/example/parameterized_query_example.dart new file mode 100644 index 0000000..05d9c0a --- /dev/null +++ b/example/parameterized_query_example.dart @@ -0,0 +1,71 @@ +import 'dart:async'; +import 'dart:math'; +import 'package:influxdb_client/api.dart'; + +/* + * warning: Parameterized Queries are supported only in InfluxDB Cloud, currently there is no support in InfluxDB OSS. + */ + +void main() async { + // Create InfluxDBClient + var client = InfluxDBClient( + url: 'https://us-west-2-1.aws.cloud2.influxdata.com', + token: 'my-token', + org: 'my-org', + bucket: 'my-bucket', + debug: true); + + // Create write service + var writeApi = client.getWriteService(WriteOptions().merge( + precision: WritePrecision.s, + batchSize: 100, + flushInterval: 5000, + gzip: true)); + + // Write data to InfluxDB + var data = List.empty(growable: true); + var random = Random(); + for (var i = 0; i < 5; i++) { + var temperature = random.nextInt(30); + data.add(Point('weather') + .addTag('location', 'Prague') + .addField('temperature', temperature) + .time(DateTime.now().subtract(Duration(days: i)).toUtc())); + } + await writeApi.write(data); + + // Create Query service and query + var queryService = client.getQueryService(); + var queryString = ''' + from(bucket: params.bucketParam) + |> range(start: duration(v: params.startParam)) + |> filter(fn: (r) => r["_measurement"] == "weather" + and r["location"] == "Prague")'''; + var queryParams = {'bucketParam': 'my-bucket', 'startParam': '-10d'}; + + // Using string for query and Map for params + print( + '\n\n------------------------------ Map params ------------------------------\n'); + var recordStream = await queryService.query(queryString, params: queryParams); + print( + '\n --------------------- Result --------------------- \n'); + await recordStream.forEach((record) { + print( + '\t\t Temperature in ${record['location']} at ${record['_time']} is ${record['_value']} °C'); + }); + + // Using Query class + print( + '\n\n------------------------------ Query class ------------------------------\n'); + var query = Query(query: queryString, params: queryParams); + recordStream = await queryService.query(query); + print( + '\n --------------------- Result --------------------- \n'); + await recordStream.forEach((record) { + print( + '\t\t Temperature in ${record['location']} at ${record['_time']} is ${record['_value']} °C'); + }); + + await Future.delayed(Duration(seconds: 10)); + client.close(); +} diff --git a/lib/client/query_service.dart b/lib/client/query_service.dart index dca125d..8590817 100644 --- a/lib/client/query_service.dart +++ b/lib/client/query_service.dart @@ -37,8 +37,13 @@ class QueryService extends DefaultService { /// /// Use this with care, all response is stored in memory. /// Result CSV format can be changed using [dialect]. - Future queryRaw(String fluxQuery, {Dialect? dialect}) async { - var query = Query(dialect: dialect, query: fluxQuery); + Future queryRaw(fluxQuery, + {Dialect? dialect, Map? params}) async { + var query = fluxQuery is Query ? fluxQuery : Query(query: fluxQuery); + + query.params = params ?? query.params; + query.dialect = dialect ?? query.dialect; + var uri = _buildUri(influxDB.url!, '/api/v2/query', {'org': influxDB.org}); var body = jsonEncode(query); Map headers = {}; @@ -51,19 +56,28 @@ class QueryService extends DefaultService { /// Streams the result of [fluxQuery] using [Dialect]. /// /// Each line is CSV parsed list of objects. - Future>> queryLines(String fluxQuery, - {Dialect? dialect}) async { - var q = Query(query: fluxQuery, dialect: dialect); - var response = await _send('/api/v2/query', {'org': influxDB.org}, q); + Future>> queryLines(fluxQuery, + {Dialect? dialect, Map? params}) async { + var query = fluxQuery is Query ? fluxQuery : Query(query: fluxQuery); + + query.params = params ?? query.params; + query.dialect = dialect ?? query.dialect; + + var response = await _send('/api/v2/query', {'org': influxDB.org}, query); return utf8.decoder .bind((response as StreamedResponse).stream) .transform(CsvToListConverter()); } /// Streams the result of query into [Stream] - Future> query(String fluxQuery) async { - var q = Query(query: fluxQuery, dialect: DEFAULT_dialect); - var response = await _send('/api/v2/query', {'org': influxDB.org}, q); + Future> query(fluxQuery, + {Map? params}) async { + var query = fluxQuery is Query ? fluxQuery : Query(query: fluxQuery); + + query.params = params ?? query.params; + query.dialect = query.dialect ?? DEFAULT_dialect; + + var response = await _send('/api/v2/query', {'org': influxDB.org}, query); return utf8.decoder .bind((response as StreamedResponse).stream) .transform(CsvToListConverter()) diff --git a/pubspec.yaml b/pubspec.yaml index 262d509..7befac9 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -7,7 +7,7 @@ environment: sdk: '>=2.12.0 <3.0.0' dependencies: - archive: '>=2.0.13 <=3.1.2' + archive: '>=2.0.13 <4.0.0' http: '>=0.13.0 <0.14.0' csv: '>=4.1.0 <6.0.0' universal_io: ^2.0.4 diff --git a/test/query_test.dart b/test/query_test.dart index 2bfd2da..39defac 100644 --- a/test/query_test.dart +++ b/test/query_test.dart @@ -1,6 +1,7 @@ import 'package:csv/csv.dart'; import 'package:http/http.dart'; import 'package:http/testing.dart'; +import 'package:influxdb_client/api.dart'; import 'package:test/test.dart'; import 'commons_test.dart'; @@ -78,6 +79,44 @@ void main() { expect(resp, oneTable); }); + test('queryRawParameterized', () async { + var mockClient = MockClient((request) async { + return Response(oneTable, 200); + }); + + client.client = mockClient; + + var query = '''from(bucket: params.bucketParam) + |> range(start: duration(v: params.startParam), stop: now()) + |> filter(fn: (r) => r._measurement == 'mem') + |> filter(fn: (r) => r._field == 'used') + '''; + var queryParams = {'bucketParam': 'my-bucket', 'startParam': '-5s'}; + + var resp = + await client.getQueryService().queryRaw(query, params: queryParams); + expect(resp, oneTable); + }); + + test('queryRawParameterizedClass', () async { + var mockClient = MockClient((request) async { + return Response(oneTable, 200); + }); + + client.client = mockClient; + + var queryString = '''from(bucket: params.bucketParam) + |> range(start: duration(v: params.startParam), stop: now()) + |> filter(fn: (r) => r._measurement == 'mem') + |> filter(fn: (r) => r._field == 'used') + '''; + var queryParams = {'bucketParam': 'my-bucket', 'startParam': '-5s'}; + var query = Query(query: queryString, params: queryParams); + + var resp = await client.getQueryService().queryRaw(query); + expect(resp, oneTable); + }); + test('queryOneTableStream', () async { var mockClient = MockClient((request) async { return Response(oneTable, 200); @@ -102,6 +141,55 @@ void main() { expect(list.length, 9); }); + test('queryStreamParameterized', () async { + var mockClient = MockClient((request) async { + return Response(oneTable, 200); + }); + + client.client = mockClient; + + var query = '''from(bucket: params.bucketParam) + |> range(start: duration(v: params.startParam), stop: now()) + |> filter(fn: (r) => r._measurement == 'mem') + |> filter(fn: (r) => r._field == 'used') + '''; + var queryParams = {'bucketParam': 'my-bucket', 'startParam': '-5s'}; + var resp = + await client.getQueryService().queryLines(query, params: queryParams); + + var list = await resp.toList(); + + list.forEach((element) { + print('-> $element'); + }); + expect(list.length, 9); + }); + + test('queryStreamParameterizedClass', () async { + var mockClient = MockClient((request) async { + return Response(oneTable, 200); + }); + + client.client = mockClient; + + var queryString = '''from(bucket: params.bucketParam) + |> range(start: duration(v: params.startParam), stop: now()) + |> filter(fn: (r) => r._measurement == 'mem') + |> filter(fn: (r) => r._field == 'used') + '''; + var queryParams = {'bucketParam': 'my-bucket', 'startParam': '-5s'}; + var query = Query(query: queryString, params: queryParams); + var resp = + await client.getQueryService().queryLines(query, params: queryParams); + + var list = await resp.toList(); + + list.forEach((element) { + print('-> $element'); + }); + expect(list.length, 9); + }); + test('queryOneTableFluxRecord', () async { var mockClient = MockClient((request) async { return Response(oneTable, 200); @@ -136,6 +224,77 @@ void main() { expect(res[4]['_time'], '2019-11-12T08:09:09Z'); }); + test('queryFluxRecordParameterized', () async { + var mockClient = MockClient((request) async { + return Response(oneTable, 200); + }); + + client.client = mockClient; + + var query = '''from(bucket: params.bucketParam) + |> range(start: duration(v: params.startParam), stop: now()) + |> filter(fn: (r) => r._measurement == 'mem') + |> filter(fn: (r) => r._field == 'used') + '''; + var queryParams = {'bucketParam': 'my-bucket', 'startParam': '-5s'}; + var resp = await client.getQueryService().query(query, params: queryParams); + + var res = await resp.toList(); + for (var r in res) { + print(r); + expect(r['table'], 0); + expect(r['_measurement'], 'mem'); + expect(r['host'], 'mac.local'); + } + + expect(res[0]['_value'], 11125907456); + expect(res[0]['_time'], '2019-11-12T08:09:05Z'); + expect(res[1]['_value'], 11127103488); + expect(res[1]['_time'], '2019-11-12T08:09:06Z'); + expect(res[2]['_value'], 11127291904); + expect(res[2]['_time'], '2019-11-12T08:09:07Z'); + expect(res[3]['_value'], 11126190080); + expect(res[3]['_time'], '2019-11-12T08:09:08Z'); + expect(res[4]['_value'], 11127832576); + expect(res[4]['_time'], '2019-11-12T08:09:09Z'); + }); + + test('queryFluxRecordParameterizedClass', () async { + var mockClient = MockClient((request) async { + return Response(oneTable, 200); + }); + + client.client = mockClient; + + var queryString = '''from(bucket: params.bucketParam) + |> range(start: duration(v: params.startParam), stop: now()) + |> filter(fn: (r) => r._measurement == 'mem') + |> filter(fn: (r) => r._field == 'used') + '''; + var queryParams = {'bucketParam': 'my-bucket', 'startParam': '-5s'}; + var query = Query(query: queryString, params: queryParams); + var resp = await client.getQueryService().query(query, params: queryParams); + + var res = await resp.toList(); + for (var r in res) { + print(r); + expect(r['table'], 0); + expect(r['_measurement'], 'mem'); + expect(r['host'], 'mac.local'); + } + + expect(res[0]['_value'], 11125907456); + expect(res[0]['_time'], '2019-11-12T08:09:05Z'); + expect(res[1]['_value'], 11127103488); + expect(res[1]['_time'], '2019-11-12T08:09:06Z'); + expect(res[2]['_value'], 11127291904); + expect(res[2]['_time'], '2019-11-12T08:09:07Z'); + expect(res[3]['_value'], 11126190080); + expect(res[3]['_time'], '2019-11-12T08:09:08Z'); + expect(res[4]['_value'], 11127832576); + expect(res[4]['_time'], '2019-11-12T08:09:09Z'); + }); + test('queryMultipleTableFluxRecord', () async { var mockClient = MockClient((request) async { return Response(multipleQueries, 200);