Skip to content

Commit

Permalink
Support for latest TDigest [RedisBloom 2.4.x] (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
chayim authored Nov 10, 2022
1 parent 05f9cca commit c298774
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 38 deletions.
48 changes: 48 additions & 0 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: Integration

on:
push:
paths-ignore:
- 'docs/**'
- '**/*.rst'
- '**/*.md'
branches:
- master
- main
- '[0-9].[0-9]'
pull_request:
branches:
- master
- main
- '[0-9].[0-9]'
schedule:
- cron: '0 1 * * *'

jobs:

lint:
name: Code linters
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: |
make checkfmt
make lint
integration:
services:
image: redislabs/rebloom:edge
port:
- 6379:6379
name: Build and test
runs-on: ubuntu-latest
steps:
- uses: actions/setup-go@v3
with:
go-version: 1.18.x
- uses: actions/checkout@v3
- run: |
make get
make coverage
- name: Upload coverage
uses: codecov/codecov-action@v3
78 changes: 62 additions & 16 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package redis_bloom_go
import (
"errors"
"fmt"
"github.com/gomodule/redigo/redis"
"strconv"
"strings"

"github.com/gomodule/redigo/redis"
)

// TODO: refactor this hard limit and revise client locking
Expand All @@ -24,8 +25,8 @@ type TDigestInfo struct {
capacity int64
mergedNodes int64
unmergedNodes int64
mergedWeight float64
unmergedWeight float64
mergedWeight int64
unmergedWeight int64
totalCompressions int64
}

Expand All @@ -50,12 +51,12 @@ func (info *TDigestInfo) UnmergedNodes() int64 {
}

// MergedWeight - returns the merged weight of TDigestInfo instance
func (info *TDigestInfo) MergedWeight() float64 {
func (info *TDigestInfo) MergedWeight() int64 {
return info.mergedWeight
}

// UnmergedWeight - returns the unmerged weight of TDigestInfo instance
func (info *TDigestInfo) UnmergedWeight() float64 {
func (info *TDigestInfo) UnmergedWeight() int64 {
return info.unmergedWeight
}

Expand Down Expand Up @@ -499,7 +500,7 @@ func (client *Client) CfInfo(key string) (map[string]int64, error) {
func (client *Client) TdCreate(key string, compression int64) (string, error) {
conn := client.Pool.Get()
defer conn.Close()
return redis.String(conn.Do("TDIGEST.CREATE", key, compression))
return redis.String(conn.Do("TDIGEST.CREATE", key, "COMPRESSION", compression))
}

// TdReset - Reset the sketch to zero - empty out the sketch and re-initialize it
Expand All @@ -521,11 +522,48 @@ func (client *Client) TdAdd(key string, samples map[float64]float64) (string, er
return redis.String(reply, err)
}

// TdMerge - Merges all of the values from 'from' to 'this' sketch
func (client *Client) TdMerge(toKey string, fromKey string) (string, error) {
// tdMerge - The internal representation of TdMerge. All underlying functions call this one,
// returning its results. It allows us to maintain interfaces.
// see https://redis.io/commands/tdigest.merge/
//
// The default values for compression is 100
func (client *Client) tdMerge(toKey string, compression int64, override bool, numKeys int64, fromKey ...string) (string, error) {
if numKeys < 1 {
return "", errors.New("a minimum of one key must be merged")
}

conn := client.Pool.Get()
defer conn.Close()
return redis.String(conn.Do("TDIGEST.MERGE", toKey, fromKey))
overidable := ""
if override {
overidable = "1"
}
return redis.String(conn.Do("TDIGEST.MERGE", toKey,
strconv.FormatInt(numKeys, 10),
strings.Join(fromKey, " "),
"COMPRESSION", compression,
overidable))
}

// TdMerge - Merges all of the values from 'from' to 'this' sketch
func (client *Client) TdMerge(toKey string, numKeys int64, fromKey ...string) (string, error) {
return client.tdMerge(toKey, 100, false, numKeys, fromKey...)
}

// TdMergeWithCompression - Merges all of the values from 'from' to 'this' sketch with specified compression
func (client *Client) TdMergeWithCompression(toKey string, compression int64, numKeys int64, fromKey ...string) (string, error) {
return client.tdMerge(toKey, compression, false, numKeys, fromKey...)
}

// TdMergeWithOverride - Merges all of the values from 'from' to 'this' sketch overriding the destination key if it exists
func (client *Client) TdMergeWithOverride(toKey string, override bool, numKeys int64, fromKey ...string) (string, error) {
return client.tdMerge(toKey, 100, true, numKeys, fromKey...)
}

// TdMergeWithCompressionAndOverride - Merges all of the values from 'from' to 'this' sketch with specified compression
// and overriding the destination key if it exists
func (client *Client) TdMergeWithCompressionAndOverride(toKey string, compression int64, numKeys int64, fromKey ...string) (string, error) {
return client.tdMerge(toKey, compression, true, numKeys, fromKey...)
}

// TdMin - Get minimum value from the sketch. Will return DBL_MAX if the sketch is empty
Expand All @@ -544,17 +582,22 @@ func (client *Client) TdMax(key string) (float64, error) {

// TdQuantile - Returns an estimate of the cutoff such that a specified fraction of the data added
// to this TDigest would be less than or equal to the cutoff
func (client *Client) TdQuantile(key string, quantile float64) (float64, error) {
func (client *Client) TdQuantile(key string, quantile float64) ([]float64, error) {
conn := client.Pool.Get()
defer conn.Close()
return redis.Float64(conn.Do("TDIGEST.QUANTILE", key, quantile))
return redis.Float64s(conn.Do("TDIGEST.QUANTILE", key, quantile))
}

// TdCdf - Returns the fraction of all points added which are <= value
func (client *Client) TdCdf(key string, value float64) (float64, error) {
// TdCdf - Returns the list of fractions of all points added which are <= values
func (client *Client) TdCdf(key string, values ...float64) ([]float64, error) {
conn := client.Pool.Get()
defer conn.Close()
return redis.Float64(conn.Do("TDIGEST.CDF", key, value))

args := make([]string, len(values))
for idx, obj := range values {
args[idx] = strconv.FormatFloat(obj, 'f', -1, 64)
}
return redis.Float64s(conn.Do("TDIGEST.CDF", key, strings.Join(args, " ")))
}

// TdInfo - Returns compression, capacity, total merged and unmerged nodes, the total
Expand Down Expand Up @@ -590,6 +633,9 @@ func ParseTDigestInfo(result interface{}, err error) (info TDigestInfo, outErr e
var key string
for i := 0; i < len(values); i += 2 {
key, outErr = redis.String(values[i], nil)
if outErr != nil {
return TDigestInfo{}, outErr
}
switch key {
case "Compression":
info.compression, outErr = redis.Int64(values[i+1], nil)
Expand All @@ -600,9 +646,9 @@ func ParseTDigestInfo(result interface{}, err error) (info TDigestInfo, outErr e
case "Unmerged nodes":
info.unmergedNodes, outErr = redis.Int64(values[i+1], nil)
case "Merged weight":
info.mergedWeight, outErr = redis.Float64(values[i+1], nil)
info.mergedWeight, outErr = redis.Int64(values[i+1], nil)
case "Unmerged weight":
info.unmergedWeight, outErr = redis.Float64(values[i+1], nil)
info.unmergedWeight, outErr = redis.Int64(values[i+1], nil)
case "Total compressions":
info.totalCompressions, outErr = redis.Int64(values[i+1], nil)
}
Expand Down
32 changes: 14 additions & 18 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package redis_bloom_go

import (
"github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/assert"
"os"
"testing"
"time"

"github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/assert"
)

func getTestConnectionDetails() (string, string) {
Expand Down Expand Up @@ -71,7 +72,7 @@ func TestReserve(t *testing.T) {
"Expansion rate": 2,
"Number of filters": 1,
"Number of items inserted": 0,
"Size": 936,
"Size": 880,
})

err = client.Reserve(key, 0.1, 1000)
Expand Down Expand Up @@ -554,16 +555,10 @@ func TestClient_TdReset(t *testing.T) {
ret, err = client.TdReset(key)
assert.Nil(t, err)
assert.Equal(t, "OK", ret)

info, err := client.TdInfo(key)
assert.Nil(t, err)
assert.Equal(t, 0.0, info.UnmergedWeight())
assert.Equal(t, int64(0), info.TotalCompressions())
assert.Equal(t, int64(100), info.Compression())
assert.Equal(t, int64(610), info.Capacity())
}

func TestClient_TdMerge(t *testing.T) {
client.FlushAll()
key1 := "toKey"
key2 := "fromKey"
ret, err := client.TdCreate(key1, 10)
Expand All @@ -584,16 +579,17 @@ func TestClient_TdMerge(t *testing.T) {
assert.Equal(t, "OK", ret)

//Merge
ret, err = client.TdMerge(key1, key2)
ret, err = client.TdMerge(key1, 1, key2)
assert.Nil(t, err)
assert.Equal(t, "OK", ret)

// TODO <open question, do we need this>
// we should now have 10 weight on to-histogram
info, err := client.TdInfo(key1)
assert.Nil(t, err)
assert.Equal(t, 10.0, info.UnmergedWeight()+info.MergedWeight())
assert.Equal(t, int64(2), info.UnmergedNodes())
assert.Equal(t, int64(2), info.MergedNodes())
assert.Equal(t, int64(8), info.UnmergedWeight()+info.MergedWeight())
assert.Equal(t, int64(4), info.UnmergedNodes())
assert.Equal(t, int64(4), info.MergedNodes())
}

func TestClient_TdMinMax(t *testing.T) {
Expand Down Expand Up @@ -631,11 +627,11 @@ func TestClient_TdQuantile(t *testing.T) {

ans, err := client.TdQuantile(key, 1.0)
assert.Nil(t, err)
assert.Equal(t, 3.0, ans)
assert.Equal(t, 3.0, ans[0])

ans, err = client.TdQuantile(key, 0.0)
assert.Nil(t, err)
assert.Equal(t, 1.0, ans)
assert.Equal(t, 1.0, ans[0])
}

func TestClient_TdCdf(t *testing.T) {
Expand All @@ -652,9 +648,9 @@ func TestClient_TdCdf(t *testing.T) {

ans, err := client.TdCdf(key, 10.0)
assert.Nil(t, err)
assert.Equal(t, 1.0, ans)
assert.Equal(t, 1.0, ans[0])

ans, err = client.TdCdf(key, 0.0)
assert.Nil(t, err)
assert.Equal(t, 0.0, ans)
assert.Equal(t, 0.0, ans[0])
}
13 changes: 10 additions & 3 deletions example_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package redis_bloom_go_test

import (
"fmt"
"log"

redisbloom "github.com/RedisBloom/redisbloom-go"
"github.com/gomodule/redigo/redis"
"log"
)

// exemplifies the NewClient function
Expand Down Expand Up @@ -55,6 +56,7 @@ func ExampleNewClientFromPool() {
func ExampleClient_TdCreate() {
host := "localhost:6379"
var client = redisbloom.NewClient(host, "nohelp", nil)
client.FlushAll()

ret, err := client.TdCreate("key", 100)
if err != nil {
Expand All @@ -70,6 +72,7 @@ func ExampleClient_TdCreate() {
func ExampleClient_TdAdd() {
host := "localhost:6379"
var client = redisbloom.NewClient(host, "nohelp", nil)
client.FlushAll()

key := "example"
ret, err := client.TdCreate(key, 100)
Expand All @@ -92,6 +95,7 @@ func ExampleClient_TdAdd() {
func ExampleClient_TdMin() {
host := "localhost:6379"
var client = redisbloom.NewClient(host, "nohelp", nil)
client.FlushAll()

key := "example"
_, err := client.TdCreate(key, 10)
Expand All @@ -118,6 +122,7 @@ func ExampleClient_TdMin() {
func ExampleClient_TdMax() {
host := "localhost:6379"
var client = redisbloom.NewClient(host, "nohelp", nil)
client.FlushAll()

key := "example"
_, err := client.TdCreate(key, 10)
Expand All @@ -144,6 +149,7 @@ func ExampleClient_TdMax() {
func ExampleClient_TdQuantile() {
host := "localhost:6379"
var client = redisbloom.NewClient(host, "nohelp", nil)
client.FlushAll()

key := "example"
_, err := client.TdCreate(key, 10)
Expand All @@ -163,13 +169,14 @@ func ExampleClient_TdQuantile() {
}

fmt.Println(ans)
// Output: 5
// Output: [5]
}

// exemplifies the TdCdf function
func ExampleClient_TdCdf() {
host := "localhost:6379"
var client = redisbloom.NewClient(host, "nohelp", nil)
client.FlushAll()

key := "example"
_, err := client.TdCreate(key, 10)
Expand All @@ -189,5 +196,5 @@ func ExampleClient_TdCdf() {
}

fmt.Println(cdf)
// Output: 0.1
// Output: [0.2]
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.12

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gomodule/redigo v1.8.2
github.com/gomodule/redigo v1.8.9
github.com/kr/text v0.2.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/stretchr/testify v1.7.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k=
github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0=
github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down

0 comments on commit c298774

Please sign in to comment.