diff --git a/mgr/dataflow_test.go b/mgr/dataflow_test.go index d4b3925e8..dc076158e 100644 --- a/mgr/dataflow_test.go +++ b/mgr/dataflow_test.go @@ -8,6 +8,7 @@ import ( "strings" "testing" "time" + "sync" "github.com/json-iterator/go" "github.com/stretchr/testify/assert" @@ -234,7 +235,7 @@ func Test_getTransformer(t *testing.T) { func Test_SendData(t *testing.T) { c := conf.MapConf{ - reader.KeyHTTPServiceAddress: ":8000", + reader.KeyHTTPServiceAddress: "127.0.0.1:8000", reader.KeyHTTPServicePath: "/logkit/data", } readConf := conf.MapConf{ @@ -318,20 +319,25 @@ func Test_SendData(t *testing.T) { "sampleLog": testInput, "senders": senders, } - + var wg sync.WaitGroup + wg.Add(1) + go func() { + for _, exp := range testJsonExp { + got, err := httpReader.ReadLine() + assert.NoError(t, err) + for _, e := range exp { + if !strings.Contains(got, e) { + t.Fatalf("exp: %v contains %v, but not", got, e) + } + } + } + wg.Done() + }() err = SendData(senderConfig) if err != nil { t.Error(err) } - for _, exp := range testJsonExp { - got, err := httpReader.ReadLine() - assert.NoError(t, err) - for _, e := range exp { - if !strings.Contains(got, e) { - t.Fatalf("exp: %v contains %v, but not", got, e) - } - } - } + wg.Wait() } func Test_getSendersConfig(t *testing.T) { diff --git a/reader/http/http.go b/reader/http/http.go index 71eee8448..4444e3213 100644 --- a/reader/http/http.go +++ b/reader/http/http.go @@ -8,6 +8,8 @@ import ( "fmt" "io" "net/http" + "strings" + "sync" "sync/atomic" "time" @@ -16,7 +18,6 @@ import ( "github.com/qiniu/log" "github.com/qiniu/logkit/conf" - "github.com/qiniu/logkit/queue" "github.com/qiniu/logkit/reader" . "github.com/qiniu/logkit/utils/models" ) @@ -37,16 +38,22 @@ func init() { reader.RegisterConstructor(reader.ModeHTTP, NewReader) } +type Details struct { + Content string + Path string +} + type Reader struct { meta *reader.Meta // Note: 原子操作,用于表示 reader 整体的运行状态 status int32 - bufQueue queue.BackendQueue - readChan <-chan []byte + readChan chan Details - address string - path string + currentPath string + address string + paths []string + wg sync.WaitGroup server *http.Server } @@ -54,10 +61,18 @@ type Reader struct { func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) { address, _ := conf.GetStringOr(reader.KeyHTTPServiceAddress, reader.DefaultHTTPServiceAddress) path, _ := conf.GetStringOr(reader.KeyHTTPServicePath, reader.DefaultHTTPServicePath) + paths := strings.Split(path, ",") + for _, val := range paths { + if strings.TrimSpace(val) == "" { + log.Infof("path[%v] have space,space have ignored", path) + continue + } + if !strings.HasPrefix(val, "/") { + return nil, fmt.Errorf("path[%v] is incorrect,it's beginning must be '/'", val) + } + } address, _ = RemoveHttpProtocal(address) - bq := queue.NewDiskQueue(Hash("Reader<"+address+">_buffer"), meta.BufFile(), DefaultMaxBytesPerFile, 0, - DefaultMaxBytesPerFile, DefaultSyncEvery, DefaultSyncEvery, time.Second*2, DefaultWriteSpeedLimit, false, 0) err := CreateDirIfNotExist(meta.BufFile()) if err != nil { return nil, err @@ -65,10 +80,9 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) { return &Reader{ meta: meta, status: reader.StatusInit, - bufQueue: bq, - readChan: bq.ReadChan(), + readChan: make(chan Details, len(paths)), address: address, - path: path, + paths: paths, }, nil } @@ -97,7 +111,9 @@ func (r *Reader) Start() error { } e := echo.New() - e.POST(r.path, r.postData()) + for _, path := range r.paths { + e.POST(path, r.postData()) + } r.server = &http.Server{ Handler: e, @@ -113,15 +129,21 @@ func (r *Reader) Start() error { } func (r *Reader) Source() string { - return r.address + return r.address + r.currentPath } func (r *Reader) ReadLine() (string, error) { timer := time.NewTimer(time.Second) defer timer.Stop() select { - case data := <-r.readChan: - return string(data), nil + case data, ok := <-r.readChan: + // Note:防止waitgroup.wait()已经通过的情况下再次调用waitgroup.done() + if ok { + //Note:确保所有数据被读取后,再关闭channel + r.wg.Done() + } + r.currentPath = data.Path + return data.Content, nil case <-timer.C: } @@ -136,11 +158,12 @@ func (r *Reader) Close() error { return nil } log.Debugf("Runner[%v] %q daemon is stopping", r.meta.RunnerName, r.Name()) - r.server.Shutdown(context.Background()) - err := r.bufQueue.Close() + //Note:确保所有数据被读取后,再关闭channel + r.wg.Wait() + close(r.readChan) atomic.StoreInt32(&r.status, reader.StatusStopped) - return err + return nil } func (r *Reader) postData() echo.HandlerFunc { @@ -167,10 +190,10 @@ func (r *Reader) pickUpData(req *http.Request) (err error) { } } br := bufio.NewReader(reqBody) - return r.storageData(br) + return r.storageData(br, req.RequestURI) } -func (r *Reader) storageData(br *bufio.Reader) (err error) { +func (r *Reader) storageData(br *bufio.Reader, path string) (err error) { for { line, err := r.readLine(br) if err != nil { @@ -182,7 +205,14 @@ func (r *Reader) storageData(br *bufio.Reader) (err error) { if line == "" { continue } - r.bufQueue.Put([]byte(line)) + if atomic.LoadInt32(&r.status) == reader.StatusStopped || atomic.LoadInt32(&r.status) == reader.StatusStopping { + return err + } + r.wg.Add(1) + r.readChan <- Details{ + Content: line, + Path: path, + } } return } diff --git a/reader/http/http_test.go b/reader/http/http_test.go index 2dd9be6e7..bf23183cc 100644 --- a/reader/http/http_test.go +++ b/reader/http/http_test.go @@ -8,6 +8,8 @@ import ( "os" "testing" "time" + "strings" + "sync" "github.com/stretchr/testify/assert" @@ -29,8 +31,8 @@ func TestNewHttpReader(t *testing.T) { defer os.RemoveAll("./meta") c := conf.MapConf{ - reader.KeyHTTPServiceAddress: ":7110", - reader.KeyHTTPServicePath: "/logkit/data", + reader.KeyHTTPServiceAddress: "127.0.0.1:7110", + reader.KeyHTTPServicePath: "/logkit/aaa,/logkit/bbb,/logkit/ccc,/logkit/ddd", } hhttpReader, err := NewReader(meta, c) assert.NoError(t, err) @@ -47,38 +49,52 @@ func TestNewHttpReader(t *testing.T) { "zxcvbnm,./.,mnbvcxz", "asdfghjkl;';lkjhgfdsa", } + paths := strings.Split("/logkit/aaa,/logkit/bbb,/logkit/ccc,/logkit/ddd", ",") // 测试正常发送 - req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:7110/logkit/data", nil) - assert.NoError(t, err) - for _, val := range testData { + var wg sync.WaitGroup + for index, val := range testData { + req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:7110"+paths[index], nil) + assert.NoError(t, err) + wg.Add(1) + go func(httpReader *Reader, t *testing.T, index int, val string) { + got, err := httpReader.ReadLine() + assert.NoError(t, err) + assert.Equal(t, val, got) + assert.Equal(t, "127.0.0.1:7110"+paths[index], httpReader.Source()) + wg.Done() + }(httpReader, t, index, val) req.Body = ioutil.NopCloser(bytes.NewReader([]byte(val))) resp, err := http.DefaultClient.Do(req) assert.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode) - got, err := httpReader.ReadLine() - assert.NoError(t, err) - assert.Equal(t, val, got) + wg.Wait() } // 测试 gzip 发送 - req, err = http.NewRequest(http.MethodPost, "http://127.0.0.1:7110/logkit/data", nil) - req.Header.Set(ContentTypeHeader, ApplicationGzip) - req.Header.Set(ContentEncodingHeader, "gzip") - assert.NoError(t, err) - for _, val := range testData { + for index, val := range testData { + req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:7110"+paths[index], nil) + req.Header.Set(ContentTypeHeader, ApplicationGzip) + req.Header.Set(ContentEncodingHeader, "gzip") + assert.NoError(t, err) + wg.Add(1) var buf bytes.Buffer g := gzip.NewWriter(&buf) - _, err := g.Write([]byte(val)) + _, err = g.Write([]byte(val)) assert.NoError(t, err) g.Close() byteVal := buf.Bytes() + go func(httpReader *Reader, t *testing.T, index int, val string) { + got, err := httpReader.ReadLine() + assert.NoError(t, err) + assert.Equal(t, val, got) + assert.Equal(t, "127.0.0.1:7110"+paths[index], httpReader.Source()) + wg.Done() + }(httpReader, t, index, val) req.Body = ioutil.NopCloser(bytes.NewReader(byteVal)) resp, err := http.DefaultClient.Do(req) assert.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode) - got, err := httpReader.ReadLine() - assert.NoError(t, err) - assert.Equal(t, val, got) + wg.Wait() } } diff --git a/reader/rest_reader_models.go b/reader/rest_reader_models.go index 216b689e2..79660fa5d 100644 --- a/reader/rest_reader_models.go +++ b/reader/rest_reader_models.go @@ -1103,6 +1103,7 @@ var ModeKeyOptions = map[string][]Option{ Description: "监听地址前缀(http_service_path)", ToolTip: "监听的请求地址,如 /data ", }, + OptionDataSourceTag, }, ModeScript: { { diff --git a/sender/http/http_test.go b/sender/http/http_test.go index 2ce6bead0..195488338 100644 --- a/sender/http/http_test.go +++ b/sender/http/http_test.go @@ -5,13 +5,14 @@ import ( "compress/gzip" "io/ioutil" "os" - "strings" "testing" "time" + "strings" + "sync" "github.com/stretchr/testify/assert" - "github.com/json-iterator/go" + "github.com/qiniu/logkit/conf" "github.com/qiniu/logkit/reader" "github.com/qiniu/logkit/reader/http" @@ -31,7 +32,7 @@ func TestHTTPSender(t *testing.T) { defer os.RemoveAll("./meta") c := conf.MapConf{ - reader.KeyHTTPServiceAddress: ":8000", + reader.KeyHTTPServiceAddress: "127.0.0.1:8000", reader.KeyHTTPServicePath: "/logkit/data", } reader, err := http.NewReader(meta, c) @@ -130,18 +131,29 @@ func TestHTTPSender(t *testing.T) { httpSender, err := NewSender(senderConf) assert.NoError(t, err) + var wg sync.WaitGroup for _, val := range testData { - httpSender.Send(val.input) - for _, exp := range val.jsonExp { - got, err := httpReader.ReadLine() - assert.NoError(t, err) - for _, e := range exp { - if !strings.Contains(got, e) { - t.Fatalf("exp: %v contains %v, but not", got, e) + wg.Add(1) + go func(httpReader *http.Reader, val struct { + input []Data + jsonExp [][]string + csvExp []map[string]string + bodyJSONExp string + }, t *testing.T) { + for _, exp := range val.jsonExp { + got, err := httpReader.ReadLine() + assert.NoError(t, err) + for _, e := range exp { + if !strings.Contains(got, e) { + t.Fatalf("exp: %v contains %v, but not", got, e) + } } } - } + wg.Done() + }(httpReader, val, t) + httpSender.Send(val.input) } + wg.Wait() // gzip = false, protocol = json senderConf = conf.MapConf{ @@ -156,17 +168,27 @@ func TestHTTPSender(t *testing.T) { assert.NoError(t, err) for _, val := range testData { - httpSender.Send(val.input) - for _, exp := range val.jsonExp { - got, err := httpReader.ReadLine() - assert.NoError(t, err) - for _, e := range exp { - if !strings.Contains(got, e) { - t.Fatalf("exp: %v contains %v, but not", got, e) + wg.Add(1) + go func(httpReader *http.Reader, val struct { + input []Data + jsonExp [][]string + csvExp []map[string]string + bodyJSONExp string + }, t *testing.T) { + for _, exp := range val.jsonExp { + got, err := httpReader.ReadLine() + assert.NoError(t, err) + for _, e := range exp { + if !strings.Contains(got, e) { + t.Fatalf("exp: %v contains %v, but not", got, e) + } } } - } + wg.Done() + }(httpReader, val, t) + httpSender.Send(val.input) } + wg.Wait() // gzip = true, protocol = csv, csvHead = true senderConf = conf.MapConf{ @@ -181,27 +203,37 @@ func TestHTTPSender(t *testing.T) { assert.NoError(t, err) for _, val := range testData { - httpSender.Send(val.input) - head, err := httpReader.ReadLine() - assert.NoError(t, err) - headArray := strings.Split(head, "\t") - for _, expMap := range val.csvExp { - gotStr, err := httpReader.ReadLine() + wg.Add(1) + go func(httpReader *http.Reader, val struct { + input []Data + jsonExp [][]string + csvExp []map[string]string + bodyJSONExp string + }, t *testing.T) { + head, err := httpReader.ReadLine() assert.NoError(t, err) - gotArray := strings.Split(gotStr, "\t") - assert.Equal(t, len(expMap), len(gotArray)) - assert.Equal(t, len(expMap), len(headArray)) - for i, head := range headArray { - exp, ok := expMap[head] - assert.Equal(t, true, ok) - if head == "d" && len(exp) != len(gotArray[i]) { - t.Fatalf("error: exp %v, but got %v", exp, gotArray[i]) - } else { - assert.Equal(t, exp, gotArray[i]) + headArray := strings.Split(head, "\t") + for _, expMap := range val.csvExp { + gotStr, err := httpReader.ReadLine() + assert.NoError(t, err) + gotArray := strings.Split(gotStr, "\t") + assert.Equal(t, len(expMap), len(gotArray)) + assert.Equal(t, len(expMap), len(headArray)) + for i, head := range headArray { + exp, ok := expMap[head] + assert.Equal(t, true, ok) + if head == "d" && len(exp) != len(gotArray[i]) { + t.Fatalf("error: exp %v, but got %v", exp, gotArray[i]) + } else { + assert.Equal(t, exp, gotArray[i]) + } } } - } + wg.Done() + }(httpReader, val, t) + httpSender.Send(val.input) } + wg.Wait() // gzip = false, protocol = csv, csvHead = true senderConf = conf.MapConf{ @@ -216,27 +248,37 @@ func TestHTTPSender(t *testing.T) { assert.NoError(t, err) for _, val := range testData { - httpSender.Send(val.input) - head, err := httpReader.ReadLine() - assert.NoError(t, err) - headArray := strings.Split(head, "\t") - for _, expMap := range val.csvExp { - gotStr, err := httpReader.ReadLine() + wg.Add(1) + go func(httpReader *http.Reader, val struct { + input []Data + jsonExp [][]string + csvExp []map[string]string + bodyJSONExp string + }, t *testing.T) { + head, err := httpReader.ReadLine() assert.NoError(t, err) - gotArray := strings.Split(gotStr, "\t") - assert.Equal(t, len(expMap), len(gotArray)) - assert.Equal(t, len(expMap), len(headArray)) - for i, head := range headArray { - exp, ok := expMap[head] - assert.Equal(t, true, ok) - if head == "d" && len(exp) != len(gotArray[i]) { - t.Fatalf("error: exp %v, but got %v", exp, gotArray[i]) - } else { - assert.Equal(t, exp, gotArray[i]) + headArray := strings.Split(head, "\t") + for _, expMap := range val.csvExp { + gotStr, err := httpReader.ReadLine() + assert.NoError(t, err) + gotArray := strings.Split(gotStr, "\t") + assert.Equal(t, len(expMap), len(gotArray)) + assert.Equal(t, len(expMap), len(headArray)) + for i, head := range headArray { + exp, ok := expMap[head] + assert.Equal(t, true, ok) + if head == "d" && len(exp) != len(gotArray[i]) { + t.Fatalf("error: exp %v, but got %v", exp, gotArray[i]) + } else { + assert.Equal(t, exp, gotArray[i]) + } } } - } + wg.Done() + }(httpReader, val, t) + httpSender.Send(val.input) } + wg.Wait() // gzip = true, protocol = csv, csvHead = false senderConf = conf.MapConf{ @@ -251,15 +293,25 @@ func TestHTTPSender(t *testing.T) { assert.NoError(t, err) for _, val := range testData { - httpSender.Send(val.input) - for i := 0; i < len(val.csvExp); i++ { - _, err := httpReader.ReadLine() + wg.Add(1) + go func(httpReader *http.Reader, val struct { + input []Data + jsonExp [][]string + csvExp []map[string]string + bodyJSONExp string + }, t *testing.T) { + for i := 0; i < len(val.input); i++ { + _, err := httpReader.ReadLine() + assert.NoError(t, err) + } + got, err := httpReader.ReadLine() assert.NoError(t, err) - } - got, err := httpReader.ReadLine() - assert.NoError(t, err) - assert.Equal(t, "", got) + assert.Equal(t, "", got) + wg.Done() + }(httpReader, val, t) + httpSender.Send(val.input) } + wg.Wait() // gzip = true, protocol = body_json, csvHead = false senderConf = conf.MapConf{ @@ -274,24 +326,34 @@ func TestHTTPSender(t *testing.T) { assert.NoError(t, err) for _, val := range testData { - err = httpSender.Send(val.input) - if err != nil { - t.Fatal(err) - } - got, err := httpReader.ReadLine() - assert.NoError(t, err) - var exps, datas []Data + wg.Add(1) + go func(httpReader *http.Reader, val struct { + input []Data + jsonExp [][]string + csvExp []map[string]string + bodyJSONExp string + }, t *testing.T) { + got, err := httpReader.ReadLine() + assert.NoError(t, err) + var exps, datas []Data - err = jsoniter.Unmarshal([]byte(got), &datas) - if err != nil { - t.Fatal(err) - } - err = jsoniter.Unmarshal([]byte(val.bodyJSONExp), &exps) + err = jsoniter.Unmarshal([]byte(got), &datas) + if err != nil { + t.Fatal(err) + } + err = jsoniter.Unmarshal([]byte(val.bodyJSONExp), &exps) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, exps, datas) + wg.Done() + }(httpReader, val, t) + err = httpSender.Send(val.input) if err != nil { t.Fatal(err) } - assert.Equal(t, exps, datas) } + wg.Wait() } func TestGzipData(t *testing.T) {