Skip to content

Commit

Permalink
Merge pull request #164 from buger/change-tcp-output-format
Browse files Browse the repository at this point in the history
Change internal message format
  • Loading branch information
buger committed Jul 8, 2015
2 parents 8f73233 + 4d00ff8 commit 18c0473
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 37 deletions.
32 changes: 14 additions & 18 deletions input_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package main

import (
"bufio"
"io"
"log"
"net"
"encoding/hex"
"fmt"
"os"
)

// Can be tested using nc tool:
Expand Down Expand Up @@ -59,24 +61,18 @@ func (i *TCPInput) handleConnection(conn net.Conn) {
defer conn.Close()

reader := bufio.NewReader(conn)
scanner := bufio.NewScanner(reader)

for scanner.Scan() {
encodedPayload := scanner.Bytes()
// Hex encoding always 2x number of bytes
decoded := make([]byte, len(encodedPayload)/2)
hex.Decode(decoded, encodedPayload)
i.data <- decoded
}

for {
buf, err := reader.ReadBytes('¶')
if err == io.EOF {
return
} else if err != nil {
log.Println("Unexpected error in input tcp connection", err)
return
}
buf_len := len(buf)
if buf_len > 0 {
new_buf_len := len(buf) - 2
if new_buf_len > 0 {
new_buf := make([]byte, new_buf_len)
copy(new_buf, buf[:new_buf_len])
i.data <- new_buf
}
}
if err := scanner.Err(); err != nil {
fmt.Fprintln(os.Stderr, "Unexpected error in input tcp connection:", err)
}
}

Expand Down
9 changes: 5 additions & 4 deletions input_tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"sync"
"testing"
"encoding/hex"
)

func TestTCPInput(t *testing.T) {
Expand Down Expand Up @@ -38,10 +39,10 @@ func TestTCPInput(t *testing.T) {

for i := 0; i < 100; i++ {
wg.Add(1)
new_buf := make([]byte, len(msg)+2)
msg = append(msg, []byte("¶")...)
copy(new_buf, msg)
conn.Write(new_buf)

encoded := make([]byte, len(msg)*2 + 1)
hex.Encode(encoded, msg)
conn.Write(append(encoded, '\n'))
}

wg.Wait()
Expand Down
10 changes: 6 additions & 4 deletions output_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"net"
"time"
"encoding/hex"
)

type TCPOutput struct {
Expand Down Expand Up @@ -51,10 +52,11 @@ func (o *TCPOutput) worker() {
}

func (o *TCPOutput) Write(data []byte) (n int, err error) {
new_buf := make([]byte, len(data)+2)
data = append(data, []byte("¶")...)
copy(new_buf, data)
o.buf <- new_buf
// Hex encoding always 2x number of bytes
encoded := make([]byte, len(data)*2 + 1)
hex.Encode(encoded, data)
o.buf <- append(encoded, '\n')

if Settings.outputTCPStats {
o.bufStats.Write(len(o.buf))
}
Expand Down
20 changes: 9 additions & 11 deletions output_tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"sync"
"testing"
"encoding/hex"
)

func TestTCPOutput(t *testing.T) {
Expand Down Expand Up @@ -48,17 +49,14 @@ func startTCP(cb func([]byte)) net.Listener {

go func() {
reader := bufio.NewReader(conn)
for {
buf, err := reader.ReadBytes('¶')
new_buf_len := len(buf) - 2
new_buf := make([]byte, new_buf_len)
copy(new_buf, buf[:new_buf_len])
if err != nil {
if err != io.EOF {
log.Printf("error: %s\n", err)
}
}
cb(new_buf)
scanner := bufio.NewScanner(reader)

for scanner.Scan() {
encodedPayload := scanner.Bytes()
// Hex encoding always 2x number of bytes
decoded := make([]byte, len(encodedPayload)/2)
hex.Decode(decoded, encodedPayload)
cb(decoded)
}
}()
}
Expand Down

0 comments on commit 18c0473

Please sign in to comment.