This repository contains C++ code of the HyperLogLog algorithm as a User Defined Function (UDF) for HP Vertica. It was created by the Analytics Infrastructure team at Criteo.
The algorithm is implemented as three C++ UDAFs (User Defined Aggregate Function):
- HllDistinctCount(VARBINARY)
- HllCreateSynopsis(INT)
- HllCombine(VARBINARY)
In the following sections we describe HyperLogLog together with the tweaks to the original algorithm, so that even someone not acquainted with the algorithm might easily get understanding of how it works.
HyperLogLog is an algorithm used for estimating cardinality of a multiset. Multiset is a bag values which don't have to unique. There might be repetitions in a mulitset, meaning that the total number of elements might be different from the number of unique elements. It's commonly known that cardinality of every data set can be easily calculated with a trivial algorithm whose complexity is linear with the number of elements. However, given the size of some of the data sets in use at Criteo, this approach might be a no go, since we would not be able to fit the data in the memory. This is why we opted to use HyperLogLog, or actually one of its variations, originally described by Flajolet et al. in [1]. Our implementation is strongly based on the HyperLogLog++ algorithm (described in [2]), as well as on some improvements implemented in Redis ([4], [5]) and Druid ([2],[3]).
HyperLogLog provides a way to compute approximate DISTINCT COUNT, known more generally as cardinality of a data set (or multiset). The algorithm achieves this in two phases. First, it aggregates the actual data in a single run over the whole set, creating a so called synopsis. Second, it uses it to give an estimate of the cardinality of the data set with a very low median relative error. An invaluable advantage of the algorithm is that it uses constant space, as opposed to linear space required by the trivial algorithm.
The algorithm is based on a few observations. Imagine that we draw numbers of a fixed length. Without loss of generality we might assume they are 64 bits long. Now, let's look at the bits they start with. If the numbers are distributed uniformly, then: 50% of all numbers will start with 0 and the remaining 50% will start with 1. Along these lines, 25% of the numbers will start with 11, 12.5% will start with 111, 6.125% will start with 1111 etc.
Intuitively, if someone tells you that she has been drawing 64-bit numbers and the longest 0-sequence at the beginning was a single zero, we'd say she has been doing that only for short time. Conversely, if she says the longest sequence was 59 zeros in a row, we'd say she put a lot of effort and time to draw the numbers. Generally speaking, if the longest observed sequence of zeros was q, then a good estimate of the total count of numbers drawn would be 2^(q+1).
Of course, if we are unlucky, we will get a long sequence of leading zeros right with the first number. In turn, this will heavily impact accuracy of our guess. In order to reduce variability we can split the stream of numbers into m substreams. To this end, we will take the first p bits of a number as a stream's index, and then look at the remaining 64-p bits (note: 2^p = m). This technique is called stochastic averaging and was depicted in details in [1]. Every bucket will contain length q of the longest encountered 000...01 sequence, i.e. q-1 zero bits followed by a one bit. If we don't see a hash falling in a particular bucket, its value will be equal to zero. Intuitively, increasing the number of buckets increases accuracy of the estimate.
Imagine that we bake cookies 🍪 A lot of them. We have a log like this where we keep tracks of the cookies baked so far:
timestamp | factory_name | cookie_name |
---|---|---|
2016-04-02T12:23:24 | Paris | Chocolate Chip |
2016-04-02T12:23:32 | New York | Peanut Butter and Oatmeal |
2016-04-02T12:23:56 | Paris | Caramel Chip |
... | ... | ... |
We want to provide distinct count of cookies by factory or any grain of timestamps above a day in a table that does not have the cookie_name column. Instead, we keep an aggregation of that column as follows:
day | factory_name | cookies_summary |
---|---|---|
2016-04-02 | Paris | <some magic summary of data> |
2016-04-02 | New York | <another magic summary of data> |
... | ... | ... |
Using the HyperLogLog algorithm we might estimate cardinality of different cookies made at various factories on a particular day. Below we describe steps necessary to create a synopsis and to calculate distinct count.
First we hash the cookies grouped by day and factory_name using hashing function. Each cookie name is transformed via this function to a 64 bit random value. Thus, for every combination of factory_name and day we have a set of 64 bit hashes.
For every combination of (factory_name, day) we are going to summarize this set of hashes to a single array of buckets, called a synopsis. Number of buckets is tightly coupled with accuracy of the estimate: the higher it is, the closer the estimate is to the actual cardinality. Let's imagine we pick 16 buckets (substreams). Since log_2(16) = 4, for each hash we will look at the first 4 bits to assign it to a respective bucket.
For the following hash: 0001 00000 1 11111101010010010 We have:
- In italic, the bucket (1),
- In bold, the first occurrence of 1 in the value bits.
Hence, in the bucket number 1 we store 6, since this is length of the 00...001 pattern in the part following the bucket index bits.
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 6 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
As we process the hashes, every time we see a hash having a bigger number of leading zeros than the length stored in the corresponding bucket, we update the value. Once the whole data set is processed, we obtain a filled array.
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
3 | 6 | 3 | 2 | 4 | 5 | 9 | 9 | 7 | 6 | 5 | 3 | 4 | 3 | 4 | 1 |
This array is a synopsis of the whole data set.
Our estimate is a bias corrected harmonic mean calculated with the following formula:
In the above formula:
E
is the estimatealpha_m
is a constant to correct the biasm
is total number of buckets,M[j]
is value stored in the j-th bucket, i.e. length of the longest 0..01 sequence that fell into the j-th bucket
HP's Vertica boasts a HyperLogLog implementation described in [6] and [7]. In order to estimate its performance we ran a couple of benchmarks. TODO: describe the benchmarks.
Using Vertica's own implementation turned out to be not acceptable for us for three of reasons.
- It does not allow creating synopses outside of Vertica. Our goal is to be able to run the first step (synopsis creation) in Hadoop, while there is no obvious way how this could be achieved.
- Basic benchmarking job we did proved its performance to be disappointing.
- Vertica's implementation does not allow any elasticity in terms of precision or bucket compression used. As a consequence, the synopsis has a constant size of 49154 bytes, which is probably an equivalent of 16 bits of precision with a 2-byte header. In our opinion this precision is an overkill in many applications. Further in this readme we show that we are able to achieve better accuracy than the native implementation with as little as 1/5 of required storage.
As pointed out in [5] HyperLogLog has a large error for small cardinalities. For example, when number of buckets is equal to 16384 (i.e. precision is 14 bits), the algorithm will yield an estimate of ~11000 for no elements, i.e. when all the buckets are equal to zero. In other words, the smallest possible estimate given by the algorithm is 11k.
An important observation made by Google in [6] is that the algorithm always overestimates the real cardinality for smaller sets. In our implementation we took into account the remedy suggested by Google, i.e. we composed raw HyperLogLog, BiasCorrection and LinearCounting together. To return an estimate we first calculate a raw HyperLogLog estimate and then apply the following logic (note: m means the number of buckets):
- If the raw estimate is between
$+\infty$ and 5m, we leave it as it. - If the raw estimate is below 5m, but above a certain threshold determined experimentally, we subtract a bias from the raw estimate. Below we describe in details how this is achieved step by step.
- Otherwise, we try to use LinearCounting if it's applicable. LinearCounting returns a valuable result when at least one of the original HLL buckets is empty. If it's not the case, our final result is a raw estimate shifted by bias.
According to [6] there is no significant difference between various hash functions. However, length of the hash makes a huge difference when the cardinality approaches the number of unique values it can generate. For instance for 32 bit hashes and cardinalities close to 2^32 the hash collisions would become more common. As a result, an accurate estimation would become impossible. Following [6], In our HyperLogLog implementation we use 64 bit MurMurHash.
LinearCounting is a different cardinality estimator that uses a bitmap of N bits. For every new element a hash is computed and its log2(N) first bits are used to index a bit in the bitmap and flip it to 1. In our case LinearCounting relies on the original buckets from HyperLogLog, since as a side effect they work like the LC's bitmap. To calculate the estimate, we look at every bucket and check whether its value is equal to zero or greater than zero. In fact, in our case LinearCounting it just another formula applied to the buckets from HyperLogLog, which yields better results for small cardinalities, i.e. when many buckets are empty.
The formula is defined as follows:
E = U * log(m/U)
where:
E
is the estimateU
is number of empty buckets (i.e. those buckets where there were no hashes falling into)m
is number of bucketslog
is a natural logarithm
LinearCounting is inaccurate for high cardinalities, but for the small ones is just right. This means that as the cardinality grows, we have to switch from LinearCounting to raw HyperLogLog. In [1] the authors set a threshold of 2.5m, but above this threshold the result is sill highly biased. To prevent it we implemented another improvement proposed the Google engineers [2] called...
Google Engineers studied the problem of the bias in their paper. Below one can see how the raw estimate (red) is shifted from the real cardinality (black).
In the appendix [7] to [6] Google published empirical bias values. In our implementation we use these data to return a more accurate estimate. To this end, we keep two arrays: rawEstimateData[][]
and biasData[][]
whose values are taken from [7]. For the given number of precision bits p we look in rawEstimateData[p][]
for the 6 values that are closest to the raw estimate. Then, we use their indices to look for corresponding bias values in biasData[p][]
. We calculate mean value of these 6 values and subtract it from the raw estimate.
In our baseline implementations we used 8 bits per bucket. This approach makes the algorithm easier to debug, but is also an unnecessary loss of space. Recall that every bucket keeps length of the longest sequence of zeros among hashes falling into that bucket. Since we use 64 bit hashes, out of which p first bits indicate which bucket to use, each bucket will store a value between 0 and 64-p. The highest number we could represent with 6 bits is 63. This means that without impacting accuracy we can allocate 6 bits per bucket effectively reducing the storage space by 1/4.
The idea is to split the buckets into groups of 4 and to store them in 3 bytes (since 4 * 6 bits = 24 bits = 3 bytes). In order to deserialize an array of bytes into buckets, we iterate over groups of three bytes which get put in four buckets. Likewise, to serialize the buckets, we iterate over groups of 4 buckets which get stored in three bytes. In our implementation we assume that the synopsis will be serialized and deserialized on little endian machines.
We lay out the compacted buckets in memory as follows.
Byte 0 Byte 1 Byte 2
+--------+--------+--------+---//
|00000011|11112222|22333333|4444
+--------+--------+--------+---//
As pointed out in [2] and originally in [11], buckets' values increase in similar pace. For instance, if we had 8 buckets and we stored there 8*2^10 unique values, each register would store a value close to 10. Obviously, it's not entirely true, because, unless we are incredibly lucky, there will be some deviation, but we bet it won't be significant.
Following [2] we came up with the idea of compacting the buckets to 5 and 4 bits. At first glance these buckets will be too short, because they can represent sequences of 2^5 = 32 and 2^4 = 16 respectively, i.e. 2^31 and 2^15 = 32,768 uniques. The latter case is particularly worrying because it would clearly put strong limitations on where HyperLogLog could by applied. When serializing the synopsis with 4 bits per bucket, if we encountered bucket values higher than 15, we would have to clip them to 15 anyway, because this would be the highest value we could represent. In turn, this would impact the estimate and effectively mean losing unique values. For instance, even with 16k registers we couldn't get higher estimate than ~536M.
A solution to this problem suggested by the Druid folks in [2] is to introduce an offset value and store positive differences from it. The offset value itself also has to be stored in the synopsis. To this end, we had to introduce a header like follows.
struct HLLHdr {
char magic[2] = {'H','L'};
uint8_t format;
uint8_t offset;
char padding[4] = {'\0','\0','\0','\0'}; // padding to reach 8 bytes in length
};
The reference value is stored as uint8_t offset
in the header during synopsis' serialization and is calculated as the lowest value among all the buckets. When deserializing a synopsis, in order to calculate effective value of a bucket, one has to sum up its value with the offset.
For instance, if we had 4 registers with values 3,5,7 and 4, the offset would be 3 and we would store 0,2,4,1 in each respective bucket. If the variance of bucket values is small, i.e. if the spread is smaller than 32 and 16 for 6 and 5 bits respectively, this solution should prevent bucket clipping. Conversely, if any of the buckets is equal to zero, the offset will bring no profit at all. Later on we present results of queries run on real data in order to check whether this impacts the accuracy.
To fit the buckets nicely in the array of bytes when using 5 bits per bucket, we split the buckets into groups of 8 and arrange them in 5 bytes as depicted below. Please note that serialization and deserialization operations with 5 bits per bucket do not get automatically vectorized by gcc 4.8.5 (as opposed to 8, 6 and 4-bits-per-bucket operations).
Byte 0 Byte 1 Byte 2 Byte 3 Byte 4
+--------+--------+--------+--------+--------+---//
+00000111|11222223|33334444|45555566|66677777|...
+--------+--------+--------+--------+--------+---//
Arranging buckets with 4 bits per bucket is straightforward:
Byte 0 Byte 1
+--------+--------+---//
+00001111|22223333|...
+--------+--------+---//
When the number of meaningful buckets is bellow a certain threshold, storing synopsis sparsely can reduce the IO and operations greatly. In the sparse format the synospsis is a list of bucket ID (2 bytes) and bucket value (1 byte), only for buckets where the values is non-zero.
Byte 0 Byte 1 Byte 2
+--------+--------+--------+---//
| <bucket ID> | value |
+--------+--------+--------+---//
This repository contains implementation of two User Defined Aggregation Functions in Vertica: HllCreateSynopsis and HllDistinctCount. The former is used to aggregate input data and creates a synopsis in the format described above. These synopsis can be further aggregated to summarize unions of two or more data sets. The latter function, HllDistinctCount, is used to compute an equivalent of an SQL DISTINCT COUNT statement from the outcome of HllCreateSynopsis.
Both HllDistinctCount and HllCreateSynopsis expect two parameters:
name | possible values | description |
---|---|---|
hllLeadingBits | 1...16 | Number of bits used cut off from each hash value used to specify which buckets a number falls into. This parameter is inherent to the HyperLogLog algorithm. In general the higher it is, the more accurate is the HLL's estimate. Importantly, synopsis' size is exponentially proportional to this value. |
bitsPerBucket | 4,5,6,8 | Number of bits used to put serialize synopsis in HllCreateSynopsis. Synopsis' size is proportional to this value. While bitsPerBucket={6,8} won't impact accuracy, when the value is set to 4 or 5, the accuracy might be effected, although this behaviour is unlikely. |
It is worthwhile to note that the smaller the synopsis is, the faster the algorithm will be. For precise numbers please refer to the Latency and accuracy benchmarks
below.
From the Vertica's point of view a synopsis is just a VARBINARY. Its size depends on the number of precision bits and compactness (number of bits per bucket) and is fixed for these two parameters, i.e. no matter what the stored cardinality is, the synopsis will have constant size. The table below characterizes synopsis size for different values of parameters. In the rows there are different supported compactness values. In the columns there are various precision sizes (i.e. binary logarithm of number of buckets). Please note that the highest supported precision is 16 bits.
p=10 | p=11 | p=12 | p=13 | p=14 | p=15 | |
---|---|---|---|---|---|---|
8 bits per bucket | 1032 | 2056 | 4104 | 8200 | 16392 | 32776 |
6 bits per bucket | 776 | 1544 | 3080 | 6152 | 12296 | 24584 |
5 bits per bucket | 648 | 1288 | 2568 | 5128 | 10248 | 20488 |
4 bits per bucket | 520 | 1032 | 2056 | 4104 | 8200 | 16392 |
A general formula for the synopsis' size is (note a constant 8 comes from the header size):
S = B * 2^p + 8
Where:
S
is synopsis sizeB
is number of bits per bucketp
is number of precision bits
This representation has never been tested on a big-endian machine. Hence, it's safe to assume that it only works on little-endian machines.
To use our implementation of HyperLogLog we need to register the UDFs in Vertica, so that it can access them.
SET ROLE pseudosuperuser;
DROP LIBRARY libhll CASCADE;
CREATE LIBRARY libhll AS '/path/to/libhll.so';
CREATE AGGREGATE FUNCTION HllCreateSynopsis AS LANGUAGE 'C++' NAME 'HllCreateSynopsisFactory' LIBRARY libhll;
CREATE AGGREGATE FUNCTION HllDistinctCount AS LANGUAGE 'C++' NAME 'HllDistinctCountFactory' LIBRARY libhll;
CREATE AGGREGATE FUNCTION HllCombine AS LANGUAGE 'C++' NAME 'HllCombineFactory' LIBRARY libhll;
Computation of DISTINCT COUNT using this HyperLogLog UDF constists of two steps. First we first have to aggregate some data and create a synopsis:
DROP TABLE test_schema.agg_clicks;
CREATE TABLE test_schema.agg_clicks
AS
SELECT
HOUR(TO_TIMESTAMP(click_ts)) AS hour,
banner_id,
zone_id,
client_id,
network_id,
HllCreateSynopsis(user_id_fast USING PARAMETERS hllLeadingBits=:precision, bitsPerBucket=:bitsperbucket) AS Synopsis
FROM
test_schema.fact_clicks
WHERE
client_id > :minrange AND client_id < :maxrange
GROUP BY
HOUR(TO_TIMESTAMP(click_ts)),
banner_id,
zone_id,
client_id,
network_id;
Secondly, we can calculate DISTINCT COUNT based on the synopsis:
SELECT
client_id,
HllDistinctCount(synopsis USING PARAMETERS hllLeadingBits=:precision)
FROM
test_schema.agg_clicks
GROUP BY
client_id;
To measure latency and accuracy we ran the queries from the listings above on some real data used at Criteo. They were run a cluster of three nodes on a table containing around 364M rows. In our query we used one third of the whole table.
TODO: add benchmarks on partitioned and ordered data (projection)
Version | Precision | Mean Error (%) | Error std. dev (%) | Max error (%) |
---|---|---|---|---|
Vertica's HLL | 16? | 0.632771 | 2.550527 | 18.825070 |
6 bits per bucket | 10 | 2.098286 | 1.744262 | 7.385960 |
. | 11 | 1.616222 | 1.317503 | 6.722578 |
. | 12 | 1.107659 | 0.877167 | 4.169899 |
. | 13 | 0.761509 | 0.577894 | 2.859550 |
. | 14 | 0.531887 | 0.414764 | 2.041547 |
. | 15 | 0.362217 | 0.301651 | 1.335048 |
5 bits per bucket | 10 | 2.098286 | 1.744262 | 7.385960 |
. | 11 | 1.616222 | 1.317503 | 6.722578 |
. | 12 | 1.107659 | 0.877167 | 4.169899 |
. | 13 | 0.761509 | 0.577894 | 2.859550 |
. | 14 | 0.531887 | 0.414764 | 2.041547 |
. | 15 | 0.362217 | 0.301651 | 1.335048 |
4 bits per bucket | 10 | 2.098278 | 1.744529 | 7.385960 |
. | 11 | 1.616220 | 1.317493 | 6.722137 |
. | 12 | 1.107642 | 0.877198 | 4.169899 |
. | 13 | 0.761514 | 0.577893 | 2.859550 |
. | 14 | 0.531884 | 0.414764 | 2.041547 |
. | 15 | 0.362218 | 0.301651 | 1.335048 |
Version | Precision | Synopsis creation (s) | Distinct Count (s) | Total (s) |
---|---|---|---|---|
Vertica's HLL | 16? | 676 | 505 | 1181 |
8 bits/bucket | 10 | 126 | 10 | 136 |
. | 11 | 147 | 12 | 159 |
. | 12 | 163 | 18 | 181 |
. | 13 | 233 | 36 | 269 |
. | 14 | 324 | 81 | 405 |
. | 15 | 489 | 129 | 618 |
6 b/b | 10 | 124 | 11 | 135 |
. | 11 | 131 | 14 | 145 |
. | 12 | 152 | 23 | 175 |
. | 13 | 203 | 39 | 242 |
. | 14 | 293 | 77 | 370 |
. | 15 | 372 | 144 | 516 |
5 b/b | 10 | 123 | 11 | 134 |
. | 11 | 129 | 16 | 145 |
. | 12 | 155 | 24 | 179 |
. | 13 | 181 | 46 | 227 |
. | 14 | 275 | 87 | 362 |
. | 15 | 364 | 167 | 531 |
4 b/b | 10 | 124 | 11 | 135 |
. | 11 | 128 | 14 | 142 |
. | 12 | 148 | 23 | 171 |
. | 13 | 174 | 40 | 214 |
. | 14 | 257 | 69 | 326 |
. | 15 | 314 | 136 | 450 |
- Vertica's implementation has huge inaccuracy. For some values the error is as high as 18%. From our observations the highest errors occur when the distinct count is above 100k.
- With 4 bits per bucket and 14 bits of precision we achieve better accuracy than Vertica, while keeping the synopsis 6 times smaller and being 3.35x faster.
- Bucket clipping for 5 and 4 bits per bucket is negligible.
This project relies on cmake as the main building tool. Typically, you create a separate build directory where you run cmake and create target binaries.
$ git clone git@github.com:criteo/vertica-hyperloglog.git
$ cd vertica-udfs
$ mkdir build
$ cd build
$ cmake ..
$ make
$ file libhll.so
libhll.so: ELF 64-bit LSB shared object, x86-64
This code comes with a handful of tests based on the Google C++ Testing Framework. To build the tests together with the main code one have to enable BUILD_TESTS in cmake. This can be either done via ccmake, an ncurses interface to cmake...
$ cd build
$ ccmake ..
# toggle tests to ON, then press 'c', then press 'g'
$ make
$ file hll_test
hll_test: ELF 64-bit LSB executable, x86-64
... or directly from the command line:
$ cd build
$ cmake .. -DBUILD_TESTS=1
$ make
$ file hll_test
hll_test: ELF 64-bit LSB executable, x86-64
Once the tests are built, it's enough to run the output binary:
$ ./hll_test
[==========] Running 10 tests from 1 test case.
[----------] Global test environment set-up.
[----------] 10 tests from HllTest
[ RUN ] HllTest.TestErrorWithinRangeForDifferentBucketMasks
[ OK ] HllTest.TestErrorWithinRangeForDifferentBucketMasks (87 ms)
...
[----------] 10 tests from HllTest (486 ms total)
[----------] Global test environment tear-down
[==========] 10 tests from 1 test case ran. (486 ms total)
[ PASSED ] 10 tests.
This repository defines a derived Vertica docker container able to run a set of SQL queries put in the /tests/integration folder. As of today, the only outcome of those tests are the following: they whether run or not (return exit code 0 or not from vsql). To run the tests in a Docker container and auto destroy them, run the following command:
cd <path_to_dir_containing_dockerfile>
docker build -t fjehl/docker-vertica-itests .
docker run -v <full_path_of_vertica_rpm>:/tmp/vertica.rpm -v docker-vertica-itests:/opt/vertica --cap-add SYS_NICE --cap-add SYS_RESOURCE --rm --name docker-vertica-itests -ti fjehl/docker-vertica-itests
If the container gets somehow corrupted, destroy the volume with persistent storage (named docker-vertica-itests) with the following command:
docker volume rm docker-vertica-itests
This work is distributed under the Apache License, Version 2.0.
- P.Flajolet et al., HyperLogLog: the analysis of a near-optimal cardinality estimation algorithm
- F.Yang, Druid: Fast, Cheap, and 98% Right: Cardinality Estimation for Big Data
- N.Ray and F.Yang, Druid: How We Scaled HyperLogLog: Three Real-World Optimizations
- Redis new data structure: the HyperLogLog
- Redis' HyperLogLog source code
- HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm, S.Heule et. al
- S.Heule et al., Appendix to HyperLogLog in Practice: Algorithmic Engineering of a State of the Art Cardinality Estimation Algorithm
- HP Vertica's documentation on APPROXIMATE_COUNT_DISTINCT_SYNOPSIS
- HP Vertica's documentation on APPROXIMATE_COUNT_DISTINCT_OF_SYNOPSIS
- T.Karnezos, HyperLogLog++: Google’s Take On Engineering HLL
- L.Chan, Presto: Interacting with petabytes of data at Facebook
- K-Y.Whang, A Linear-Time Probabilistic Counting Algorithm for Database Applications