Skip to content

Commit

Permalink
http reader support multiple prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
mockerzzz committed Aug 6, 2018
1 parent dca1753 commit 150500c
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 120 deletions.
28 changes: 17 additions & 11 deletions mgr/dataflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"testing"
"time"
"sync"

"github.com/json-iterator/go"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -234,7 +235,7 @@ func Test_getTransformer(t *testing.T) {

func Test_SendData(t *testing.T) {
c := conf.MapConf{
reader.KeyHTTPServiceAddress: ":8000",
reader.KeyHTTPServiceAddress: "127.0.0.1:8000",
reader.KeyHTTPServicePath: "/logkit/data",
}
readConf := conf.MapConf{
Expand Down Expand Up @@ -318,20 +319,25 @@ func Test_SendData(t *testing.T) {
"sampleLog": testInput,
"senders": senders,
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
for _, exp := range testJsonExp {
got, err := httpReader.ReadLine()
assert.NoError(t, err)
for _, e := range exp {
if !strings.Contains(got, e) {
t.Fatalf("exp: %v contains %v, but not", got, e)
}
}
}
wg.Done()
}()
err = SendData(senderConfig)
if err != nil {
t.Error(err)
}
for _, exp := range testJsonExp {
got, err := httpReader.ReadLine()
assert.NoError(t, err)
for _, e := range exp {
if !strings.Contains(got, e) {
t.Fatalf("exp: %v contains %v, but not", got, e)
}
}
}
wg.Wait()
}

func Test_getSendersConfig(t *testing.T) {
Expand Down
46 changes: 27 additions & 19 deletions reader/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
"net/http"
"sync/atomic"
"time"
"strings"

"github.com/labstack/echo"

"github.com/qiniu/log"

"github.com/qiniu/logkit/conf"
"github.com/qiniu/logkit/queue"
"github.com/qiniu/logkit/reader"
. "github.com/qiniu/logkit/utils/models"
)
Expand All @@ -37,38 +37,41 @@ func init() {
reader.RegisterConstructor(reader.ModeHTTP, NewReader)
}

type Details struct {
Content string
Path string
}

type Reader struct {
meta *reader.Meta
// Note: 原子操作,用于表示 reader 整体的运行状态
status int32

bufQueue queue.BackendQueue
readChan <-chan []byte
readChan chan Details

address string
path string
currentPath string
address string
paths []string

server *http.Server
}

func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
address, _ := conf.GetStringOr(reader.KeyHTTPServiceAddress, reader.DefaultHTTPServiceAddress)
path, _ := conf.GetStringOr(reader.KeyHTTPServicePath, reader.DefaultHTTPServicePath)
paths := strings.Split(path, ",")
address, _ = RemoveHttpProtocal(address)

bq := queue.NewDiskQueue(Hash("Reader<"+address+">_buffer"), meta.BufFile(), DefaultMaxBytesPerFile, 0,
DefaultMaxBytesPerFile, DefaultSyncEvery, DefaultSyncEvery, time.Second*2, DefaultWriteSpeedLimit, false, 0)
err := CreateDirIfNotExist(meta.BufFile())
if err != nil {
return nil, err
}
return &Reader{
meta: meta,
status: reader.StatusInit,
bufQueue: bq,
readChan: bq.ReadChan(),
readChan: make(chan Details, len(paths)),
address: address,
path: path,
paths: paths,
}, nil
}

Expand Down Expand Up @@ -97,7 +100,9 @@ func (r *Reader) Start() error {
}

e := echo.New()
e.POST(r.path, r.postData())
for _, path := range r.paths {
e.POST(path, r.postData())
}

r.server = &http.Server{
Handler: e,
Expand All @@ -113,15 +118,16 @@ func (r *Reader) Start() error {
}

func (r *Reader) Source() string {
return r.address
return r.address + r.currentPath
}

func (r *Reader) ReadLine() (string, error) {
timer := time.NewTimer(time.Second)
defer timer.Stop()
select {
case data := <-r.readChan:
return string(data), nil
r.currentPath = data.Path
return data.Content, nil
case <-timer.C:
}

Expand All @@ -136,11 +142,10 @@ func (r *Reader) Close() error {
return nil
}
log.Debugf("Runner[%v] %q daemon is stopping", r.meta.RunnerName, r.Name())

r.server.Shutdown(context.Background())
err := r.bufQueue.Close()
close(r.readChan)
atomic.StoreInt32(&r.status, reader.StatusStopped)
return err
return nil
}

func (r *Reader) postData() echo.HandlerFunc {
Expand All @@ -167,10 +172,10 @@ func (r *Reader) pickUpData(req *http.Request) (err error) {
}
}
br := bufio.NewReader(reqBody)
return r.storageData(br)
return r.storageData(br, req.RequestURI)
}

func (r *Reader) storageData(br *bufio.Reader) (err error) {
func (r *Reader) storageData(br *bufio.Reader, path string) (err error) {
for {
line, err := r.readLine(br)
if err != nil {
Expand All @@ -182,7 +187,10 @@ func (r *Reader) storageData(br *bufio.Reader) (err error) {
if line == "" {
continue
}
r.bufQueue.Put([]byte(line))
r.readChan <- Details{
Content: line,
Path: path,
}
}
return
}
Expand Down
50 changes: 33 additions & 17 deletions reader/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"os"
"testing"
"time"
"strings"
"sync"

"github.com/stretchr/testify/assert"

Expand All @@ -29,8 +31,8 @@ func TestNewHttpReader(t *testing.T) {
defer os.RemoveAll("./meta")

c := conf.MapConf{
reader.KeyHTTPServiceAddress: ":7110",
reader.KeyHTTPServicePath: "/logkit/data",
reader.KeyHTTPServiceAddress: "127.0.0.1:7110",
reader.KeyHTTPServicePath: "/logkit/aaa,/logkit/bbb,/logkit/ccc,/logkit/ddd",
}
hhttpReader, err := NewReader(meta, c)
assert.NoError(t, err)
Expand All @@ -47,38 +49,52 @@ func TestNewHttpReader(t *testing.T) {
"zxcvbnm,./.,mnbvcxz",
"asdfghjkl;';lkjhgfdsa",
}
paths := strings.Split("/logkit/aaa,/logkit/bbb,/logkit/ccc,/logkit/ddd", ",")

// 测试正常发送
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:7110/logkit/data", nil)
assert.NoError(t, err)
for _, val := range testData {
var wg sync.WaitGroup
for index, val := range testData {
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:7110"+paths[index], nil)
assert.NoError(t, err)
wg.Add(1)
go func(httpReader *Reader, t *testing.T, index int, val string) {
got, err := httpReader.ReadLine()
assert.NoError(t, err)
assert.Equal(t, val, got)
assert.Equal(t, "127.0.0.1:7110"+paths[index], httpReader.Source())
wg.Done()
}(httpReader, t, index, val)
req.Body = ioutil.NopCloser(bytes.NewReader([]byte(val)))
resp, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
got, err := httpReader.ReadLine()
assert.NoError(t, err)
assert.Equal(t, val, got)
wg.Wait()
}

// 测试 gzip 发送
req, err = http.NewRequest(http.MethodPost, "http://127.0.0.1:7110/logkit/data", nil)
req.Header.Set(ContentTypeHeader, ApplicationGzip)
req.Header.Set(ContentEncodingHeader, "gzip")
assert.NoError(t, err)
for _, val := range testData {
for index, val := range testData {
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:7110"+paths[index], nil)
req.Header.Set(ContentTypeHeader, ApplicationGzip)
req.Header.Set(ContentEncodingHeader, "gzip")
assert.NoError(t, err)
wg.Add(1)
var buf bytes.Buffer
g := gzip.NewWriter(&buf)
_, err := g.Write([]byte(val))
_, err = g.Write([]byte(val))
assert.NoError(t, err)
g.Close()
byteVal := buf.Bytes()
go func(httpReader *Reader, t *testing.T, index int, val string) {
got, err := httpReader.ReadLine()
assert.NoError(t, err)
assert.Equal(t, val, got)
assert.Equal(t, "127.0.0.1:7110"+paths[index], httpReader.Source())
wg.Done()
}(httpReader, t, index, val)
req.Body = ioutil.NopCloser(bytes.NewReader(byteVal))
resp, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
got, err := httpReader.ReadLine()
assert.NoError(t, err)
assert.Equal(t, val, got)
wg.Wait()
}
}
1 change: 1 addition & 0 deletions reader/rest_reader_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,7 @@ var ModeKeyOptions = map[string][]Option{
Description: "监听地址前缀(http_service_path)",
ToolTip: "监听的请求地址,如 /data ",
},
OptionDataSourceTag,
},
ModeScript: {
{
Expand Down
Loading

0 comments on commit 150500c

Please sign in to comment.