Skip to content

Commit

Permalink
reset toxic
Browse files Browse the repository at this point in the history
  • Loading branch information
chaosbox authored and rmani committed Feb 24, 2019
1 parent 07d7b63 commit 1f539fb
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 2 deletions.
23 changes: 21 additions & 2 deletions link.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package toxiproxy

import (
"io"

"github.com/Shopify/toxiproxy/stream"
"github.com/Shopify/toxiproxy/toxics"
"github.com/sirupsen/logrus"
"io"
"net"
)

// ToxicLinks are single direction pipelines that connects an input and output via
Expand Down Expand Up @@ -54,6 +54,7 @@ func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Di

// Start the link with the specified toxics
func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) {

go func() {
bytes, err := io.Copy(link.input, source)
if err != nil {
Expand All @@ -69,6 +70,24 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser)
if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok {
link.stubs[i].State = stateful.NewState()
}
if _, ok := toxic.Toxic.(*toxics.ResetToxic); ok {
if err := source.(*net.TCPConn).SetLinger(0); err != nil {
logrus.WithFields(logrus.Fields{
"name": link.proxy.Name,
"toxic": toxic.Type,
"err": err,
}).Error("source: Unable to setLinger(ms)")

}
if err := dest.(*net.TCPConn).SetLinger(0); err != nil {
logrus.WithFields(logrus.Fields{
"name": link.proxy.Name,
"toxic": toxic.Type,
"err": err,
}).Error("dest: Unable to setLinger(ms)")

}
}

go link.stubs[i].Run(toxic)
}
Expand Down
37 changes: 37 additions & 0 deletions toxics/reset_peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package toxics

import (
"time"
)

/*
The ResetToxic sends closes the connection abruptly after a timeout (in ms). The behaviour of Close is set to discard any unsent/unacknowledged data by setting SetLinger to 0,
~= sets TCP RST flag and resets the connection.
If the timeout is set to 0, then the connection will be reset immediately.
Drop data since it will initiate a graceful close by sending the FIN/ACK. (io.EOF)
*/

type ResetToxic struct {
// Timeout in milliseconds
Timeout int64 `json:"timeout"`
}

func (t *ResetToxic) Pipe(stub *ToxicStub) {
timeout := time.Duration(t.Timeout) * time.Millisecond

for {
select {
case <-stub.Interrupt:
return
case <-stub.Input:
<-time.After(timeout)
stub.Close()
return
}
}
}

func init() {
Register("reset_peer", new(ResetToxic))
}
64 changes: 64 additions & 0 deletions toxics/reset_peer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package toxics_test

import (
"github.com/Shopify/toxiproxy"
"github.com/Shopify/toxiproxy/toxics"
"io"
"net"
"os"
"syscall"
"testing"
"time"
)

const msg = "reset toxic payload\n"

func TestResetToxicNoTimeout(t *testing.T) {
WithEchoProxy(t, func(conn net.Conn, response chan []byte, proxy *toxiproxy.Proxy) {
addToxicAndWritePayload(t, conn, proxy, toxics.ResetToxic{}, "upstream")
checkConnectionState(t, conn)
})
}

func TestResetToxicWithTimeout(t *testing.T) {
WithEchoProxy(t, func(conn net.Conn, response chan []byte, proxy *toxiproxy.Proxy) {
resetToxic := toxics.ResetToxic{Timeout: 100}
addToxicAndWritePayload(t, conn, proxy, resetToxic, "upstream")
start := time.Now()
checkConnectionState(t, conn)
AssertDeltaTime(t, "Reset after timeout", time.Since(start), time.Duration(resetToxic.Timeout)*time.Millisecond, time.Duration(resetToxic.Timeout+10)*time.Millisecond)
})
}

func TestResetToxicWithTimeoutDownstream(t *testing.T) {
WithEchoProxy(t, func(conn net.Conn, response chan []byte, proxy *toxiproxy.Proxy) {
resetToxic := toxics.ResetToxic{Timeout: 100}
addToxicAndWritePayload(t, conn, proxy, resetToxic, "downstream")
start := time.Now()
checkConnectionState(t, conn)
AssertDeltaTime(t, "Reset after timeout", time.Since(start), time.Duration(resetToxic.Timeout)*time.Millisecond, time.Duration(resetToxic.Timeout+10)*time.Millisecond)
})
}

func addToxicAndWritePayload(t *testing.T, conn net.Conn, proxy *toxiproxy.Proxy, resetToxic toxics.ResetToxic, stream string) {
if _, err := proxy.Toxics.AddToxicJson(ToxicToJson(t, "resetconn", "reset_peer", stream, &resetToxic)); err != nil {
t.Error("AddToxicJson returned error:", err)
}
if _, err := conn.Write([]byte(msg)); err != nil {
t.Error("Failed writing TCP payload", err)
}
}

func checkConnectionState(t *testing.T, conn net.Conn) {
tmp := make([]byte, 10)
_, err := conn.Read(tmp)
opErr, _ := err.(*net.OpError)
syscallErr, _ := opErr.Err.(*os.SyscallError)
if !(syscallErr.Err == syscall.ECONNRESET) {
t.Error("Expected: upstream - connection reset by peer. Got:", err)
}
_, err = conn.Read(tmp)
if err != io.EOF {
t.Error("expected EOF from closed connection")
}
}

0 comments on commit 1f539fb

Please sign in to comment.