Skip to content

Commit

Permalink
Add ability to specify bytes options as strings with units (KB, MiB, …
Browse files Browse the repository at this point in the history
…...) (influxdata#4852)
  • Loading branch information
Samuel-BF authored and Jean-Louis Dupond committed Apr 22, 2019
1 parent 89ede5b commit 7e80d92
Show file tree
Hide file tree
Showing 16 changed files with 117 additions and 61 deletions.
28 changes: 28 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"syscall"
"time"
"unicode"

"github.com/alecthomas/units"
)

const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
Expand All @@ -36,6 +38,11 @@ type Duration struct {
Duration time.Duration
}

// Size just wraps an int64
type Size struct {
Size int64
}

// SetVersion sets the telegraf agent version
func SetVersion(v string) error {
if version != "" {
Expand Down Expand Up @@ -85,6 +92,27 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
return nil
}

func (s *Size) UnmarshalTOML(b []byte) error {
var err error
b = bytes.Trim(b, `'`)

val, err := strconv.ParseInt(string(b), 10, 64)
if err == nil {
s.Size = val
return nil
}
uq, err := strconv.Unquote(string(b))
if err != nil {
return err
}
val, err = units.ParseStrictBytes(uq)
if err != nil {
return err
}
s.Size = val
return nil
}

// ReadLines reads contents from a file and splits them by new lines.
// A convenience wrapper to ReadLinesOffsetN(filename, 0, -1).
func ReadLines(filename string) ([]string, error) {
Expand Down
23 changes: 23 additions & 0 deletions internal/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,29 @@ func TestDuration(t *testing.T) {
assert.Equal(t, time.Second, d.Duration)
}

func TestSize(t *testing.T) {
var s Size

s.UnmarshalTOML([]byte(`"1B"`))
assert.Equal(t, int64(1), s.Size)

s = Size{}
s.UnmarshalTOML([]byte(`1`))
assert.Equal(t, int64(1), s.Size)

s = Size{}
s.UnmarshalTOML([]byte(`'1'`))
assert.Equal(t, int64(1), s.Size)

s = Size{}
s.UnmarshalTOML([]byte(`"1GB"`))
assert.Equal(t, int64(1000*1000*1000), s.Size)

s = Size{}
s.UnmarshalTOML([]byte(`"12GiB"`))
assert.Equal(t, int64(12*1024*1024*1024), s.Size)
}

func TestCompressWithGzip(t *testing.T) {
testData := "the quick brown fox jumps over the lazy dog"
inputBuffer := bytes.NewBuffer([]byte(testData))
Expand Down
7 changes: 4 additions & 3 deletions plugins/inputs/filecount/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ Counts files in directories that match certain criteria.
## Only count regular files. Defaults to true.
regular_only = true

## Only count files that are at least this size in bytes. If size is
## Only count files that are at least this size. If size is
## a negative number, only count files that are smaller than the
## absolute value of size. Defaults to 0.
size = 0
## absolute value of size. Acceptable units are B, KiB, MiB, KB, ...
## Without quotes and units, interpreted as size in bytes.
size = "0B"

## Only count files that have not been touched for at least this
## duration. If mtime is negative, only count files that have been
Expand Down
19 changes: 10 additions & 9 deletions plugins/inputs/filecount/filecount.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ const sampleConfig = `
## Only count regular files. Defaults to true.
regular_only = true
## Only count files that are at least this size in bytes. If size is
## Only count files that are at least this size. If size is
## a negative number, only count files that are smaller than the
## absolute value of size. Defaults to 0.
size = 0
## absolute value of size. Acceptable units are B, KiB, MiB, KB, ...
## Without quotes and units, interpreted as size in bytes.
size = "0B"
## Only count files that have not been touched for at least this
## duration. If mtime is negative, only count files that have been
Expand All @@ -51,7 +52,7 @@ type FileCount struct {
Name string
Recursive bool
RegularOnly bool
Size int64
Size internal.Size
MTime internal.Duration `toml:"mtime"`
fileFilters []fileFilterFunc
}
Expand Down Expand Up @@ -99,18 +100,18 @@ func (fc *FileCount) regularOnlyFilter() fileFilterFunc {
}

func (fc *FileCount) sizeFilter() fileFilterFunc {
if fc.Size == 0 {
if fc.Size.Size == 0 {
return nil
}

return func(f os.FileInfo) (bool, error) {
if !f.Mode().IsRegular() {
return false, nil
}
if fc.Size < 0 {
return f.Size() < -fc.Size, nil
if fc.Size.Size < 0 {
return f.Size() < -fc.Size.Size, nil
}
return f.Size() >= fc.Size, nil
return f.Size() >= fc.Size.Size, nil
}
}

Expand Down Expand Up @@ -257,7 +258,7 @@ func NewFileCount() *FileCount {
Name: "*",
Recursive: true,
RegularOnly: true,
Size: 0,
Size: internal.Size{Size: 0},
MTime: internal.Duration{Duration: 0},
fileFilters: nil,
}
Expand Down
6 changes: 3 additions & 3 deletions plugins/inputs/filecount/filecount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ func TestRegularOnlyFilter(t *testing.T) {

func TestSizeFilter(t *testing.T) {
fc := getNoFilterFileCount("testdata")
fc.Size = -100
fc.Size = internal.Size{Size: -100}
matches := []string{"foo", "bar", "baz", "subdir/quux", "subdir/quuz"}

acc := testutil.Accumulator{}
acc.GatherError(fc.Gather)

require.True(t, assertFileCount(&acc, "testdata", len(matches)))

fc.Size = 100
fc.Size = internal.Size{Size: 100}
matches = []string{"qux"}

acc = testutil.Accumulator{}
Expand Down Expand Up @@ -119,7 +119,7 @@ func getNoFilterFileCount(dir string) FileCount {
Name: "*",
Recursive: true,
RegularOnly: false,
Size: 0,
Size: internal.Size{Size: 0},
MTime: internal.Duration{Duration: 0},
fileFilters: nil,
}
Expand Down
4 changes: 2 additions & 2 deletions plugins/inputs/http_listener_v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ This is a sample configuration for the plugin.
# write_timeout = "10s"

## Maximum allowed http request body size in bytes.
## 0 means to use the default of 536,870,912 bytes (500 mebibytes)
# max_body_size = 0
## 0 means to use the default of 524,288,000 bytes (500 mebibytes)
# max_body_size = "500MB"

## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
Expand Down
14 changes: 7 additions & 7 deletions plugins/inputs/http_listener_v2/http_listener_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type HTTPListenerV2 struct {
Methods []string
ReadTimeout internal.Duration
WriteTimeout internal.Duration
MaxBodySize int64
MaxBodySize internal.Size
Port int

tlsint.ServerConfig
Expand Down Expand Up @@ -65,8 +65,8 @@ const sampleConfig = `
# write_timeout = "10s"
## Maximum allowed http request body size in bytes.
## 0 means to use the default of 536,870,912 bytes (500 mebibytes)
# max_body_size = 0
## 0 means to use the default of 524,288,00 bytes (500 mebibytes)
# max_body_size = "500MB"
## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
Expand Down Expand Up @@ -106,8 +106,8 @@ func (h *HTTPListenerV2) SetParser(parser parsers.Parser) {

// Start starts the http listener service.
func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error {
if h.MaxBodySize == 0 {
h.MaxBodySize = defaultMaxBodySize
if h.MaxBodySize.Size == 0 {
h.MaxBodySize.Size = defaultMaxBodySize
}

if h.ReadTimeout.Duration < time.Second {
Expand Down Expand Up @@ -173,7 +173,7 @@ func (h *HTTPListenerV2) ServeHTTP(res http.ResponseWriter, req *http.Request) {

func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request) {
// Check that the content length is not too large for us to handle.
if req.ContentLength > h.MaxBodySize {
if req.ContentLength > h.MaxBodySize.Size {
tooLarge(res)
return
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request)
defer body.Close()
}

body = http.MaxBytesReader(res, body, h.MaxBodySize)
body = http.MaxBytesReader(res, body, h.MaxBodySize.Size)
bytes, err := ioutil.ReadAll(body)
if err != nil {
tooLarge(res)
Expand Down
7 changes: 4 additions & 3 deletions plugins/inputs/http_listener_v2/http_listener_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"testing"
"time"

"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"

Expand Down Expand Up @@ -51,7 +52,7 @@ func newTestHTTPListenerV2() *HTTPListenerV2 {
Methods: []string{"POST"},
Parser: parser,
TimeFunc: time.Now,
MaxBodySize: 70000,
MaxBodySize: internal.Size{Size: 70000},
}
return listener
}
Expand Down Expand Up @@ -234,7 +235,7 @@ func TestWriteHTTPExactMaxBodySize(t *testing.T) {
Path: "/write",
Methods: []string{"POST"},
Parser: parser,
MaxBodySize: int64(len(hugeMetric)),
MaxBodySize: internal.Size{Size: int64(len(hugeMetric))},
TimeFunc: time.Now,
}

Expand All @@ -256,7 +257,7 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
Path: "/write",
Methods: []string{"POST"},
Parser: parser,
MaxBodySize: 4096,
MaxBodySize: internal.Size{Size: 4096},
TimeFunc: time.Now,
}

Expand Down
24 changes: 12 additions & 12 deletions plugins/inputs/influxdb_listener/http_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type HTTPListener struct {
ServiceAddress string
ReadTimeout internal.Duration
WriteTimeout internal.Duration
MaxBodySize int64
MaxLineSize int
MaxBodySize internal.Size
MaxLineSize internal.Size
Port int

tlsint.ServerConfig
Expand Down Expand Up @@ -84,12 +84,12 @@ const sampleConfig = `
write_timeout = "10s"
## Maximum allowed http request body size in bytes.
## 0 means to use the default of 536,870,912 bytes (500 mebibytes)
max_body_size = 0
## 0 means to use the default of 524,288,000 bytes (500 mebibytes)
max_body_size = "500MiB"
## Maximum line size allowed to be sent in bytes.
## 0 means to use the default of 65536 bytes (64 kibibytes)
max_line_size = 0
max_line_size = "64KiB"
## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
Expand Down Expand Up @@ -139,11 +139,11 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error {
h.BuffersCreated = selfstat.Register("http_listener", "buffers_created", tags)
h.AuthFailures = selfstat.Register("http_listener", "auth_failures", tags)

if h.MaxBodySize == 0 {
h.MaxBodySize = DEFAULT_MAX_BODY_SIZE
if h.MaxBodySize.Size == 0 {
h.MaxBodySize.Size = DEFAULT_MAX_BODY_SIZE
}
if h.MaxLineSize == 0 {
h.MaxLineSize = DEFAULT_MAX_LINE_SIZE
if h.MaxLineSize.Size == 0 {
h.MaxLineSize.Size = DEFAULT_MAX_LINE_SIZE
}

if h.ReadTimeout.Duration < time.Second {
Expand All @@ -154,7 +154,7 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error {
}

h.acc = acc
h.pool = NewPool(200, h.MaxLineSize)
h.pool = NewPool(200, int(h.MaxLineSize.Size))

tlsConf, err := h.ServerConfig.TLSConfig()
if err != nil {
Expand Down Expand Up @@ -241,7 +241,7 @@ func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {

func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
// Check that the content length is not too large for us to handle.
if req.ContentLength > h.MaxBodySize {
if req.ContentLength > h.MaxBodySize.Size {
tooLarge(res)
return
}
Expand All @@ -261,7 +261,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
return
}
}
body = http.MaxBytesReader(res, body, h.MaxBodySize)
body = http.MaxBytesReader(res, body, h.MaxBodySize.Size)

var return400 bool
var hangingBytes bool
Expand Down
9 changes: 5 additions & 4 deletions plugins/inputs/influxdb_listener/http_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"testing"
"time"

"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -217,7 +218,7 @@ func TestWriteHTTPNoNewline(t *testing.T) {
func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) {
listener := &HTTPListener{
ServiceAddress: "localhost:0",
MaxLineSize: 128 * 1000,
MaxLineSize: internal.Size{Size: 128 * 1000},
TimeFunc: time.Now,
}

Expand All @@ -235,7 +236,7 @@ func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) {
func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
listener := &HTTPListener{
ServiceAddress: "localhost:0",
MaxBodySize: 4096,
MaxBodySize: internal.Size{Size: 4096},
TimeFunc: time.Now,
}

Expand All @@ -252,7 +253,7 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) {
listener := &HTTPListener{
ServiceAddress: "localhost:0",
MaxLineSize: 70,
MaxLineSize: internal.Size{Size: 70},
TimeFunc: time.Now,
}

Expand All @@ -279,7 +280,7 @@ func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) {
func TestWriteHTTPLargeLinesSkipped(t *testing.T) {
listener := &HTTPListener{
ServiceAddress: "localhost:0",
MaxLineSize: 100,
MaxLineSize: internal.Size{Size: 100},
TimeFunc: time.Now,
}

Expand Down
4 changes: 2 additions & 2 deletions plugins/inputs/socket_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ This is a sample configuration for the plugin.
## Enables client authentication if set.
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]

## Maximum socket buffer size in bytes.
## Maximum socket buffer size (in bytes when no unit specified).
## For stream sockets, once the buffer fills up, the sender will start backing up.
## For datagram sockets, once the buffer fills up, metrics will start dropping.
## Defaults to the OS default.
# read_buffer_size = 65535
# read_buffer_size = "64KiB"

## Period between keep alive probes.
## Only applies to TCP sockets.
Expand Down
Loading

0 comments on commit 7e80d92

Please sign in to comment.