diff --git a/pkg/station/lib/proxies.go b/pkg/station/lib/proxies.go index 9aee3f9b..fba7171a 100644 --- a/pkg/station/lib/proxies.go +++ b/pkg/station/lib/proxies.go @@ -154,8 +154,26 @@ func halfPipe(src net.Conn, dst net.Conn, buf := make([]byte, 32*1024) for { nr, er := src.Read(buf) + if er != nil { + if nr > len(buf) { + log.Errorf("unexpected read len error - up:%t (%dB): %s", isUpload, nr, er) + } + if e := generalizeErr(er); e != nil { + if isUpload { + stats.ClientConnErr = e.Error() + } else { + stats.CovertConnErr = e.Error() + } + } + break + } if nr > 0 { - nw, ew := dst.Write(buf[0:nr]) + if nr > len(buf) && er == nil { + log.Errorf("unexpected read len error - up:%t (%dB)", isUpload, nr) + } + + toWrite := int(math.Min(float64(len(buf)), float64(nr))) + nw, ew := dst.Write(buf[:toWrite]) // Update stats: stats.addBytes(int64(nw), isUpload) @@ -181,16 +199,6 @@ func halfPipe(src net.Conn, dst net.Conn, } } - if er != nil { - if e := generalizeErr(er); e != nil { - if isUpload { - stats.ClientConnErr = e.Error() - } else { - stats.CovertConnErr = e.Error() - } - } - break - } // refresh stall timeout - set both because it only happens on write so if connection is // sending traffic unidirectionally we prevent the receiving side from timing out. diff --git a/pkg/station/lib/proxies_test.go b/pkg/station/lib/proxies_test.go index 805d9ed3..f9003ee8 100644 --- a/pkg/station/lib/proxies_test.go +++ b/pkg/station/lib/proxies_test.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "io" + mrand "math/rand" "net" "os" "sync" @@ -21,8 +22,6 @@ import ( "github.com/refraction-networking/conjure/pkg/station/log" ) -var errNotExist = errors.New("not implemented") - // under construction - not finalized or definitive // TODO: flesh out this test, or disable it. The go routines have a race condition that can result // in the test being useless. @@ -79,14 +78,14 @@ type mockConn struct { func (m *mockConn) Read(b []byte) (n int, err error) { if m.read == nil { - return 0, errNotExist + return 0, nil } return m.read(b) } func (m *mockConn) Write(b []byte) (n int, err error) { if m.write == nil { - return 0, errNotExist + return 0, nil } return m.write(b) } @@ -94,7 +93,7 @@ func (m *mockConn) Write(b []byte) (n int, err error) { // Close closes the connection. func (m *mockConn) Close() error { if m.close == nil { - return errNotExist + return nil } return m.close() } @@ -267,3 +266,79 @@ func TestHalfpipeDeadlineActual(t *testing.T) { covertCovert.Close() wg.Wait() } + +// Test large writes and what happens when short write error is hit +func TestHalfpipeLargeWrite(t *testing.T) { + + inbuf := make([]byte, 32805) + + n, err := mrand.Read(inbuf) + require.Nil(t, err) + require.Equal(t, len(inbuf), n) + + clientClient, clientStation := net.Pipe() + stationCovert, covertCovert := net.Pipe() + + logger := log.New(os.Stdout, "", 0) + logger.SetLevel(log.TraceLevel) + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + b := make([]byte, 1024) + _, _ = io.CopyBuffer(io.Discard, covertCovert, b) + }() + + go halfPipe(clientStation, stationCovert, &wg, logger, "Up "+"XXXXXX", &tunnelStats{proxyStats: getProxyStats()}) + go halfPipe(stationCovert, clientStation, &wg, logger, "Down "+"XXXXXX", &tunnelStats{proxyStats: getProxyStats()}) + + nw, err := clientClient.Write(inbuf) + require.Nil(t, err) + + require.Equal(t, len(inbuf), nw) + + clientClient.Close() + covertCovert.Close() + wg.Wait() +} + +func TestHalfpipeUnreliableReader(t *testing.T) { + + inbuf := make([]byte, 32805) + + n, err := mrand.Read(inbuf) + require.Nil(t, err) + require.Equal(t, len(inbuf), n) + + r := 0 + + clientConn := &mockConn{ + read: func(b []byte) (n int, err error) { + // It cant possibly send all of the data in one write so the read will return a bad + // length here. + copy(b, inbuf) + // swap this to 0 to see the case where the read len is incorrect, but no error occurs + if r != 0 { + return len(inbuf), nil + } else { + return len(inbuf), errors.New("bad length") + } + }, + write: func(b []byte) (n int, err error) { + return len(b), nil + }, + close: func() error { + return nil + }, + } + stationConn := &mockConn{} + + logger := log.New(os.Stdout, "", 0) + logger.SetLevel(log.TraceLevel) + wg := new(sync.WaitGroup) + wg.Add(1) + + go halfPipe(clientConn, stationConn, wg, logger, "Up "+"XXXXXX", &tunnelStats{proxyStats: getProxyStats()}) + + wg.Wait() +} diff --git a/scripts/install_pfring.sh b/scripts/install_pfring.sh index a877c2bc..71dc2929 100755 --- a/scripts/install_pfring.sh +++ b/scripts/install_pfring.sh @@ -33,7 +33,7 @@ else make install mkdir -p /local/include/linux/ cp linux/pf_ring.h /usr/local/include/linux/ - cd .. - make + cd ../userland + make all make install fi