From 87ee770b4dfc14070e7941e4d0ed1d062380f55d Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 14 Nov 2018 18:48:31 -0800 Subject: [PATCH] encode record-store keys in pubsub Instead of using bare keys (binary), use `/record/base64url(key)`. This: 1. Pubsub keys must be UTF-8. Record-store keys definitely are not. 2. The prefix may help with validators in the future (namespace *everything*). See: https://github.com/ipfs/interop/pull/39 --- pubsub.go | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/pubsub.go b/pubsub.go index 514dcae..e7ca5ce 100644 --- a/pubsub.go +++ b/pubsub.go @@ -3,6 +3,7 @@ package namesys import ( "bytes" "context" + "encoding/base64" "errors" "sync" "time" @@ -48,6 +49,13 @@ type PubsubValueStore struct { Validator record.Validator } +// KeyToTopic converts a binary record key to a pubsub topic key. +func KeyToTopic(key string) string { + // Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs. + // Encodes to "/record/base64url(key)" + return "/record/" + base64.RawURLEncoding.EncodeToString([]byte(key)) +} + // NewPubsubPublisher constructs a new Publisher that publishes IPNS records through pubsub. // The constructor interface is complicated by the need to bootstrap the pubsub topic. // This could be greatly simplified if the pubsub implementation handled bootstrap itself @@ -69,6 +77,10 @@ func NewPubsubValueStore(ctx context.Context, host p2phost.Host, cr routing.Cont // Publish publishes an IPNS record through pubsub with default TTL func (p *PubsubValueStore) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) error { + // Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs. + // Encode to "/record/base64url(key)" + topic := KeyToTopic(key) + p.mx.Lock() _, bootstraped := p.subs[key] @@ -76,13 +88,13 @@ func (p *PubsubValueStore) PutValue(ctx context.Context, key string, value []byt p.subs[key] = nil p.mx.Unlock() - bootstrapPubsub(p.ctx, p.cr, p.host, key) + bootstrapPubsub(p.ctx, p.cr, p.host, topic) } else { p.mx.Unlock() } log.Debugf("PubsubPublish: publish value for key", key) - return p.ps.Publish(key, value) + return p.ps.Publish(topic, value) } func (p *PubsubValueStore) isBetter(key string, val []byte) bool { @@ -115,15 +127,17 @@ func (p *PubsubValueStore) Subscribe(key string) error { return nil } + topic := KeyToTopic(key) + // Ignore the error. We have to check again anyways to make sure the // record hasn't expired. // // Also, make sure to do this *before* subscribing. - p.ps.RegisterTopicValidator(key, func(ctx context.Context, msg *pubsub.Message) bool { + p.ps.RegisterTopicValidator(topic, func(ctx context.Context, msg *pubsub.Message) bool { return p.isBetter(key, msg.GetData()) }) - sub, err := p.ps.Subscribe(key) + sub, err := p.ps.Subscribe(topic) if err != nil { p.mx.Unlock() return err @@ -146,7 +160,7 @@ func (p *PubsubValueStore) Subscribe(key string) error { if !bootstraped { // TODO: Deal with publish then resolve case? Cancel behaviour changes. - go bootstrapPubsub(ctx, p.cr, p.host, key) + go bootstrapPubsub(ctx, p.cr, p.host, topic) } return nil }