Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alex.parallel plus denylist #74

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
432af6a
first draft
alex-goodisman Apr 24, 2024
561931c
include parallelismkey in unit tests
alex-goodisman Apr 24, 2024
e5e4881
oops
alex-goodisman Apr 24, 2024
3ff48d3
fix modulo
alex-goodisman Apr 24, 2024
e70dbc3
denylist first draft
alex-goodisman Apr 4, 2024
a1a3e73
add missing error checks
alex-goodisman Apr 4, 2024
ffd2f17
fix nix vendorHash
alex-goodisman Apr 4, 2024
67786e8
create tests
alex-goodisman Apr 4, 2024
dc0057e
fix tests 1
alex-goodisman Apr 4, 2024
463c09b
fix tests 2
alex-goodisman Apr 4, 2024
15726a7
dont try to jsonParse an empty string
alex-goodisman Apr 4, 2024
c4992ef
fix tests 4
alex-goodisman Apr 4, 2024
c5f5a00
add actual oplog deny test
alex-goodisman Apr 4, 2024
11c54c7
fix tests 5
alex-goodisman Apr 4, 2024
df75220
fix tests 6
alex-goodisman Apr 4, 2024
64e6100
refactor to only filter on db name
alex-goodisman Apr 5, 2024
92d3c57
fix nix again
alex-goodisman Apr 5, 2024
3e3b851
fix tests 7
alex-goodisman Apr 5, 2024
257798e
use sync map
alex-goodisman Apr 5, 2024
ef62f07
use sync map 2
alex-goodisman Apr 5, 2024
90060c9
move staleness metric to write side & partition
alex-goodisman Apr 24, 2024
72c72eb
ordinal now a label
alex-goodisman Apr 24, 2024
3f4077a
seconds
alex-goodisman Apr 24, 2024
f51929f
lastprocessedtimestamp now sharded
alex-goodisman Apr 24, 2024
db74286
comment
alex-goodisman Apr 24, 2024
c8f83ed
more breakpoint metrics
alex-goodisman Apr 26, 2024
7ffa61f
trigger gha maybe
alex-goodisman Apr 26, 2024
9f19b02
oops
alex-goodisman Apr 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ buildGoModule {
'';

# update: set value to an empty string and run `nix build`. This will download Go, fetch the dependencies and calculates their hash.
vendorHash = "sha256-ceToA2DC1bhmg9WIeNSAfoNoU7sk9PrQqgqt5UbpivQ=";
vendorHash = "sha256-Vh7O0iMPG6nAvcyv92h5TVZS2awnR0vz75apyzJeu4c=";

nativeBuildInputs = [ installShellFiles ];
doCheck = false;
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/deckarep/golang-set v1.7.1
github.com/go-redis/redis/v7 v7.4.1
github.com/go-redis/redis/v8 v8.11.5
github.com/gorilla/websocket v1.4.2
github.com/juju/mgo/v2 v2.0.0-20210302023703-70d5d206e208
github.com/juju/replicaset v0.0.0-20210302050932-0303c8575745
Expand All @@ -29,10 +30,10 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/golang/protobuf v1.4.3 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gomodule/redigo v1.8.5 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/juju/clock v0.0.0-20190205081909-9c5c9712527c // indirect
github.com/juju/errors v0.0.0-20200330140219-3fe23663418f // indirect
github.com/juju/loggo v0.0.0-20200526014432-9ce3a2e09b5e // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
Expand Down
88 changes: 88 additions & 0 deletions integration-tests/acceptance/denylist_http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"bytes"
"encoding/json"
"io"
"net/http"
"os"
"reflect"
"testing"
)

func doRequest(method string, path string, t *testing.T, expectedCode int) interface{} {
req, err := http.NewRequest(method, os.Getenv("OTR_URL")+path, &bytes.Buffer{})
if err != nil {
t.Fatalf("Error creating req: %s", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := (&http.Client{}).Do(req)
if err != nil {
t.Fatalf("Error sending request: %s", err)
}

defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error eceiving response body: %s", err)
}

if resp.StatusCode != expectedCode {
t.Fatalf("Expected status code %d, but got %d.\nBody was: %s", expectedCode, resp.StatusCode, respBody)
}

if expectedCode == 200 {
var data interface{}
err = json.Unmarshal(respBody, &data)
if err != nil {
t.Fatalf("Error parsing JSON response: %s", err)
}

return data
}
return nil
}

// Test the /denylist HTTP operations
func TestDenyList(t *testing.T) {
// GET empty list of rules
data := doRequest("GET", "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{}) {
t.Fatalf("Expected empty list from blank GET, but got %#v", data)
}
// PUT new rule
doRequest("PUT", "/denylist/abc", t, 201)
// GET list with new rule in it
data = doRequest("GET", "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{"abc"}) {
t.Fatalf("Expected singleton from GET, but got %#v", data)
}
// GET existing rule
data = doRequest("GET", "/denylist/abc", t, 200)
if !reflect.DeepEqual(data, "abc") {
t.Fatalf("Expected matched body from GET, but got %#v", data)
}
// PUT second rule
doRequest("PUT", "/denylist/def", t, 201)
// GET second rule
data = doRequest("GET", "/denylist/def", t, 200)
if !reflect.DeepEqual(data, "def") {
t.Fatalf("Expected matched body from GET, but got %#v", data)
}
// GET list with both rules
data = doRequest("GET", "/denylist", t, 200)
// check both permutations, in case the server reordered them
if !reflect.DeepEqual(data, []interface{}{"abc", "def"}) && !reflect.DeepEqual(data, []interface{}{"def", "abc"}) {
t.Fatalf("Expected doubleton from GET, but got %#v", data)
}
// DELETE first rule
doRequest("DELETE", "/denylist/abc", t, 204)
// GET first rule
doRequest("GET", "/denylist/abc", t, 404)
// GET list with only second rule
data = doRequest("GET", "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{"def"}) {
t.Fatalf("Expected singleton from GET, but got %#V", data)
}
}
72 changes: 72 additions & 0 deletions integration-tests/acceptance/denylist_oplog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"context"
"testing"

"github.com/tulip/oplogtoredis/integration-tests/helpers"
"go.mongodb.org/mongo-driver/bson"
)

func TestDenyOplog(t *testing.T) {
harness := startHarness()
defer harness.stop()

_, err := harness.mongoClient.Collection("Foo").InsertOne(context.Background(), bson.M{
"_id": "id1",
"f": "1",
})
if err != nil {
panic(err)
}

expectedMessage1 := helpers.OTRMessage{
Event: "i",
Document: map[string]interface{}{
"_id": "id1",
},
Fields: []string{"_id", "f"},
}

harness.verify(t, map[string][]helpers.OTRMessage{
"tests.Foo": {expectedMessage1},
"tests.Foo::id1": {expectedMessage1},
})

doRequest("PUT", "/denylist/tests", t, 201)

_, err = harness.mongoClient.Collection("Foo").InsertOne(context.Background(), bson.M{
"_id": "id2",
"g": "2",
})
if err != nil {
panic(err)
}

// second message should not have been received, since it got denied
harness.verify(t, map[string][]helpers.OTRMessage{})

doRequest("DELETE", "/denylist/tests", t, 204)

_, err = harness.mongoClient.Collection("Foo").InsertOne(context.Background(), bson.M{
"_id": "id3",
"h": "3",
})
if err != nil {
panic(err)
}

expectedMessage3 := helpers.OTRMessage{
Event: "i",
Document: map[string]interface{}{
"_id": "id3",
},
Fields: []string{"_id", "h"},
}

// back to normal now that the deny rule is gone
harness.verify(t, map[string][]helpers.OTRMessage{
"tests.Foo": {expectedMessage3},
"tests.Foo::id3": {expectedMessage3},
})
}
11 changes: 10 additions & 1 deletion lib/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
package config

import (
"time"
"strings"
"time"

"github.com/kelseyhightower/envconfig"
)

Expand All @@ -21,6 +22,7 @@ type oplogtoredisConfiguration struct {
MongoConnectTimeout time.Duration `default:"10s" split_words:"true"`
MongoQueryTimeout time.Duration `default:"5s" split_words:"true"`
OplogV2ExtractSubfieldChanges bool `default:"false" envconfig:"OPLOG_V2_EXTRACT_SUBFIELD_CHANGES"`
WriteParallelism int `default:"1" split_words:"true"`
}

var globalConfig *oplogtoredisConfiguration
Expand Down Expand Up @@ -131,6 +133,13 @@ func OplogV2ExtractSubfieldChanges() bool {
return globalConfig.OplogV2ExtractSubfieldChanges
}

// WriteParallelism controls how many parallel write loops will be run (sharded based on a hash
// of the database name.) Each parallel loop has its own redis connection and internal buffer.
// Healthz endpoint will report fail if anyone of them dies.
func WriteParallelism() int {
return globalConfig.WriteParallelism
}

// ParseEnv parses the current environment variables and updates the stored
// configuration. It is *not* threadsafe, and should just be called once
// at the start of the program.
Expand Down
97 changes: 97 additions & 0 deletions lib/denylist/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package denylist

import (
"encoding/json"
"net/http"
"sync"
)

// CollectionEndpoint serves the endpoints for the whole Denylist at /denylist
func CollectionEndpoint(denylist *sync.Map) func(http.ResponseWriter, *http.Request) {
return func(response http.ResponseWriter, request *http.Request) {
switch request.Method {
case "GET":
listDenylistKeys(response, denylist)
default:
http.Error(response, http.StatusText(http.StatusNotFound), http.StatusNotFound)
}
}
}

// SingleEndpoint serves the endpoints for particular Denylist entries at /denylist/...
func SingleEndpoint(denylist *sync.Map) func(http.ResponseWriter, *http.Request) {
return func(response http.ResponseWriter, request *http.Request) {
switch request.Method {
case "GET":
getDenylistEntry(response, request, denylist)
case "PUT":
createDenylistEntry(response, request, denylist)
case "DELETE":
deleteDenylistEntry(response, request, denylist)
default:
http.Error(response, http.StatusText(http.StatusNotFound), http.StatusNotFound)
}
}
}

// GET /denylist
func listDenylistKeys(response http.ResponseWriter, denylist *sync.Map) {
keys := []interface{}{}

denylist.Range(func(key interface{}, value interface{}) bool {
keys = append(keys, key)
return true
})

response.Header().Set("Content-Type", "application/json")
response.WriteHeader(http.StatusOK)
err := json.NewEncoder(response).Encode(keys)
if err != nil {
http.Error(response, "couldn't encode result", http.StatusInternalServerError)
return
}
}

// GET /denylist/...
func getDenylistEntry(response http.ResponseWriter, request *http.Request, denylist *sync.Map) {
id := request.URL.Path
_, exists := denylist.Load(id)
if !exists {
http.Error(response, "denylist entry not found with that id", http.StatusNotFound)
return
}

response.Header().Set("Content-Type", "application/json")
response.WriteHeader(http.StatusOK)
err := json.NewEncoder(response).Encode(id)
if err != nil {
http.Error(response, "couldn't encode result", http.StatusInternalServerError)
return
}
}

// PUT /denylist/...
func createDenylistEntry(response http.ResponseWriter, request *http.Request, denylist *sync.Map) {
id := request.URL.Path
_, exists := denylist.Load(id)
if exists {
response.WriteHeader(http.StatusNoContent)
return
}

denylist.Store(id, true)
response.WriteHeader(http.StatusCreated)
}

// DELETE /denylist/...
func deleteDenylistEntry(response http.ResponseWriter, request *http.Request, denylist *sync.Map) {
id := request.URL.Path
_, exists := denylist.Load(id)
if !exists {
http.Error(response, "denylist entry not found with that id", http.StatusNotFound)
return
}

denylist.Delete(id)
response.WriteHeader(http.StatusNoContent)
}
16 changes: 15 additions & 1 deletion lib/oplog/processor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package oplog

import (
"bytes"
"crypto/sha256"
"encoding/binary"
"encoding/json"
"strings"

Expand Down Expand Up @@ -78,6 +81,16 @@ func processOplogEntry(op *oplogEntry) (*redispub.Publication, error) {
return nil, errors.Wrap(err, "marshalling outgoing message")
}

hash := sha256.Sum256([]byte(op.Database))
intSlice := hash[len(hash)-8:]

var hashInt uint64

err = binary.Read(bytes.NewReader(intSlice), binary.LittleEndian, &hashInt)
if err != nil {
panic(errors.Wrap(err, "decoding database hash as uint64"))
}

// We need to publish on both the full-collection channel and the
// single-document channel
return &redispub.Publication{
Expand All @@ -92,7 +105,8 @@ func processOplogEntry(op *oplogEntry) (*redispub.Publication, error) {
Msg: msgJSON,
OplogTimestamp: op.Timestamp,

TxIdx: op.TxIdx,
TxIdx: op.TxIdx,
ParallelismKey: int(hashInt),
}, nil
}

Expand Down
Loading
Loading