Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use slice for Subscriptions instead of map #131

Merged
merged 1 commit into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions autopaho/cmd/rpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ func getCmConfig(cfg config) autopaho.ClientConfig {
fmt.Println("mqtt connection up")
ctx, _ := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
if _, err := cm.Subscribe(ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
cfg.topic: {QoS: cfg.qos},
Subscriptions: []paho.SubscribeOptions{
{Topic: cfg.topic, QoS: cfg.qos},
},
}); err != nil {
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
Expand Down
4 changes: 2 additions & 2 deletions autopaho/cmd/rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func listener(rTopic string) {
})

_, err = cm.Subscribe(ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
rTopic: {QoS: 0},
Subscriptions: []paho.SubscribeOptions{
{Topic: rTopic, QoS: 0},
},
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions autopaho/examples/docker/subscriber/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func main() {
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
fmt.Println("mqtt connection up")
if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
cfg.topic: {QoS: cfg.qos},
Subscriptions: []paho.SubscribeOptions{
{Topic: cfg.topic, QoS: cfg.qos},
},
}); err != nil {
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
Expand Down
4 changes: 2 additions & 2 deletions autopaho/extensions/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func NewHandler(ctx context.Context, opts HandlerOpts) (*Handler, error) {
opts.Router.RegisterHandler(h.responseTopic, h.responseHandler)

_, err := opts.Conn.Subscribe(ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
h.responseTopic: {QoS: 1},
Subscriptions: []paho.SubscribeOptions{
{Topic: h.responseTopic, QoS: 1},
},
})
if err != nil {
Expand Down
10 changes: 2 additions & 8 deletions packets/packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,7 @@ func NewControlPacket(t byte) *ControlPacket {
cp.Content = &Pubcomp{Properties: &Properties{}}
case SUBSCRIBE:
cp.Flags = 2
cp.Content = &Subscribe{
Subscriptions: make(map[string]SubOptions),
Properties: &Properties{},
}
cp.Content = &Subscribe{Properties: &Properties{}}
case SUBACK:
cp.Content = &Suback{Properties: &Properties{}}
case UNSUBSCRIBE:
Expand Down Expand Up @@ -220,10 +217,7 @@ func ReadPacket(r io.Reader) (*ControlPacket, error) {
cp.Content = &Pubcomp{Properties: &Properties{}}
case SUBSCRIBE:
cp.Flags = 2
cp.Content = &Subscribe{
Subscriptions: make(map[string]SubOptions),
Properties: &Properties{},
}
cp.Content = &Subscribe{Properties: &Properties{}}
case SUBACK:
cp.Content = &Suback{Properties: &Properties{}}
case UNSUBSCRIBE:
Expand Down
5 changes: 1 addition & 4 deletions packets/packets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,7 @@ func TestNewControlPacket(t *testing.T) {
args: SUBSCRIBE,
want: &ControlPacket{
FixedHeader: FixedHeader{Type: SUBSCRIBE, Flags: 2},
Content: &Subscribe{
Properties: &Properties{},
Subscriptions: make(map[string]SubOptions),
},
Content: &Subscribe{Properties: &Properties{}},
},
},
{
Expand Down
5 changes: 2 additions & 3 deletions packets/pingreq.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package packets

import (
"bytes"
"fmt"
"io"
"net"
)
Expand All @@ -12,10 +11,10 @@ type Pingreq struct {
}

func (p *Pingreq) String() string {
return fmt.Sprintf("PINGREQ")
return "PINGREQ"
}

//Unpack is the implementation of the interface required function for a packet
// Unpack is the implementation of the interface required function for a packet
func (p *Pingreq) Unpack(r *bytes.Buffer) error {
return nil
}
Expand Down
5 changes: 2 additions & 3 deletions packets/pingresp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package packets

import (
"bytes"
"fmt"
"io"
"net"
)
Expand All @@ -12,10 +11,10 @@ type Pingresp struct {
}

func (p *Pingresp) String() string {
return fmt.Sprintf("PINGRESP")
return "PINGRESP"
}

//Unpack is the implementation of the interface required function for a packet
// Unpack is the implementation of the interface required function for a packet
func (p *Pingresp) Unpack(r *bytes.Buffer) error {
return nil
}
Expand Down
14 changes: 8 additions & 6 deletions packets/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ import (
// Subscribe is the Variable Header definition for a Subscribe control packet
type Subscribe struct {
Properties *Properties
Subscriptions map[string]SubOptions
Subscriptions []SubOptions
PacketID uint16
}

func (s *Subscribe) String() string {
var b strings.Builder

fmt.Fprintf(&b, "SUBSCRIBE: PacketID:%d Subscriptions:\n", s.PacketID)
for sub, o := range s.Subscriptions {
fmt.Fprintf(&b, "\t%s: QOS:%d RetainHandling:%X NoLocal:%t RetainAsPublished:%t\n", sub, o.QoS, o.RetainHandling, o.NoLocal, o.RetainAsPublished)
for _, o := range s.Subscriptions {
fmt.Fprintf(&b, "\t%s: QOS:%d RetainHandling:%X NoLocal:%t RetainAsPublished:%t\n", o.Topic, o.QoS, o.RetainHandling, o.NoLocal, o.RetainAsPublished)
}
fmt.Fprintf(&b, "Properties:\n%s", s.Properties)

Expand All @@ -29,6 +29,7 @@ func (s *Subscribe) String() string {

// SubOptions is the struct representing the options for a subscription
type SubOptions struct {
Topic string
QoS byte
RetainHandling byte
NoLocal bool
Expand Down Expand Up @@ -87,7 +88,8 @@ func (s *Subscribe) Unpack(r *bytes.Buffer) error {
if err = so.Unpack(r); err != nil {
return err
}
s.Subscriptions[t] = so
so.Topic = t
s.Subscriptions = append(s.Subscriptions, so)
}

return nil
Expand All @@ -98,8 +100,8 @@ func (s *Subscribe) Buffers() net.Buffers {
var b bytes.Buffer
writeUint16(s.PacketID, &b)
var subs bytes.Buffer
for t, o := range s.Subscriptions {
writeString(t, &subs)
for _, o := range s.Subscriptions {
writeString(o.Topic, &subs)
subs.WriteByte(o.Pack())
}
idvp := s.Properties.Pack(SUBSCRIBE)
Expand Down
12 changes: 6 additions & 6 deletions paho/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,20 +623,20 @@ func (c *Client) Authenticate(ctx context.Context, a *Auth) (*AuthResponse, erro
// is returned from the function, along with any errors.
func (c *Client) Subscribe(ctx context.Context, s *Subscribe) (*Suback, error) {
if !c.serverProps.WildcardSubAvailable {
for t := range s.Subscriptions {
if strings.ContainsAny(t, "#+") {
for _, sub := range s.Subscriptions {
if strings.ContainsAny(sub.Topic, "#+") {
// Using a wildcard in a subscription when not supported
return nil, fmt.Errorf("cannot subscribe to %s, server does not support wildcards", t)
return nil, fmt.Errorf("cannot subscribe to %s, server does not support wildcards", sub.Topic)
}
}
}
if !c.serverProps.SubIDAvailable && s.Properties != nil && s.Properties.SubscriptionIdentifier != nil {
return nil, fmt.Errorf("cannot send subscribe with subID set, server does not support subID")
}
if !c.serverProps.SharedSubAvailable {
for t := range s.Subscriptions {
if strings.HasPrefix(t, "$share") {
return nil, fmt.Errorf("cannont subscribe to %s, server does not support shared subscriptions", t)
for _, sub := range s.Subscriptions {
if strings.HasPrefix(sub.Topic, "$share") {
return nil, fmt.Errorf("cannont subscribe to %s, server does not support shared subscriptions", sub.Topic)
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions paho/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ func TestClientSubscribe(t *testing.T) {
go c.PingHandler.Start(c.Conn, 30*time.Second)

s := &Subscribe{
Subscriptions: map[string]SubscribeOptions{
"test/1": {QoS: 1},
"test/2": {QoS: 2},
"test/3": {QoS: 0},
Subscriptions: []SubscribeOptions{
{Topic: "test/1", QoS: 1},
{Topic: "test/2", QoS: 2},
{Topic: "test/3", QoS: 0},
},
}

Expand Down
4 changes: 2 additions & 2 deletions paho/cmd/chat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func main() {
}()

if _, err := c.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
*topic: {QoS: byte(*qos), NoLocal: true},
Subscriptions: []paho.SubscribeOptions{
{Topic: *topic, QoS: byte(*qos), NoLocal: true},
},
}); err != nil {
log.Fatalln(err)
Expand Down
4 changes: 2 additions & 2 deletions paho/cmd/rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func listener(server, rTopic, username, password string) {
fmt.Printf("Connected to %s\n", server)

_, err = c.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
rTopic: paho.SubscribeOptions{QoS: 0},
Subscriptions: []paho.SubscribeOptions{
{Topic: rTopic, QoS: 0},
},
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions paho/cmd/stdoutsub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func main() {
}()

sa, err := c.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
*topic: {QoS: byte(*qos)},
Subscriptions: []paho.SubscribeOptions{
{Topic: *topic, QoS: byte(*qos)},
},
})
if err != nil {
Expand Down
22 changes: 12 additions & 10 deletions paho/cp_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ type (
// Subscribe is a representation of a MQTT subscribe packet
Subscribe struct {
Properties *SubscribeProperties
Subscriptions map[string]SubscribeOptions
Subscriptions []SubscribeOptions
}

// SubscribeOptions is the struct representing the options for a subscription
SubscribeOptions struct {
Topic string
QoS byte
RetainHandling byte
NoLocal bool
Expand All @@ -35,16 +36,17 @@ func (s *Subscribe) InitProperties(prop *packets.Properties) {
}
}

// PacketSubOptionsFromSubscribeOptions returns a map of string to packet
// PacketSubOptionsFromSubscribeOptions returns a slice of packet
// library SubOptions for the paho Subscribe on which it is called
func (s *Subscribe) PacketSubOptionsFromSubscribeOptions() map[string]packets.SubOptions {
r := make(map[string]packets.SubOptions)
for k, v := range s.Subscriptions {
r[k] = packets.SubOptions{
QoS: v.QoS,
NoLocal: v.NoLocal,
RetainAsPublished: v.RetainAsPublished,
RetainHandling: v.RetainHandling,
func (s *Subscribe) PacketSubOptionsFromSubscribeOptions() []packets.SubOptions {
r := make([]packets.SubOptions, len(s.Subscriptions))
for i, sub := range s.Subscriptions {
r[i] = packets.SubOptions{
Topic: sub.Topic,
QoS: sub.QoS,
NoLocal: sub.NoLocal,
RetainAsPublished: sub.RetainAsPublished,
RetainHandling: sub.RetainHandling,
}
}

Expand Down
4 changes: 2 additions & 2 deletions paho/extensions/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func NewHandler(ctx context.Context, c *paho.Client) (*Handler, error) {
c.Router.RegisterHandler(fmt.Sprintf("%s/responses", c.ClientID), h.responseHandler)

_, err := c.Subscribe(ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
fmt.Sprintf("%s/responses", c.ClientID): {QoS: 1},
Subscriptions: []paho.SubscribeOptions{
{Topic: fmt.Sprintf("%s/responses", c.ClientID), QoS: 1},
},
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions paho/message_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type CPContext struct {
}

// MIDs is the default MIDService provided by this library.
// It uses a map of uint16 to *CPContext to track responses
// to messages with a messageid
// It uses a slice of *CPContext to track responses
// to messages with a messageid tracking the last used message id
type MIDs struct {
sync.Mutex
lastMid uint16
Expand Down