Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Pubsub.SeenMessagesStrategy #9543

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module github.com/ipfs/kubo

replace github.com/libp2p/go-libp2p-pubsub => github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230110155724-04bfcf58514f
Copy link
Member

@lidel lidel Jan 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYSA I've pushed this change, so we can see if CI passes against last commit from libp2p/go-libp2p-pubsub#513
(It is late Friday so will do proper review on Monday)

@smrz2001 In the future, you can point at an arbitrary commit from your own fork via:

$  go mod edit -replace github.com/libp2p/go-libp2p-pubsub=github.com/smrz2001/go-libp2p-pubsub@04bfcf58514f59bddb650fec5b2edac018c8c406

Copy link
Contributor Author

@smrz2001 smrz2001 Jan 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much, @lidel!! I didn't know we could point to fork commits like that!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix any CI issues that come up.


require (
bazil.org/fuse v0.0.0-20200117225306-7b5117fecadc
contrib.go.opencensus.io/exporter/prometheus v0.4.0
Expand Down Expand Up @@ -134,6 +136,7 @@ require (
github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/felixge/httpsnoop v1.0.2 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
Expand Down Expand Up @@ -224,7 +227,6 @@ require (
github.com/whyrusleeping/cbor-gen v0.0.0-20200710004633-5379fc63235d // indirect
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/
github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302 h1:QV0ZrfBLpFc2KDk+a4LJefDczXnonRwrYrQJY/9L4dA=
github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302/go.mod h1:qBlWZqWeVx9BjvqBsnC/8RUlAYpIFmPvgROcw0n1scE=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -817,8 +819,6 @@ github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRj
github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-pubsub v0.8.2 h1:QLGUmkgKmwEVxVDYGsqc5t9CykOMY2Y21cXQHjR462I=
github.com/libp2p/go-libp2p-pubsub v0.8.2/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw=
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 h1:D30iKdlqDt5ZmLEYhHELCMRj8b4sFAqrUcshIUvVP/s=
github.com/libp2p/go-libp2p-pubsub-router v0.6.0/go.mod h1:FY/q0/RBTKsLA7l4vqC2cbRbOvyDotg8PJQ7j8FDudE=
github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA=
Expand Down Expand Up @@ -1273,6 +1273,8 @@ github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:s
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY=
github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230110155724-04bfcf58514f h1:Z8HthHnvHtDafaSRNU6n+UodAqOuptMcDBuLprrv1Qg=
github.com/smrz2001/go-libp2p-pubsub v0.0.0-20230110155724-04bfcf58514f/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE=
Expand Down Expand Up @@ -1359,8 +1361,6 @@ github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
Expand Down
116 changes: 77 additions & 39 deletions test/integration/pubsub_msg_seen_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error

var bootstrapNode, consumerNode, producerNode *core.IpfsNode
var bootstrapPeerID, consumerPeerID, producerPeerID peer.ID
sendDupMsg := false

mn := mocknet.New()
bootstrapNode, err := mockNode(ctx, mn, false, "") // no need for PubSub configuration
Expand All @@ -98,25 +97,34 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error
t.Fatal(err)
}

// Used for logging the timeline
startTime := time.Time{}

// Used for overriding the message ID
sendMsgId := ""

// Set up the pubsub message ID generation override for the producer
core.RegisterFXOptionFunc(func(info core.FXNodeInfo) ([]fx.Option, error) {
var pubsubOptions []pubsub.Option
pubsubOptions = append(
pubsubOptions,
pubsub.WithSeenMessagesTTL(ttl),
pubsub.WithMessageIdFn(func(pmsg *pubsub_pb.Message) string {
now := time.Now().Format(time.StampMilli)
now := time.Now()
if startTime.Second() == 0 {
startTime = now
}
timeElapsed := now.Sub(startTime).Seconds()
msg := string(pmsg.Data)
var msgID string
from, _ := peer.IDFromBytes(pmsg.From)
if (from == producerPeerID) && sendDupMsg {
msgID = "DupMsg"
t.Logf("sending [%s] with duplicate message ID at [%s]", msg, now)
var msgId string
if from == producerPeerID {
msgId = sendMsgId
t.Logf("sending [%s] with message ID [%s] at T%fs", msg, msgId, timeElapsed)
} else {
msgID = pubsub.DefaultMsgIdFn(pmsg)
t.Logf("sending [%s] with unique message ID at [%s]", msg, now)
msgId = pubsub.DefaultMsgIdFn(pmsg)
}
return msgID
return msgId
}),
)
return append(
Expand Down Expand Up @@ -165,8 +173,8 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error
t.Fatal(err)
}
// Utility functions defined inline to include context in closure
now := func() string {
return time.Now().Format(time.StampMilli)
now := func() float64 {
return time.Now().Sub(startTime).Seconds()
}
ctr := 0
msgGen := func() string {
Expand All @@ -188,57 +196,87 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error
msg, err := consumerSubscription.Next(rxCtx)
if shouldFind {
if err != nil {
t.Logf("did not receive [%s] by [%s]", msgTxt, now())
t.Logf("expected but did not receive [%s] at T%fs", msgTxt, now())
t.Fatal(err)
}
t.Logf("received [%s] at [%s]", string(msg.Data()), now())
t.Logf("received [%s] at T%fs", string(msg.Data()), now())
if !bytes.Equal(msg.Data(), []byte(msgTxt)) {
t.Fatalf("consumed data [%s] does not match published data [%s]", string(msg.Data()), msgTxt)
}
} else {
if err == nil {
t.Logf("received [%s] at [%s]", string(msg.Data()), now())
t.Logf("not expected but received [%s] at T%fs", string(msg.Data()), now())
t.Fail()
}
t.Logf("did not receive [%s] by [%s]", msgTxt, now())
t.Logf("did not receive [%s] at T%fs", msgTxt, now())
}
}

// Send message 1 with the message ID we're going to duplicate later
sendDupMsg = true
const MsgId_1 = "MsgId_1"
const MsgId_2 = "MsgId_2"
const MsgId_3 = "MsgId_3"

// Send message 1 with the message ID we're going to duplicate
sentMsg1 := time.Now()
sendMsgId = MsgId_1
msgTxt := produceMessage()
consumeMessage(msgTxt, true) // should find message
// Should find the message because it's new
consumeMessage(msgTxt, true)

// Send message 2 with the same message ID as before
sendDupMsg = true
// Send message 2 with a duplicate message ID
sendMsgId = MsgId_1
msgTxt = produceMessage()
consumeMessage(msgTxt, false) // should NOT find message, because it got deduplicated (sent twice within the SeenMessagesTTL window)

// Wait for seen cache TTL time to let seen cache entries time out
time.Sleep(ttl)
// Should NOT find message because it got deduplicated (sent 2 times within the SeenMessagesTTL window).
consumeMessage(msgTxt, false)

// Send message 3 with a new message ID
//
// This extra step is necessary for testing the cache TTL because the PubSub code only garbage collects when a
// message ID was not already present in the cache. This means that message 2's cache entry, even though it has
// technically timed out, will still cause the message to be considered duplicate. When a message with a different
// ID passes through, it will be added to the cache and garbage collection will clean up message 2's entry. This is
// another bug in the pubsub/cache implementation that will be fixed once the code is refactored for this issue:
// https://github.com/libp2p/go-libp2p-pubsub/issues/502
sendDupMsg = false
sendMsgId = MsgId_2
msgTxt = produceMessage()
// Should find the message because it's new
consumeMessage(msgTxt, true)

// Wait till just before the SeenMessagesTTL window has passed since message 1 was sent
time.Sleep(time.Until(sentMsg1.Add(ttl - 100*time.Millisecond)))

// Send message 4 with a duplicate message ID
sendMsgId = MsgId_1
msgTxt = produceMessage()
// Should NOT find the message because it got deduplicated (sent 3 times within the SeenMessagesTTL window). This
// time, however, the expiration for the message should also get pushed out for a whole SeenMessagesTTL window since
// the default time cache now implements a sliding window algorithm.
consumeMessage(msgTxt, false)

// Send message 5 with a duplicate message ID. This will be a second after the last attempt above since NOT finding
// a message takes a second to determine. That would put this attempt at ~1 second after the SeenMessagesTTL window
// starting at message 1 has expired.
sentMsg5 := time.Now()
sendMsgId = MsgId_1
msgTxt = produceMessage()
consumeMessage(msgTxt, true) // should find message
// Should NOT find the message, because it got deduplicated (sent 2 times since the updated SeenMessagesTTL window
// started). This time again, the expiration should get pushed out for another SeenMessagesTTL window.
consumeMessage(msgTxt, false)

// Send message 6 with a message ID that hasn't been seen within a SeenMessagesTTL window
sendMsgId = MsgId_2
msgTxt = produceMessage()
// Should find the message since last read > SeenMessagesTTL, so it looks like a new message.
consumeMessage(msgTxt, true)

// Sleep for a full SeenMessagesTTL window to let cache entries time out
time.Sleep(time.Until(sentMsg5.Add(ttl + 100*time.Millisecond)))

// Send message 4 with the same message ID as before
sendDupMsg = true
// Send message 7 with a duplicate message ID
sendMsgId = MsgId_1
msgTxt = produceMessage()
consumeMessage(msgTxt, true) // should find message again (time since the last read > SeenMessagesTTL, so it looks like a new message).
// Should find the message this time since last read > SeenMessagesTTL, so it looks like a new message.
consumeMessage(msgTxt, true)

// Send message 5 with a new message ID
// Send message 8 with a brand new message ID
//
// This step is not strictly necessary, but has been added for good measure.
sendDupMsg = false
sendMsgId = MsgId_3
msgTxt = produceMessage()
consumeMessage(msgTxt, true) // should find message
// Should find the message because it's new
consumeMessage(msgTxt, true)
return nil
}