diff --git a/plugins/common/socket/datagram.go b/plugins/common/socket/datagram.go index 0f84b7d66f9e2..d3698be9bf1f4 100644 --- a/plugins/common/socket/datagram.go +++ b/plugins/common/socket/datagram.go @@ -49,7 +49,7 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError) go func() { defer l.wg.Done() - buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet + buf := make([]byte, l.ReadBufferSize) for { n, src, err := l.conn.ReadFrom(buf) receiveTime := time.Now() @@ -88,7 +88,7 @@ func (l *packetListener) listenConnection(onConnection CallbackConnection, onErr defer l.wg.Done() defer l.conn.Close() - buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet + buf := make([]byte, l.ReadBufferSize) for { // Wait for packets and read them n, src, err := l.conn.ReadFrom(buf) @@ -133,7 +133,7 @@ func (l *packetListener) listenConnection(onConnection CallbackConnection, onErr }() } -func (l *packetListener) setupUnixgram(u *url.URL, socketMode string) error { +func (l *packetListener) setupUnixgram(u *url.URL, socketMode string, bufferSize int) error { l.path = filepath.FromSlash(u.Path) if runtime.GOOS == "windows" && strings.Contains(l.path, ":") { l.path = strings.TrimPrefix(l.path, `\`) @@ -162,6 +162,12 @@ func (l *packetListener) setupUnixgram(u *url.URL, socketMode string) error { } } + if bufferSize > 0 { + l.ReadBufferSize = bufferSize + } else { + l.ReadBufferSize = 64 * 1024 // 64kb - IP packet size + } + return l.setupDecoder() } @@ -198,6 +204,7 @@ func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) err } } + l.ReadBufferSize = 64 * 1024 // 64kb - IP packet size l.conn = conn return l.setupDecoder() } @@ -208,6 +215,7 @@ func (l *packetListener) setupIP(u *url.URL) error { return fmt.Errorf("listening (ip) failed: %w", err) } + l.ReadBufferSize = 64 * 1024 // 64kb - IP packet size l.conn = conn return l.setupDecoder() } diff --git a/plugins/common/socket/socket.go b/plugins/common/socket/socket.go index ed5bc499cd15e..4f12a9a0fe53f 100644 --- a/plugins/common/socket/socket.go +++ b/plugins/common/socket/socket.go @@ -136,7 +136,7 @@ func (s *Socket) Setup() error { s.listener = l case "unixgram": l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers) - if err := l.setupUnixgram(s.url, s.SocketMode); err != nil { + if err := l.setupUnixgram(s.url, s.SocketMode, int(s.ReadBufferSize)); err != nil { return err } s.listener = l diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go index 31585d457d6fd..d1b191c0d6f25 100644 --- a/plugins/inputs/socket_listener/socket_listener_test.go +++ b/plugins/inputs/socket_listener/socket_listener_test.go @@ -197,8 +197,8 @@ func TestSocketListener(t *testing.T) { } } -func TestLargeReadBuffer(t *testing.T) { - // Construct a buffer-size setting of 100KiB +func TestLargeReadBufferTCP(t *testing.T) { + // Construct a buffer-size setting of 1000KiB var bufsize config.Size require.NoError(t, bufsize.UnmarshalText([]byte("1000KiB"))) @@ -262,6 +262,92 @@ func TestLargeReadBuffer(t *testing.T) { testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) } +func TestLargeReadBufferUnixgram(t *testing.T) { + // Construct a buffer-size setting of 100KiB + // Assuming that the testing environment has net.core.wmem_max set to a value greater than 100KiB + if runtime.GOOS == "windows" { + t.Skip("Skipping on Windows, as unixgram sockets are not supported") + } + + if runtime.GOOS == "darwin" { + t.Skip("Skipping on macOS (darwin), as unixgram write buffer size cannot be changed (default 2048 bytes)") + } + + var bufsize config.Size + require.NoError(t, bufsize.UnmarshalText([]byte("100KiB"))) + + // Create a socket + sock, err := os.CreateTemp("", "sock-") + require.NoError(t, err) + defer sock.Close() + defer os.Remove(sock.Name()) + var serverAddr = sock.Name() + + // Setup plugin with a sufficient read buffer + plugin := &SocketListener{ + ServiceAddress: "unixgram" + "://" + serverAddr, + Config: socket.Config{ + ReadBufferSize: bufsize, + }, + Log: &testutil.Logger{}, + } + parser := &value.Parser{ + MetricName: "test", + DataType: "string", + } + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + + // Create a large message with the readbuffer size + message := bytes.Repeat([]byte{'a'}, int(bufsize)) + expected := []telegraf.Metric{ + metric.New( + "test", + map[string]string{}, + map[string]interface{}{"value": string(message)}, + time.Unix(0, 0), + ), + } + + // Start the plugin + var acc testutil.Accumulator + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + addr := plugin.socket.Address() + + // Setup the client for submitting data + client, err := createClient(plugin.ServiceAddress, addr, nil) + require.NoError(t, err) + defer client.Close() + + // Check the socket write buffer size + unixConn, ok := client.(*net.UnixConn) + require.True(t, ok, "client is not a *net.UnixConn") + if err := unixConn.SetWriteBuffer(len(message)); err != nil { + t.Skipf("Failed to set write buffer size: %v. Skipping test.", err) + } + + // Write the message + _, err = client.Write(message) + require.NoError(t, err) + client.Close() + + getError := func() error { + acc.Lock() + defer acc.Unlock() + return acc.FirstError() + } + + // Test the resulting metrics and compare against expected results + require.Eventuallyf(t, func() bool { + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "did not receive metrics (%d): %v", acc.NMetrics(), getError()) + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) +} + func TestCases(t *testing.T) { // Get all directories in testdata folders, err := os.ReadDir("testcases")