From a1819b12af41ca21675d72e56d6037695b72b15f Mon Sep 17 00:00:00 2001 From: teyeheimans Date: Thu, 17 Oct 2024 16:14:51 +0200 Subject: [PATCH] Implemented lookup management (#63) Added lookup() method to de DruidClient which returns a LookupBuilder. With this class you can create a new lookup in a chained way. It is also possible to list, update and delete lookups. --- CHANGELOG.md | 4 + README.md | 748 +++++++++++++- examples/Lookups.php | 40 + examples/Spatial.php | 3 +- src/DruidClient.php | 12 + src/Lookups/JdbcLookup.php | 119 +++ src/Lookups/KafkaLookup.php | 45 + src/Lookups/LookupBuilder.php | 580 +++++++++++ src/Lookups/LookupInterface.php | 14 + src/Lookups/MapLookup.php | 26 + src/Lookups/ParseSpecs/CsvParseSpec.php | 56 ++ .../ParseSpecs/CustomJsonParseSpec.php | 35 + src/Lookups/ParseSpecs/ParseSpecInterface.php | 14 + .../ParseSpecs/SimpleJsonParseSpec.php | 20 + src/Lookups/ParseSpecs/TsvParseSpec.php | 63 ++ src/Lookups/UriLookup.php | 48 + src/Lookups/UriPrefixLookup.php | 74 ++ src/Tasks/IndexTaskBuilder.php | 2 +- tests/DruidClientTest.php | 10 + tests/Lookups/JdbcLookupTest.php | 81 ++ tests/Lookups/KafkaLookupTest.php | 53 + tests/Lookups/LookupBuilderTest.php | 918 ++++++++++++++++++ tests/Lookups/MapLookupTest.php | 23 + tests/Lookups/ParseSpecs/CsvParseSpecTest.php | 50 + .../ParseSpecs/CustomJsonParseSpecTest.php | 27 + .../ParseSpecs/SimpleJsonParseSpecTest.php | 17 + tests/Lookups/ParseSpecs/TsvParseSpecTest.php | 55 ++ tests/Lookups/UriLookupTest.php | 59 ++ tests/Lookups/UriPrefixLookupTest.php | 59 ++ 29 files changed, 3239 insertions(+), 16 deletions(-) create mode 100644 examples/Lookups.php create mode 100644 src/Lookups/JdbcLookup.php create mode 100644 src/Lookups/KafkaLookup.php create mode 100644 src/Lookups/LookupBuilder.php create mode 100644 src/Lookups/LookupInterface.php create mode 100644 src/Lookups/MapLookup.php create mode 100644 src/Lookups/ParseSpecs/CsvParseSpec.php create mode 100644 src/Lookups/ParseSpecs/CustomJsonParseSpec.php create mode 100644 src/Lookups/ParseSpecs/ParseSpecInterface.php create mode 100644 src/Lookups/ParseSpecs/SimpleJsonParseSpec.php create mode 100644 src/Lookups/ParseSpecs/TsvParseSpec.php create mode 100644 src/Lookups/UriLookup.php create mode 100644 src/Lookups/UriPrefixLookup.php create mode 100644 tests/Lookups/JdbcLookupTest.php create mode 100644 tests/Lookups/KafkaLookupTest.php create mode 100644 tests/Lookups/LookupBuilderTest.php create mode 100644 tests/Lookups/MapLookupTest.php create mode 100644 tests/Lookups/ParseSpecs/CsvParseSpecTest.php create mode 100644 tests/Lookups/ParseSpecs/CustomJsonParseSpecTest.php create mode 100644 tests/Lookups/ParseSpecs/SimpleJsonParseSpecTest.php create mode 100644 tests/Lookups/ParseSpecs/TsvParseSpecTest.php create mode 100644 tests/Lookups/UriLookupTest.php create mode 100644 tests/Lookups/UriPrefixLookupTest.php diff --git a/CHANGELOG.md b/CHANGELOG.md index e899fb7..0e80670 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +**v4.1.0** + +Added support to manage lookups. See [DruidClient::lookup()](README.md#druidclientlookup) + **v4.0.0** - Removed GroupByV1. GroupBy v1 is a legacy engine and has not been supported since 2021. diff --git a/README.md b/README.md index d2ad5b0..a80a350 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,7 @@ DRUID_ROUTER_URL=http://druid-router.url:8080 - Support for building metricSpec and DimensionSpec in CompactTaskBuilder - Implement hadoop based batch ingestion (indexing) - Implement Avro Stream and Avro OCF input formats. + ## Examples There are several examples which are written on the single-server tutorial of druid. See [this](examples/README.md) page @@ -93,6 +94,7 @@ for more information. - [DruidClient](#druidclient) - [DruidClient::auth()](#druidclientauth) - [DruidClient::query()](#druidclientquery) + - [DruidClient::lookup()](#druidclientlookup) - [DruidClient::cancelQuery()](#druidclientcancelquery) - [DruidClient::compact()](#druidclientcompact) - [DruidClient::reindex()](#druidclientreindex) @@ -147,7 +149,7 @@ for more information. - [whereIn()](#wherein) - [orWhereIn()](#orwherein) - [whereArrayContains()](#wherearraycontains) - - [orWhereArrayContains()](#orwherearraycontains) + - [orWhereArrayContains()](#orwherearraycontains) - [whereBetween()](#wherebetween) - [orWhereBetween()](#orwherebetween) - [whereColumn()](#wherecolumn) @@ -201,6 +203,30 @@ for more information. - [scan()](#scan) - [timeseries()](#timeseries) - [search()](#search) + - [LookupBuilder: Generic Methods](#lookupbuilder-generic-methods) + - [all()](#all) + - [names()](#names) + - [introspect()](#introspect) + - [keys()](#keys) + - [values()](#values) + - [tiers()](#tiers) + - [store()](#store) + - [delete()](#delete) + - [LookupBuilder: Building Lookups](#lookupbuilder-building-lookups) + - [uri()](#uri) + - [uriPrefix()](#uriprefix) + - [kafka()](#kafka) + - [jdbc()](#jdbc) + - [map()](#map) + - [maxHeapPercentage()](maxheappercentage) + - [pollPeriod()](#pollperiod) + - [injective()](#injective) + - [firstCacheTimeout()](#firstcachetimeout) + - [LookupBuilder: Lookup Parse Specifications](#lookupbuilder-lookup-parse-specifications) + - [tsv()](#tsv) + - [csv()](#csv) + - [json()](#json) + - [customJson()](#customjson) - [Metadata](#metadata) - [intervals](#metadata-intervals) - [interval](#metadata-interval) @@ -322,7 +348,7 @@ Example of using a custom guzzle client: ```php -// Create a custom guzzle client which uses an http proxy. +// Create a custom guzzle client which uses a http proxy. $guzzleClient = new GuzzleHttp\Client([ 'proxy' => 'tcp://localhost:8125', 'timeout' => 30, @@ -331,7 +357,7 @@ $guzzleClient = new GuzzleHttp\Client([ // Create a new DruidClient, which uses our custom Guzzle Client $druidClient = new DruidClient( - ['router_url' => 'http://druid.router.com'], + ['router_url' => 'https://druid.router.com'], $guzzleClient ); @@ -387,6 +413,40 @@ to do this. See [QueryBuilder: Data Sources](#querybuilder-data-sources) See the following chapters for more information about the query builder. +#### `DruidClient::lookup()` + +The `lookup()` method gives you a `LookupBuilder` instance, which allows you to create/update, list or delete lookups. + +A lookup is a key-value list which is kept in-memory in druid. During queries, you can use these lists to transform +certain data. +For example, change a `user_id` to a human-readable `name`. + +See also [LookupBuilder: Generic Methods](#lookupbuilder-generic-methods). + +Example: + +```php +$client = new DruidClient(['router_url' => 'https://router.url:8080']); + +// Store a lookup, which is populated by fetching data from a database +$client->lookup()->jdbc( + connectUri: 'jdbc:mysql://localhost:3306/my_database', + username: 'username', + password: 'p4ssw0rd!', + table: 'users', + keyColumn: 'id', + valueColumn: 'name', + filter: "state='active'", + tsColumn: 'updated_at', + ) + ->pollPeriod('PT30M') // renew our data every 30 minutes + ->store( + lookupName: 'usernames', + tier: 'company' + ); + +``` + #### `DruidClient::cancelQuery()` The `cancelQuery()` method gives you the ability to cancel a query. To cancel a query, you must know its unique @@ -967,11 +1027,11 @@ To select a _dimension_, you can use one of the methods below: This method has the following arguments: -| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | -|-----------------|-----------------------|-------------------|-----------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| string or array | Required | `$dimension` | country_iso | The dimension which you want to select | -| string | Optional | `$as` | country | The name where the result will be available by in the result set. | -| string | Optional | `$outputType` | string | The output type of the data. If left unspecified, we will use `string`. | +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|-----------------|-----------------------|---------------|-------------|-------------------------------------------------------------------------| +| string or array | Required | `$dimension` | country_iso | The dimension which you want to select | +| string | Optional | `$as` | country | The name where the result will be available by in the result set. | +| string | Optional | `$outputType` | string | The output type of the data. If left unspecified, we will use `string`. | This method allows you to select a dimension in various way's, as shown in the example above. @@ -1006,7 +1066,6 @@ $builder->select([ ]) ``` - **Change the output type of the dimension:** ```php @@ -1684,7 +1743,6 @@ Example: $builder->whereArrayContains('features', 'myNewFeature'); ``` - #### `orWhereArrayContains()` Same as `whereArrayContains()`, but now we will join previous added filters with a `or` instead of an `and`. @@ -1800,7 +1858,7 @@ This method has the following arguments: | string | Required | `$dimension` | "flags" | The dimension where you want to filter on | | int | Required | `$flags` | 64 | The flags which should match in the given dimension (comparing with a bitwise AND) | | string | Optional | `$boolean` | "and" | This influences how this filter will be joined with previous added filters. Should both filters apply ("and") or one or the other ("or") ? Default is "and". | -| boolean | Optional | `$useJavascript` | true | Older versions do not yet support the bitwiseAnd expression. Set this parameter to `true` to use an javacript alternative instead. | +| boolean | Optional | `$useJavascript` | true | Older versions do not yet support the bitwiseAnd expression. Set this parameter to `true` to use an javascript alternative instead. | #### `orWhereFlags()` @@ -1977,7 +2035,7 @@ We also support using a `Closure` to group various havings in 1 filter. It will ```php $builder->having(function (FilterBuilder $filterBuilder) { - $filterBuilder->orHaving('sumKats', '>', 0); + $filterBuilder->orHaving('sumCats', '>', 0); $filterBuilder->orHaving('sumDogs', '>', 0); }); $builder->having('sumKids', '=', 0); @@ -2697,6 +2755,670 @@ The `searchRegex()` method has the following arguments: |----------|-----------------------|--------------|-------------|----------------------------------------------------------------| | string | Required | `$pattern` | "^Wiki" | A regular expression where the dimension should match against. | +## LookupBuilder: Generic Methods + +With the Lookup builder you can do everything related to lookups in Druid. See the coming chapters for more information. + +#### `all()` + +This method fetches **all** lookup configurations from **all** tiers and returns them in a large array. + +See also this page: https://druid.apache.org/docs/latest/api-reference/lookups-api/#get-all-lookups + +For example: + +```php +$config = $client->lookup()->all(); + +var_export($config); +``` + +The code above could return something like this: + +```php +array ( + '__default' => + array ( + 'test_map' => + array ( + 'version' => '2024-10-14T15:16:55.000Z', + 'lookupExtractorFactory' => + array ( + 'type' => 'map', + 'map' => + array ( + 'test1' => 'Test Number 1', + 'test2' => 'Test Number 2', + 'test3' => 'Test Number 3', + ), + ), + ), + 'usernames' => + array ( + 'version' => '2024-10-15T11:21:30.000Z', + 'lookupExtractorFactory' => + array ( + 'type' => 'cachedNamespace', + 'extractionNamespace' => + array ( + 'type' => 'jdbc', + 'connectorConfig' => + array ( + 'connectURI' => 'jdbc:mysql://database.example.com:3306/my_db_name', + 'user' => 'userN4me', + 'password' => 'p4ssw0rd!', + ), + 'table' => 'users', + 'keyColumn' => 'id', + 'valueColumn' => 'username', + 'filter' => 'status = "active"', + 'pollPeriod' => 'P15M', + 'jitterSeconds' => 300, + ), + 'injective' => false, + 'firstCacheTimeout' => 0, + ), + ), + ), +) +``` + +#### `names()` + +This method returns all lookup names defined within the given tier. +See also: https://druid.apache.org/docs/latest/api-reference/lookups-api/#list-lookup-names + +The `names()` method has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|----------|-----------------------|--------------|---------------|-----------------------------------------------------------------------------------| +| string | Optional | `$tier` | "my_business" | The name of the tier where we want to search in. By default this is `"__default"` | + +Example: + +```php + +$names = $client->lookup()->names(); + +var_export($names); +``` + +This will result a list of all lookups which are defined, for example: + +```php +array ( + 0 => 'usernames', + 1 => 'departments' +) +``` + +#### `introspect()` + +The `introspect()` method allows you fetch the current content of a lookup (so the key/value list). +See also: https://druid.apache.org/docs/latest/querying/lookups/#introspect-a-lookup + +The `introspect()` method has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|----------|-----------------------|---------------|----------------|---------------------------------------------------------------------| +| string | Required | `$lookupName` | "countryNames" | The name of the lookup where you want to retrieve the contents for. | + +Example: + +```php + +$names = $client->lookup()->introspect('countryNames'); + +var_export($names); +``` + +The result will be a key/value list of the contents of the lookup. For example: + +```php +array( + 'nl' => 'The Netherlands', + 'be' => 'Belgium', + 'de' => 'Germany' +) +``` + +#### `keys()` + +The `keys()` method allows you to fetch all keys for a given lookup. +See also: https://druid.apache.org/docs/latest/querying/lookups/#introspect-a-lookup + +The `keys()` method has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|----------|-----------------------|---------------|----------------|-----------------------------------------------------------------| +| string | Required | `$lookupName` | "countryNames" | The name of the lookup where you want to retrieve the keys for. | + +Example: + +```php + +$keys = $client->lookup()->keys('countryNames'); + +var_export($keys); +``` + +The result will be a keys of the contents of the lookup. For example: + +```php +array( + 0 => 'nl' + 1 => 'be' + 2 => 'de' +) +``` + +#### `values()` + +The `values()` method allows you to fetch all values for a given lookup. +See also: https://druid.apache.org/docs/latest/querying/lookups/#introspect-a-lookup + +The `values()` method has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|----------|-----------------------|---------------|----------------|-------------------------------------------------------------------| +| string | Required | `$lookupName` | "countryNames" | The name of the lookup where you want to retrieve the values for. | + +Example: + +```php + +$values = $client->lookup()->values('countryNames'); + +var_export($values); +``` + +The result will be a keys of the contents of the lookup. For example: + +```php +array( + 0 => 'The Netherlands', + 1 => 'Belgium', + 2 => 'Germany' +) +``` + +#### `tiers()` + +The `tiers()` method will return a list of all tier names known in the druid configuration. + +The `tiers()` method has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|----------|-----------------------|--------------|-------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------| +| boolean | Optional | `$discover` | `false` | When set to true (default), we will also discover all tiers which are active in the cluster, and not only in the ones known in the dynamic configuration. | + +Example: + +```php + +$tiers = $client->lookup()->tiers(); + +var_export($tiers); +``` + +The result will be a list of all tier names. For example: + +```php +array( + 0 => '__default', + 1 => 'project1', +) +``` + +#### `store()` + +The `store()` method stores the configured lookup in druid. If it did not exist yet, it will be created, otherwise it +will be updated. Be aware that the version number needs to be unique for the given lookup name. + +The `store()` method has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|-------------|-----------------------|----------------|-----------------|------------------------------------------------------------------------------------------------| +| string | Required | `$lookupName` | "country_names" | The name of the lookup to store. | +| string | Optional | `$tier` | "__default" | The name of the tier where we should store the lookup. Default value is `"__default"` | +| string/null | Optional | `$versionName` | "v10" | The version name. When not given (or `null`), we will use the current datetime as the version. | + +The store method requires a lookup configuration to be defined. This can be done by one of the lookup builder methods: + +- [uri()](#uri) +- [uriPrefix()](#uriprefix) +- [kafka()](#kafka) +- [jdbc()](#jdbc) +- [map()](#map) + +These builder methods should be called _before_ calling the `store()` method. + +Example: + +```php +// Store a lookup, which is populated by fetching data from a database +$client->lookup()->jdbc( + connectUri: 'jdbc:mysql://localhost:3306/my_database', + username: 'username', + password: 'p4ssw0rd!', + table: 'users', + keyColumn: 'id', + valueColumn: 'name', + filter: "state='active'", + tsColumn: 'updated_at', + ) + ->pollPeriod('PT30M') // renew our data every 30 minutes + ->store( + lookupName: 'usernames', + tier: 'company' + ); +``` + +The store method does not have any result. If the store fails, it will throw an exception. + +#### `delete()` + +The `delete()` method will delete a lookup from the cluster. If it was last lookup in the tier, then tier is deleted as +well. +See also: https://druid.apache.org/docs/latest/api-reference/lookups-api/#delete-lookup + +The `delete()` method has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|----------|-----------------------|---------------|----------------|--------------------------------------------------| +| string | Required | `$lookupName` | "countryNames" | The name of the lookup which you want to delete. | + +Example: + +```php +$client->lookup()->delete('countryNames'); +``` + +The delete method has no result. When the delete action fails, it will throw an exception. + +## LookupBuilder: Building Lookups + +The following methods allow you to _build_ a lookup so you can store it. + +#### `uri()` + +The `uri()` method allows you to define that lookup data should be read from a file. The file can be an "on disk file", +HDFS, S3 or GCS path. + +See also: https://druid.apache.org/docs/latest/querying/lookups-cached-global#uri-lookup + +It is required to also specify how we can parse the file. You can do this with one of the +[Parse Specification](#lookupbuilder-lookup-parse-specifications) methods. + +Also, if you want the lookup to be updated periodically, you should define a poll period. See [pollPeriod](#pollperiod) + +Finally, you can also define the max heap percentage, See: [maxHeapPercentage](#maxheappercentage) + +The `uri()` method has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|----------|-----------------------|--------------|----------------------------------|--------------------------------------------------------------| +| string | Required | `$uri` | "s3://bucket/some/key/file.json" | The path of the file which contains the data for the lookup. | + +Example: + +```php + +$client->lookup() + // define the location of the file + ->uri("s3://bucket/some/key/countries.tsv") + // define how the file should be parsed + ->tsv(["id", "country_name", "country_iso", "size"], "country_iso", "country_name") + // Refresh the lookup every hour + ->pollPeriod('PT60M') + // store the lookup + ->store("country_iso_to_name"); +``` + +#### `uriPrefix()` + +The `uriPrefix()` method allows you to define that lookup data should be read from one or multiple files. +The file can be an "on disk file", HDFS, S3 or GCS path. + +See also: https://druid.apache.org/docs/latest/querying/lookups-cached-global#uri-lookup + +It is required to also specify how we can parse the file(s). You can do this with one of the +[Parse Specification](#lookupbuilder-lookup-parse-specifications) methods. + +Also, if you want the lookup to be updated periodically, you should define a poll period. See [pollPeriod](#pollperiod) + +Finally, you can also define the max heap percentage, See: [maxHeapPercentage](#maxheappercentage) + +The `uriPrefix()` method has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|----------|-----------------------|--------------|----------------------|---------------------------------------------------------------------------------------| +| string | Required | `$uriPrefix` | "/mount/disk/files/" | The path prefix of where the file(s) can be found | +| string | Optional | `$fileRegex` | "*.json" | Optional regex for matching the file name under uriPrefix. By default `".*"` is used/ | + +Example: + +```php + +$client->lookup() + // define the location of the file(s) + ->uriPrefix("s3://bucket/some/key/", "*.json") + // define how the file should be parsed + ->customJson("country_iso", "country_title") + // store the lookup + ->store("country_iso_to_name"); +``` + +#### `kafka()` + +The `kafka()` method allows you to fetch the data for a lookup from a kafka topic. + +See for more information about how this works: https://druid.apache.org/docs/latest/querying/kafka-extraction-namespace + +It is also possible to specify that each key/value pair in the lookup is unique. When this is the case, you can use the +[injective](#injective) method to specify this. + +The `kafka()` method has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|--------------|-----------------------|--------------------|-------------------------------------------|------------------------------------------------------------------------| +| string | Required | `$kafkaTopic` | "customers" | The Kafka topic to read the data from | +| string/array | Required | `$servers` | "kafka1.service:9092,kafka2.service:9092" | A string or array with the server(s) | +| array | Optional | `$kafkaProperties` | `["enable.auto.commit" => true]` | Extra consumer properties. | +| int | Optional | `$connectTimeout` | 30 | How long to wait for an initial connection. Default is 0 (do not wait) | + +Example: + +```php + +$client->lookup() + // configure that our data comes from kafka + ->kafka( + "customers", + "kafka1.service:9092,kafka2.service:9092" + ) + // define that this data is one-to-one (unique) + ->injective() + // store the lookup + ->store("customer_id_to_name"); +``` + +#### `jdbc()` + +The `jdbc()` method allows you to fetch the data for a lookup from a database. + +See also: https://druid.apache.org/docs/latest/querying/lookups-cached-global/#jdbc-lookup + +If you want the lookup to be updated periodically, you should define a poll period. See [pollPeriod](#pollperiod) + +Finally, you can also define the max heap percentage, See: [maxHeapPercentage](#maxheappercentage) + +The `jdbc()` method has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|-------------|-----------------------|-----------------------|-------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| string | Required | `$connectUri` | "jdbc:mysql://localhost:3306/druid" | The URI where to connect to. | +| string/null | Required | `$username` | "johnny" | The username for the connection, or null when not used. | +| string/null | Required | `$password` | "fooBarBaz123;" | The password for the connection, or null when not used. | +| string | Required | `$table` | "customers" | A string or array with the server(s) | +| string | Required | `$keyColumn` | "id" | The column from the table which is used as key for the lookup. | +| string | Required | `$valueColumn` | "company_name | The column from the table which is used as value from the lookup. | +| string | Optional | `$filter` | status = 'active' and sector='it' | Specify a filter (like a where statement) which should be used in the query to fetch the data from the database. | +| string | Optional | `$tsColumn` | "updated_at" | Specify a column which contains a datetime. Druid will use this to only fetch rows from the database which have been changed since the last poll request. This reduces database load and is highly recommended! | +| int | Optional | `$jitterSeconds` | 300 | How much jitter to add (in seconds) up to maximum as a delay (actual value will be used as random from 0 to jitterSeconds), used to distribute db load more evenly. | +| int | Optional | `$loadTimeoutSeconds` | 60 | How much time (in seconds) it can take to query and populate lookup values. It will be helpful in lookup updates. On lookup update, it will wait maximum of loadTimeoutSeconds for new lookup to come up and continue serving from old lookup until new lookup successfully loads. | + +Example: + +```php +// Store a lookup, which is populated by fetching data from a database +$client->lookup()->jdbc( + connectUri: 'jdbc:mysql://localhost:3306/my_database', + username: 'username', + password: 'p4ssw0rd!', + table: 'users', + keyColumn: 'id', + valueColumn: 'name', + filter: "state='active'", + tsColumn: 'updated_at', + ) + ->pollPeriod('PT30M') // renew our data every 30 minutes + ->maxHeapPercentage(10) // 10% of JVM heap size + ->store( + lookupName: 'usernames', + tier: 'company' + ); +``` + +#### `map()` + +#### `maxHeapPercentage()` + +With this method you can set the maximum percentage of heap size that the lookup should consume. +If the lookup grows beyond this size, warning messages will be logged in the respective service logs. + +This method only applies for [jdbc](#jdbc), [uri](#uri) and [uriPrefix](#uriprefix) lookups. + +The `maxHeapPercentage()` has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|----------|-----------------------|----------------------|-------------|---------------------------------------------------------------------| +| int | Required | `$maxHeapPercentage` | 10 | The maximum percentage of heap size that the lookup should consume. | + +Example: + +```php +// Store a lookup, which is populated by fetching data from a database +$client->lookup() + ->jdbc( ... ) + ->maxHeapPercentage(10) // 10% of JVM heap size + ->store( ... ); +``` + +#### `pollPeriod()` + +With this method you can specify at which interval the data should be refreshed. + +This method only applies for [jdbc](#jdbc), [uri](#uri) and [uriPrefix](#uriprefix) lookups. + +The `pollPeriod()` has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|------------|-----------------------|--------------|-------------|--------------------------------------------------------------------------------------------------------------------| +| int/string | Required | `$period` | 600000 | Period between polling for updates. This can either be in milliseconds or in ISO 8601 format periods like "PT15M" | + +Example: + +```php +// Store a lookup, which is populated by fetching data from a database +$client->lookup() + ->jdbc( ... ) + ->pollPeriod('PT30M') // renew our data every 30 minutes + ->store( ... ); +``` + +#### `injective()` + +If the underlying lookup data is injective (keys and values are unique) then optimizations can occur internally by +setting this to true. + +This applies for all lookup types except for the `map()` lookup type. + +The `injective()` has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|----------|-----------------------|--------------|-------------|------------------------------------------------------------------------------------------------------------------------------------------------------------| +| boolean | Optional | `$injective` | false | Set injective to true or false. By default druid handles the data as NOT injective (false). When this method is called, we will set it by default to true. | + +Example: + +```php +// Store a lookup, which is populated by fetching data from a database +$client->lookup() + ->jdbc( ... ) + ->pollPeriod('PT30M') // renew our data every 30 minutes + ->store( ... ); +``` + +#### `firstCacheTimeout()` + +How long to wait (in ms) for the first run of the cache to populate. 0 indicates to not wait + +This applies for all lookup types. + +The `firstCacheTimeout()` has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|----------|-----------------------|------------------------|-------------|----------------------------------------------------------------------------------------------| +| int | Required | `$firstCacheTimeoutMs` | 5000 | How long to wait (in ms) for the first run of the cache to populate. 0 indicates to not wait | + +Example: + +```php +// Store a lookup, which is populated by fetching data from a database +$client->lookup() + ->jdbc( ... ) + ->firstCacheTimeout(5000) // ms to wait before first run. + ->store( ... ); +``` + +## LookupBuilder: Lookup Parse Specifications + +When a lookup is filled with data from a file, we need to know how to parse the file. +This can be done with the following parse specification types. + +These method only apply on the [uri](#uri) and [uriPrefix](#uriprefix) lookup types. + +#### `tsv()` + +With this method you can indicate that the file which is going to be processed is a TSV file. +TSV are files where the content is seperated most commonly by tabs. + +The `tsv()` method has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|------------|-----------------------|-------------------|-----------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| array/null | Required | `$columns` | `['id', 'type', 'age', 'company_name']` | List of columns in the tsv file. Set to `null` if the file already has a header row which can be used as column names. In this case you should set `$hasHeaderRow` to `true` | +| string | Optional | `$keyColumn` | "id" | The name of the column containing the key. When not specified the first column is used. | +| string | Optional | `$valueColumn` | "company_name" | The name of the column containing the value. When not specified the seconds column is used. | +| string | Optional | `$delimiter` | "\t" | The delimiter in the file. By default this is a tab (`\t`) | +| string | Optional | `$listDelimiter` | "\n" | The list delimiter in the file. By default this is a "Start of Header" (`\x01`) | +| boolean | Optional | `$hasHeaderRow` | true | Set to `true` to indicate that column information can be extracted from the input files header row | +| int | Optional | `$skipHeaderRows` | 2 | Number of header rows to be skipped. | + +If both skipHeaderRows and hasHeaderRow options are set, skipHeaderRows is first applied. For example, if you set +skipHeaderRows to 2 and hasHeaderRow to true, Druid will skip the first two lines and then extract column information +from the third line. + +Example: + +```php + +// Create our lookup based on TSV files +$client->lookup() + ->uri('/path/to/my/file.tsv') + ->tsv( + ['id', 'type', 'age', 'company_name', + 'id', + 'company_name', + "\t", + "\n" + ) + ->pollPeriod('PT5M') // Refresh every 5 minutes + ->store('company_names'); +``` + +#### `csv()` + +With this method you can indicate that the file which is going to be processed is a CSV file. +CSV are files where the content is seperated most comma's. + +The `csv()` method has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|------------|-----------------------|-------------------|-----------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| array/null | Required | `$columns` | `['id', 'type', 'age', 'company_name']` | List of columns in the csv file. Set to `null` if the file already has a header row which can be used as column names. In this case you should set `$hasHeaderRow` to `true` | +| string | Optional | `$keyColumn` | "id" | The name of the column containing the key. When not specified the first column is used. | +| string | Optional | `$valueColumn` | "company_name" | The name of the column containing the value. When not specified the seconds column is used. | +| boolean | Optional | `$hasHeaderRow` | true | Set to `true` to indicate that column information can be extracted from the input files header row | +| int | Optional | `$skipHeaderRows` | 2 | Number of header rows to be skipped. | + +If both skipHeaderRows and hasHeaderRow options are set, skipHeaderRows is first applied. For example, if you set +skipHeaderRows to 2 and hasHeaderRow to true, Druid will skip the first two lines and then extract column information +from the third line. + +Example: + +```php + +// Create our lookup based on CSV files +$client->lookup() + ->uri('s3://my_bucket/path/to/my/file.csv') + ->csv( + ['id', 'type', 'age', 'company_name', + 'id', + 'company_name' + ) + ->pollPeriod('PT5M') // Refresh every 5 minutes + ->store('company_names'); +``` + +#### `json()` + +With this method you can indicate that the file which is going to be processed is a JSON file. + +This method does not take any arguments. + +Example JSON content: + +``` +{ "foo": "bar" } +{ "baz": "bat" } +{ "buck": "truck" } +``` + +Example: + +```php + +// Create our lookup based on JSON files +$client->lookup() + ->uri('s3://my_bucket/path/to/my/companies.json') + ->json + ->pollPeriod('PT5M') // Refresh every 5 minutes + ->store('company_names'); +``` + +#### `customJson()` + +With this method you can indicate that the file which is going to be processed is a JSON file. + +The `customJson()` method has the following arguments: + +| **Type** | **Optional/Required** | **Argument** | **Example** | **Description** | +|----------|-----------------------|-------------------|---------------|-----------------------------| +| string | Required | `$keyFieldName` | "id" | The field name of the key | +| string | Required | `$valueFieldName` | "company_name | The field name of the value | + +Example JSON content: + +``` +{ "key": "foo", "value": "bar", "somethingElse": "something" } +{ "key": "baz", "value": "bat", "somethingElse": "something" } +{ "key": "buck", "somethingElse": "something", "value": "truck" } +``` + +Example: + +```php + +// Create our lookup based on JSON files +$client->lookup() + ->uri('/mount/disk/path/users.json') + ->customJson('id', 'username') + ->store('lookup_usernames'); +``` + ## QueryBuilder: Execute The Query The following methods allow you to execute the query which you have build using the other methods. There are various @@ -3232,7 +3954,7 @@ It will retrieve the structure for the last known interval, or for the interval Example: ```php -// Retrieve the strucutre of our dataSource +// Retrieve the structure of our dataSource $structure = $client->metadata()->structure('wikipedia'); ``` diff --git a/examples/Lookups.php b/examples/Lookups.php new file mode 100644 index 0000000..c57e62f --- /dev/null +++ b/examples/Lookups.php @@ -0,0 +1,40 @@ + 'http://127.0.0.1:8888']); + + // Enable this to see some more data + //$client->setLogger(new ConsoleLogger()); + + // Store a lookup. + $client->lookup() + ->map([ + 'nl' => 'The Netherlands', + 'de' => 'Germany', + 'be' => 'Belgium', + 'fr' => 'France', + 'it' => 'Italy', + 'es' => 'Spain', + ])->store('country_iso_to_name'); + + // List al lookup names + $names = $client->lookup()->names(); + + // Display the result as a console table. + new ConsoleTable(array_map(fn($name) => ["lookup name" => $name], $names)); +} catch (Throwable $exception) { + echo "Something went wrong during retrieving druid data\n"; + echo $exception->getMessage() . "\n"; + echo $exception->getTraceAsString(); +} \ No newline at end of file diff --git a/examples/Spatial.php b/examples/Spatial.php index a83bb59..f597234 100644 --- a/examples/Spatial.php +++ b/examples/Spatial.php @@ -22,8 +22,7 @@ ->select('Mountain') ->select('Country') ->select('Location') - ->whereSpatialRadius('Location', [28,84], 0.8) - ; + ->whereSpatialRadius('Location', [28, 84], 0.8); // Execute the query. $response = $builder->execute(); diff --git a/src/DruidClient.php b/src/DruidClient.php index 522095a..d63571a 100644 --- a/src/DruidClient.php +++ b/src/DruidClient.php @@ -12,6 +12,7 @@ use Level23\Druid\Queries\QueryBuilder; use Psr\Http\Message\ResponseInterface; use Level23\Druid\Tasks\KillTaskBuilder; +use Level23\Druid\Lookups\LookupBuilder; use GuzzleHttp\Exception\ServerException; use Level23\Druid\Queries\QueryInterface; use Level23\Druid\Tasks\IndexTaskBuilder; @@ -472,6 +473,17 @@ public function index(string $dataSource, InputSourceInterface $inputSource): In return new IndexTaskBuilder($this, $dataSource, $inputSource); } + /** + * Return a LookupBuilder instance. With this class you can do your lookup management, such as store, list and + * delete lookups. + * + * @return \Level23\Druid\Lookups\LookupBuilder + */ + public function lookup(): LookupBuilder + { + return new LookupBuilder($this); + } + /** * Create a re-index task for druid. * diff --git a/src/Lookups/JdbcLookup.php b/src/Lookups/JdbcLookup.php new file mode 100644 index 0000000..0d96516 --- /dev/null +++ b/src/Lookups/JdbcLookup.php @@ -0,0 +1,119 @@ + '2015-01-01 00:00:00'. If tsColumn is set, the + * caching service will attempt to only poll values that were written after the last sync. If tsColumn is not set, + * the entire table is pulled every time. + * + * @see https://druid.apache.org/docs/latest/configuration/#jdbc-connections-to-external-databases + * + * @param string $connectUri The JDBC connect Uri. You can selectively allow JDBC properties in + * connectURI. See JDBC connections security config for more details. + * @param string|null $username + * @param string|null $password + * @param string $table The table which contains the key value pairs + * @param string $keyColumn The column in table which contains the keys + * @param string $valueColumn The column in table which contains the values + * @param string|null $filter The filter to use when selecting lookups, this is used to create a + * where clause on lookup population. For example "age >= 18" + * @param string|null $tsColumn The column in table which contains when the key was updated + * @param int|null $jitterSeconds How much jitter to add (in seconds) up to maximum as a delay (actual + * value will be used as random from 0 to jitterSeconds), used to + * distribute db load more evenly + * @param int|null $loadTimeoutSeconds How much time (in seconds) it can take to query and populate lookup + * values. It will be helpful in lookup updates. On lookup update, it + * will wait maximum of loadTimeoutSeconds for new lookup to come up + * and continue serving from old lookup until new lookup successfully + * loads. + * @param string|int|null $pollPeriod The pollPeriod value specifies the period in ISO 8601 format between + * checks for replacement data for the lookup. For example PT15M. When + * not given, it is only once. + * @param int|null $maxHeapPercentage The maximum percentage of heap size that the lookup should consume. + * If the lookup grows beyond this size, warning messages will be + * logged in the respective service logs. + * @param bool $injective If the underlying map is injective (keys and values are unique) then + * optimizations can occur internally by setting this to true + * @param int $firstCacheTimeoutMs How long to wait (in ms) for the first run of the cache to populate. + * 0 indicates to not wait + */ + public function __construct( + protected string $connectUri, + protected string|null $username, + protected string|null $password, + protected string $table, + protected string $keyColumn, + protected string $valueColumn, + protected ?string $filter = null, + protected ?string $tsColumn = null, + protected ?int $jitterSeconds = null, + protected ?int $loadTimeoutSeconds = null, + protected null|int|string $pollPeriod = null, + protected ?int $maxHeapPercentage = null, + protected bool $injective = false, + protected int $firstCacheTimeoutMs = 0 + ) { + + } + + public function toArray(): array + { + $response = [ + 'type' => 'jdbc', + 'connectorConfig' => [ + 'connectURI' => $this->connectUri, + ], + 'table' => $this->table, + 'keyColumn' => $this->keyColumn, + 'valueColumn' => $this->valueColumn, + ]; + + if ($this->username !== null) { + $response['connectorConfig']['user'] = $this->username; + } + if ($this->password !== null) { + $response['connectorConfig']['password'] = $this->password; + } + + if ($this->filter !== null) { + $response['filter'] = $this->filter; + } + + if ($this->tsColumn) { + $response['tsColumn'] = $this->tsColumn; + } + + if ($this->pollPeriod !== null) { + $response['pollPeriod'] = $this->pollPeriod; + } + + if ($this->jitterSeconds !== null) { + $response['jitterSeconds'] = $this->jitterSeconds; + } + + if ($this->loadTimeoutSeconds !== null) { + $response['loadTimeoutSeconds'] = $this->loadTimeoutSeconds; + } + + if ($this->maxHeapPercentage !== null) { + $response['maxHeapPercentage'] = $this->maxHeapPercentage; + } + + return [ + 'type' => 'cachedNamespace', + 'extractionNamespace' => $response, + 'injective' => $this->injective, + 'firstCacheTimeout' => $this->firstCacheTimeoutMs, + ]; + } +} \ No newline at end of file diff --git a/src/Lookups/KafkaLookup.php b/src/Lookups/KafkaLookup.php new file mode 100644 index 0000000..cc9a27f --- /dev/null +++ b/src/Lookups/KafkaLookup.php @@ -0,0 +1,45 @@ + $servers + * @param array $kafkaProperties Kafka consumer properties + * @param int $connectTimeout How long to wait for an initial connection + * @param bool $isOneToOne The map is a one-to-one (like injective) + */ + public function __construct( + protected string $kafkaTopic, + protected string|array $servers, + protected array $kafkaProperties = [], + protected int $connectTimeout = 0, + protected bool $isOneToOne = false + ) { + $this->kafkaProperties['bootstrap.servers'] = + is_array($this->servers) + ? implode(',', $this->servers) + : $this->servers; + } + + /** + * @return array|int|bool> + */ + public function toArray(): array + { + return [ + 'type' => 'kafka', + 'kafkaTopic' => $this->kafkaTopic, + 'kafkaProperties' => $this->kafkaProperties, + 'connectTimeout' => $this->connectTimeout, + 'isOneToOne' => $this->isOneToOne, + ]; + } +} \ No newline at end of file diff --git a/src/Lookups/LookupBuilder.php b/src/Lookups/LookupBuilder.php new file mode 100644 index 0000000..da9f9a7 --- /dev/null +++ b/src/Lookups/LookupBuilder.php @@ -0,0 +1,580 @@ +|null + */ + protected ?string $lookupClass = null; + + /** + * @var array + */ + protected array $parameters = []; + + protected ?ParseSpecInterface $parseSpec = null; + + public function __construct(protected DruidClient $druidClient) + { + + } + + /** + * This will create or update a lookup. + * Assign a unique version identifier each time you update a lookup extractor factory. Otherwise, the call will + * fail. If no version was specified, we will automatically use the current date and time as version number. + * + * @see https://druid.apache.org/docs/latest/api-reference/lookups-api + * @see https://druid.apache.org/docs/latest/querying/lookups-cached-global + * @see https://druid.apache.org/docs/latest/querying/kafka-extraction-namespace + * + * @param string $lookupName + * @param string $tier + * @param string|null $versionName + * + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function store( + string $lookupName, + string $tier = '__default', + string $versionName = null + ): void { + if ($this->lookupClass === null) { + throw new InvalidArgumentException('No lookup defined. Please define the lookup by using the map, kafka, jdbc, uri or uriPrefix methods!'); + } + + if ($this->lookupClass === UriLookup::class || $this->lookupClass === UriPrefixLookup::class) { + + if ($this->parseSpec === null) { + throw new InvalidArgumentException('Using an URI lookup, but there is no parseSpec defined! Use the csv, tsv, simpleJson or customJson methods to define the parseSpec.'); + } + + $parameters = $this->parameters; + $parameters[] = $this->pollPeriod; + $parameters[] = $this->maxHeapPercentage; + $parameters[] = $this->injective; + $parameters[] = $this->firstCacheTimeoutMs; + $lookup = new $this->lookupClass( + $this->parseSpec, + ...$parameters + ); + } elseif ($this->lookupClass === JdbcLookup::class) { + $parameters = $this->parameters; + $parameters[] = $this->pollPeriod; + $parameters[] = $this->maxHeapPercentage; + $parameters[] = $this->injective; + $parameters[] = $this->firstCacheTimeoutMs; + + $lookup = new $this->lookupClass(...$parameters); + } elseif ($this->lookupClass === KafkaLookup::class) { + $parameters = $this->parameters; + $parameters[] = $this->injective; + + $lookup = new $this->lookupClass(...$parameters); + } else { + $lookup = new $this->lookupClass( + ...$this->parameters + ); + } + + $payload = [ + 'version' => $versionName ?? (new DateTime())->format('Y-m-d\TH:i:s.000\Z'), + 'lookupExtractorFactory' => $lookup->toArray(), + ]; + + $this->druidClient->executeRawRequest( + 'post', + $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config/' . $tier . '/' . $lookupName, + $payload, + ); + } + + /** + * Return all keys for the given lookup. + * + * @param string $lookupName + * + * @return array + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function keys(string $lookupName): array + { + /** + * Druid facilitates an endpoint for the keys: + * /druid/v1/lookups/introspect//keys + * + * Unfortunately the response is not valid json. Therefore, we cannot use it. + * + * @see https://github.com/apache/druid/issues/17361 + */ + $all = $this->introspect($lookupName); + + return array_keys($all); + } + + /** + * Return all values for the given lookup. + * + * @param string $lookupName + * + * @return array + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function values(string $lookupName): array + { + /** + * Druid facilitates an endpoint for the values: + * /druid/v1/lookups/introspect//values + * + * Unfortunately the response is not valid json. Therefore, we cannot use it. + * + * @see https://github.com/apache/druid/issues/17361 + */ + $all = $this->introspect($lookupName); + + return array_values($all); + } + + /** + * Return the content of the lookup + * + * @param string $lookupName + * + * @return array + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function introspect(string $lookupName): array + { + /** @var array $response */ + $response = $this->druidClient->executeRawRequest( + 'get', + $this->druidClient->config('broker_url') . '/druid/v1/lookups/introspect/' . $lookupName, + ); + + return $response; + } + + /** + * Delete the given lookup in the given tier. When this fails an exception is thrown. + * + * @param string $lookupName + * @param string $tier + * + * @return void + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function delete(string $lookupName, string $tier = '__default'): void + { + $this->druidClient->executeRawRequest( + 'delete', + $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config/' . $tier . '/' . $lookupName, + ); + } + + /** + * Return all tiers and all of their lookups in one large configuration array. + * + * + * @return array + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function all(): array + { + /** @var array $response */ + $response = $this->druidClient->executeRawRequest( + 'get', + $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config/all', + ); + + return $response; + } + + /** + * Return a list of known tier names in the dynamic configuration. + * + * @return array + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function tiers(bool $discover = true): array + { + /** @var array $response */ + $response = $this->druidClient->executeRawRequest( + 'get', + $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config?discover=' . ($discover ? 'true' : 'false'), + ); + + return $response; + } + + /** + * For Uri and JDBC lookups it is possible to define a first cache timeout. + * With this method you can set it. + * + * @param int $ms + * + * @return $this + */ + public function firstCacheTimeout(int $ms): self + { + $this->firstCacheTimeoutMs = $ms; + + return $this; + } + + /** + * For Uri and JDBC lookups it is possible to define if the content is injective or not. + * Injective means that each key item points to a unique value. So each key and value is unique. + * If so, druid kan make internal optimizations. + * + * @param bool $injective + * + * @return $this + */ + public function injective(bool $injective = true): self + { + $this->injective = $injective; + + return $this; + } + + /** + * Set the polling period for the lookup to configure. This is applied for JDBC, URI and URIPrefix lookups. + * + * @param string|int $period Period between polling for updates. For example PT10M for every 10 minutes, or use + * milliseconds like 600000. When not given, the data is fetched only once. + * + * @return $this + */ + public function pollPeriod(int|string $period): self + { + $this->pollPeriod = $period; + + return $this; + } + + /** + * Set the max heap percentage for the lookup to configure. This is applied for JDBC, URI and URIPrefix lookups. + * + * @param int $maxHeapPercentage The maximum percentage of heap size that the lookup should consume. If the lookup + * grows beyond this size, warning messages will be logged in the respective service + * logs. + * + * @return $this + */ + public function maxHeapPercentage(int $maxHeapPercentage): self + { + $this->maxHeapPercentage = $maxHeapPercentage; + + return $this; + } + + /** + * Return all lookup names defined under the given tier. + * + * @param string $tier + * + * @return array + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function names(string $tier = '__default'): array + { + /** @var array $response */ + $response = $this->druidClient->executeRawRequest( + 'get', + $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config/' . $tier, + ); + + return $response; + } + + /** + * Return the lookup as it is currently configured in Druid. + * + * @param string $name + * @param string $tier + * + * @return array + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function get(string $name, string $tier = '__default'): array + { + /** @var array $response */ + $response = $this->druidClient->executeRawRequest( + 'get', + $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config/' . $tier . '/' . $name, + ); + + return $response; + } + + /** + * Configure a map lookup. + * + * @param array $map + * + * @return \Level23\Druid\Lookups\LookupBuilder + */ + public function map(array $map): self + { + $this->lookupClass = MapLookup::class; + $this->parameters = [$map]; + + return $this; + } + + /** + * Configure a kafka lookup. + * + * @see https://druid.apache.org/docs/latest/querying/kafka-extraction-namespace + * + * @param string $kafkaTopic The Kafka topic to read the data from + * @param string|array $servers The kafka server(s), for example ["kafka1.service:9092", + * "kafka2.service:9092"] + * @param array $kafkaProperties Other optional kafka properties. + * @param int $connectTimeout How long to wait for an initial connection + * + * @return $this + */ + public function kafka( + string $kafkaTopic, + string|array $servers, + array $kafkaProperties = [], + int $connectTimeout = 0 + ): LookupBuilder { + $this->lookupClass = KafkaLookup::class; + $this->parameters = [$kafkaTopic, $servers, $kafkaProperties, $connectTimeout]; + + return $this; + } + + /** + * Configure a new JDBC lookup. + * + * @see https://druid.apache.org/docs/latest/querying/lookups-cached-global/#jdbc-lookup + * + * @param string $connectUri The URI where to connect to. For example + * "jdbc:mysql://localhost:3306/druid" + * @param string|null $username The username for the connection, or null when not used. + * @param string|null $password The password for the connection, or null when not used. + * @param string $table The table where to retrieve the data from. + * @param string $keyColumn The column from the table which is used as key for the lookup. + * @param string $valueColumn The column from the table which is used as value from the lookup. + * @param string|null $filter Specify a filter (like a where statement) which should be used in the + * query to fetch the data from the database. + * @param string|null $tsColumn Specify a column which contains a datetime. Druid will use this to + * only fetch rows from the database which have been changed since the + * last poll request. This reduces database load and is highly + * recommended! + * @param int|null $jitterSeconds How much jitter to add (in seconds) up to maximum as a delay (actual + * value will be used as random from 0 to jitterSeconds), used to + * distribute db load more evenly. + * @param int|null $loadTimeoutSeconds How much time (in seconds) it can take to query and populate lookup + * values. It will be helpful in lookup updates. On lookup update, it + * will wait maximum of loadTimeoutSeconds for new lookup to come up and + * continue serving from old lookup until new lookup successfully loads. + * + * @return $this + */ + public function jdbc( + string $connectUri, + string|null $username, + string|null $password, + string $table, + string $keyColumn, + string $valueColumn, + ?string $filter = null, + ?string $tsColumn = null, + ?int $jitterSeconds = null, + ?int $loadTimeoutSeconds = null + ): self { + $this->lookupClass = JdbcLookup::class; + $this->parameters = [ + $connectUri, + $username, + $password, + $table, + $keyColumn, + $valueColumn, + $filter, + $tsColumn, + $jitterSeconds, + $loadTimeoutSeconds, + ]; + + return $this; + } + + /** + * Configure a new URI lookup. Do not forget to specify the file specification by calling the `csv`, `tsv`, `json` + * or `customJson` methods. + * + * @param string $uri URI for the lookup file. Can be a file, HDFS, S3 or GCS path. + * + * @return $this + */ + public function uri(string $uri): self + { + $this->lookupClass = UriLookup::class; + $this->parameters = [$uri]; + + return $this; + } + + /** + * Configure a new URI lookup for files matching a given pattern. + * + * Do not forget to specify the file specification by calling the `csv`, `tsv`, `json` + * or `customJson` methods. + * + * @param string $uriPrefix A URI prefix that specifies a directory or other searchable resource where lookup + * files are located + * @param string|null $fileRegex Regex for matching the file name under uriPrefix, for example "*.json" + * + * @return $this + */ + public function uriPrefix(string $uriPrefix, ?string $fileRegex = null): self + { + $this->lookupClass = UriPrefixLookup::class; + $this->parameters = [$uriPrefix, $fileRegex]; + + return $this; + } + + /** + * Specify that the file which is being parsed by a URI or URIPrefix lookup is a CSV file. + * If both skipHeaderRows and hasHeaderRow options are set, skipHeaderRows is first applied. For example, if you + * set skipHeaderRows to 2 and hasHeaderRow to true, Druid will skip the first two lines and then extract column + * information from the third line. + * + * @param array|null $columns The list of columns in the csv file, or use null and set + * $hasHeaderRow to true to fetch it automatically. + * @param string|null $keyColumn The name of the column containing the key + * @param string|null $valueColumn The name of the column containing the value + * @param bool $hasHeaderRow Set to true to indicate that column information can be extracted + * from the input files' header row + * @param int $skipHeaderRows Number of header rows to be skipped + * + * @return $this + */ + public function csv( + ?array $columns, + ?string $keyColumn = null, + ?string $valueColumn = null, + bool $hasHeaderRow = false, + int $skipHeaderRows = 0 + ): LookupBuilder { + + $this->parseSpec = new CsvParseSpec($columns, $keyColumn, $valueColumn, $hasHeaderRow, $skipHeaderRows); + + return $this; + } + + /** + * Specify that the file which is being parsed by a URI or URIPrefix lookup is a JSON file. + * + * @param string $keyFieldName The field name of the key + * @param string $valueFieldName The field name of the value + * + * @return $this + */ + public function customJson( + string $keyFieldName, + string $valueFieldName + ): LookupBuilder { + $this->parseSpec = new CustomJsonParseSpec($keyFieldName, $valueFieldName); + + return $this; + } + + /** + * Specify that the file which is being parsed by a URI or URIPrefix lookup is a JSON file containing key => value + * items. For example: + * + * ``` + * {"foo": "bar"} + * {"baz": "bat"} + * {"buck": "truck"} + * ``` + * + * @return $this + */ + public function json(): LookupBuilder + { + $this->parseSpec = new SimpleJsonParseSpec(); + + return $this; + } + + /** + * Specify that the file which is being parsed by a URI or URIPrefix lookup is a TSV file. + * + * If both skipHeaderRows and hasHeaderRow options are set, skipHeaderRows is first applied. For example, if you + * set skipHeaderRows to 2 and hasHeaderRow to true, Druid will skip the first two lines and then extract column + * information from the third line. + * + * @param array|null $columns The list of columns in the TSV file, or use null and set + * $hasHeaderRow to true to fetch it automatically. + * @param string|null $keyColumn The name of the column containing the key + * @param string|null $valueColumn The name of the column containing the value + * @param string $delimiter The delimiter in the file + * @param string $listDelimiter The list delimiter in the file + * @param bool $hasHeaderRow Set to true to indicate that column information can be extracted + * from the input files' header row + * @param int $skipHeaderRows Number of header rows to be skipped + * + * @return $this + */ + public function tsv( + ?array $columns, + ?string $keyColumn = null, + ?string $valueColumn = null, + string $delimiter = "\t", + string $listDelimiter = "\x01", + bool $hasHeaderRow = false, + int $skipHeaderRows = 0 + ): LookupBuilder { + $this->parseSpec = new TsvParseSpec( + $columns, + $keyColumn, + $valueColumn, + $delimiter, + $listDelimiter, + $hasHeaderRow, + $skipHeaderRows + ); + + return $this; + } +} \ No newline at end of file diff --git a/src/Lookups/LookupInterface.php b/src/Lookups/LookupInterface.php new file mode 100644 index 0000000..d2364ea --- /dev/null +++ b/src/Lookups/LookupInterface.php @@ -0,0 +1,14 @@ +|bool|int> + */ + public function toArray(): array; +} \ No newline at end of file diff --git a/src/Lookups/MapLookup.php b/src/Lookups/MapLookup.php new file mode 100644 index 0000000..0a0e4b8 --- /dev/null +++ b/src/Lookups/MapLookup.php @@ -0,0 +1,26 @@ + $map + */ + public function __construct(protected array $map) + { + + } + + public function toArray(): array + { + return [ + 'type' => 'map', + 'map' => $this->map, + ]; + } +} \ No newline at end of file diff --git a/src/Lookups/ParseSpecs/CsvParseSpec.php b/src/Lookups/ParseSpecs/CsvParseSpec.php new file mode 100644 index 0000000..6f40591 --- /dev/null +++ b/src/Lookups/ParseSpecs/CsvParseSpec.php @@ -0,0 +1,56 @@ +|null $columns + * @param string|null $keyColumn + * @param string|null $valueColumn + * @param bool $hasHeaderRow + * @param int $skipHeaderRows + */ + public function __construct( + protected ?array $columns, + protected ?string $keyColumn = null, + protected ?string $valueColumn = null, + protected bool $hasHeaderRow = false, + protected int $skipHeaderRows = 0 + ) { + + } + + /** + * @return array|string|int> + */ + public function toArray(): array + { + $response = [ + 'format' => 'csv', + 'hasHeaderRow' => $this->hasHeaderRow, + ]; + + if ($this->columns !== null) { + $response['columns'] = $this->columns; + } + + if ($this->keyColumn !== null) { + $response['keyColumn'] = $this->keyColumn; + } + if ($this->valueColumn !== null) { + $response['valueColumn'] = $this->valueColumn; + } + if ($this->skipHeaderRows !== null) { + $response['skipHeaderRows'] = $this->skipHeaderRows; + } + + return $response; + } +} \ No newline at end of file diff --git a/src/Lookups/ParseSpecs/CustomJsonParseSpec.php b/src/Lookups/ParseSpecs/CustomJsonParseSpec.php new file mode 100644 index 0000000..f888011 --- /dev/null +++ b/src/Lookups/ParseSpecs/CustomJsonParseSpec.php @@ -0,0 +1,35 @@ + + */ + public function toArray(): array + { + return [ + 'format' => 'customJson', + 'keyFieldName' => $this->keyFieldName, + 'valueFieldName' => $this->valueFieldName, + ]; + } +} \ No newline at end of file diff --git a/src/Lookups/ParseSpecs/ParseSpecInterface.php b/src/Lookups/ParseSpecs/ParseSpecInterface.php new file mode 100644 index 0000000..09d7293 --- /dev/null +++ b/src/Lookups/ParseSpecs/ParseSpecInterface.php @@ -0,0 +1,14 @@ +|bool|int> + */ + public function toArray(): array; +} \ No newline at end of file diff --git a/src/Lookups/ParseSpecs/SimpleJsonParseSpec.php b/src/Lookups/ParseSpecs/SimpleJsonParseSpec.php new file mode 100644 index 0000000..ecccfa8 --- /dev/null +++ b/src/Lookups/ParseSpecs/SimpleJsonParseSpec.php @@ -0,0 +1,20 @@ + + */ + public function toArray(): array + { + return [ + 'format' => 'simpleJson', + ]; + } +} \ No newline at end of file diff --git a/src/Lookups/ParseSpecs/TsvParseSpec.php b/src/Lookups/ParseSpecs/TsvParseSpec.php new file mode 100644 index 0000000..a49e4e4 --- /dev/null +++ b/src/Lookups/ParseSpecs/TsvParseSpec.php @@ -0,0 +1,63 @@ +|null $columns + * @param string|null $keyColumn + * @param string|null $valueColumn + * @param string|null $delimiter + * @param string|null $listDelimiter + * @param bool $hasHeaderRow + * @param int $skipHeaderRows + */ + public function __construct( + protected null|array $columns, + protected ?string $keyColumn = null, + protected ?string $valueColumn = null, + protected ?string $delimiter = null, + protected ?string $listDelimiter = null, + protected bool $hasHeaderRow = false, + protected int $skipHeaderRows = 0 + ) { + + } + + /** + * @return array|string|int|null> + */ + public function toArray(): array + { + $response = [ + 'format' => 'tsv', + 'columns' => $this->columns, + 'hasHeaderRow' => $this->hasHeaderRow, + ]; + + if ($this->keyColumn !== null) { + $response['keyColumn'] = $this->keyColumn; + } + if ($this->valueColumn !== null) { + $response['valueColumn'] = $this->valueColumn; + } + if ($this->delimiter !== null) { + $response['delimiter'] = $this->delimiter; + } + if ($this->listDelimiter !== null) { + $response['listDelimiter'] = $this->listDelimiter; + } + if ($this->skipHeaderRows !== null) { + $response['skipHeaderRows'] = $this->skipHeaderRows; + } + + return $response; + } +} \ No newline at end of file diff --git a/src/Lookups/UriLookup.php b/src/Lookups/UriLookup.php new file mode 100644 index 0000000..3344594 --- /dev/null +++ b/src/Lookups/UriLookup.php @@ -0,0 +1,48 @@ + 'uri', + 'uri' => $this->uri, + 'namespaceParseSpec' => $this->parseSpec->toArray(), + ]; + + if ($this->pollPeriod !== null) { + $response['pollPeriod'] = $this->pollPeriod; + } + + if ($this->maxHeapPercentage !== null) { + $response['maxHeapPercentage'] = $this->maxHeapPercentage; + } + + return [ + 'type' => 'cachedNamespace', + 'extractionNamespace' => $response, + 'injective' => $this->injective, + 'firstCacheTimeout' => $this->firstCacheTimeoutMs, + ]; + } +} \ No newline at end of file diff --git a/src/Lookups/UriPrefixLookup.php b/src/Lookups/UriPrefixLookup.php new file mode 100644 index 0000000..2363b94 --- /dev/null +++ b/src/Lookups/UriPrefixLookup.php @@ -0,0 +1,74 @@ + 'uri', + 'uriPrefix' => $this->uriPrefix, + 'namespaceParseSpec' => $this->parseSpec->toArray(), + ]; + + if ($this->fileRegex !== null) { + $response['fileRegex'] = $this->fileRegex; + } + + if ($this->pollPeriod !== null) { + $response['pollPeriod'] = $this->pollPeriod; + } + + if ($this->maxHeapPercentage !== null) { + $response['maxHeapPercentage'] = $this->maxHeapPercentage; + } + + return [ + 'type' => 'cachedNamespace', + 'extractionNamespace' => $response, + 'injective' => $this->injective, + 'firstCacheTimeout' => $this->firstCacheTimeoutMs, + ]; + } +} \ No newline at end of file diff --git a/src/Tasks/IndexTaskBuilder.php b/src/Tasks/IndexTaskBuilder.php index 1a05dec..8f86e90 100644 --- a/src/Tasks/IndexTaskBuilder.php +++ b/src/Tasks/IndexTaskBuilder.php @@ -202,7 +202,7 @@ protected function buildTask(array|TaskContext $context): TaskInterface throw new InvalidArgumentException('You have to specify an timestamp column!'); } - if ($this->granularityType == ArbitraryGranularity::class) { + if ($this->granularityType === ArbitraryGranularity::class) { $granularity = new ArbitraryGranularity( $this->queryGranularity, $this->rollup, diff --git a/tests/DruidClientTest.php b/tests/DruidClientTest.php index 46ea767..9b96578 100644 --- a/tests/DruidClientTest.php +++ b/tests/DruidClientTest.php @@ -18,6 +18,7 @@ use Level23\Druid\Queries\QueryBuilder; use Psr\Http\Message\ResponseInterface; use Level23\Druid\Tasks\KillTaskBuilder; +use Level23\Druid\Lookups\LookupBuilder; use Level23\Druid\Tasks\IndexTaskBuilder; use Level23\Druid\Queries\QueryInterface; use GuzzleHttp\Exception\ServerException; @@ -123,6 +124,15 @@ public function testQuery(): void $client->query('randomDataSource', 'quarter'); } + public function testLookup(): void + { + $client = new DruidClient([]); + + $instance = $client->lookup(); + + $this->assertInstanceOf(LookupBuilder::class, $instance); + } + /** * @runInSeparateProcess * @preserveGlobalState disabled diff --git a/tests/Lookups/JdbcLookupTest.php b/tests/Lookups/JdbcLookupTest.php new file mode 100644 index 0000000..808fd7b --- /dev/null +++ b/tests/Lookups/JdbcLookupTest.php @@ -0,0 +1,81 @@ +assertEquals( + [ + 'type' => 'cachedNamespace', + 'extractionNamespace' => [ + 'type' => 'jdbc', + 'connectorConfig' => [ + 'connectURI' => 'jdbc:mysql://localhost:3306/druid', + ], + 'table' => 'countries', + 'keyColumn' => 'country_iso', + 'valueColumn' => 'country_name', + ], + 'injective' => false, + 'firstCacheTimeout' => 0, + ], + $lookup->toArray() + ); + + $lookup = new JdbcLookup( + 'jdbc:mysql://localhost:3306/druid', + 'myUser', + 'p4ssw0rd!', + 'countries', + 'country_iso', + 'country_name', + "region = 'eu'", + "updated_at", + 150, + 30, + 'PT15M', + 10 + ); + + $this->assertEquals( + [ + 'type' => 'cachedNamespace', + 'extractionNamespace' => [ + 'type' => 'jdbc', + 'connectorConfig' => [ + 'connectURI' => 'jdbc:mysql://localhost:3306/druid', + 'user' => 'myUser', + 'password' => 'p4ssw0rd!', + ], + 'table' => 'countries', + 'keyColumn' => 'country_iso', + 'valueColumn' => 'country_name', + 'filter' => "region = 'eu'", + 'tsColumn' => 'updated_at', + 'pollPeriod' => 'PT15M', + 'jitterSeconds' => 150, + 'loadTimeoutSeconds' => 30, + 'maxHeapPercentage' => 10, + ], + 'injective' => false, + 'firstCacheTimeout' => 0, + + ], + $lookup->toArray() + ); + } +} diff --git a/tests/Lookups/KafkaLookupTest.php b/tests/Lookups/KafkaLookupTest.php new file mode 100644 index 0000000..c5de146 --- /dev/null +++ b/tests/Lookups/KafkaLookupTest.php @@ -0,0 +1,53 @@ +assertEquals( + [ + 'type' => 'kafka', + 'kafkaTopic' => 'operators', + 'kafkaProperties' => [ + 'bootstrap.servers' => 'kafka1.server:9092,kafka2.server:9092', + ], + 'connectTimeout' => 0, + 'isOneToOne' => false, + ], + $lookup->toArray() + ); + + $lookup = new KafkaLookup( + 'countries', + 'kafka3.server:9092,kafka4.server:9092', + ['enable.auto.commit' => true], + 6000, + true + ); + + $this->assertEquals( + [ + 'type' => 'kafka', + 'kafkaTopic' => 'countries', + 'kafkaProperties' => [ + 'bootstrap.servers' => 'kafka3.server:9092,kafka4.server:9092', + 'enable.auto.commit' => true, + ], + 'connectTimeout' => 6000, + 'isOneToOne' => true, + ], + $lookup->toArray() + ); + } +} diff --git a/tests/Lookups/LookupBuilderTest.php b/tests/Lookups/LookupBuilderTest.php new file mode 100644 index 0000000..d4597a0 --- /dev/null +++ b/tests/Lookups/LookupBuilderTest.php @@ -0,0 +1,918 @@ +expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('No lookup defined. Please define the lookup by using the map, kafka, jdbc, uri or uriPrefix methods!'); + + $client->lookup()->store('samples'); + } + + /** + * @throws \Level23\Druid\Exceptions\QueryResponseException + * @throws \GuzzleHttp\Exception\GuzzleException + */ + public function testStoreWithoutParseSpec(): void + { + $client = new DruidClient([]); + + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Using an URI lookup, but there is no parseSpec defined! Use the csv, tsv, simpleJson or customJson methods to define the parseSpec.'); + + $client->lookup()->uri('/path/to/file.json')->store('file_names'); + } + + /** + * @testWith ["countries"] + * ["operators", "mobile"] + * ["countries", null, "v1"] + * + * @param string $lookupName + * @param string|null $tier + * @param string|null $version + * + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function testStoreUri(string $lookupName, ?string $tier = null, ?string $version = null): void + { + $client = Mockery::mock(DruidClient::class); + + $lookup = new UriLookup( + new SimpleJsonParseSpec(), + '/path/to/countries.json', + 'PT30M', + 10, + true, + 60000 + ); + + $client->shouldReceive('config') + ->once() + ->with('coordinator_url') + ->andReturn('https://coordinator.example.com'); + + $client->shouldReceive('executeRawRequest') + ->once() + ->with( + 'post', + 'https://coordinator.example.com/druid/coordinator/v1/lookups/config/' . ($tier ?? '__default') . '/' . $lookupName, + [ + 'version' => $version ?? (new DateTime())->format('Y-m-d\TH:i:s.000\Z'), + 'lookupExtractorFactory' => $lookup->toArray(), + ] + ); + + $builder = new LookupBuilder($client); + + $builder = $builder->uri('/path/to/countries.json') + ->json() + ->pollPeriod('PT30M') + ->maxHeapPercentage(10) + ->injective() + ->firstCacheTimeout(60000); + + if ($tier) { + $builder->store($lookupName, $tier, $version); + } else { + $builder->store( + lookupName: $lookupName, + versionName: $version + ); + } + } + + /** + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function testStoreJdbc(): void + { + $client = Mockery::mock(DruidClient::class); + + $lookup = new JdbcLookup( + 'jdbc:mysql://localhost:3306/druid', + 'druid', + 'pssswrrdd', + 'users', + 'id', + 'name', + "state='active'", + 'updated_at', + null, + null, + 'PT30M', + 20, + false, + 15000 + ); + + $client->shouldReceive('config') + ->once() + ->with('coordinator_url') + ->andReturn('https://coordinator.example.com'); + + $client->shouldReceive('executeRawRequest') + ->once() + ->withArgs(function ($method, $url, $parameters) use ($lookup) { + $this->assertEquals('post', $method); + $this->assertEquals( + 'https://coordinator.example.com/druid/coordinator/v1/lookups/config/company/usernames', + $url + ); + + $this->assertEquals( + [ + 'version' => (new DateTime())->format('Y-m-d\TH:i:s.000\Z'), + 'lookupExtractorFactory' => $lookup->toArray(), + ], + $parameters + ); + + return true; + }); + + $builder = new LookupBuilder($client); + + $builder->jdbc( + connectUri: 'jdbc:mysql://localhost:3306/druid', + username: 'druid', + password: 'pssswrrdd', + table: 'users', + keyColumn: 'id', + valueColumn: 'name', + filter: "state='active'", + tsColumn: 'updated_at', + ) + ->pollPeriod('PT30M') + ->maxHeapPercentage(20) + ->injective(false) + ->firstCacheTimeout(15000) + ->store( + lookupName: 'usernames', + tier: 'company' + ); + } + + /** + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function testStoreKafka(): void + { + $client = Mockery::mock(DruidClient::class); + + $lookup = new KafkaLookup( + 'clients', + 'kafka.service:9092', + ['enable.auto.commit' => true], + 30, + true + ); + + $client->shouldReceive('config') + ->once() + ->with('coordinator_url') + ->andReturn('https://coordinator.example.com'); + + $client->shouldReceive('executeRawRequest') + ->once() + ->with( + 'post', + 'https://coordinator.example.com/druid/coordinator/v1/lookups/config/__default/client_names', + [ + 'version' => (new DateTime())->format('Y-m-d\TH:i:s.000\Z'), + 'lookupExtractorFactory' => $lookup->toArray(), + ] + ); + + $builder = new LookupBuilder($client); + + $builder + ->kafka( + 'clients', + 'kafka.service:9092', + ['enable.auto.commit' => true], + 30 + ) + ->injective() + ->store( + lookupName: 'client_names' + ); + } + + /** + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function testStoreMap(): void + { + $client = Mockery::mock(DruidClient::class); + + $lookup = new MapLookup( + ['foo' => 'bar', 'zoo' => 'baz'], + ); + + $client->shouldReceive('config') + ->once() + ->with('coordinator_url') + ->andReturn('https://coordinator.example.com'); + + $client->shouldReceive('executeRawRequest') + ->once() + ->with( + 'post', + 'https://coordinator.example.com/druid/coordinator/v1/lookups/config/__default/foo_bar', + [ + 'version' => (new DateTime())->format('Y-m-d\TH:i:s.000\Z'), + 'lookupExtractorFactory' => $lookup->toArray(), + ] + ); + + $builder = new LookupBuilder($client); + + $builder + ->map(['foo' => 'bar', 'zoo' => 'baz']) + ->store( + lookupName: 'foo_bar' + ); + } + + /** + * @throws \Level23\Druid\Exceptions\QueryResponseException + * @throws \GuzzleHttp\Exception\GuzzleException + */ + public function testKeys(): void + { + $builder = Mockery::mock(LookupBuilder::class); + $builder->makePartial(); + + $builder->shouldReceive('introspect') + ->once() + ->with('countries') + ->andReturn([ + 'nl' => 'The Netherlands', + 'de' => 'Germany', + 'be' => 'Belgium', + ]); + + $response = $builder->keys('countries'); + + $this->assertEquals(['nl', 'de', 'be'], $response); + } + + /** + * @throws \Level23\Druid\Exceptions\QueryResponseException + * @throws \GuzzleHttp\Exception\GuzzleException + */ + public function testValues(): void + { + $builder = Mockery::mock(LookupBuilder::class); + $builder->makePartial(); + + $builder->shouldReceive('introspect') + ->once() + ->with('countries') + ->andReturn([ + 'nl' => 'The Netherlands', + 'de' => 'Germany', + 'be' => 'Belgium', + ]); + + $response = $builder->values('countries'); + + $this->assertEquals(['The Netherlands', 'Germany', 'Belgium'], $response); + } + + /** + * @throws \Level23\Druid\Exceptions\QueryResponseException + * @throws \GuzzleHttp\Exception\GuzzleException + */ + public function testIntrospect(): void + { + $data = [ + 'nl' => 'The Netherlands', + 'de' => 'Germany', + 'be' => 'Belgium', + ]; + + $client = Mockery::mock(DruidClient::class); + + $builder = new LookupBuilder($client); + + $client->shouldReceive('config') + ->once() + ->with('broker_url') + ->andReturn('https://broker.example.com'); + + $client->shouldReceive('executeRawRequest') + ->once() + ->with('get', 'https://broker.example.com/druid/v1/lookups/introspect/countries') + ->andReturn($data); + + $response = $builder->introspect('countries'); + + $this->assertEquals($data, $response); + } + + /** + * @testWith ["countries", "globeData"] + * ["titles"] + * @param string $lookupName + * @param string|null $tier + * + * @return void + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function testDelete(string $lookupName, string|null $tier = null): void + { + $client = Mockery::mock(DruidClient::class); + + $client->shouldReceive('config') + ->once() + ->with('coordinator_url') + ->andReturn('https://coordinator.example.com'); + + $client->shouldReceive('executeRawRequest') + ->once() + ->with('delete', + 'https://coordinator.example.com/druid/coordinator/v1/lookups/config/' . ($tier ?? '__default') . '/' . $lookupName); + + $builder = new LookupBuilder($client); + if ($tier) { + $builder->delete($lookupName, $tier); + } else { + $builder->delete($lookupName); + } + } + + /** + * @throws \Level23\Druid\Exceptions\QueryResponseException + * @throws \GuzzleHttp\Exception\GuzzleException + */ + public function testAll(): void + { + $all = [ + '__default' => + [ + 'test_map' => + [ + 'version' => '2024-10-14T15:16:55.000Z', + 'lookupExtractorFactory' => + [ + 'type' => 'map', + 'map' => + [ + 'test1' => 'Test Number 1', + 'test2' => 'Test Number 2', + 'test3' => 'Test Number 3', + ], + ], + ], + 'usernames' => + [ + 'version' => '2024-10-15T11:21:30.000Z', + 'lookupExtractorFactory' => + [ + 'type' => 'cachedNamespace', + 'extractionNamespace' => + [ + 'type' => 'jdbc', + 'connectorConfig' => + [ + 'connectURI' => 'jdbc:mysql://database.example.com:3306/my_db_name', + 'user' => 'userN4me', + 'password' => 'p4ssw0rd!', + ], + 'table' => 'users', + 'keyColumn' => 'id', + 'valueColumn' => 'username', + 'filter' => 'status = "active"', + 'pollPeriod' => 'P15M', + 'jitterSeconds' => 300, + ], + 'injective' => false, + 'firstCacheTimeout' => 0, + ], + ], + ], + ]; + $client = Mockery::mock(DruidClient::class); + + $builder = new LookupBuilder($client); + + $client->shouldReceive('config') + ->once() + ->with('coordinator_url') + ->andReturn('https://coordinator.example.com'); + + $client->shouldReceive('executeRawRequest') + ->once() + ->with('get', 'https://coordinator.example.com/druid/coordinator/v1/lookups/config/all') + ->andReturn($all); + + $response = $builder->all(); + + $this->assertEquals($all, $response); + } + + /** + * @testWith [true] + * [false] + * + * @param bool $discover + * + * @return void + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function testTiers(bool $discover): void + { + $data = [ + '__default', + 'tier1', + 'tier2', + ]; + $client = Mockery::mock(DruidClient::class); + + $builder = new LookupBuilder($client); + + $client->shouldReceive('config') + ->once() + ->with('coordinator_url') + ->andReturn('https://coordinator.example.com'); + + $client->shouldReceive('executeRawRequest') + ->once() + ->with('get', + 'https://coordinator.example.com/druid/coordinator/v1/lookups/config?discover=' . ($discover ? 'true' : 'false')) + ->andReturn($data); + + $response = $builder->tiers($discover); + + $this->assertEquals($data, $response); + } + + /** + * @throws \ReflectionException + */ + public function testFirstCacheTimeout(): void + { + $client = new DruidClient([]); + $builder = $client->lookup(); + + $this->assertEquals(0, $this->getProperty($builder, 'firstCacheTimeoutMs')); + + $result = $builder->firstCacheTimeout(1200); + $this->assertEquals($result, $builder); + + $this->assertEquals(1200, $this->getProperty($builder, 'firstCacheTimeoutMs')); + } + + /** + * @throws \ReflectionException + */ + public function testInjective(): void + { + $client = new DruidClient([]); + $builder = $client->lookup(); + + $this->assertFalse($this->getProperty($builder, 'injective')); + + $result = $builder->injective(); + $this->assertEquals($result, $builder); + $this->assertTrue($this->getProperty($builder, 'injective')); + + $builder->injective(false); + $this->assertFalse($this->getProperty($builder, 'injective')); + } + + /** + * @throws \ReflectionException + */ + public function testPollPeriod(): void + { + $client = new DruidClient([]); + $builder = $client->lookup(); + + $this->assertEquals(null, $this->getProperty($builder, 'pollPeriod')); + + $result = $builder->pollPeriod(60000); + $this->assertEquals($result, $builder); + $this->assertEquals(60000, $this->getProperty($builder, 'pollPeriod')); + + $builder->pollPeriod('PT15M'); + $this->assertEquals('PT15M', $this->getProperty($builder, 'pollPeriod')); + } + + /** + * @throws \ReflectionException + */ + public function testMaxHeapPercentage(): void + { + $client = new DruidClient([]); + $builder = $client->lookup(); + + $this->assertEquals(null, $this->getProperty($builder, 'maxHeapPercentage')); + + $percentage = rand(1, 99); + $result = $builder->maxHeapPercentage($percentage); + $this->assertEquals($result, $builder); + $this->assertEquals($percentage, $this->getProperty($builder, 'maxHeapPercentage')); + } + + /** + * @testWith ["__default"] + * [] + * ["production"] + * + * @param string|null $tier + * + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function testNames(?string $tier = null): void + { + $data = [ + 'usernames', + 'countries', + ]; + $client = Mockery::mock(DruidClient::class); + + $builder = new LookupBuilder($client); + + $client->shouldReceive('config') + ->once() + ->with('coordinator_url') + ->andReturn('https://coordinator.example.com'); + + $client->shouldReceive('executeRawRequest') + ->once() + ->with('get', + 'https://coordinator.example.com/druid/coordinator/v1/lookups/config/' . ($tier ?? '__default')) + ->andReturn($data); + + $response = $tier ? $builder->names($tier) : $builder->names(); + + $this->assertEquals($data, $response); + } + + /** + * @testWith ["test_map", "__default"] + * ["countries"] + * + * @param string $name + * @param string|null $tier + * + * @throws \GuzzleHttp\Exception\GuzzleException + * @throws \Level23\Druid\Exceptions\QueryResponseException + */ + public function testGet(string $name, ?string $tier = null): void + { + $data = [ + 'version' => '2024-10-14T15:16:55.000Z', + 'lookupExtractorFactory' => + [ + 'type' => 'map', + 'map' => + [ + 'test1' => 'Test Nummer 1', + 'test2' => 'Test Nummer 2', + 'test3' => 'Test Nummer 3', + ], + ], + ]; + $client = Mockery::mock(DruidClient::class); + + $builder = new LookupBuilder($client); + + $client->shouldReceive('config') + ->once() + ->with('coordinator_url') + ->andReturn('https://coordinator.example.com'); + + $client->shouldReceive('executeRawRequest') + ->once() + ->with('get', + 'https://coordinator.example.com/druid/coordinator/v1/lookups/config/' . ($tier ?? '__default') . '/' . $name) + ->andReturn($data); + + $response = $tier ? $builder->get($name, $tier) : $builder->get($name); + + $this->assertEquals($data, $response); + } + + /** + * @throws \ReflectionException + */ + public function testMap(): void + { + $client = new DruidClient([]); + $builder = $client->lookup(); + + $data = ['foo' => 'FooBar', 'bar' => 'BarBaz']; + $result = $builder->map($data); + + $this->assertEquals($result, $builder); + + $this->assertEquals(MapLookup::class, $this->getProperty($builder, 'lookupClass')); + $this->assertEquals([$data], $this->getProperty($builder, 'parameters')); + } + + /** + * @throws \ReflectionException + */ + public function testKafka(): void + { + $client = new DruidClient([]); + $builder = $client->lookup(); + + $result = $builder->kafka( + 'users', + 'kafka.server1:9092,kafka.server2.9092', + ['enable.auto.commit' => true], + 30 + ); + + $this->assertEquals($result, $builder); + + $this->assertEquals(KafkaLookup::class, $this->getProperty($builder, 'lookupClass')); + $this->assertEquals([ + 'users', + 'kafka.server1:9092,kafka.server2.9092', + ['enable.auto.commit' => true], + 30, + ], $this->getProperty($builder, 'parameters')); + + $builder->kafka( + 'groups', + ['kafka12.server:9092', 'kafka13.server.9092'] + ); + + $this->assertEquals(KafkaLookup::class, $this->getProperty($builder, 'lookupClass')); + $this->assertEquals([ + 'groups', + ['kafka12.server:9092', 'kafka13.server.9092'], + [], + 0, + ], $this->getProperty($builder, 'parameters')); + } + + /** + * @throws \ReflectionException + */ + public function testJdbc(): void + { + $client = new DruidClient([]); + $builder = $client->lookup(); + + $result = $builder->jdbc( + 'jdbc:mysql://localhost:3306/my_database', + null, + null, + 'users', + 'id', + 'username', + "state='active'", + 'updated_at', + 300, + 30 + ); + + $this->assertEquals($result, $builder); + + $this->assertEquals(JdbcLookup::class, $this->getProperty($builder, 'lookupClass')); + $this->assertEquals([ + 'jdbc:mysql://localhost:3306/my_database', + null, + null, + 'users', + 'id', + 'username', + "state='active'", + 'updated_at', + 300, + 30, + ], $this->getProperty($builder, 'parameters')); + + $builder->jdbc( + 'jdbc:mysql://localhost:3306/other_db', + 'pietertje', + 'PAARSWOORD', + 'countries', + 'id', + 'title', + ); + + $this->assertEquals(JdbcLookup::class, $this->getProperty($builder, 'lookupClass')); + $this->assertEquals([ + 'jdbc:mysql://localhost:3306/other_db', + 'pietertje', + 'PAARSWOORD', + 'countries', + 'id', + 'title', + null, + null, + null, + null, + ], $this->getProperty($builder, 'parameters')); + } + + /** + * @throws \ReflectionException + */ + public function testUri(): void + { + $client = new DruidClient([]); + $builder = $client->lookup(); + + $result = $builder->uri('/var/mount/files/content.json'); + $this->assertEquals($result, $builder); + + $this->assertEquals(UriLookup::class, $this->getProperty($builder, 'lookupClass')); + $this->assertEquals([ + '/var/mount/files/content.json', + ], $this->getProperty($builder, 'parameters')); + } + + /** + * @throws \ReflectionException + */ + public function testUriPrefix(): void + { + $client = new DruidClient([]); + $builder = $client->lookup(); + + $result = $builder->uriPrefix('/var/mount/files/'); + $this->assertEquals($result, $builder); + + $this->assertEquals(UriPrefixLookup::class, $this->getProperty($builder, 'lookupClass')); + $this->assertEquals([ + '/var/mount/files/', + null, + ], $this->getProperty($builder, 'parameters')); + + $builder->uriPrefix("s3://bucket/some/key/prefix/", "renames-[0-9]*\\.gz"); + + $this->assertEquals(UriPrefixLookup::class, $this->getProperty($builder, 'lookupClass')); + $this->assertEquals([ + "s3://bucket/some/key/prefix/", + "renames-[0-9]*\\.gz", + ], $this->getProperty($builder, 'parameters')); + } + + /** + * @testWith [true] + * [false] + * + * @runInSeparateProcess + * @preserveGlobalState disabled + * + * @param bool $withDefaults + */ + public function testCsv(bool $withDefaults): void + { + $client = new DruidClient([]); + $builder = $client->lookup(); + + $parseSpec = Mockery::mock('overload:' . CsvParseSpec::class, ParseSpecInterface::class); + $parseSpec->shouldReceive('__construct') + ->once() + ->with( + $withDefaults ? ['id', 'first', 'last', 'username', 'address'] : null, + 'id', + 'username', + !$withDefaults, + $withDefaults ? 0 : 2 + ); + + if ($withDefaults) { + $result = $builder->csv( + ['id', 'first', 'last', 'username', 'address'], + 'id', + 'username' + ); + } else { + $result = $builder->csv( + null, + 'id', + 'username', + true, + 2 + ); + } + $this->assertEquals($result, $builder); + } + + /** + * @testWith [true] + * [false] + * + * @runInSeparateProcess + * @preserveGlobalState disabled + * + * @param bool $withDefaults + */ + public function testTsv(bool $withDefaults): void + { + $client = new DruidClient([]); + $builder = $client->lookup(); + + $parseSpec = Mockery::mock('overload:' . TsvParseSpec::class, ParseSpecInterface::class); + $parseSpec->shouldReceive('__construct') + ->once() + ->with( + $withDefaults ? ['id', 'first', 'last', 'username', 'address'] : null, + 'id', + 'username', + $withDefaults ? "\t" : ";", + $withDefaults ? "\x01" : "\n", + !$withDefaults, + $withDefaults ? 0 : 5 + ); + + if ($withDefaults) { + $result = $builder->tsv( + ['id', 'first', 'last', 'username', 'address'], + 'id', + 'username' + ); + } else { + $result = $builder->tsv( + null, + 'id', + 'username', + ";", + "\n", + true, + 5 + ); + } + $this->assertEquals($result, $builder); + } + + /** + * @runInSeparateProcess + * @preserveGlobalState disabled + */ + public function testCustomJson(): void + { + $client = new DruidClient([]); + $builder = $client->lookup(); + + $parseSpec = Mockery::mock('overload:' . CustomJsonParseSpec::class, ParseSpecInterface::class); + $parseSpec->shouldReceive('__construct') + ->once() + ->with('id', 'name'); + + $result = $builder->customJson('id', 'name'); + $this->assertEquals($result, $builder); + } + + /** + * @runInSeparateProcess + * @preserveGlobalState disabled + */ + public function testJson(): void + { + $client = new DruidClient([]); + $builder = $client->lookup(); + + $parseSpec = Mockery::mock('overload:' . SimpleJsonParseSpec::class, ParseSpecInterface::class); + $parseSpec->shouldReceive('__construct') + ->once(); + + $result = $builder->json(); + $this->assertEquals($result, $builder); + } +} \ No newline at end of file diff --git a/tests/Lookups/MapLookupTest.php b/tests/Lookups/MapLookupTest.php new file mode 100644 index 0000000..cbdb5f8 --- /dev/null +++ b/tests/Lookups/MapLookupTest.php @@ -0,0 +1,23 @@ + 'Foo', 'bar' => 'Baz']); + + $this->assertEquals( + [ + 'type' => 'map', + 'map' => ['foo' => 'Foo', 'bar' => 'Baz'], + ], + $lookup->toArray() + ); + } +} diff --git a/tests/Lookups/ParseSpecs/CsvParseSpecTest.php b/tests/Lookups/ParseSpecs/CsvParseSpecTest.php new file mode 100644 index 0000000..e5b1046 --- /dev/null +++ b/tests/Lookups/ParseSpecs/CsvParseSpecTest.php @@ -0,0 +1,50 @@ +assertEquals( + [ + 'format' => 'csv', + 'hasHeaderRow' => false, + 'columns' => ['id', 'name', 'alias', 'costs', 'title'], + 'keyColumn' => 'id', + 'valueColumn' => 'title', + 'skipHeaderRows' => 0, + ], + $parseSpec->toArray() + ); + + $parseSpec = new CsvParseSpec( + null, + 'id', + 'title', + true, + 2 + ); + + $this->assertEquals( + [ + 'format' => 'csv', + 'hasHeaderRow' => true, + 'keyColumn' => 'id', + 'valueColumn' => 'title', + 'skipHeaderRows' => 2, + ], + $parseSpec->toArray() + ); + } +} diff --git a/tests/Lookups/ParseSpecs/CustomJsonParseSpecTest.php b/tests/Lookups/ParseSpecs/CustomJsonParseSpecTest.php new file mode 100644 index 0000000..8da9cf1 --- /dev/null +++ b/tests/Lookups/ParseSpecs/CustomJsonParseSpecTest.php @@ -0,0 +1,27 @@ +assertEquals( + [ + 'format' => 'customJson', + 'keyFieldName' => 'id', + 'valueFieldName' => 'title', + ], + $parseSpec->toArray() + ); + } +} diff --git a/tests/Lookups/ParseSpecs/SimpleJsonParseSpecTest.php b/tests/Lookups/ParseSpecs/SimpleJsonParseSpecTest.php new file mode 100644 index 0000000..6563da1 --- /dev/null +++ b/tests/Lookups/ParseSpecs/SimpleJsonParseSpecTest.php @@ -0,0 +1,17 @@ +assertEquals(['format' => 'simpleJson'], $parseSpec->toArray()); + } +} diff --git a/tests/Lookups/ParseSpecs/TsvParseSpecTest.php b/tests/Lookups/ParseSpecs/TsvParseSpecTest.php new file mode 100644 index 0000000..7b212d6 --- /dev/null +++ b/tests/Lookups/ParseSpecs/TsvParseSpecTest.php @@ -0,0 +1,55 @@ +assertEquals( + [ + 'format' => 'tsv', + 'hasHeaderRow' => false, + 'columns' => ['id', 'name', 'alias', 'costs', 'title'], + 'keyColumn' => 'id', + 'valueColumn' => 'title', + 'skipHeaderRows' => 0, + ], + $parseSpec->toArray() + ); + + $parseSpec = new TsvParseSpec( + null, + 'id', + 'title', + "\t", + "\n", + true, + 3 + ); + + $this->assertEquals( + [ + 'format' => 'tsv', + 'hasHeaderRow' => true, + 'keyColumn' => 'id', + 'valueColumn' => 'title', + 'skipHeaderRows' => 3, + 'columns' => null, + 'delimiter' => "\t", + 'listDelimiter' => "\n", + ], + $parseSpec->toArray() + ); + } +} diff --git a/tests/Lookups/UriLookupTest.php b/tests/Lookups/UriLookupTest.php new file mode 100644 index 0000000..1ac4044 --- /dev/null +++ b/tests/Lookups/UriLookupTest.php @@ -0,0 +1,59 @@ +assertEquals([ + 'type' => 'cachedNamespace', + 'extractionNamespace' => [ + 'type' => 'uri', + 'uri' => '/mount/files/data.json', + 'namespaceParseSpec' => $parseSpec->toArray(), + ], + 'injective' => false, + 'firstCacheTimeout' => 0, + ], $lookup->toArray()); + + $parseSpec = new CsvParseSpec( + ['key', 'value'], 'key', 'value' + ); + + $lookup = new UriLookup( + $parseSpec, + "s3://bucket/some/key/prefix/renames-0003.gz", + 'PT15M', + 15, + true, + 6000 + ); + + $this->assertEquals([ + 'type' => 'cachedNamespace', + 'extractionNamespace' => [ + 'type' => 'uri', + 'uri' => 's3://bucket/some/key/prefix/renames-0003.gz', + 'namespaceParseSpec' => $parseSpec->toArray(), + 'pollPeriod' => 'PT15M', + 'maxHeapPercentage' => 15, + ], + 'injective' => true, + 'firstCacheTimeout' => 6000, + ], $lookup->toArray()); + } +} diff --git a/tests/Lookups/UriPrefixLookupTest.php b/tests/Lookups/UriPrefixLookupTest.php new file mode 100644 index 0000000..14766dd --- /dev/null +++ b/tests/Lookups/UriPrefixLookupTest.php @@ -0,0 +1,59 @@ +assertEquals([ + 'type' => 'cachedNamespace', + 'extractionNamespace' => [ + 'type' => 'uri', + 'uriPrefix' => 's3://bucket/some/key/prefix/', + 'fileRegex' => 'renames-[0-9]*\.gz', + 'namespaceParseSpec' => $parseSpec->toArray(), + ], + 'injective' => false, + 'firstCacheTimeout' => 0, + ], $lookup->toArray()); + + $lookup = new UriPrefixLookup( + $parseSpec, + '/mount/files/', + null, + 'PT15M', + 15, + true, + 6000 + ); + + $this->assertEquals([ + 'type' => 'cachedNamespace', + 'extractionNamespace' => [ + 'type' => 'uri', + 'uriPrefix' => '/mount/files/', + 'namespaceParseSpec' => $parseSpec->toArray(), + 'pollPeriod' => 'PT15M', + 'maxHeapPercentage' => 15, + ], + 'injective' => true, + 'firstCacheTimeout' => 6000, + ], $lookup->toArray()); + } +}