Skip to content
This repository has been archived by the owner on Nov 10, 2020. It is now read-only.

consistent failure: network i/o timeout #29

Open
jcw opened this issue Jan 22, 2016 · 7 comments
Open

consistent failure: network i/o timeout #29

jcw opened this issue Jan 22, 2016 · 7 comments

Comments

@jcw
Copy link

jcw commented Jan 22, 2016

The following code fails, always after some 42 or 43 seconds (go 1.5.3, Mac OSX 10.11.3):

package main

import (
    "flag"
    "time"

    "github.com/surge/glog"
    "github.com/surgemq/message"
    "github.com/surgemq/surgemq/service"
)

func main() {
    flag.Parse()
    defer glog.Flush()

    go func() {
        srv := service.Server{}
        glog.Info("starting MQTT server at :12345")
        glog.Fatal(srv.ListenAndServe("tcp://:12345"))
    }()

    // wait a bit, the internal MQTT server is still starting up
    time.Sleep(time.Second)

    client := &service.Client{}

    msg := message.NewConnectMessage()
    msg.SetVersion(4)
    msg.SetCleanSession(true)
    msg.SetClientId([]byte("me"))

    if err := client.Connect("tcp://:12345", msg); err != nil {
        glog.Fatal(err)
    }
    glog.Infoln("connected to port 12345")

    for t := range time.Tick(time.Second) {
        glog.Info(t)

        msg := message.NewPublishMessage()
        msg.SetTopic([]byte("abc"))
        msg.SetPayload([]byte("def"))

        e := client.Publish(msg, nil)
        if e != nil {
            glog.Fatal("publish: ", e)
        }
    }
}

Sample error output from go run blah.go -logtostderr :

...
I0122 21:40:55.862248   11954 blah.go:37/main] 2016-01-22 21:40:55.862186826 +0100 CET
I0122 21:40:56.865790   11954 blah.go:37/main] 2016-01-22 21:40:56.8656395 +0100 CET
I0122 21:40:57.864022   11954 blah.go:37/main] 2016-01-22 21:40:57.863648731 +0100 CET
E0122 21:40:58.861597   11954 sendrecv.go:77/receiver] (2/me) error reading from connection: read tcp 127.0.0.1:52627->127.0.0.1:12345: i/o timeout
I0122 21:40:58.862059   11954 blah.go:37/main] 2016-01-22 21:40:58.861395068 +0100 CET
F0122 21:40:58.862185   11954 blah.go:44/main] publish: (2/me) Error sending PUBLISH message: service: buffer is not ready
exit status 255

I can't figure out what is causing this problem. Can anyone else reproduce this on a different system?

@jcw
Copy link
Author

jcw commented Jan 23, 2016

A failure after 45 seconds would be 1.5 times the service.minKeepAlive in client.go - could that be a hint?

@jcw
Copy link
Author

jcw commented Jan 23, 2016

When this code is added, the problem goes away:

submsg := message.NewSubscribeMessage()
submsg.AddTopic([]byte("#"), 0)
client.Subscribe(submsg, nil, func(msg *message.PublishMessage) error {
    return nil
})

(but it's not enough to pass in nil as publish handler)

In short: it looks like the keep-alive timer is not properly reset for use with publish-only clients.

@zhenjl
Copy link
Contributor

zhenjl commented Jan 25, 2016

@jcw thanks for the detailed updates. I sincerely apologize for not having tracked this close. TBH I haven't had time to look at this project for a while. Would be happy to merge any PR if you have a fix.

@jcw
Copy link
Author

jcw commented Jan 25, 2016

Unfortunately, I've not been able to chase this and reach a clear-cut conclusion. I'm not even sure that the above workaround is 100% correct - was hoping someone more versed in SurgeMQ than me would be able to look into it. I also ran into other issues with the last-Will (but am not ruling out my own confusion, to be honest).

For now, I've decided to use another package as client, with Mosquitto as server. Would love to revisit this later, because I'd prefer a single-exe solution with the server also in Go - but I'm afraid I can't contribute anything more of substance at the moment.

Hopefully the above small standalone example will eventually help someone find and fix this bug.

@ankoh
Copy link

ankoh commented Jan 26, 2016

@jcw It is a better approach to go for Mosquito and Eclipse Paho in any real-world application. Especially because SurgeMQ does not support ACK timeouts.

Note: If you're looking for experiments, I rewrote the broker here.
It focuses on a strict layer architecture, delegation pattern instead of excessive lambdas and idiomatic go channels instead of the former custom ring buffer. It's a research project for my university that aims to implement message prioritisation. I'll probably finish it in February.

@jcw
Copy link
Author

jcw commented Jan 26, 2016

Thanks @ankoh - yes, I'm using Mosquitto w/ Paho now, which allows me to proceed with my project. A single-exe solution with the MQTT broker built-in would have been really nice, but this setup will have to do for now. Thanks for the pointer - I'll keep an eye on it.

@ljy2010a
Copy link

ljy2010a commented Jul 21, 2016

I find out why get this error ->127.0.0.1:12345: i/o timeout
because the net.conn has set conn.SetReadDeadline but without the heartbeat
I saw Paho client has achieved

so I change the code like below , and find another error #27
to resolve this shoule set ackmsg.Ackbuf ackmsg.Msgbuf before processAcked

package main

import (
    "flag"
    "fmt"
    "time"

    "github.com/surge/glog"
    "github.com/surgemq/message"
    "github.com/surgemq/surgemq/service"
)

func main() {

    addr := "tcp://127.0.0.1:12345"
    flag.Parse()
    defer glog.Flush()

    go func() {
        srv := service.Server{}
        glog.Info("starting MQTT server at :12345")
        glog.Fatal(srv.ListenAndServe(addr))
    }()

    // wait a bit, the internal MQTT server is still starting up
    time.Sleep(time.Second)

    // mqtt.ERROR = log.New(os.Stdout, "", 0)
    // opts := mqtt.NewClientOptions().AddBroker(addr).SetClientID("Paho")
    // opts.SetKeepAlive(2 * time.Second)
    // opts.SetPingTimeout(1 * time.Second)

    // c := mqtt.NewClient(opts)
    // if token := c.Connect(); token.Wait() && token.Error() != nil {
    //  panic(token.Error())
    // }

    client := &service.Client{}
    msg := message.NewConnectMessage()
    msg.SetVersion(4)
    KeepAlive := 40
    msg.SetKeepAlive(uint16(KeepAlive))
    msg.SetCleanSession(true)
    msg.SetClientId([]byte("Paho"))
    if err := client.Connect(addr, msg); err != nil {
        glog.Fatal(err)
    }

    go heartbeatProcess(client, KeepAlive)

    glog.Infoln("connected to port 12345")
    i := 0
    for _ = range time.Tick(time.Second * 1) {
        i++
        glog.Info(i)
        text := fmt.Sprintf("%d", i)

        // token := c.Publish("topic", 0, false, text)
        // token.Wait()

        msg := message.NewPublishMessage()
        msg.SetTopic([]byte("topic"))
        msg.SetPayload([]byte(text))
        msg.SetRetain(false)
        err := client.Publish(msg, nil)
        if err != nil {
            glog.Fatal("publish: ", err)
        }
    }
}

func heartbeatProcess(c *service.Client, KeepAlive int) {
    for _ = range time.Tick(time.Duration(KeepAlive) * time.Second) {
        c.Ping(func(msg, ack message.Message, err error) error {
            glog.Infof("Ping \n")
            return nil
        })
    }
}

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants