diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index 3fa8951ee264..129fc5cee3ea 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -185,7 +185,7 @@ func (b *Beat) launch(bt Creator) error { // TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet, // but refine publisher to disconnect clients on stop automatically - // defer publisher.Stop() + defer publisher.Stop() b.Publisher = publisher beater, err := bt(b, sub) diff --git a/libbeat/outputs/s3/common_test.go b/libbeat/outputs/s3/common_test.go new file mode 100644 index 000000000000..285d0d17e14c --- /dev/null +++ b/libbeat/outputs/s3/common_test.go @@ -0,0 +1,44 @@ +package s3out + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/elastic/beats/libbeat/logp" + "github.com/stretchr/testify/assert" +) + +func mkTempDir(t *testing.T) string { + tempDir, err := ioutil.TempDir("", "testConsumer") + assert.Nil(t, err) + err = os.MkdirAll(tempDir, 0700) + assert.Nil(t, err) + t.Logf("Created temporary directory %v", tempDir) + return tempDir +} + +func rmTempDir(t *testing.T, tempDir string) { + t.Logf("Removing temporary directory %v", tempDir) + err := os.RemoveAll(tempDir) + assert.Nil(t, err) +} + +func setupLogp(t *testing.T) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"s3"}) + } +} + +func getTestConfig(tempDir string) config { + return config{ + AccessKeyId: "testKeyId", + SecretAccessKey: "testSecretKey", + Region: "US-EAST-1", + Bucket: "testBucket", + Prefix: "testPrefix/", + TemporaryDirectory: tempDir, + SecondsPerChunk: 60 * 60 * 2, + RetryLimitSeconds: 60 * 60, + } +} diff --git a/libbeat/outputs/s3/config.go b/libbeat/outputs/s3/config.go new file mode 100644 index 000000000000..22fd510b0dab --- /dev/null +++ b/libbeat/outputs/s3/config.go @@ -0,0 +1,39 @@ +package s3out + +import ( + "fmt" + "os" + "path/filepath" +) + +type config struct { + AccessKeyId string `config:"access_key_id"` + SecretAccessKey string `config:"secret_access_key"` + Region string `config:"region"` + Bucket string `config:"bucket"` + Prefix string `config:"prefix"` + TemporaryDirectory string `config:"temporary_directory"` + SecondsPerChunk int `config:"seconds_per_chunk"` + RetryLimitSeconds int `config:"retry_limit_seconds"` +} + +var ( + defaultConfig = config{ + Region: "us-east-1", + TemporaryDirectory: filepath.Join(os.TempDir(), "beat_s3"), + SecondsPerChunk: 300, + RetryLimitSeconds: 60 * 30, + } +) + +func (c *config) Validate() error { + if c.Bucket == "" { + return fmt.Errorf("Must specify an s3 bucket") + } + + if c.SecondsPerChunk < 1 { + return fmt.Errorf("seconds_per_chunk must be a positive integer") + } + + return nil +} diff --git a/libbeat/outputs/s3/consumer.go b/libbeat/outputs/s3/consumer.go new file mode 100644 index 000000000000..cd9f955ce12f --- /dev/null +++ b/libbeat/outputs/s3/consumer.go @@ -0,0 +1,252 @@ +package s3out + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "regexp" + "time" + + "github.com/elastic/beats/libbeat/logp" +) + +type consumerAPI interface { + run() + appendLine(string) + shutdown() +} + +type consumer struct { + lineChan chan string + ticker *time.Ticker + chunkDuration time.Duration + chunkStartTime time.Time + appType string + timestampRegex *regexp.Regexp + timestampFormat string + baseFilePath string + file *os.File + uploader *s3uploader + uploadThreadChan chan bool +} + +type consumerOptions struct { + AppType string `config:"appType"` + TimestampRegex string `config:"timestampRegex"` + TimestampFormat string `config:"timestampFormat"` +} + +func (c *consumer) appendLine(line string) { + c.lineChan <- line +} + +func (c *consumer) shutdown() { + close(c.lineChan) +} + +func (c *consumer) run() { + + debug("running consumer for app: %v", c.appType) + + for { + select { + case <-c.ticker.C: + c.upload(true) + case line, ok := <-c.lineChan: + if ok { + c.append(line) + } else { + c.upload(false) + c.uploader.shutdown() + logp.Info("Waiting for s3 uploads for %v to complete...", c.appType) + <-c.uploadThreadChan + return + } + } + } +} + +func (c *consumer) append(line string) { + timestamp, err := c.getLineTimestamp(line) + if err != nil { + logp.Err("%v", err) + } + + if timestamp != nil { + if timestamp.Before(c.chunkStartTime) || timestamp.After(c.chunkStartTime.Add(c.chunkDuration)) { + c.upload(true) + c.chunkStartTime = *timestamp + } + } + + fmt.Fprintln(c.file, line) + + if timestamp != nil { + c.setModTime(c.file.Name(), *timestamp) + } +} + +func (c *consumer) getLineTimestamp(line string) (*time.Time, error) { + if c.timestampRegex == nil { + return nil, nil + } + + timestampStr := c.timestampRegex.FindString(line) + if timestampStr == "" { + return nil, errors.New(fmt.Sprintf("Could not find a timestamp in line for %v: %v", c.appType, line)) + } + + timestamp, err := time.Parse(c.timestampFormat, timestampStr) + if err != nil { + return nil, errors.New(fmt.Sprintf("Error parsing timestamp: %v", err)) + } + + return ×tamp, nil +} + +func (c *consumer) setModTime(filePath string, timestamp time.Time) { + err := os.Chtimes(filePath, timestamp, timestamp) + if err != nil { + logp.Err("Error setting timestamp on %v: %v", filePath, err) + } +} + +func (c *consumer) upload(createNewFile bool) { + + fInfo, err := c.file.Stat() + if err != nil { + logp.Err("Error retrieving file info: %v", err) + return + } + + if fInfo.Size() < 1 { + logp.Info("Chunk %v is empty, not uploading", c.file.Name()) + return + } + + err = c.file.Sync() + if err != nil { + logp.Err(err.Error()) + return + } + + debug("Sending %v to uploader goroutine", c.file.Name()) + c.uploader.fileChan <- c.file + + if createNewFile { + c.createTempFile() + } + +} + +func (c *consumer) runUploader() { + go func() { + c.uploader.recieveAndUpload() + debug("recieveAndUpload returned, signalling run()") + close(c.uploadThreadChan) + }() +} + +func (c *consumer) init() error { + c.runUploader() + if err := c.handleLeftoverChunks(); err != nil { + return err + } + if err := c.createTempFile(); err != nil { + return err + } + return nil +} + +func (c *consumer) createTempFile() error { + tempFilePath := fmt.Sprintf("%s_%d", c.baseFilePath, time.Now().UTC().UnixNano()) + file, err := os.Create(tempFilePath) + if err != nil { + logp.Err("Failed to create temporary file: %v", tempFilePath) + return err + } + logp.Info("Created new temporary file: %v", file.Name()) + c.file = file + return nil +} + +func (c *consumer) handleLeftoverChunks() error { + chunkPaths, err := filepath.Glob(fmt.Sprintf("%s_*", c.baseFilePath)) + if err != nil { + return err + } + + for _, filePath := range chunkPaths { + file, err := os.Open(filePath) + if err != nil { + logp.Err("Encountered error while accessing leftover chunk %v: %v", filePath, err.Error()) + continue + } + + fInfo, err := file.Stat() + if err != nil { + logp.Err(err.Error()) + } + + if fInfo.Size() < 1 { + // It's empty, just delete it and move on + os.Remove(filePath) + continue + } + + logp.Info("Found non-empty leftover chunk for %v, uploading it", c.appType) + // Put it directly in the upload queue, from here on it behaves like a chunk that failed to upload during the current exucution of the program + c.uploader.fileChan <- file + } + + return nil +} + +func removeFile(file *os.File) { + debug("Removing file %v", file.Name()) + err := file.Close() + if err != nil { + logp.Err("Error closing file: %v", err) + } + err = os.Remove(file.Name()) + if err != nil { + logp.Err("Error removing file %v: %v", file.Name(), err) + } +} + +func newConsumer(c config, options *consumerOptions, s3Svc S3API) (*consumer, error) { + baseFilePath := filepath.Join(c.TemporaryDirectory, options.AppType) + + newConsumer := &consumer{ + lineChan: make(chan string), + ticker: time.NewTicker(time.Second * time.Duration(c.SecondsPerChunk)), + chunkDuration: time.Second * time.Duration(c.SecondsPerChunk), + chunkStartTime: time.Now(), + appType: options.AppType, + timestampFormat: options.TimestampFormat, + baseFilePath: baseFilePath, + uploader: newS3Uploader(c, options.AppType, s3Svc), + uploadThreadChan: make(chan bool), + } + var err error + if options.TimestampRegex != "" { + if options.TimestampFormat == "" { + logp.Err("timestampRegex specified without timestampFormat") + return nil, errors.New("Must specify timestampFormat with timestampRegex for s3 output") + } + newConsumer.timestampRegex, err = regexp.Compile(options.TimestampRegex) + if err != nil { + logp.Err("failed to initialize s3 consumer for %v", options.AppType) + return nil, err + } + } + + err = newConsumer.init() + if err != nil { + logp.Err("failed to initialize s3 consumer for %v", options.AppType) + return nil, err + } + + return newConsumer, nil +} diff --git a/libbeat/outputs/s3/consumer_test.go b/libbeat/outputs/s3/consumer_test.go new file mode 100644 index 000000000000..fc414ac03c89 --- /dev/null +++ b/libbeat/outputs/s3/consumer_test.go @@ -0,0 +1,159 @@ +// +build !integration + +package s3out + +import ( + "os" + "path" + "strconv" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go/service/s3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func getTestconsumerOptions(appType string) *consumerOptions { + return &consumerOptions{AppType: appType} +} + +func Testshutdown(t *testing.T) { + tempDir := mkTempDir(t) + defer rmTempDir(t, tempDir) + setupLogp(t) + config := getTestConfig(tempDir) + + s3SvcMock := new(s3Mock) + s3SvcMock.On("PutObject", mock.AnythingOfType("*s3.PutObjectInput")).Return(&s3.PutObjectOutput{}, nil) + + consumer, err := newConsumer(config, getTestconsumerOptions("testLog"), s3SvcMock) + assert.Nil(t, err) + assert.NotNil(t, consumer) + + consumershutdown := make(chan bool) + go func(consumershutdown chan<- bool) { + consumer.run() + consumershutdown <- true + }(consumershutdown) + + select { + case <-consumershutdown: + t.Error("Consumer shutdown before shutdown was called") + default: + } + + consumer.appendLine("a log line") + consumer.shutdown() + + select { + case <-consumershutdown: + case <-time.After(time.Second * 5): + t.Error("Consumer failed to shutdown after shutdown was called") + } + + // Make sure we upload any remaining data before shutting down + s3SvcMock.AssertCalled(t, "PutObject", mock.AnythingOfType("*s3.PutObjectInput")) + s3SvcMock.AssertNumberOfCalls(t, "PutObject", 1) +} + +// Make sure we don't upload empty chunks to S3 +func TestEmptyChunk(t *testing.T) { + tempDir := mkTempDir(t) + defer rmTempDir(t, tempDir) + config := getTestConfig(tempDir) + + s3SvcMock := new(s3Mock) + s3SvcMock.On("PutObject", mock.AnythingOfType("*s3.PutObjectInput")).Return(&s3.PutObjectOutput{}, nil) + + consumer, err := newConsumer(config, getTestconsumerOptions("testLog"), s3SvcMock) + assert.Nil(t, err) + assert.NotNil(t, consumer) + + consumershutdown := make(chan bool) + go func(consumershutdown chan<- bool) { + consumer.run() + consumershutdown <- true + }(consumershutdown) + + select { + case <-consumershutdown: + t.Error("Consumer shutdown before shutdown was called") + default: + } + + consumer.upload(true) + + s3SvcMock.AssertNotCalled(t, "PutObject", mock.AnythingOfType("*s3.PutObjectInput")) + consumer.shutdown() +} + +func TestHandleLeftoverChunk(t *testing.T) { + tempDir := mkTempDir(t) + defer rmTempDir(t, tempDir) + setupLogp(t) + config := getTestConfig(tempDir) + + s3SvcMock := new(s3Mock) + s3SvcMock.On("PutObject", mock.AnythingOfType("*s3.PutObjectInput")).Return(&s3.PutObjectOutput{}, nil) + + consumer, err := newConsumer(config, getTestconsumerOptions("testLog"), s3SvcMock) + assert.Nil(t, err) + assert.NotNil(t, consumer) + + consumer.append("a log line") + + fInfo, err := os.Stat(consumer.file.Name()) + assert.Nil(t, err) + + // The new consumer should find the old file and upload it as is, using the + // file's last modified timestamp + secondConsumer, err := newConsumer(config, getTestconsumerOptions("testLog"), s3SvcMock) + assert.Nil(t, err) + assert.NotNil(t, secondConsumer) + secondConsumer.shutdown() + secondConsumer.run() + + s3SvcMock.AssertCalled(t, "PutObject", mock.AnythingOfType("*s3.PutObjectInput")) + s3SvcMock.AssertNumberOfCalls(t, "PutObject", 1) + putData := s3SvcMock.Calls[0].Arguments[0].(*s3.PutObjectInput) + assert.Equal(t, strconv.FormatInt(fInfo.ModTime().UTC().Unix(), 10), path.Base(*putData.Key)) + consumer.shutdown() +} + +func TestGetLineTimestamp(t *testing.T) { + tempDir := mkTempDir(t) + defer rmTempDir(t, tempDir) + setupLogp(t) + config := getTestConfig(tempDir) + + timeFormat := "2006-01-02T15:04:05.000-0700" + options := getTestconsumerOptions("testLog") + + logLine := "2016-09-20T14:59:14.736-0400 I NETWORK [conn1] end connection 127.0.0.1:62218 (0 connections now open)" + logTime, err := time.Parse(timeFormat, strings.Split(logLine, " ")[0]) + assert.Nil(t, err) + + consumer, err := newConsumer(config, options, nil) + assert.Nil(t, err) + + // shouldn't do anything because we don't have a regex + ts, err := consumer.getLineTimestamp(logLine) + assert.Nil(t, err) + assert.Nil(t, ts) + + // Should return an error because we have a regex without a format + options.TimestampRegex = "^[[:digit:]]{4}-[[:digit:]]{2}-[[:digit:]]{2}T[[:digit:]]{2}\\:[[:digit:]]{2}\\:[[:digit:]]{2}\\.[[:digit:]]{3}[+-][[:digit:]]{4}" + consumer, err = newConsumer(config, options, nil) + assert.NotNil(t, err) + + // Should modify the timestamp to be in the past + options.TimestampFormat = timeFormat + consumer, err = newConsumer(config, options, nil) + assert.Nil(t, err) + ts, err = consumer.getLineTimestamp(logLine) + assert.Nil(t, err) + assert.NotNil(t, ts) + assert.True(t, ts.Equal(logTime)) +} diff --git a/libbeat/outputs/s3/mocks_test.go b/libbeat/outputs/s3/mocks_test.go new file mode 100644 index 000000000000..94d2193d5825 --- /dev/null +++ b/libbeat/outputs/s3/mocks_test.go @@ -0,0 +1,56 @@ +// +build !integration + +package s3out + +import ( + "github.com/aws/aws-sdk-go/service/s3" + "github.com/stretchr/testify/mock" +) + +// MockS3API is an autogenerated mock type for the S3API type +type s3Mock struct { + mock.Mock +} + +// PutObject provides a mock function with given fields: _a0 +func (_m *s3Mock) PutObject(_a0 *s3.PutObjectInput) (*s3.PutObjectOutput, error) { + ret := _m.Called(_a0) + + var r0 *s3.PutObjectOutput + if rf, ok := ret.Get(0).(func(*s3.PutObjectInput) *s3.PutObjectOutput); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*s3.PutObjectOutput) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(*s3.PutObjectInput) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// consumerMock is an autogenerated mock type for the consumerAPI type +type consumerMock struct { + mock.Mock +} + +// appendLine provides a mock function with given fields: _a0 +func (_m *consumerMock) appendLine(_a0 string) { + _m.Called(_a0) +} + +// run provides a mock function with given fields: +func (_m *consumerMock) run() { + _m.Called() +} + +// shutdown provides a mock function with given fields: +func (_m *consumerMock) shutdown() { + _m.Called() +} diff --git a/libbeat/outputs/s3/s3.go b/libbeat/outputs/s3/s3.go new file mode 100644 index 000000000000..89c17b269972 --- /dev/null +++ b/libbeat/outputs/s3/s3.go @@ -0,0 +1,195 @@ +package s3out + +import ( + "os" + "path/filepath" + "sync" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/op" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs" + "github.com/elastic/go-ucfg" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" +) + +var debug = logp.MakeDebug("s3") + +func init() { + outputs.RegisterOutputPlugin("s3", New) +} + +// A subset of github.com/aws/aws-sdk-go/blob/master/service/s3/s3iface.S3API +type S3API interface { + PutObject(*s3.PutObjectInput) (*s3.PutObjectOutput, error) +} + +type s3Output struct { + config config + s3Svc S3API + consumerLock *sync.RWMutex + consumerMap map[string]consumerAPI + consumerWg *sync.WaitGroup +} + +// New instantiates a new s3 output instance. +func New(_ string, cfg *common.Config, _ int) (outputs.Outputer, error) { + config := defaultConfig + if err := cfg.Unpack(&config); err != nil { + logp.Err("Error unpacking config for s3 output.") + return nil, err + } + + if config.AccessKeyId != "" && config.SecretAccessKey != "" { + debug("Found aws credentials in config, setting environment variables") + os.Setenv("AWS_ACCESS_KEY_ID", config.AccessKeyId) + os.Setenv("AWS_SECRET_ACCESS_KEY", config.SecretAccessKey) + } + + // disable bulk support in publisher pipeline + cfg.SetInt("flush_interval", -1, -1) + cfg.SetInt("bulk_max_size", -1, -1) + + svc := s3.New(session.New(&aws.Config{Region: aws.String(config.Region)})) + + output := &s3Output{ + s3Svc: svc, + consumerLock: new(sync.RWMutex), + consumerMap: make(map[string]consumerAPI), + consumerWg: &sync.WaitGroup{}, + } + + if err := output.init(config); err != nil { + logp.Err("Error calling init for s3 output.") + return nil, err + } + + return output, nil + +} + +func (out *s3Output) init(config config) error { + out.config = config + tempDir := out.config.TemporaryDirectory + if err := os.MkdirAll(tempDir, 0700); err != nil { + logp.Err("Failed to create s3 temporary file directory: %v", tempDir) + return err + } + logp.Info("Created directory for temporary s3 files: %v", tempDir) + + return nil +} + +func (out *s3Output) PublishEvent( + sig op.Signaler, + opts outputs.Options, + data outputs.Data, +) (err error) { + + defer func() { op.Sig(sig, err) }() + + appType, err := getConsumerOptions(data) + if err != nil { + return err + } + + message, err := getMessage(data) + if err != nil { + return err + } + + consumer, err := out.getConsumer(appType) + if err != nil { + return err + } + + consumer.appendLine(message) + + return err +} + +func (out *s3Output) Close() error { + debug("Close called on s3 outputter, shutting down") + out.consumerLock.RLock() + for _, consumer := range out.consumerMap { + consumer.shutdown() + } + out.consumerLock.RUnlock() + out.consumerWg.Wait() + return nil +} + +func getConsumerOptions(data outputs.Data) (*consumerOptions, error) { + options := &consumerOptions{} + + appTypeInterface, err := data.Event.GetValue("fields.s3") + if err == nil { + config, err := ucfg.NewFrom(appTypeInterface.(common.MapStr)) + if err == nil { + config.Unpack(options) + } + } + + if options.AppType == "" { + logp.Info("Could not retrieve appType for s3 output, using source for appType instead") + sourceInterface, err := data.Event.GetValue("source") + if err != nil { + logp.Err("Could not get the source of event for s3 output and appType not set, bailing out") + return nil, err + } + source := sourceInterface.(string) + options.AppType = filepath.Base(source) + } + + debug("Build consumer options: %v", options) + return options, nil +} + +func getMessage(data outputs.Data) (string, error) { + messageInterface, err := data.Event.GetValue("message") + if err != nil { + logp.Err("Could not get message for s3 output. Malformed event?") + return "", err + } + return messageInterface.(string), nil +} + +func (out *s3Output) getConsumer(options *consumerOptions) (consumer consumerAPI, err error) { + out.consumerLock.RLock() + consumer = out.consumerMap[options.AppType] + out.consumerLock.RUnlock() + + if consumer != nil { + return + } + + out.consumerLock.Lock() + defer out.consumerLock.Unlock() + consumer = out.consumerMap[options.AppType] + + // It is possible that another goroutine got the lock and instantiated + // a consumer before us, double check now that we have the write lock. + if consumer != nil { + return + } + + consumer, err = newConsumer(out.config, options, out.s3Svc) + if err != nil { + logp.Err("Error creating consumer for appType %v: %v", options.AppType, err) + return + } + + out.consumerMap[options.AppType] = consumer + + // This WaitGroup is used to wait for all consumers to upload any remaining chunks when Close() is called. + out.consumerWg.Add(1) + go func() { + defer out.consumerWg.Done() + consumer.run() + }() + + return +} diff --git a/libbeat/outputs/s3/s3_test.go b/libbeat/outputs/s3/s3_test.go new file mode 100644 index 000000000000..8e4924afba3e --- /dev/null +++ b/libbeat/outputs/s3/s3_test.go @@ -0,0 +1,200 @@ +// +build !integration + +package s3out + +import ( + "os" + "path/filepath" + "sync" + "testing" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs" + "github.com/stretchr/testify/assert" +) + +func TestOutputInit(t *testing.T) { + tempDir := mkTempDir(t) + defer rmTempDir(t, tempDir) + outputTempDir := filepath.Join(tempDir, "s3Out") + + output := &s3Output{ + s3Svc: nil, + consumerLock: new(sync.RWMutex), + consumerMap: make(map[string]consumerAPI), + consumerWg: &sync.WaitGroup{}, + } + + config := defaultConfig + config.TemporaryDirectory = outputTempDir + + err := output.init(config) + assert.Nil(t, err) + + fInfo, err := os.Stat(outputTempDir) + assert.Nil(t, err) + assert.True(t, fInfo.IsDir()) +} + +func TestGetConsumer(t *testing.T) { + tempDir := mkTempDir(t) + defer rmTempDir(t, tempDir) + outputTempDir := filepath.Join(tempDir, "s3Out") + + output := &s3Output{ + s3Svc: nil, + consumerLock: new(sync.RWMutex), + consumerMap: make(map[string]consumerAPI), + consumerWg: &sync.WaitGroup{}, + } + + config := defaultConfig + config.TemporaryDirectory = outputTempDir + + err := output.init(config) + assert.Nil(t, err) + + options := &consumerOptions{AppType: "myApp"} + myConsumerMock := new(consumerMock) + output.consumerMap[options.AppType] = myConsumerMock + outConsumer, err := output.getConsumer(options) + assert.Nil(t, err) + assert.Equal(t, outConsumer, myConsumerMock) + + otherOptions := &consumerOptions{AppType: "otherApp"} + realConsumer, err := output.getConsumer(otherOptions) + realConsumer.shutdown() + assert.Nil(t, err) + assert.NotNil(t, realConsumer) +} + +func TestGetMessage(t *testing.T) { + myMessage := "some message" + data := outputs.Data{ + Event: common.MapStr{}, + } + + ret, err := getMessage(data) + assert.NotNil(t, err) + + data.Event["message"] = myMessage + ret, err = getMessage(data) + assert.Nil(t, err) + assert.Equal(t, myMessage, ret) +} + +func TestGetConsumerOptions(t *testing.T) { + data := outputs.Data{ + Event: common.MapStr{}, + } + + ret, err := getConsumerOptions(data) + assert.NotNil(t, err) + + data.Event["source"] = filepath.Join("var", "log", "myApp.log") + ret, err = getConsumerOptions(data) + assert.Nil(t, err) + assert.Equal(t, "myApp.log", ret.AppType) + assert.Empty(t, ret.TimestampFormat) + assert.Empty(t, ret.TimestampRegex) + + data.Event["fields"] = common.MapStr{} + data.Event["fields"].(common.MapStr)["s3"] = common.MapStr{} + data.Event["fields"].(common.MapStr)["s3"].(common.MapStr)["appType"] = "myApp" + + ret, err = getConsumerOptions(data) + assert.Nil(t, err) + assert.Equal(t, "myApp", ret.AppType) + assert.Empty(t, ret.TimestampFormat) + assert.Empty(t, ret.TimestampRegex) + + data.Event["fields"].(common.MapStr)["s3"].(common.MapStr)["timestampRegex"] = "someRegex" + data.Event["fields"].(common.MapStr)["s3"].(common.MapStr)["timestampFormat"] = "someFormat" + ret, err = getConsumerOptions(data) + assert.Nil(t, err) + assert.Equal(t, "myApp", ret.AppType) + assert.Equal(t, "someFormat", ret.TimestampFormat) + assert.Equal(t, "someRegex", ret.TimestampRegex) +} + +func TestPublishEvent(t *testing.T) { + tempDir := mkTempDir(t) + defer rmTempDir(t, tempDir) + outputTempDir := filepath.Join(tempDir, "s3Out") + + output := &s3Output{ + s3Svc: nil, + consumerLock: new(sync.RWMutex), + consumerMap: make(map[string]consumerAPI), + consumerWg: &sync.WaitGroup{}, + } + + config := defaultConfig + config.TemporaryDirectory = outputTempDir + + err := output.init(config) + assert.Nil(t, err) + + consumerMock1 := new(consumerMock) + output.consumerMap["myApp1"] = consumerMock1 + consumerMock2 := new(consumerMock) + output.consumerMap["myApp2"] = consumerMock2 + + data := outputs.Data{ + Event: common.MapStr{}, + } + data.Event["source"] = filepath.Join("var", "log", "myApp.log") + data.Event["fields"] = common.MapStr{} + data.Event["fields"].(common.MapStr)["s3"] = common.MapStr{} + data.Event["fields"].(common.MapStr)["s3"].(common.MapStr)["appType"] = "myApp1" + data.Event["message"] = "Hello myApp1!" + + consumerMock1.On("appendLine", "Hello myApp1!").Return() + consumerMock2.On("appendLine", "Hello myApp2!").Return() + + err = output.PublishEvent(nil, outputs.Options{}, data) + assert.Nil(t, err) + consumerMock1.AssertNumberOfCalls(t, "appendLine", 1) + consumerMock2.AssertNotCalled(t, "appendLine") + + data.Event["fields"].(common.MapStr)["s3"] = common.MapStr{} + data.Event["fields"].(common.MapStr)["s3"].(common.MapStr)["appType"] = "myApp2" + data.Event["message"] = "Hello myApp2!" + + err = output.PublishEvent(nil, outputs.Options{}, data) + assert.Nil(t, err) + consumerMock1.AssertNumberOfCalls(t, "appendLine", 1) + consumerMock2.AssertNumberOfCalls(t, "appendLine", 1) +} + +func TestClose(t *testing.T) { + tempDir := mkTempDir(t) + defer rmTempDir(t, tempDir) + outputTempDir := filepath.Join(tempDir, "s3Out") + + output := &s3Output{ + s3Svc: nil, + consumerLock: new(sync.RWMutex), + consumerMap: make(map[string]consumerAPI), + consumerWg: &sync.WaitGroup{}, + } + + config := defaultConfig + config.TemporaryDirectory = outputTempDir + + err := output.init(config) + assert.Nil(t, err) + + consumerMock1 := new(consumerMock) + output.consumerMap["myApp1"] = consumerMock1 + consumerMock2 := new(consumerMock) + output.consumerMap["myApp2"] = consumerMock2 + + consumerMock1.On("shutdown").Return() + consumerMock2.On("shutdown").Return() + + err = output.Close() + assert.Nil(t, err) + consumerMock1.AssertNumberOfCalls(t, "shutdown", 1) + consumerMock2.AssertNumberOfCalls(t, "shutdown", 1) +} diff --git a/libbeat/outputs/s3/s3uploader.go b/libbeat/outputs/s3/s3uploader.go new file mode 100644 index 000000000000..f86a377b59e3 --- /dev/null +++ b/libbeat/outputs/s3/s3uploader.go @@ -0,0 +1,121 @@ +package s3out + +import ( + "errors" + "os" + "path" + "strconv" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/elastic/beats/libbeat/logp" +) + +const retryInterval = time.Second * 30 + +type s3uploader struct { + fileChan chan *os.File + shutdownChan chan bool + retryLimit time.Duration + appType string + bucket string + prefix string + s3Svc S3API +} + +func newS3Uploader(c config, appType string, s3Svc S3API) *s3uploader { + retryLimit := time.Minute * time.Duration(c.RetryLimitSeconds) + uploadInterval := time.Second * time.Duration(c.SecondsPerChunk) + channelSize := int64(retryLimit / uploadInterval) + debug("computed channel size to be %v; uploadInterval: %v, retryLimit: %v", channelSize, uploadInterval, retryLimit) + + return &s3uploader{ + fileChan: make(chan *os.File, channelSize), + shutdownChan: make(chan bool), + retryLimit: retryLimit, + appType: appType, + bucket: c.Bucket, + prefix: c.Prefix, + s3Svc: s3Svc, + } +} + +func (s *s3uploader) shutdown() { + close(s.fileChan) + close(s.shutdownChan) +} + +func (s *s3uploader) recieveAndUpload() { + debug("recieveAndUpload goroutine started for s3uploader") + + for { + // Wait until we have something to do. + f, ok := <-s.fileChan + if !ok { + debug("recieveAndUpload goroutine exiting") + break + } + + if err := s.tryUpload(f); err != nil { + logp.Err("tryUpload returned an error, shutting down s3 uploader goroutine. (%v)", err) + break + } + } +} + +func (s *s3uploader) tryUpload(file *os.File) error { + tryUntil := time.Now().Add(s.retryLimit) + for { + + err := s.s3Put(file) + if err == nil { + removeFile(file) + break + } + + now := time.Now() + if now.Add(retryInterval).After(tryUntil) { + logp.Err("Failed to upload %v for too long, dropping the chunk", file.Name()) + removeFile(file) + break + } + + logp.Err("Failed to upload %v, will try again in %v and give up in %v", file.Name(), retryInterval, tryUntil.Sub(now)) + select { + case <-s.shutdownChan: + return errors.New("S3 upload failed during shutdown, abandoning current and future uploads. We will try to recover them on the next run.") + case <-time.After(retryInterval): + } + } + + return nil +} + +func (s *s3uploader) s3Put(file *os.File) error { + + fInfo, err := file.Stat() + if err != nil { + return err + } + + _, err = file.Seek(0, 0) + if err != nil { + return err + } + + timeStamp := strconv.FormatInt(fInfo.ModTime().UTC().Unix(), 10) + + debug("Uploading %v to s3", fInfo.Name()) + response, err := s.s3Svc.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(path.Join(s.prefix, s.appType, timeStamp)), + Body: file, + }) + if err != nil { + return err + } + debug(response.String()) + + return nil +} diff --git a/libbeat/outputs/s3/s3uploader_test.go b/libbeat/outputs/s3/s3uploader_test.go new file mode 100644 index 000000000000..f000854e1d00 --- /dev/null +++ b/libbeat/outputs/s3/s3uploader_test.go @@ -0,0 +1,86 @@ +package s3out + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/aws/aws-sdk-go/service/s3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestEvictOldFiles(t *testing.T) { + tempDir := mkTempDir(t) + defer rmTempDir(t, tempDir) + setupLogp(t) + testConfig := getTestConfig(tempDir) + testConfig.RetryLimitSeconds = 0 + + blockMockChan := make(chan time.Time) + s3SvcMock := new(s3Mock) + s3SvcMock.On("PutObject", mock.AnythingOfType("*s3.PutObjectInput")).Return(&s3.PutObjectOutput{}, nil).Once() + s3SvcMock.On("PutObject", mock.AnythingOfType("*s3.PutObjectInput")).Return(nil, errors.New("We are investigating increased API error rates in the US-EAST-1 Region.")).WaitUntil(blockMockChan) + + uploader := newS3Uploader(testConfig, "testEvectOldFiles", s3SvcMock) + // Needed to avoid deadlock at the end of the test + uploader.fileChan = make(chan *os.File, 1) + + files := make([]*os.File, 4) + for i := 0; i < 4; i++ { + file, err := os.Create(filepath.Join(tempDir, fmt.Sprintf("file%v", i))) + if err != nil || file == nil { + t.Logf("%v; %v", file, err) + t.FailNow() + } + files[i] = file + } + + files[0].WriteString("One file") + files[1].WriteString("Two file") + files[2].WriteString("Red file") + files[3].WriteString("Blue file") + + go uploader.recieveAndUpload() + + debug("Sending first file") + uploader.fileChan <- files[0] + + debug("Sending second file") + uploader.fileChan <- files[1] + debug("Allowing second api call to fail") + blockMockChan <- time.Now() + + debug("Sending third file") + uploader.retryLimit = time.Hour + uploader.fileChan <- files[2] + + debug("Allowing third api call to fail") + blockMockChan <- time.Now() + + debug("Sending fourth file") + uploader.fileChan <- files[3] + + uploader.shutdown() + + // The first file should be deleted since it uploaded + _, err := os.Stat(files[0].Name()) + assert.True(t, os.IsNotExist(err)) + + // The second file should be deleted since it timed out + _, err = os.Stat(files[1].Name()) + assert.True(t, os.IsNotExist(err)) + + // The third should be left on disk since we shutdown before it timed out + _, err = os.Stat(files[2].Name()) + assert.Nil(t, err) + + // The fourth file should be left on disk and we shouldn't have even tried to + // upload it since the previous one failed and shutdown was called + _, err = os.Stat(files[3].Name()) + assert.Nil(t, err) + s3SvcMock.AssertNumberOfCalls(t, "PutObject", 3) +} diff --git a/libbeat/publisher/publish.go b/libbeat/publisher/publish.go index 4f8d1b318e28..6cb8b192b0b4 100644 --- a/libbeat/publisher/publish.go +++ b/libbeat/publisher/publish.go @@ -21,6 +21,7 @@ import ( _ "github.com/elastic/beats/libbeat/outputs/kafka" _ "github.com/elastic/beats/libbeat/outputs/logstash" _ "github.com/elastic/beats/libbeat/outputs/redis" + _ "github.com/elastic/beats/libbeat/outputs/s3" ) // command line flags