Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http reader support multiple prefix #678

Merged
merged 1 commit into from
Aug 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions mgr/dataflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"testing"
"time"
"sync"

"github.com/json-iterator/go"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -234,7 +235,7 @@ func Test_getTransformer(t *testing.T) {

func Test_SendData(t *testing.T) {
c := conf.MapConf{
reader.KeyHTTPServiceAddress: ":8000",
reader.KeyHTTPServiceAddress: "127.0.0.1:8000",
reader.KeyHTTPServicePath: "/logkit/data",
}
readConf := conf.MapConf{
Expand Down Expand Up @@ -318,20 +319,25 @@ func Test_SendData(t *testing.T) {
"sampleLog": testInput,
"senders": senders,
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
for _, exp := range testJsonExp {
got, err := httpReader.ReadLine()
assert.NoError(t, err)
for _, e := range exp {
if !strings.Contains(got, e) {
t.Fatalf("exp: %v contains %v, but not", got, e)
}
}
}
wg.Done()
}()
err = SendData(senderConfig)
if err != nil {
t.Error(err)
}
for _, exp := range testJsonExp {
got, err := httpReader.ReadLine()
assert.NoError(t, err)
for _, e := range exp {
if !strings.Contains(got, e) {
t.Fatalf("exp: %v contains %v, but not", got, e)
}
}
}
wg.Wait()
}

func Test_getSendersConfig(t *testing.T) {
Expand Down
70 changes: 50 additions & 20 deletions reader/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"io"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -16,7 +18,6 @@ import (
"github.com/qiniu/log"

"github.com/qiniu/logkit/conf"
"github.com/qiniu/logkit/queue"
"github.com/qiniu/logkit/reader"
. "github.com/qiniu/logkit/utils/models"
)
Expand All @@ -37,38 +38,51 @@ func init() {
reader.RegisterConstructor(reader.ModeHTTP, NewReader)
}

type Details struct {
Content string
Path string
}

type Reader struct {
meta *reader.Meta
// Note: 原子操作,用于表示 reader 整体的运行状态
status int32

bufQueue queue.BackendQueue
readChan <-chan []byte
readChan chan Details

address string
path string
currentPath string
address string
paths []string
wg sync.WaitGroup

server *http.Server
}

func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
address, _ := conf.GetStringOr(reader.KeyHTTPServiceAddress, reader.DefaultHTTPServiceAddress)
path, _ := conf.GetStringOr(reader.KeyHTTPServicePath, reader.DefaultHTTPServicePath)
paths := strings.Split(path, ",")

This comment was marked as resolved.

for _, val := range paths {
if strings.TrimSpace(val) == "" {
log.Infof("path[%v] have space,space have ignored", path)
continue
}
if !strings.HasPrefix(val, "/") {
return nil, fmt.Errorf("path[%v] is incorrect,it's beginning must be '/'", val)
}
}
address, _ = RemoveHttpProtocal(address)

bq := queue.NewDiskQueue(Hash("Reader<"+address+">_buffer"), meta.BufFile(), DefaultMaxBytesPerFile, 0,
DefaultMaxBytesPerFile, DefaultSyncEvery, DefaultSyncEvery, time.Second*2, DefaultWriteSpeedLimit, false, 0)
err := CreateDirIfNotExist(meta.BufFile())
if err != nil {
return nil, err
}
return &Reader{
meta: meta,
status: reader.StatusInit,
bufQueue: bq,
readChan: bq.ReadChan(),
readChan: make(chan Details, len(paths)),
address: address,
path: path,
paths: paths,
}, nil
}

Expand Down Expand Up @@ -97,7 +111,9 @@ func (r *Reader) Start() error {
}

e := echo.New()
e.POST(r.path, r.postData())
for _, path := range r.paths {
e.POST(path, r.postData())
}

r.server = &http.Server{
Handler: e,
Expand All @@ -113,15 +129,21 @@ func (r *Reader) Start() error {
}

func (r *Reader) Source() string {
return r.address
return r.address + r.currentPath
}

func (r *Reader) ReadLine() (string, error) {
timer := time.NewTimer(time.Second)
defer timer.Stop()
select {
case data := <-r.readChan:
return string(data), nil
case data, ok := <-r.readChan:
// Note:防止waitgroup.wait()已经通过的情况下再次调用waitgroup.done()
if ok {
//Note:确保所有数据被读取后,再关闭channel
r.wg.Done()
}
r.currentPath = data.Path
return data.Content, nil
case <-timer.C:
}

Expand All @@ -136,11 +158,12 @@ func (r *Reader) Close() error {
return nil
}
log.Debugf("Runner[%v] %q daemon is stopping", r.meta.RunnerName, r.Name())

r.server.Shutdown(context.Background())
err := r.bufQueue.Close()
//Note:确保所有数据被读取后,再关闭channel
r.wg.Wait()
close(r.readChan)
atomic.StoreInt32(&r.status, reader.StatusStopped)
return err
return nil
}

func (r *Reader) postData() echo.HandlerFunc {
Expand All @@ -167,10 +190,10 @@ func (r *Reader) pickUpData(req *http.Request) (err error) {
}
}
br := bufio.NewReader(reqBody)
return r.storageData(br)
return r.storageData(br, req.RequestURI)
}

func (r *Reader) storageData(br *bufio.Reader) (err error) {
func (r *Reader) storageData(br *bufio.Reader, path string) (err error) {
for {
line, err := r.readLine(br)
if err != nil {
Expand All @@ -182,7 +205,14 @@ func (r *Reader) storageData(br *bufio.Reader) (err error) {
if line == "" {
continue
}
r.bufQueue.Put([]byte(line))
if atomic.LoadInt32(&r.status) == reader.StatusStopped || atomic.LoadInt32(&r.status) == reader.StatusStopping {
return err
}
r.wg.Add(1)
r.readChan <- Details{
Content: line,
Path: path,
}
}

This comment was marked as resolved.

return
}
Expand Down
50 changes: 33 additions & 17 deletions reader/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"os"
"testing"
"time"
"strings"
"sync"

"github.com/stretchr/testify/assert"

Expand All @@ -29,8 +31,8 @@ func TestNewHttpReader(t *testing.T) {
defer os.RemoveAll("./meta")

c := conf.MapConf{
reader.KeyHTTPServiceAddress: ":7110",
reader.KeyHTTPServicePath: "/logkit/data",
reader.KeyHTTPServiceAddress: "127.0.0.1:7110",
reader.KeyHTTPServicePath: "/logkit/aaa,/logkit/bbb,/logkit/ccc,/logkit/ddd",
}
hhttpReader, err := NewReader(meta, c)
assert.NoError(t, err)
Expand All @@ -47,38 +49,52 @@ func TestNewHttpReader(t *testing.T) {
"zxcvbnm,./.,mnbvcxz",
"asdfghjkl;';lkjhgfdsa",
}
paths := strings.Split("/logkit/aaa,/logkit/bbb,/logkit/ccc,/logkit/ddd", ",")

// 测试正常发送
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:7110/logkit/data", nil)
assert.NoError(t, err)
for _, val := range testData {
var wg sync.WaitGroup
for index, val := range testData {
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:7110"+paths[index], nil)
assert.NoError(t, err)
wg.Add(1)
go func(httpReader *Reader, t *testing.T, index int, val string) {
got, err := httpReader.ReadLine()
assert.NoError(t, err)
assert.Equal(t, val, got)
assert.Equal(t, "127.0.0.1:7110"+paths[index], httpReader.Source())
wg.Done()
}(httpReader, t, index, val)
req.Body = ioutil.NopCloser(bytes.NewReader([]byte(val)))
resp, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
got, err := httpReader.ReadLine()
assert.NoError(t, err)
assert.Equal(t, val, got)
wg.Wait()
}

// 测试 gzip 发送
req, err = http.NewRequest(http.MethodPost, "http://127.0.0.1:7110/logkit/data", nil)
req.Header.Set(ContentTypeHeader, ApplicationGzip)
req.Header.Set(ContentEncodingHeader, "gzip")
assert.NoError(t, err)
for _, val := range testData {
for index, val := range testData {
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:7110"+paths[index], nil)
req.Header.Set(ContentTypeHeader, ApplicationGzip)
req.Header.Set(ContentEncodingHeader, "gzip")
assert.NoError(t, err)
wg.Add(1)
var buf bytes.Buffer
g := gzip.NewWriter(&buf)
_, err := g.Write([]byte(val))
_, err = g.Write([]byte(val))
assert.NoError(t, err)
g.Close()
byteVal := buf.Bytes()
go func(httpReader *Reader, t *testing.T, index int, val string) {
got, err := httpReader.ReadLine()
assert.NoError(t, err)
assert.Equal(t, val, got)
assert.Equal(t, "127.0.0.1:7110"+paths[index], httpReader.Source())
wg.Done()
}(httpReader, t, index, val)
req.Body = ioutil.NopCloser(bytes.NewReader(byteVal))
resp, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
got, err := httpReader.ReadLine()
assert.NoError(t, err)
assert.Equal(t, val, got)
wg.Wait()
}
}
1 change: 1 addition & 0 deletions reader/rest_reader_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,7 @@ var ModeKeyOptions = map[string][]Option{
Description: "监听地址前缀(http_service_path)",
ToolTip: "监听的请求地址,如 /data ",
},
OptionDataSourceTag,
},
ModeScript: {
{
Expand Down
Loading