Skip to content

Commit

Permalink
Segments sorted by non-time columns. (#16849)
Browse files Browse the repository at this point in the history
* Segments primarily sorted by non-time columns.

Currently, segments are always sorted by __time, followed by the sort
order provided by the user via dimensionsSpec or CLUSTERED BY. Sorting
by __time enables efficient execution of queries involving time-ordering
or granularity. Time-ordering is a simple matter of reading the rows in
stored order, and granular cursors can be generated in streaming fashion.

However, for various workloads, it's better for storage footprint and
query performance to sort by arbitrary orders that do not start with __time.
With this patch, users can sort segments by such orders.

For spec-based ingestion, users add "useExplicitSegmentSortOrder: true" to
dimensionsSpec. The "dimensions" list determines the sort order. To
define a sort order that includes "__time", users explicitly
include a dimension named "__time".

For SQL-based ingestion, users set the context parameter
"useExplicitSegmentSortOrder: true". The CLUSTERED BY clause is then
used as the explicit segment sort order.

In both cases, when the new "useExplicitSegmentSortOrder" parameter is
false (the default), __time is implicitly prepended to the sort order,
as it always was prior to this patch.

The new parameter is experimental for two main reasons. First, such
segments can cause errors when loaded by older servers, due to violating
their expectations that timestamps are always monotonically increasing.
Second, even on newer servers, not all queries can run on non-time-sorted
segments. Scan queries involving time-ordering and any query involving
granularity will not run. (To partially mitigate this, a currently-undocumented
SQL feature "sqlUseGranularity" is provided. When set to false the SQL planner
avoids using "granularity".)

Changes on the write path:

1) DimensionsSpec can now optionally contain a __time dimension, which
   controls the placement of __time in the sort order. If not present,
   __time is considered to be first in the sort order, as it has always
   been.

2) IncrementalIndex and IndexMerger are updated to sort facts more
   flexibly; not always by time first.

3) Metadata (stored in metadata.drd) gains a "sortOrder" field.

4) MSQ can generate range-based shard specs even when not all columns are
   singly-valued strings. It merely stops accepting new clustering key
   fields when it encounters the first one that isn't a singly-valued
   string. This is useful because it enables range shard specs on
   "someDim" to be created for clauses like "CLUSTERED BY someDim, __time".

Changes on the read path:

1) Add StorageAdapter#getSortOrder so query engines can tell how a
   segment is sorted.

2) Update QueryableIndexStorageAdapter, IncrementalIndexStorageAdapter,
   and VectorCursorGranularizer to throw errors when using granularities
   on non-time-ordered segments.

3) Update ScanQueryEngine to throw an error when using the time-ordering
  "order" parameter on non-time-ordered segments.

4) Update TimeBoundaryQueryRunnerFactory to perform a segment scan when
   running on a non-time-ordered segment.

5) Add "sqlUseGranularity" context parameter that causes the SQL planner
   to avoid using granularities other than ALL.

Other changes:

1) Rename DimensionsSpec "hasCustomDimensions" to "hasFixedDimensions"
   and change the meaning subtly: it now returns true if the DimensionsSpec
   represents an unchanging list of dimensions, or false if there is
   some discovery happening. This is what call sites had expected anyway.

* Fixups from CI.

* Fixes.

* Fix missing arg.

* Additional changes.

* Fix logic.

* Fixes.

* Fix test.

* Adjust test.

* Remove throws.

* Fix styles.

* Fix javadocs.

* Cleanup.

* Smoother handling of null ordering.

* Fix tests.

* Missed a spot on the merge.

* Fixups.

* Avoid needless Filters.and.

* Add timeBoundaryInspector to test.

* Fix tests.

* Fix FrameStorageAdapterTest.

* Fix various tests.

* Use forceSegmentSortByTime instead of useExplicitSegmentSortOrder.

* Pom fix.

* Fix doc.
  • Loading branch information
gianm authored Aug 23, 2024
1 parent 8c8a4b2 commit 0603d51
Show file tree
Hide file tree
Showing 153 changed files with 4,605 additions and 1,360 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.Cursors;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.QueryableIndexTimeBoundaryInspector;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
Expand Down Expand Up @@ -192,11 +194,11 @@ public void timeFloorUsingCursor(Blackhole blackhole)
try (final CursorHolder cursorHolder = adapter.makeCursorHolder(CursorBuildSpec.FULL_SCAN)) {
final Cursor cursor = cursorHolder.asCursor();
final CursorGranularizer granularizer = CursorGranularizer.create(
adapter,
cursor,
QueryableIndexTimeBoundaryInspector.create(index),
Cursors.getTimeOrdering(index.getOrdering()),
Granularities.HOUR,
adapter.getInterval(),
false
adapter.getInterval()
);
final Sequence<Long> results =
Sequences.simple(granularizer.getBucketIterable())
Expand Down
2 changes: 1 addition & 1 deletion docs/ingestion/ingestion-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ A `dimensionsSpec` can have the following components:
| `spatialDimensions` | An array of [spatial dimensions](../querying/geo.md). | `[]` |
| `includeAllDimensions` | Note that this field only applies to string-based schema discovery where Druid ingests dimensions it discovers as strings. This is different from schema auto-discovery where Druid infers the type for data. You can set `includeAllDimensions` to true to ingest both explicit dimensions in the `dimensions` field and other dimensions that the ingestion task discovers from input data. In this case, the explicit dimensions will appear first in the order that you specify them, and the dimensions dynamically discovered will come after. This flag can be useful especially with auto schema discovery using [`flattenSpec`](./data-formats.md#flattenspec). If this is not set and the `dimensions` field is not empty, Druid will ingest only explicit dimensions. If this is not set and the `dimensions` field is empty, all discovered dimensions will be ingested. | false |
| `useSchemaDiscovery` | Configure Druid to use schema auto-discovery to discover some or all of the dimensions and types for your data. For any dimensions that aren't a uniform type, Druid ingests them as JSON. You can use this for native batch or streaming ingestion. | false |

| `forceSegmentSortByTime` | When set to true (the default), segments created by the ingestion job are sorted by `{__time, dimensions[0], dimensions[1], ...}`. When set to false, segments created by the ingestion job are sorted by `{dimensions[0], dimensions[1], ...}`. To include `__time` in the sort order when this parameter is set to `false`, you must include a dimension named `__time` with type `long` explicitly in the `dimensions` list.<br /><br />Setting this to `false` is an experimental feature; see [Sorting](partitioning.md#sorting) for details. | `true` |

#### Dimension objects

Expand Down
59 changes: 43 additions & 16 deletions docs/ingestion/partitioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,60 @@ Partitioning by time is important for two reasons:
The most common choices to balance these considerations are `hour` and `day`. For streaming ingestion, `hour` is especially
common, because it allows compaction to follow ingestion with less of a time delay.

The following table describes how to configure time chunk partitioning.

|Method|Configuration|
|------|------------|
|[SQL](../multi-stage-query/index.md)|[`PARTITIONED BY`](../multi-stage-query/concepts.md#partitioning)|
|[Kafka](../ingestion/kafka-ingestion.md) or [Kinesis](../ingestion/kinesis-ingestion.md)|`segmentGranularity` inside the [`granularitySpec`](ingestion-spec.md#granularityspec)|
|[Native batch](native-batch.md) or [Hadoop](hadoop.md)|`segmentGranularity` inside the [`granularitySpec`](ingestion-spec.md#granularityspec)|

## Secondary partitioning

Druid can partition segments within a particular time chunk further depending upon options that vary based on the ingestion type you have chosen. In general, secondary partitioning on a particular dimension improves locality. This means that rows with the same value for that dimension are stored together, decreasing access time.
Druid further partitions each time chunk into immutable segments. Secondary partitioning on a particular dimension improves locality. This means that rows with the same value for that dimension are stored together, decreasing access time.

To achieve the best performance and smallest overall footprint, partition your data on a "natural"
dimension that you often use as a filter when possible. Such partitioning often improves compression and query performance. For example, some cases have yielded threefold storage size decreases.
To achieve the best performance and smallest overall footprint, partition your data on a "natural" dimension that
you often use as a filter, or that achieves some alignment within your data. Such partitioning can improve compression
and query performance by significant multiples.

## Partitioning and sorting
The following table describes how to configure secondary partitioning.

Partitioning and sorting work well together. If you do have a "natural" partitioning dimension, consider placing it first in the `dimensions` list of your `dimensionsSpec`. This way Druid sorts rows within each segment by that column. This sorting configuration frequently improves compression more than using partitioning alone.
|Method|Configuration|
|------|------------|
|[SQL](../multi-stage-query/index.md)|[`CLUSTERED BY`](../multi-stage-query/concepts.md#clustering)|
|[Kafka](../ingestion/kafka-ingestion.md) or [Kinesis](../ingestion/kinesis-ingestion.md)|Upstream partitioning defines how Druid partitions the datasource. You can also alter clustering using [`REPLACE`](../multi-stage-query/concepts.md#replace) (with `CLUSTERED BY`) or [compaction](../data-management/compaction.md) after initial ingestion.|
|[Native batch](native-batch.md) or [Hadoop](hadoop.md)|[`partitionsSpec`](native-batch.md#partitionsspec) inside the `tuningConfig`|

Note that Druid always sorts rows within a segment by timestamp first, even before the first dimension listed in your `dimensionsSpec`. This sorting can preclude the efficacy of dimension sorting. To work around this limitation if necessary, set your `queryGranularity` equal to `segmentGranularity` in your [`granularitySpec`](./ingestion-spec.md#granularityspec). Druid will set all timestamps within the segment to the same value, letting you identify a [secondary timestamp](schema-design.md#secondary-timestamps) as the "real" timestamp.
## Sorting

## How to configure partitioning
Each segment is internally sorted to promote compression and locality.

Not all ingestion methods support an explicit partitioning configuration, and not all have equivalent levels of flexibility. If you are doing initial ingestion through a less-flexible method like
Kafka), you can use [reindexing](../data-management/update.md#reindex) or [compaction](../data-management/compaction.md) to repartition your data after initial ingestion. This is a powerful technique you can use to optimally partition any data older than a certain time threshold while you continuously add new data from a stream.
Partitioning and sorting work well together. If you do have a "natural" partitioning dimension, consider placing it
first in your sort order as well. This way, Druid sorts rows within each segment by that column. This sorting configuration
frequently improves compression and performance more than using partitioning alone.

The following table shows how each ingestion method handles partitioning:
The following table describes how to configure sorting.

|Method|How it works|
|Method|Configuration|
|------|------------|
|[Native batch](native-batch.md)|Configured using [`partitionsSpec`](native-batch.md#partitionsspec) inside the `tuningConfig`.|
|[SQL](../multi-stage-query/index.md)|Configured using [`PARTITIONED BY`](../multi-stage-query/concepts.md#partitioning) and [`CLUSTERED BY`](../multi-stage-query/concepts.md#clustering).|
|[Hadoop](hadoop.md)|Configured using [`partitionsSpec`](hadoop.md#partitionsspec) inside the `tuningConfig`.|
|[Kafka indexing service](../ingestion/kafka-ingestion.md)|Kafka topic partitioning defines how Druid partitions the datasource. You can also [reindex](../data-management/update.md#reindex) or [compact](../data-management/compaction.md) to repartition after initial ingestion.|
|[Kinesis indexing service](../ingestion/kinesis-ingestion.md)|Kinesis stream sharding defines how Druid partitions the datasource. You can also [reindex](../data-management/update.md#reindex) or [compact](../data-management/compaction.md) to repartition after initial ingestion.|
|[SQL](../multi-stage-query/index.md)|Uses order of fields in [`CLUSTERED BY`](../multi-stage-query/concepts.md#clustering) or [`segmentSortOrder`](../multi-stage-query/reference.md#context) in the query context|
|[Kafka](../ingestion/kafka-ingestion.md) or [Kinesis](../ingestion/kinesis-ingestion.md)|Uses order of fields in [`dimensionsSpec`](ingestion-spec.md#granularityspec)|
|[Native batch](native-batch.md) or [Hadoop](hadoop.md)|Uses order of fields in [`dimensionsSpec`](ingestion-spec.md#granularityspec)|

:::info
Druid implicitly sorts rows within a segment by `__time` first before any `dimensions` or `CLUSTERED BY` fields, unless
you set `forceSegmentSortByTime` to `false` in your
[query context](../multi-stage-query/reference.md#context-parameters) (for SQL) or in your
[`dimensionsSpec`](ingestion-spec.md#dimensionsspec) (for other ingestion forms).

Setting `forceSegmentSortByTime` to `false` is an experimental feature. Segments created with sort orders that
do not start with `__time` can only be read by Druid 31 or later. Additionally, at this time, certain queries are not
supported on such segments, including:

- Native queries with `granularity` other than `all`.
- Native `scan` query with ascending or descending time order.
- SQL queries that plan into an unsupported native query.
:::

## Learn more

Expand Down
10 changes: 7 additions & 3 deletions docs/multi-stage-query/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,13 @@ Clustering is important for two reasons:

1. Lower storage footprint due to improved locality, and therefore improved compressibility.
2. Better query performance due to dimension-based segment pruning, which removes segments from consideration when they
cannot possibly contain data matching a query's filter. This speeds up filters like `x = 'foo'` and `x IN ('foo',
'bar')`.
cannot possibly contain data matching a query's filter. This speeds up filters like `x = 'foo'` and
`x IN ('foo', 'bar')`.

To activate dimension-based pruning, these requirements must be met:

- Segments were generated by a `REPLACE` statement, not an `INSERT` statement.
- All `CLUSTERED BY` columns are single-valued string columns.
- `CLUSTERED BY` begins with single-valued string columns. These single-valued string columns are used for pruning.

If these requirements are _not_ met, Druid still clusters data during ingestion but will not be able to perform
dimension-based segment pruning at query time. You can tell if dimension-based segment pruning is possible by using the
Expand All @@ -188,6 +188,10 @@ available in the **Segments** view under the **Partitioning** column.

For more information about syntax, see [`CLUSTERED BY`](./reference.md#clustered-by).

For more information about the mechanics of clustering, refer to
[Secondary partitioning](../ingestion/partitioning#secondary-partitioning) and
[Sorting](../ingestion/partitioning#sorting).

### Rollup

[Rollup](../ingestion/rollup.md) is a technique that pre-aggregates data during ingestion to reduce the amount of data
Expand Down
12 changes: 9 additions & 3 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,13 @@ For more information about partitioning, see [Partitioning](concepts.md#partitio
### `CLUSTERED BY`

The `CLUSTERED BY <column list>` clause is optional for [INSERT](#insert) and [REPLACE](#replace). It accepts a list of
column names or expressions. Druid's segment generation only supports ascending order, so an `INSERT` or `REPLACE` query with
`CLUSTERED BY` columns in `DESC` ordering is not allowed.
column names or expressions.

This column list is used for [secondary partitioning](../ingestion/partitioning.md#secondary-partitioning) of segments
within a time chunk, and [sorting](../ingestion/partitioning.md#sorting) of rows within a segment. For sorting purposes,
Druid implicitly prepends `__time` to the `CLUSTERED BY` column list, unless
[`forceSegmentSortByTime`](#context) is set to `false`
(an experimental feature; see [Sorting](../ingestion/partitioning.md#sorting) for details).

For more information about clustering, see [Clustering](concepts.md#clustering).

Expand Down Expand Up @@ -397,7 +402,8 @@ The following table lists the context parameters for the MSQ task engine:
| `arrayIngestMode` | INSERT, REPLACE<br /><br /> Controls how ARRAY type values are stored in Druid segments. When set to `array` (recommended for SQL compliance), Druid will store all ARRAY typed values in [ARRAY typed columns](../querying/arrays.md), and supports storing both VARCHAR and numeric typed arrays. When set to `mvd` (the default, for backwards compatibility), Druid only supports VARCHAR typed arrays, and will store them as [multi-value string columns](../querying/multi-value-dimensions.md). See [`arrayIngestMode`] in the [Arrays](../querying/arrays.md) page for more details. | `mvd` (for backwards compatibility, recommended to use `array` for SQL compliance)|
| `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE<br /><br />Algorithm to use for JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for sort-merge join. Affects all JOIN operations in the query. This is a hint to the MSQ engine and the actual joins in the query may proceed in a different way than specified. See [Joins](#joins) for more details. | `broadcast` |
| `rowsInMemory` | INSERT or REPLACE<br /><br />Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the [known issues](./known-issues.md) around memory usage. | 100,000 |
| `segmentSortOrder` | INSERT or REPLACE<br /><br />Normally, Druid sorts rows in individual segments using `__time` first, followed by the [CLUSTERED BY](#clustered-by) clause. When you set `segmentSortOrder`, Druid sorts rows in segments using this column list first, followed by the CLUSTERED BY order.<br /><br />You provide the column list as comma-separated values or as a JSON array in string form. If your query includes `__time`, then this list must begin with `__time`. For example, consider an INSERT query that uses `CLUSTERED BY country` and has `segmentSortOrder` set to `__time,city`. Within each time chunk, Druid assigns rows to segments based on `country`, and then within each of those segments, Druid sorts those rows by `__time` first, then `city`, then `country`. | empty list |
| `segmentSortOrder` | INSERT or REPLACE<br /><br />Normally, Druid sorts rows in individual segments using `__time` first, followed by the [CLUSTERED BY](#clustered-by) clause. When you set `segmentSortOrder`, Druid uses the order from this context parameter instead. Provide the column list as comma-separated values or as a JSON array in string form.<br />< br/>For example, consider an INSERT query that uses `CLUSTERED BY country` and has `segmentSortOrder` set to `__time,city,country`. Within each time chunk, Druid assigns rows to segments based on `country`, and then within each of those segments, Druid sorts those rows by `__time` first, then `city`, then `country`. | empty list |
| `forceSegmentSortByTime` | INSERT or REPLACE<br /><br />When set to `true` (the default), Druid prepends `__time` to [CLUSTERED BY](#clustered-by) when determining the sort order for individual segments. Druid also requires that `segmentSortOrder`, if provided, starts with `__time`.<br /><br />When set to `false`, Druid uses the [CLUSTERED BY](#clustered-by) alone to determine the sort order for individual segments, and does not require that `segmentSortOrder` begin with `__time`. Setting this parameter to `false` is an experimental feature; see [Sorting](../ingestion/partitioning#sorting) for details. | `true` |
| `maxParseExceptions`| SELECT, INSERT, REPLACE<br /><br />Maximum number of parse exceptions that are ignored while executing the query before it stops with `TooManyWarningsFault`. To ignore all the parse exceptions, set the value to -1. | 0 |
| `rowsPerSegment` | INSERT or REPLACE<br /><br />The number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, use the default. For general information about sizing rows per segment, see [Segment Size Optimization](../operations/segment-optimization.md). | 3,000,000 |
| `indexSpec` | INSERT or REPLACE<br /><br />An [`indexSpec`](../ingestion/ingestion-spec.md#indexspec) to use when generating segments. May be a JSON string or object. See [Front coding](../ingestion/ingestion-spec.md#front-coding) for details on configuring an `indexSpec` with front coding. | See [`indexSpec`](../ingestion/ingestion-spec.md#indexspec). |
Expand Down
Loading

0 comments on commit 0603d51

Please sign in to comment.