Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http reader support multiple prefix #678

Merged
merged 1 commit into from
Aug 8, 2018

Conversation

mockerzzz
Copy link
Contributor

@mockerzzz mockerzzz commented Jul 31, 2018

@mockerzzz mockerzzz force-pushed the prefix_socket_reader branch 5 times, most recently from 531a518 to d05b58f Compare August 6, 2018 05:13
@@ -319,19 +319,21 @@ func Test_SendData(t *testing.T) {
"senders": senders,
}

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

下面没等这个进程跑起来就会结束

wg.Add(1)
go func() {
got, err := httpReader.ReadLine()
assert.NoError(t, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goroutine里面用到的变量,如果没有当作参数传进去,就会出现并发错误,此处要把httpReader和t当作参数传入函数。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

下同

@@ -136,11 +142,10 @@ func (r *Reader) Close() error {
return nil
}
log.Debugf("Runner[%v] %q daemon is stopping", r.meta.RunnerName, r.Name())

close(r.readChan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里在http 仍然在serve的时候就close channel,可能会导致panic, send on closed channel

@mockerzzz mockerzzz force-pushed the prefix_socket_reader branch 3 times, most recently from 88df7d1 to 150500c Compare August 6, 2018 06:45
r.server.Shutdown(context.Background())
err := r.bufQueue.Close()
close(r.readChan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里要确认一下,如果http server在shutdown的之前,postData函数被阻塞了,比如数据来不及读导致channel传不进去数据,那么这里close会导致panic吗

r.readChan <- Details{
Content: line,
Path: path,
}
}

This comment was marked as resolved.

@wonderflow
Copy link
Contributor

尝试实际运行起来,然后用大量的请求去请求这个http reader,然后请求不要停,紧接着logkit关掉,看看现象

"github.com/qiniu/logkit/reader"
. "github.com/qiniu/logkit/utils/models"
"sync"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

移到最上面的分组


server *http.Server
}

func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
address, _ := conf.GetStringOr(reader.KeyHTTPServiceAddress, reader.DefaultHTTPServiceAddress)
path, _ := conf.GetStringOr(reader.KeyHTTPServicePath, reader.DefaultHTTPServicePath)
paths := strings.Split(path, ",")

This comment was marked as resolved.

if val[0] == '/' {
validPaths = append(validPaths, val)
} else {
log.Warnf("path[%v] not begin with '/',this path ignored", val)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里直接错误返回吧,不用Warn,反正是初始配置的时候


server *http.Server
}

func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
address, _ := conf.GetStringOr(reader.KeyHTTPServiceAddress, reader.DefaultHTTPServiceAddress)
path, _ := conf.GetStringOr(reader.KeyHTTPServicePath, reader.DefaultHTTPServicePath)
paths := strings.Split(path, ",")
for _, val := range paths {
if val[0] != '/' {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你这样写如果val是空就panic了,先trimeSpace,空的continue,然后检查 strings.HasPrefix()


server *http.Server
}

func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
address, _ := conf.GetStringOr(reader.KeyHTTPServiceAddress, reader.DefaultHTTPServiceAddress)
path, _ := conf.GetStringOr(reader.KeyHTTPServicePath, reader.DefaultHTTPServicePath)
paths := strings.Split(path, ",")
for _, val := range paths {
if !strings.HasPrefix(val, "/") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

空的过滤一下

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

考虑: /api,/ap2,,

@wonderflow
Copy link
Contributor

文档也对应更新一下 mars上

r.server.Shutdown(context.Background())
err := r.bufQueue.Close()
//Note:确保不会往已关闭的channel中写入message
Copy link
Contributor

@unknwon unknwon Aug 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里描述不对吧,应该是和 wg.Done 一个作用,因为是 wait 完才 close 的 channel

@wonderflow
Copy link
Contributor

LGTM

@unknwon
Copy link
Contributor

unknwon commented Aug 8, 2018

LGTM

@wonderflow wonderflow merged commit c7f9ff7 into qiniu:master Aug 8, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants