Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wss:// and netpoll #121

Closed
macabre2077 opened this issue Oct 24, 2020 · 20 comments
Closed

wss:// and netpoll #121

macabre2077 opened this issue Oct 24, 2020 · 20 comments

Comments

@macabre2077
Copy link

macabre2077 commented Oct 24, 2020

I was having issues with my app and found out that using wss:// with nginx as a proxy leads to missing messages. I am using the chat example with netpoll.

Running the golang code below with ws:// returns the number of sent requests, whilst using wss:// returns only first 2-3 messages.

High-level example with wsutil works as expected, I think that might be the issue with netpoll.
Hopefully, that makes more sense to you. Thanks

slightly modified chat example from gobwas/ws-examples

server.go

	poller, _ := netpoll.New(nil)

	pool := gopool.NewPool(128, 1, 1)

	handle := func(conn net.Conn) {
		userSeq := 0

		hs, err := ws.Upgrade(conn)

		if err != nil {
			panic(err)
		}

		logger.Debugf("%s: established websocket connection: %+v", nameConn(conn), hs)

		desc := netpoll.Must(netpoll.HandleRead(conn))

		// Subscribe to events about conn.
		pErr := poller.Start(desc, func(ev netpoll.Event) {
			fmt.Println(userSeq)

			userSeq++
		})

		if pErr != nil {
			panic(pErr)
		}
	}

	// Create incoming connections listener.
	ln, err := net.Listen("tcp", ":8088")

	if err != nil {
		log.Fatal(err)
	}

	log.Printf("websocket is listening on %s", ln.Addr().String())

	// Create netpoll descriptor for the listener.
	// We use OneShot here to manually resume events stream when we want to.
	acceptDesc := netpoll.Must(netpoll.HandleListener(
		ln, netpoll.EventRead|netpoll.EventOneShot,
	))

	// accept is a channel to signal about next incoming connection Accept()
	// results.
	accept := make(chan error, 1)

	// Subscribe to events about listener.
	err = poller.Start(acceptDesc, func(e netpoll.Event) {
		err := pool.ScheduleTimeout(time.Millisecond, func() {
			conn, err := ln.Accept()
			if err != nil {
				accept <- err
				return
			}

			accept <- nil
			handle(conn)
		})
		if err == nil {
			err = <-accept
		}
		if err != nil {
			if err != gopool.ErrScheduleTimeout {
				goto cooldown
			}
			if ne, ok := err.(net.Error); ok && ne.Temporary() {
				goto cooldown
			}

			log.Fatalf("accept error: %v", err)

		cooldown:
			delay := 5 * time.Millisecond
			log.Printf("accept error: %v; retrying in %s", err, delay)
			time.Sleep(delay)
		}

		err = poller.Resume(acceptDesc)

		if err != nil {
			log.Fatal(err)
		}
	})

	if err != nil {
		log.Fatal(err)
	}

client.js

process.env['NODE_TLS_REJECT_UNAUTHORIZED'] = 0

const WebSocket = require('ws');
const ws = new WebSocket( 'wss://vm.dev/' );

ws.on('open', function open() {
    for (let i = 0; i < 10; i++) {
        ws.send("ping");
    }
})

Nginx config

server {
    listen 80;
    listen 443 ssl;

    # ... ssl config here ...

    location / {
        proxy_pass http://192.168.0.100:8088/;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection 'upgrade';
        proxy_set_header Host $host;
        proxy_cache_bypass $http_upgrade;
        proxy_set_header X-Real-IP $remote_addr;
    }
}
@macabre2077 macabre2077 changed the title wss:// and Nginx wss:// and netpoll Oct 24, 2020
@macabre2077
Copy link
Author

I get the same issue using tls.Server and then upgrading as said in #64
I made a repo with the reproduction - https://github.com/macabre2077/ws-examples

@IbrahimKoutabli
Copy link

IbrahimKoutabli commented Nov 1, 2020

I have the same issue with or without TLS... if i send 10/15 messages, the server picks up the first 7/8... then if i send some more messages, it gives me the previous missed messages.

Does anyone have any idea why ?

I have similar set up to the original issue taken from the example modified a bit.

@navossoc
Copy link
Contributor

navossoc commented Nov 1, 2020

@IbrahimKoutabli have you tried checking TCPConn.SetNoDelay?

@emplorium
Copy link

@navossoc on the server or client? i tried wrapping the server conn with SetNoDelay to no avail... any advice?

thanks

@emplorium
Copy link

because it defaults to true ? already no delay

@navossoc
Copy link
Contributor

navossoc commented Nov 1, 2020

For better results: on both sides, so you can make sure that you doesn't buffer.
I'm not sure if this is your problem at all, I'm just trying to guess, but it looks like so.

Are you using any kind of proxy/reverse proxy/cdn?
For this test, I suggest you do a direct connection between the two computers.

I'm using the zero copy approach.
On my Accept main loop I'm using AcceptTCP, this gives me a *net.TCPConn.

If you are using a *net.Conn you can also type convert it like: conn.(*net.TCPConn) then call SetNoDelay(true).

@emplorium
Copy link

emplorium commented Nov 1, 2020

Thanks

I haven't deployed it yet so I'm not using any proxies, I'm just running it locally.

I'm using a *net.Conn and I convert it like conn.(*net.TCPConn) but why do I need to manually call SetNoDelay(true) when according to the docs it defaults to true ?

So if it defaults to true (I manually set it to true anyway) why do all the messages sent to the server not being processed ?

@emplorium
Copy link

emplorium commented Nov 1, 2020

handle := func(conn net.Conn) {
		newConn := conn.(*net.TCPConn)
		newConn.SetNoDelay(true)
		safeConn := deadliner{newConn, *ioTimeout}
		// Zero-copy upgrade to WebSocket connection.
		hs, err := ws.Upgrade(safeConn)
		if err != nil {
			log.Printf("%s: upgrade error: %v", nameConn(conn), err)
			conn.Close()
			return
		}
		user := chat.Register(safeConn)
		desc := netpoll.Must(netpoll.HandleRead(conn))

		// Subscribe to events about conn.
		poller.Start(desc, func(ev netpoll.Event) {
			if ev&(netpoll.EventReadHup|netpoll.EventHup) != 0 {
				poller.Stop(desc)
				chat.Remove(user)
				return
			}
			pool.Schedule(func() {
				if err := user.Receive(); err != nil {
					// When receive failed, we can only disconnect broken
					// connection and stop to receive events about it.
					poller.Stop(desc)
					chat.Remove(user)
				}
			})
		})
	}
	ln, err := net.Listen("tcp", *addr)
	if err != nil {
		log.Fatal(err)
	}
	defer ln.Close()
	// Create netpoll descriptor for the listener.
	// We use OneShot here to manually resume events stream when we want to.
	acceptDesc := netpoll.Must(netpoll.HandleListener(
		ln, netpoll.EventRead|netpoll.EventOneShot,
	))
	accept := make(chan error, 1)

	// Subscribe to events about listener.
	poller.Start(acceptDesc, func(e netpoll.Event) {
		err := pool.ScheduleTimeout(time.Millisecond, func() {
			conn, err := ln.Accept()
			if err != nil {
				accept <- err
				return
			}
			accept <- nil
			handle(conn)
		})
		if err == nil {
			err = <-accept
		}
		if err != nil {
			if err != gopool.ErrScheduleTimeout {
				goto cooldown
			}
			if ne, ok := err.(net.Error); ok && ne.Temporary() {
				goto cooldown
			}

			log.Fatalf("accept error: %v", err)

		cooldown:
			delay := 5 * time.Millisecond
			log.Printf("accept error: %v; retrying in %s", err, delay)
			time.Sleep(delay)
		}

		poller.Resume(acceptDesc)
	})
	<-exit
}

@emplorium
Copy link

something to do with the poller ?

@lesismal
Copy link

lesismal commented Apr 4, 2021

SetNoDelay should not affect the protocol logic of the application layer

@lesismal
Copy link

lesismal commented Apr 4, 2021

handle := func(conn net.Conn) {
		newConn := conn.(*net.TCPConn)
		newConn.SetNoDelay(true)
		safeConn := deadliner{newConn, *ioTimeout}
		// Zero-copy upgrade to WebSocket connection.
		hs, err := ws.Upgrade(safeConn)
		if err != nil {
			log.Printf("%s: upgrade error: %v", nameConn(conn), err)
			conn.Close()
			return
		}
		user := chat.Register(safeConn)
		desc := netpoll.Must(netpoll.HandleRead(conn))

		// Subscribe to events about conn.
		poller.Start(desc, func(ev netpoll.Event) {
			if ev&(netpoll.EventReadHup|netpoll.EventHup) != 0 {
				poller.Stop(desc)
				chat.Remove(user)
				return
			}
			pool.Schedule(func() {
				if err := user.Receive(); err != nil {
					// When receive failed, we can only disconnect broken
					// connection and stop to receive events about it.
					poller.Stop(desc)
					chat.Remove(user)
				}
			})
		})
	}
	ln, err := net.Listen("tcp", *addr)
	if err != nil {
		log.Fatal(err)
	}
	defer ln.Close()
	// Create netpoll descriptor for the listener.
	// We use OneShot here to manually resume events stream when we want to.
	acceptDesc := netpoll.Must(netpoll.HandleListener(
		ln, netpoll.EventRead|netpoll.EventOneShot,
	))
	accept := make(chan error, 1)

	// Subscribe to events about listener.
	poller.Start(acceptDesc, func(e netpoll.Event) {
		err := pool.ScheduleTimeout(time.Millisecond, func() {
			conn, err := ln.Accept()
			if err != nil {
				accept <- err
				return
			}
			accept <- nil
			handle(conn)
		})
		if err == nil {
			err = <-accept
		}
		if err != nil {
			if err != gopool.ErrScheduleTimeout {
				goto cooldown
			}
			if ne, ok := err.(net.Error); ok && ne.Temporary() {
				goto cooldown
			}

			log.Fatalf("accept error: %v", err)

		cooldown:
			delay := 5 * time.Millisecond
			log.Printf("accept error: %v; retrying in %s", err, delay)
			time.Sleep(delay)
		}

		poller.Resume(acceptDesc)
	})
	<-exit
}

Which package does "netpoll" refer to?

@macabre2077
Copy link
Author

@lesismal netpoll is from github.com/mailru/easygo

@lesismal
Copy link

lesismal commented Apr 6, 2021

Thanks

I haven't deployed it yet so I'm not using any proxies, I'm just running it locally.

I'm using a *net.Conn and I convert it like conn.(*net.TCPConn) but why do I need to manually call SetNoDelay(true) when according to the docs it defaults to true ?

So if it defaults to true (I manually set it to true anyway) why do all the messages sent to the server not being processed ?

netpoll.HandelRead called the conn.File() which returns a File with the duped fd, I guess you use the previous safeConn in user.Receive(), but the netpoll is handling the duped one.

@lesismal
Copy link

lesismal commented Apr 6, 2021

All the above example does not show the complete code. According to your guys' description, I guess it is due to the problem of tcp sticky package:
For example, the client continuously sends 10 ws messages. At the tcp protocol layer, more than one messages may arrive at the same time, but user.Receive() may only read 1 ws message and then return, there are still bytes in the tcp read buffer. easygo/netpoll use epoll ET mode which would not trigger again if you read only part of the data after a read event.

@lesismal
Copy link

lesismal commented Apr 6, 2021

I check the code, you should not use netpoll with gobwas, Upgrade may block:

https://github.com/gobwas/ws/blob/master/server.go#L452

if many connections send bytes one-by-one or send a half ws upgrade message, it will make all your gopool's task blocked, then your service not available.

and refer to this: When using epoll somehow wsutil helpers is not the best way to deal with frames.

for epoll/1m-ws-connections, you may try this: poller-ws-server

@sunfuze
Copy link

sunfuze commented Apr 15, 2021

All the above example does not show the complete code. According to your guys' description, I guess it is due to the problem of tcp sticky package:
For example, the client continuously sends 10 ws messages. At the tcp protocol layer, more than one messages may arrive at the same time, but user.Receive() may only read 1 ws message and then return, there are still bytes in the tcp read buffer. easygo/netpoll use epoll ET mode which would not trigger again if you read only part of the data after a read event.

@macabre2077 easygo/netpoll default use epoll ET model. You can change to epoll LT model. LT and ET model all will trigger read event of one fd concurrency, if you don't want to do something locking to ensure read order, you can use EPOLLONESHOT. Code snippets:

                desc := netpoll.Must(netpoll.HandleReadOnce(conn))

		// Subscribe to events about conn.
		poller.Start(desc, func(ev netpoll.Event) {
			if ev&(netpoll.EventReadHup|netpoll.EventHup) != 0 {
				poller.Stop(desc)
				chat.Remove(user)
				return
			}
			pool.Schedule(func() {
				if err := user.Receive(); err != nil {
					// When receive failed, we can only disconnect broken
					// connection and stop to receive events about it.
					poller.Stop(desc)
					chat.Remove(user)
                                        return
				}
                                // oneshot model should do resume after every read operation
                                poller.Resume(desc)
			})
		})

@lesismal
Copy link

All the above example does not show the complete code. According to your guys' description, I guess it is due to the problem of tcp sticky package:
For example, the client continuously sends 10 ws messages. At the tcp protocol layer, more than one messages may arrive at the same time, but user.Receive() may only read 1 ws message and then return, there are still bytes in the tcp read buffer. easygo/netpoll use epoll ET mode which would not trigger again if you read only part of the data after a read event.

@macabre2077 easygo/netpoll default use epoll ET model. You can change to epoll LT model. LT and ET model all will trigger read event of one fd concurrency, if you don't want to do something locking to ensure read order, you can use EPOLLONESHOT. Code snippets:

                desc := netpoll.Must(netpoll.HandleReadOnce(conn))

		// Subscribe to events about conn.
		poller.Start(desc, func(ev netpoll.Event) {
			if ev&(netpoll.EventReadHup|netpoll.EventHup) != 0 {
				poller.Stop(desc)
				chat.Remove(user)
				return
			}
			pool.Schedule(func() {
				if err := user.Receive(); err != nil {
					// When receive failed, we can only disconnect broken
					// connection and stop to receive events about it.
					poller.Stop(desc)
					chat.Remove(user)
                                        return
				}
                                // oneshot model should do resume after every read operation
                                poller.Resume(desc)
			})
		})

It is not suitable to use netpoll in gobwas, epoll relies on non-blocking fd, but the upgrade of gobwas relies on blocking fd, the upgrade will cause the task pool goroutine to block

@lesismal
Copy link

Many people do not understand the relationship between synchronization, asynchrony, and task pool. Similar problems exist in this 1m-go-websockets.

@gobwas
Copy link
Owner

gobwas commented Apr 25, 2021

@lesismal by design of this library you should upgrade first before registering your connection in epoll. Otherwise you have to implement something like pico http parser from C world. I believe that probably would be overkill.

@lesismal
Copy link

@lesismal by design of this library you should upgrade first before registering your connection in epoll. Otherwise you have to implement something like pico http parser from C world. I believe that probably would be overkill.

so I write this real-epoll-http/ws, and examples

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants