From 4d00ff87572bcb42877acdfd4cf5909938c4b740 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Wed, 8 Jul 2015 19:07:50 +0500 Subject: [PATCH] Change internal message format --- input_tcp.go | 32 ++++++++++++++------------------ input_tcp_test.go | 9 +++++---- output_tcp.go | 10 ++++++---- output_tcp_test.go | 20 +++++++++----------- 4 files changed, 34 insertions(+), 37 deletions(-) diff --git a/input_tcp.go b/input_tcp.go index 7eeb4888..2e26c62f 100644 --- a/input_tcp.go +++ b/input_tcp.go @@ -2,9 +2,11 @@ package main import ( "bufio" - "io" "log" "net" + "encoding/hex" + "fmt" + "os" ) // Can be tested using nc tool: @@ -59,24 +61,18 @@ func (i *TCPInput) handleConnection(conn net.Conn) { defer conn.Close() reader := bufio.NewReader(conn) + scanner := bufio.NewScanner(reader) + + for scanner.Scan() { + encodedPayload := scanner.Bytes() + // Hex encoding always 2x number of bytes + decoded := make([]byte, len(encodedPayload)/2) + hex.Decode(decoded, encodedPayload) + i.data <- decoded + } - for { - buf, err := reader.ReadBytes('¶') - if err == io.EOF { - return - } else if err != nil { - log.Println("Unexpected error in input tcp connection", err) - return - } - buf_len := len(buf) - if buf_len > 0 { - new_buf_len := len(buf) - 2 - if new_buf_len > 0 { - new_buf := make([]byte, new_buf_len) - copy(new_buf, buf[:new_buf_len]) - i.data <- new_buf - } - } + if err := scanner.Err(); err != nil { + fmt.Fprintln(os.Stderr, "Unexpected error in input tcp connection:", err) } } diff --git a/input_tcp_test.go b/input_tcp_test.go index fb8a2d13..0f76e4c1 100644 --- a/input_tcp_test.go +++ b/input_tcp_test.go @@ -6,6 +6,7 @@ import ( "net" "sync" "testing" + "encoding/hex" ) func TestTCPInput(t *testing.T) { @@ -38,10 +39,10 @@ func TestTCPInput(t *testing.T) { for i := 0; i < 100; i++ { wg.Add(1) - new_buf := make([]byte, len(msg)+2) - msg = append(msg, []byte("¶")...) - copy(new_buf, msg) - conn.Write(new_buf) + + encoded := make([]byte, len(msg)*2 + 1) + hex.Encode(encoded, msg) + conn.Write(append(encoded, '\n')) } wg.Wait() diff --git a/output_tcp.go b/output_tcp.go index 53eaa7fe..fb060256 100644 --- a/output_tcp.go +++ b/output_tcp.go @@ -6,6 +6,7 @@ import ( "log" "net" "time" + "encoding/hex" ) type TCPOutput struct { @@ -51,10 +52,11 @@ func (o *TCPOutput) worker() { } func (o *TCPOutput) Write(data []byte) (n int, err error) { - new_buf := make([]byte, len(data)+2) - data = append(data, []byte("¶")...) - copy(new_buf, data) - o.buf <- new_buf + // Hex encoding always 2x number of bytes + encoded := make([]byte, len(data)*2 + 1) + hex.Encode(encoded, data) + o.buf <- append(encoded, '\n') + if Settings.outputTCPStats { o.bufStats.Write(len(o.buf)) } diff --git a/output_tcp_test.go b/output_tcp_test.go index aaf0cf6f..e42f14a2 100644 --- a/output_tcp_test.go +++ b/output_tcp_test.go @@ -7,6 +7,7 @@ import ( "net" "sync" "testing" + "encoding/hex" ) func TestTCPOutput(t *testing.T) { @@ -48,17 +49,14 @@ func startTCP(cb func([]byte)) net.Listener { go func() { reader := bufio.NewReader(conn) - for { - buf, err := reader.ReadBytes('¶') - new_buf_len := len(buf) - 2 - new_buf := make([]byte, new_buf_len) - copy(new_buf, buf[:new_buf_len]) - if err != nil { - if err != io.EOF { - log.Printf("error: %s\n", err) - } - } - cb(new_buf) + scanner := bufio.NewScanner(reader) + + for scanner.Scan() { + encodedPayload := scanner.Bytes() + // Hex encoding always 2x number of bytes + decoded := make([]byte, len(encodedPayload)/2) + hex.Decode(decoded, encodedPayload) + cb(decoded) } }() }