Skip to content

Commit

Permalink
GoSentinel: Added redis-sentinel support
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabor L. Mate committed Oct 18, 2023
1 parent 8ad2cc1 commit 6e3767f
Show file tree
Hide file tree
Showing 12 changed files with 249 additions and 92 deletions.
12 changes: 11 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ services:
fresh -c scripts/fresh-runner.conf
environment:
- OTR_MONGO_URL=mongodb://mongo/dev
- OTR_REDIS_URL=redis://redis
- OTR_REDIS_URL=redis://redis,redis://redis-sentinel-master
- OTR_LOG_DEBUG=true
ports:
- 9000:9000
Expand All @@ -35,3 +35,13 @@ services:
- './.data/mongo-data:/data/db'
redis:
image: redis:6.0

redis-sentinel-master:
image: redis:6.0
environment:
- REDIS_REPLICATION_MODE=master

redis-sentinel:
image: redis-sentinel:latest
environment:
- REDIS_SENTINEL_MASTER=redis-sentinel-master
26 changes: 24 additions & 2 deletions integration-tests/acceptance/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ services:
condition: service_healthy
redis:
condition: service_started
redis-sentinel:
condition: service_started
redis-sentinel-master:
condition: service_started
command:
- /wait-for.sh
- --timeout=120
Expand All @@ -20,25 +24,33 @@ services:
- --timeout=120
- redis:6379
- '--'
- /wait-for.sh
- --timeout=120
- redis-sentinel:26379
- '--'
- /integration/acceptance/entry.sh
environment:
- MONGO_URL=mongodb://mongo/tests
- REDIS_URL=redis://redis
- REDIS_URL=redis://redis,redis://redis-sentinel:26379
- OTR_URL=http://oplogtoredis:9000
oplogtoredis:
build:
context: ../..
dockerfile: ${OTR_DOCKERFILE}
environment:
- OTR_MONGO_URL=mongodb://mongo/tests
- OTR_REDIS_URL=redis://redis
- OTR_REDIS_URL=redis://redis,redis://redis-sentinel:26379
- OTR_LOG_DEBUG=true
- OTR_OPLOG_V2_EXTRACT_SUBFIELD_CHANGES=true
depends_on:
mongo:
condition: service_healthy
redis:
condition: service_started
redis-sentinel:
condition: service_started
redis-sentinel-master:
condition: service_started
volumes:
- ../../scripts/wait-for.sh:/wait-for.sh
command:
Expand All @@ -61,5 +73,15 @@ services:
logging:
driver: none

redis-sentinel-master:
image: redis:${REDIS_TAG}
environment:
- REDIS_REPLICATION_MODE=master

redis-sentinel:
image: bitnami/redis-sentinel:latest
environment:
- REDIS_SENTINEL_MASTER=redis-sentinel-master

volumes:
mongo_data:
64 changes: 51 additions & 13 deletions integration-tests/acceptance/harness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,26 @@ import (
"context"
"encoding/json"
"sort"
"strings"
"testing"
"time"

"github.com/go-redis/redis/v8"
"github.com/kylelemons/godebug/pretty"
"github.com/tulip/oplogtoredis/integration-tests/helpers"
"github.com/tulip/oplogtoredis/lib/log"
"go.mongodb.org/mongo-driver/mongo"
)

// This file is a simple test harness for acceptance tests

type harness struct {
redisClient redis.UniversalClient
legacyRedisClient redis.UniversalClient
subscription *redis.PubSub
subscriptionC <-chan *redis.Message
legacySubscription *redis.PubSub
legacySubscriptionC <-chan *redis.Message
mongoClient *mongo.Database
}

Expand All @@ -33,6 +38,10 @@ func startHarness() *harness {
h.subscription = h.redisClient.PSubscribe(context.Background(), "*")
h.subscriptionC = h.subscription.Channel()

h.legacyRedisClient = helpers.LegacyRedisClient()
h.legacySubscription = h.legacyRedisClient.PSubscribe(context.Background(), "*")
h.legacySubscriptionC = h.legacySubscription.Channel()

return &h
}

Expand All @@ -41,20 +50,30 @@ func (h *harness) stop() {
_ = h.mongoClient.Client().Disconnect(context.Background())
h.redisClient.Close()
h.subscription.Close()

h.legacyRedisClient.Close()
h.legacySubscription.Close()
}

// Gets all messages sent to Redis. Returns once it hasn't seen a new message
// in a second.
func (h *harness) getMessages() map[string][]helpers.OTRMessage {
func (h *harness) getMessagesHelper(legacy bool) map[string][]helpers.OTRMessage {
msgs := map[string][]helpers.OTRMessage{}
var ch <-chan *redis.Message
if legacy {
ch = h.legacySubscriptionC
} else {
ch = h.subscriptionC
}

for {
select {
case msg := <-h.subscriptionC:
case msg := <-ch:
parsedMsg := helpers.OTRMessage{}
err := json.Unmarshal([]byte(msg.Payload), &parsedMsg)
if err != nil {
panic("Error parsing JSON from redis: " + err.Error())
// Optional: check for sentinel related messages
log.Log.Debugw("Error parsing JSON from redis: " + err.Error() + "\n Response text: " + msg.Payload)
}

if val, ok := msgs[msg.Channel]; ok {
Expand All @@ -70,10 +89,35 @@ func (h *harness) getMessages() map[string][]helpers.OTRMessage {
}
}
}
func (h *harness) getMessages() map[string][]helpers.OTRMessage {
return h.getMessagesHelper(false)
}
func (h *harness) getLegacyMessages() map[string][]helpers.OTRMessage {
return h.getMessagesHelper(true)
}

// This is the same as getMessages, it just doesn't return the messages
func (h *harness) resetMessages() {
h.getMessages()
h.getLegacyMessages()
}
func (h *harness) verifyPub(t *testing.T, pub map[string][]helpers.OTRMessage, expectedPubs map[string][]helpers.OTRMessage) {
for _, pubs := range pub {
for _, pub := range pubs {
sort.Strings(pub.Fields)
}

helpers.SortOTRMessagesByID(pubs)
}
for key := range pub {
if strings.Contains(key, "sentinel"){
delete(pub, key)
}
}
if diff := pretty.Compare(pub, expectedPubs); diff != "" {
t.Errorf("Got incorrect publications (-got +want)\n%s", diff)
}

}

// Check the publications that were actually made against the publications that
Expand All @@ -85,6 +129,7 @@ func (h *harness) verify(t *testing.T, expectedPubs map[string][]helpers.OTRMess
// Receive all the messages (waiting until no messages are received for a
// second)
actualPubs := h.getMessages()
actualLegacyPubs := h.getLegacyMessages()

// Sort the fields inside each message, and the messages themselves, before we compare
for _, pubs := range expectedPubs {
Expand All @@ -94,16 +139,9 @@ func (h *harness) verify(t *testing.T, expectedPubs map[string][]helpers.OTRMess

helpers.SortOTRMessagesByID(pubs)
}
for _, pubs := range actualPubs {
for _, pub := range pubs {
sort.Strings(pub.Fields)
}

helpers.SortOTRMessagesByID(pubs)
}
h.verifyPub(t, actualPubs, expectedPubs)
h.verifyPub(t, actualLegacyPubs, expectedPubs)
// pop the __sentinel__ entry

// Verify the correct messages were received on each channel
if diff := pretty.Compare(actualPubs, expectedPubs); diff != "" {
t.Errorf("Got incorrect publications (-got +want)\n%s", diff)
}
}
45 changes: 39 additions & 6 deletions integration-tests/helpers/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,50 @@ package helpers

import (
"os"

"strings"
"github.com/go-redis/redis/v8"
)

// RedisClient returns a redis client to the URL specified in the REDIS_URL
// env var
func RedisClient() *redis.Client {
redisOpts, err := redis.ParseURL(os.Getenv("REDIS_URL"))
func redisClient(legacy bool) redis.UniversalClient{
var idx int
if (legacy){
idx = 0
} else {
idx = 1
}
redisOpts, err := redis.ParseURL(strings.Split(os.Getenv("REDIS_URL"), ",")[idx])
if err != nil {
panic(err)
}
var clientOptions redis.UniversalOptions
if legacy {
clientOptions = redis.UniversalOptions{
Addrs: []string{redisOpts.Addr},
DB: redisOpts.DB,
Password: redisOpts.Password,
TLSConfig: redisOpts.TLSConfig,
}
}else{
clientOptions = redis.UniversalOptions{
Addrs: []string{redisOpts.Addr},
DB: redisOpts.DB,
Password: redisOpts.Password,
TLSConfig: redisOpts.TLSConfig,
MasterName: "mymaster",
}
}


return redis.NewUniversalClient(&clientOptions)
}

func LegacyRedisClient() redis.UniversalClient {
return redisClient(true)
}


return redis.NewClient(redisOpts)
// RedisClient returns the second redis client to the URL specified in the REDIS_URL
// The first one is the legacy fallback URL
func RedisClient() redis.UniversalClient {
return redisClient(false)
}
23 changes: 21 additions & 2 deletions integration-tests/performance/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ services:
condition: service_healthy
redis:
condition: service_started
redis-sentinel:
condition: service_started
redis-sentinel-master:
condition: service_started
command:
- /wait-for.sh
- --timeout=60
Expand All @@ -20,16 +24,20 @@ services:
- --timeout=60
- redis:6379
- '--'
- /wait-for.sh
- --timeout=120
- redis-sentinel:26379
- '--'
- /integration/performance/entry.sh
environment:
- MONGO_URL=mongodb://mongo/tests
- REDIS_URL=redis://redis
- REDIS_URL=redis://redis,redis://redis-sentinel:26379

oplogtoredis:
build: ../..
environment:
- OTR_MONGO_URL=mongodb://mongo/tests
- OTR_REDIS_URL=redis://redis
- OTR_REDIS_URL=redis://redis,redis://redis-sentinel:26379
depends_on:
mongo:
condition: service_healthy
Expand Down Expand Up @@ -65,5 +73,16 @@ services:
logging:
driver: none

redis-sentinel-master:
image: redis:6.0
environment:
- REDIS_REPLICATION_MODE=master

redis-sentinel:
image: bitnami/redis-sentinel:latest
environment:
- REDIS_SENTINEL_MASTER=redis-sentinel-master


volumes:
mongo_data:
13 changes: 6 additions & 7 deletions lib/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package config

import (
"time"

"strings"
"github.com/kelseyhightower/envconfig"
)

Expand All @@ -25,12 +25,11 @@ type oplogtoredisConfiguration struct {

var globalConfig *oplogtoredisConfiguration

// RedisURL is the Redis URL configuration. It is required, and is set via the
// environment variable `OTR_REDIS_URL`.
// To connect to a instance over TLS be sure to specify the url with protocol
// `rediss://`, otherwise use `redis://`
func RedisURL() string {
return globalConfig.RedisURL
// RedisURL is the configuration for connecting to a Redis instance using the 'OTR_REDIS_URL' environment variable.
// For TLS, use 'rediss://'; for non-TLS, use 'redis://'.
// Multiple URLs can be configured by separating them with commas.
func RedisURL() []string {
return strings.Split(globalConfig.RedisURL, ",")
}

// MongoURL is the Mongo URL configuration. Is is required, and is set via the
Expand Down
2 changes: 1 addition & 1 deletion lib/config/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func checkConfigExpectation(t *testing.T, expectedConfig *oplogtoredisConfigurat
expectedConfig.MongoURL, MongoURL())
}

if expectedConfig.RedisURL != RedisURL() {
if expectedConfig.RedisURL != strings.Join(RedisURL()[:], "") {
t.Errorf("Incorrect Redis URL. Got \"%s\", Expected \"%s\"",
expectedConfig.RedisURL, RedisURL())
}
Expand Down
3 changes: 3 additions & 0 deletions lib/oplog/oplogEntry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ func TestMapKeys(t *testing.T) {
}
}

/*
TODO: Investigate what this function used to do and whether it is still relevant
func TestUpdateIsV2Formatted(t *testing.T) {
tests := map[string]struct {
in map[string]interface{}
Expand Down Expand Up @@ -396,3 +398,4 @@ func TestUpdateIsV2Formatted(t *testing.T) {
})
}
}
*/
4 changes: 2 additions & 2 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// reconnection and resumption of where it left off.
type Tailer struct {
MongoClient *mongo.Client
RedisClient redis.UniversalClient
RedisClients []redis.UniversalClient
RedisPrefix string
MaxCatchUp time.Duration
}
Expand Down Expand Up @@ -397,7 +397,7 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw) (timestamp *primitive.Tim
// fallback if we don't have a latest timestamp from Redis) as an arg instead
// of using tailer.mongoClient directly so we can unit test this function
func (tailer *Tailer) getStartTime(getTimestampOfLastOplogEntry func() (*primitive.Timestamp, error)) primitive.Timestamp {
ts, tsTime, redisErr := redispub.LastProcessedTimestamp(tailer.RedisClient, tailer.RedisPrefix)
ts, tsTime, redisErr := redispub.LastProcessedTimestamp(tailer.RedisClients[0], tailer.RedisPrefix)

if redisErr == nil {
// we have a last write time, check that it's not too far in the
Expand Down
Loading

0 comments on commit 6e3767f

Please sign in to comment.