Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: conn.Next not safe #648

Closed
3 tasks done
zhongweikang opened this issue Nov 6, 2024 · 29 comments
Closed
3 tasks done

[Bug]: conn.Next not safe #648

zhongweikang opened this issue Nov 6, 2024 · 29 comments
Assignees
Labels
bug Something isn't working needs fix pending development Requested PR owner to improve code and waiting for the result

Comments

@zhongweikang
Copy link
Contributor

Actions I've taken before I'm here

  • I've thoroughly read the documentations on this issue but still have no clue.
  • I've searched the Github Issues but didn't find any duplicate issues that have been resolved.
  • I've searched the internet for this issue but didn't find anything helpful.

What happened?

reopened: #647

@panjf2000 看到上一个被关了,不好意思,重提一个

我提前看过了文档,并且也熟读gnet源码,但是依然感觉有问题

文档里的表述是: the []byte buf returned by Next() is not allowed to be passed to a new goroutine

#647 提到的问题里,并没有将Next()返回的字节数组传递给新的goroutine,是Next本身内部实现造成的线程不安全

Next()方法返回的一刹那,就已经线程不安全了

Major version of gnet

v2

Specific version of gnet

v2

Operating system

Linux, macOS

OS version

Linux 3.10.0

Go version

go1.17.12

Relevant log output

Code snippets (optional)

No response

How to Reproduce

多个事件循环并发执行时概率复现

Does this issue reproduce with the latest release?

It can reproduce with the latest release

@zhongweikang zhongweikang added the bug Something isn't working label Nov 6, 2024
@zhongweikang
Copy link
Contributor Author

	# 问题代码
	defer c.inboundBuffer.Discard(n) //nolint:errcheck
	if len(head) >= n {
		return head[:n], err
	}
	# 可能的修复代码
	defer c.inboundBuffer.Discard(n) //nolint:errcheck
	if len(head) >= n {
		c.loop.cache.Reset()
		c.loop.cache.Write(head[:n])
		return c.loop.cache.Bytes(), err
	}

@gh-translator
Copy link
Collaborator

🤖 Non-English text detected, translating ...


#Problem code
defer c.inboundBuffer.Discard(n) //nolint:errcheck
if len(head) >= n {
return head[:n], err
}
#Possible fix code
defer c.inboundBuffer.Discard(n) //nolint:errcheck
if len(head) >= n {
c.loop.cache.Reset()
c.loop.cache.Write(head[:n])
return c.loop.cache.Bytes(), err
}

@zhongweikang
Copy link
Contributor Author

	defer c.inboundBuffer.Discard(n) // 这行代码,会将head回收到 sync.Pool 中
	if len(head) >= n {
		return head[:n], err // 这行代码直接将 head 返回了
	}

sync.Pool中被回收的字节数组,可能被其他协程复用

@gh-translator
Copy link
Collaborator

🤖 Non-English text detected, translating ...


defer c.inboundBuffer.Discard(n) // This line of code will recycle head into sync.Pool
if len(head) >= n {
return head[:n], err // This line of code directly returns head
}

The recycled byte array in sync.Pool may be reused by other coroutines

@panjf2000
Copy link
Owner

我重新看了一下,确实有这种可能,不过只有当 head 是全部数据而且 len(head) == n 时才会发生,因为 inboundBuffer.Discard 只会在所有数据都取完之后才将内部 buffer 放入 sync.Pool。这应该是唯一的可能,你在实际运行过程中有遇到过这个问题吗?

@gh-translator
Copy link
Collaborator

🤖 Non-English text detected, translating ...


I re-read it, and it is indeed possible, but it will only happen when head is all the data and len(head) == n, because inboundBuffer.Discard will only be fetched after all the data. Then the internal buffer is put into sync.Pool. This should be the only possibility. Have you ever encountered this problem in actual operation?

@panjf2000
Copy link
Owner

这个地方应该是把 inboundBufferring.Buffer 切换成 elastic.RingBuffer 之后才会有的问题,使用前者应该是没问题的。

@zhongweikang
Copy link
Contributor Author

我重新看了一下,确实有这种可能,不过只有当 head 是全部数据而且 len(head) == n 时才会发生,因为 inboundBuffer.Discard 只会在所有数据都取完之后才将内部 buffer 放入 sync.Pool。这应该是唯一的可能,你在实际运行过程中有遇到过这个问题吗?

最近刚遇到这个问题,最终定位到了这里,复现概率确实挺低的。

@gh-translator
Copy link
Collaborator

🤖 Non-English text detected, translating ...


I re-read it, and it is indeed possible, but it will only happen when head is all the data and len(head) == n, because inboundBuffer.Discard will only fetch all the data. After finishing, put the internal buffer into sync.Pool. This should be the only possibility. Have you ever encountered this problem in actual operation?

I just encountered this problem recently and finally located it here. The probability of recurrence is indeed very low.

@panjf2000
Copy link
Owner

我觉得这样修改应该就能修复这个问题:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..82c1d89b 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -326,6 +326,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
 	head, tail := c.inboundBuffer.Peek(n)
 	defer c.inboundBuffer.Discard(n) //nolint:errcheck
 	if len(head) >= n {
+		if inBufferLen == n && len(tail) == 0 {
+			c.loop.cache.Reset()
+			c.loop.cache.Write(head)
+			return c.loop.cache.Bytes(), err
+		}
 		return head[:n], err
 	}
 	c.loop.cache.Reset()

@gh-translator
Copy link
Collaborator

🤖 Non-English text detected, translating ...


I think this modification should fix the problem:

diff --git a/connection_unix.go b/connection_unix.go
indexcfce3a92..82c1d89b 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -326,6 +326,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
 head, tail := c.inboundBuffer.Peek(n)
 defer c.inboundBuffer.Discard(n) //nolint:errcheck
 if len(head) >= n {
+ if inBufferLen == n && len(tail) == 0 {
+ c.loop.cache.Reset()
+ c.loop.cache.Write(head)
+ return c.loop.cache.Bytes(), err
+ }
 return head[:n], err
 }
 c.loop.cache.Reset()

@panjf2000
Copy link
Owner

更简单的方法:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
 	}
 	head, tail := c.inboundBuffer.Peek(n)
 	defer c.inboundBuffer.Discard(n) //nolint:errcheck
-	if len(head) >= n {
-		return head[:n], err
-	}
 	c.loop.cache.Reset()
 	c.loop.cache.Write(head)
+	if len(head) >= n {
+		return c.loop.cache.Bytes(), err
+	}
 	c.loop.cache.Write(tail)
 	if inBufferLen >= n {
 		return c.loop.cache.Bytes(), err

不过这种方式就变成了每次都会复制数据,会有一点性能损耗。

@gh-translator
Copy link
Collaborator

🤖 Non-English text detected, translating ...


Easier way:

diff --git a/connection_unix.go b/connection_unix.go
indexcfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
 }
 head, tail := c.inboundBuffer.Peek(n)
 defer c.inboundBuffer.Discard(n) //nolint:errcheck
- if len(head) >= n {
- return head[:n], err
- }
 c.loop.cache.Reset()
 c.loop.cache.Write(head)
+ if len(head) >= n {
+ return c.loop.cache.Bytes(), err
+ }
 c.loop.cache.Write(tail)
 if inBufferLen >= n {
 return c.loop.cache.Bytes(), err

However, this method will copy data every time, which will cause a little performance loss.

@panjf2000 panjf2000 added pending development Requested PR owner to improve code and waiting for the result needs fix labels Nov 6, 2024
@zhongweikang
Copy link
Contributor Author

更简单的方法:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
 	}
 	head, tail := c.inboundBuffer.Peek(n)
 	defer c.inboundBuffer.Discard(n) //nolint:errcheck
-	if len(head) >= n {
-		return head[:n], err
-	}
 	c.loop.cache.Reset()
 	c.loop.cache.Write(head)
+	if len(head) >= n {
+		return c.loop.cache.Bytes(), err
+	}
 	c.loop.cache.Write(tail)
 	if inBufferLen >= n {
 		return c.loop.cache.Bytes(), err

不过这种方式就变成了每次都会复制数据,会有一点性能损耗。

我理解这个损耗应该是必须的,并且这里只是复制了一下数据,并没有新申请内存,感觉上还好~

@gh-translator
Copy link
Collaborator

🤖 Non-English text detected, translating ...


Easier way:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
}
head, tail := c.inboundBuffer.Peek(n)
defer c.inboundBuffer.Discard(n) //nolint:errcheck
- if len(head) >= n {
- return head[:n], err
- }
c.loop.cache.Reset()
c.loop.cache.Write(head)
+ if len(head) >= n {
+ return c.loop.cache.Bytes(), err
+ }
c.loop.cache.Write(tail)
if inBufferLen >= n {
return c.loop.cache.Bytes(), err

However, this method will copy the data every time, which will cause a little performance loss.

I understand that this loss should be necessary, and I just copied the data here and did not apply for new memory. It feels fine~

@panjf2000
Copy link
Owner

你方便提一个 PR 吗?
@zhongweikang

@gh-translator
Copy link
Collaborator

🤖 Non-English text detected, translating ...


Is it convenient for you to submit a PR?
@zhongweikang

@zhongweikang
Copy link
Contributor Author

你方便提一个 PR 吗? @zhongweikang

方便

@gh-translator
Copy link
Collaborator

🤖 Non-English text detected, translating ...


Is it convenient for you to submit a PR? @zhongweikang

convenient

@zhongweikang
Copy link
Contributor Author

更简单的方法:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
 	}
 	head, tail := c.inboundBuffer.Peek(n)
 	defer c.inboundBuffer.Discard(n) //nolint:errcheck
-	if len(head) >= n {
-		return head[:n], err
-	}
 	c.loop.cache.Reset()
 	c.loop.cache.Write(head)
+	if len(head) >= n {
+		return c.loop.cache.Bytes(), err
+	}
 	c.loop.cache.Write(tail)
 	if inBufferLen >= n {
 		return c.loop.cache.Bytes(), err

不过这种方式就变成了每次都会复制数据,会有一点性能损耗。

好像不行,要求读 n 个字节,这样把整个head全返回出去了

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a9..1bf11e9 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,10 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
        }
        head, tail := c.inboundBuffer.Peek(n)
        defer c.inboundBuffer.Discard(n) //nolint:errcheck
+       c.loop.cache.Reset()
        if len(head) >= n {
-               return head[:n], err
+               c.loop.cache.Write(head[:n])
+               return c.loop.cache.Bytes(), err
        }
-       c.loop.cache.Reset()
        c.loop.cache.Write(head)
        c.loop.cache.Write(tail)
        if inBufferLen >= n {

这样可以吗?

@gh-translator
Copy link
Collaborator

🤖 Non-English text detected, translating ...


Easier way:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
}
head, tail := c.inboundBuffer.Peek(n)
defer c.inboundBuffer.Discard(n) //nolint:errcheck
- if len(head) >= n {
- return head[:n], err
- }
c.loop.cache.Reset()
c.loop.cache.Write(head)
+ if len(head) >= n {
+ return c.loop.cache.Bytes(), err
+ }
c.loop.cache.Write(tail)
if inBufferLen >= n {
return c.loop.cache.Bytes(), err

However, this method will copy data every time, which will cause a little performance loss.

It doesn't seem to work. It requires reading n bytes, so the entire head is returned.

diff --git a/connection_unix.go b/connection_unix.go
indexcfce3a9..1bf11e9 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,10 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
        }
        head, tail := c.inboundBuffer.Peek(n)
        defer c.inboundBuffer.Discard(n) //nolint:errcheck
+ c.loop.cache.Reset()
        if len(head) >= n {
- return head[:n], err
+ c.loop.cache.Write(head[:n])
+ return c.loop.cache.Bytes(), err
        }
- c.loop.cache.Reset()
        c.loop.cache.Write(head)
        c.loop.cache.Write(tail)
        if inBufferLen >= n {

Is this okay?

@panjf2000
Copy link
Owner

确实,我前面忘了考虑这一点,就按你最后修改的那个来吧。

@gh-translator
Copy link
Collaborator

🤖 Non-English text detected, translating ...


Indeed, I forgot to consider this before, so I will just go with the last one you modified.

@panjf2000
Copy link
Owner

这么说起来后面的 tail 好像也有这个问题,顺便一起改了吧!

@gh-translator
Copy link
Collaborator

🤖 Non-English text detected, translating ...


Speaking of which, it seems that the tail behind it also has this problem, so let’s change it together!

zhongweikang pushed a commit to zhongweikang/gnet that referenced this issue Nov 6, 2024
@zhongweikang
Copy link
Contributor Author

更简单的方法:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
 	}
 	head, tail := c.inboundBuffer.Peek(n)
 	defer c.inboundBuffer.Discard(n) //nolint:errcheck
-	if len(head) >= n {
-		return head[:n], err
-	}
 	c.loop.cache.Reset()
 	c.loop.cache.Write(head)
+	if len(head) >= n {
+		return c.loop.cache.Bytes(), err
+	}
 	c.loop.cache.Write(tail)
 	if inBufferLen >= n {
 		return c.loop.cache.Bytes(), err

不过这种方式就变成了每次都会复制数据,会有一点性能损耗。

我实际测试了一下,你这个代码还是没问题,刚才理解错了

	head, tail := c.inboundBuffer.Peek(n) // Peek 只取了 n 个字节
	defer c.inboundBuffer.Discard(n)
	c.loop.cache.Reset()
	if len(head) >= n {
		c.loop.cache.Write(head) // head 的长度实际不会大于 n,所以这里 head 与 head[:n] 等价
		return c.loop.cache.Bytes(), err
	}

后面的tail同理也没问题,还是按你这个代码写了。

@gh-translator
Copy link
Collaborator

🤖 Non-English text detected, translating ...


Easier way:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
}
head, tail := c.inboundBuffer.Peek(n)
defer c.inboundBuffer.Discard(n) //nolint:errcheck
- if len(head) >= n {
- return head[:n], err
- }
c.loop.cache.Reset()
c.loop.cache.Write(head)
+ if len(head) >= n {
+ return c.loop.cache.Bytes(), err
+ }
c.loop.cache.Write(tail)
if inBufferLen >= n {
return c.loop.cache.Bytes(), err

However, this method will copy data every time, which will cause a little performance loss.

I actually tested it, and your code is still fine. I just misunderstood it.

head, tail := c.inboundBuffer.Peek(n) // Peek only takes n bytes
defer c.inboundBuffer.Discard(n)
c.loop.cache.Reset()
if len(head) >= n {
c.loop.cache.Write(head) // The length of head will not actually be greater than n, so here head is equivalent to head[:n]
return c.loop.cache.Bytes(), err
}

The following tail works in the same way, so I will write it according to your code.

@panjf2000
Copy link
Owner

好吧,我刚才也被你绕进去了,这样看应该就没问题了。

@gh-translator
Copy link
Collaborator

🤖 Non-English text detected, translating ...


Okay, you got me involved just now, so it should be fine.

zhongweikang pushed a commit to zhongweikang/gnet that referenced this issue Nov 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs fix pending development Requested PR owner to improve code and waiting for the result
Projects
None yet
Development

No branches or pull requests

3 participants