Skip to content

Commit

Permalink
ToxicStub allow to unblocking write to Output
Browse files Browse the repository at this point in the history
It could happen when Link has no reciever and there is some packets in buffer.
It produces deadlock.

Dynamic test with latest version of toxiproxy
  • Loading branch information
miry committed Sep 10, 2022
1 parent b6acfcf commit 524143c
Show file tree
Hide file tree
Showing 11 changed files with 358 additions and 19 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
* Show uniq request id in API HTTP response. (#425, @miry)
* Add method to parse `stream.Direction` from string.
Allow to convert `stream.Direction` to string. (#430, @miry)
* Add posibility to write to Output with deadline.
On interrupting badnwidth toxic use non blocking write. (#436, @miry)

# [2.4.0] - 2022-03-07

Expand Down
11 changes: 10 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
OS := $(shell uname -s)
ARCH := $(shell uname -m)
GO_VERSION := $(shell go version | cut -f3 -d" ")
GO_MINOR_VERSION := $(shell echo $(GO_VERSION) | cut -f2 -d.)
GO_PATCH_VERSION := $(shell echo $(GO_VERSION) | cut -f3 -d. | sed "s/^\s*$$/0/")
Expand All @@ -13,8 +14,9 @@ test:
$(MALLOC_ENV) go test -v -race -timeout 1m ./...

.PHONY: test-e2e
test-e2e: build
test-e2e: build container.build
scripts/test-e2e
timeout -v --foreground 20m scripts/test-e2e-hazelcast toxiproxy

.PHONY: test-release
test-release: test bench test-e2e release-dry
Expand Down Expand Up @@ -45,6 +47,13 @@ build: dist clean
go build -ldflags="-s -w" -o ./dist/toxiproxy-server ./cmd/server
go build -ldflags="-s -w" -o ./dist/toxiproxy-cli ./cmd/cli

.PHONY: container.build
container.build:
env GOOS=linux CGO_ENABLED=0 go build -ldflags="-s -w" -o ./dist/toxiproxy-server-linux-$(ARCH) ./cmd/server
env GOOS=linux CGO_ENABLED=0 go build -ldflags="-s -w" -o ./dist/toxiproxy-cli-linux-$(ARCH) ./cmd/cli
docker build -f Dockerfile -t toxiproxy dist
docker run --rm toxiproxy --version

.PHONY: release
release:
goreleaser release --rm-dist
Expand Down
4 changes: 2 additions & 2 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func stopBrowsersMiddleware(next http.Handler) http.Handler {
}

func timeoutMiddleware(next http.Handler) http.Handler {
return http.TimeoutHandler(next, 30*time.Second, "")
return http.TimeoutHandler(next, 25*time.Second, "")
}

type ApiServer struct {
Expand Down Expand Up @@ -121,7 +121,7 @@ func (server *ApiServer) Listen(host string, port string) {
srv := &http.Server{
Handler: r,
Addr: net.JoinHostPort(host, port),
WriteTimeout: 10 * time.Second,
WriteTimeout: 30 * time.Second,
ReadTimeout: 10 * time.Second,
}

Expand Down
49 changes: 36 additions & 13 deletions link.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net"
"time"

"github.com/rs/zerolog"

Expand Down Expand Up @@ -73,7 +74,10 @@ func (link *ToxicLink) Start(
dest io.WriteCloser,
) {
logger := link.Logger
logger.Debug().Msg("Setup connection")
logger.
Debug().
Str("direction", link.Direction()).
Msg("Setup connection")

labels := []string{
link.Direction(),
Expand Down Expand Up @@ -133,23 +137,33 @@ func (link *ToxicLink) read(
func (link *ToxicLink) write(
metricLabels []string,
name string,
server *ApiServer,
server *ApiServer, // TODO: Replace with AppConfig for Metrics and Logger
dest io.WriteCloser,
) {
logger := link.Logger
logger := link.Logger.
With().
Str("component", "ToxicLink").
Str("method", "write").
Str("link", name).
Str("proxy", link.proxy.Name).
Str("link_addr", fmt.Sprintf("%p", link)).
Logger()

bytes, err := io.Copy(dest, link.output)
if err != nil {
logger.Warn().
Int64("bytes", bytes).
Err(err).
Msg("Destination terminated")
}
if server.Metrics.proxyMetricsEnabled() {
Msg("Could not write to destination")
} else if server.Metrics.proxyMetricsEnabled() {
server.Metrics.ProxyMetrics.SentBytesTotal.
WithLabelValues(metricLabels...).Add(float64(bytes))
}

dest.Close()
logger.Trace().Msgf("Remove link %s from ToxicCollection", name)
link.toxics.RemoveLink(name)
logger.Trace().Msgf("RemoveConnection %s from Proxy %s", name, link.proxy.Name)
link.proxy.RemoveConnection(name)
}

Expand Down Expand Up @@ -211,11 +225,11 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp
}
}

log.Trace().Msg("Interrupt the previous toxic to update its output")
log.Trace().Msg("Interrupting the previous toxic to update its output")
stop := make(chan bool)
go func() {
stop <- link.stubs[toxic_index-1].InterruptToxic()
}()
go func(stub *toxics.ToxicStub, stop chan bool) {
stop <- stub.InterruptToxic()
}(link.stubs[toxic_index-1], stop)

// Unblock the previous toxic if it is trying to flush
// If the previous toxic is closed, continue flusing until we reach the end.
Expand All @@ -231,9 +245,14 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp
if !stopped {
<-stop
}
return
return // TODO: There are some steps after this to clean buffer
}

err := link.stubs[toxic_index].WriteOutput(tmp, 5*time.Second)
if err != nil {
log.Err(err).
Msg("Could not write last packets after interrupt to Output")
}
link.stubs[toxic_index].Output <- tmp
}
}

Expand All @@ -244,7 +263,11 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp
link.stubs[toxic_index].Close()
return
}
link.stubs[toxic_index].Output <- tmp
err := link.stubs[toxic_index].WriteOutput(tmp, 5*time.Second)
if err != nil {
log.Err(err).
Msg("Could not write last packets after interrupt to Output")
}
}

link.stubs[toxic_index-1].Output = link.stubs[toxic_index].Output
Expand Down
62 changes: 62 additions & 0 deletions link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func TestStateCreated(t *testing.T) {
if flag.Lookup("test.v").DefValue == "true" {
log = zerolog.New(os.Stdout).With().Caller().Timestamp().Logger()
}

link := NewToxicLink(nil, collection, stream.Downstream, log)
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
collection.links["test"] = link
Expand All @@ -261,3 +262,64 @@ func TestStateCreated(t *testing.T) {
t.Fatalf("New toxic did not have state object created.")
}
}

func TestRemoveToxicWithBrokenConnection(t *testing.T) {
ctx := context.Background()

log := zerolog.Nop()
if flag.Lookup("test.v").DefValue == "true" {
log = zerolog.New(os.Stdout).With().Caller().Timestamp().Logger()
}
ctx = log.WithContext(ctx)

collection := NewToxicCollection(nil)
link := NewToxicLink(nil, collection, stream.Downstream, log)
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
collection.links["test"] = link

toxics := [2]*toxics.ToxicWrapper{
{
Toxic: &toxics.BandwidthToxic{
Rate: 0,
},
Type: "bandwidth",
Direction: stream.Downstream,
Toxicity: 1,
},
{
Toxic: &toxics.BandwidthToxic{
Rate: 0,
},
Type: "bandwidth",
Direction: stream.Upstream,
Toxicity: 1,
},
}

collection.chainAddToxic(toxics[0])
collection.chainAddToxic(toxics[1])

done := make(chan struct{})
defer close(done)

var data uint16 = 42
go func(log zerolog.Logger) {
for {
select {
case <-done:
link.input.Close()
return
case <-time.After(10 * time.Second):
log.Print("Finish load")
return
default:
buf := make([]byte, 2)
binary.BigEndian.PutUint16(buf, data)
link.input.Write(buf)
}
}
}(log)

collection.chainRemoveToxic(ctx, toxics[0])
collection.chainRemoveToxic(ctx, toxics[1])
}
31 changes: 31 additions & 0 deletions scripts/hazelcast.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>

<hazelcast xmlns="http://www.hazelcast.com/schema/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.hazelcast.com/schema/config
http://www.hazelcast.com/schema/config/hazelcast-config-5.1.xsd">

<properties>
<property name="hazelcast.merge.next.run.delay.seconds">15</property>
<property name="hazelcast.merge.first.run.delay.seconds">20</property>
<property name="hazelcast.partition.migration.chunks.enabled">false</property>
<property name="hazelcast.heartbeat.failuredetector.type">deadline</property>
<property name="hazelcast.heartbeat.interval.seconds">3</property>
<property name="hazelcast.max.no.heartbeat.seconds">10</property>
</properties>

<network>
<public-address>member-proxy:${proxyPort}</public-address>
<port auto-increment="false">5701</port>
<join>
<auto-detection enabled="false"/>
<tcp-ip enabled="true">
<member-list>
<member>member-proxy:${proxyPort0}</member>
<member>member-proxy:${proxyPort1}</member>
<member>member-proxy:${proxyPort2}</member>
</member-list>
</tcp-ip>
</join>
</network>
</hazelcast>
5 changes: 4 additions & 1 deletion scripts/test-e2e
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ function cleanup() {
}
trap "cleanup" EXIT SIGINT SIGTERM

echo "= Toxiproxy E2E tests"
echo
echo "== Setup"
echo
echo "=== Starting Web service"

pkill -15 "toxiproxy-server" || true
Expand Down Expand Up @@ -56,7 +60,6 @@ cli toggle shopify_http
echo -e "-----------------\n"

echo "== Benchmarking"

echo
echo "=== Without toxics"

Expand Down
Loading

0 comments on commit 524143c

Please sign in to comment.