Skip to content

Commit

Permalink
ToxicStub allow to unblocking write to Output
Browse files Browse the repository at this point in the history
It could happen when Link has no reciever and there is some packets in buffer.
It produces deadlock.
  • Loading branch information
miry committed Sep 7, 2022
1 parent dc4b9e4 commit 19263b7
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 16 deletions.
11 changes: 10 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
OS := $(shell uname -s)
ARCH := $(shell uname -m)
GO_VERSION := $(shell go version | cut -f3 -d" ")
GO_MINOR_VERSION := $(shell echo $(GO_VERSION) | cut -f2 -d.)
GO_PATCH_VERSION := $(shell echo $(GO_VERSION) | cut -f3 -d. | sed "s/^\s*$$/0/")
Expand All @@ -13,8 +14,9 @@ test:
$(MALLOC_ENV) go test -v -race -timeout 1m ./...

.PHONY: test-e2e
test-e2e: build
test-e2e: build container.build
scripts/test-e2e
timeout -v --preserve-status --foreground 10m scripts/test-e2e-hazelcast toxiproxy

.PHONY: test-release
test-release: test bench test-e2e release-dry
Expand Down Expand Up @@ -45,6 +47,13 @@ build: dist clean
go build -ldflags="-s -w" -o ./dist/toxiproxy-server ./cmd/server
go build -ldflags="-s -w" -o ./dist/toxiproxy-cli ./cmd/cli

.PHONY: container.build
container.build:
env GOOS=linux CGO_ENABLED=0 go build -ldflags="-s -w" -o ./dist/toxiproxy-server-linux-$(ARCH) ./cmd/server
env GOOS=linux CGO_ENABLED=0 go build -ldflags="-s -w" -o ./dist/toxiproxy-cli-linux-$(ARCH) ./cmd/cli
docker build -f Dockerfile -t toxiproxy dist
docker run --rm toxiproxy --version

.PHONY: release
release:
goreleaser release --rm-dist
Expand Down
48 changes: 36 additions & 12 deletions link.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net"
"time"

"github.com/rs/zerolog"

Expand Down Expand Up @@ -73,7 +74,10 @@ func (link *ToxicLink) Start(
dest io.WriteCloser,
) {
logger := link.Logger
logger.Debug().Msg("Setup connection")
logger.
Debug().
Str("direction", link.Direction()).
Msg("Setup connection")

labels := []string{
link.Direction(),
Expand Down Expand Up @@ -133,23 +137,34 @@ func (link *ToxicLink) read(
func (link *ToxicLink) write(
metricLabels []string,
name string,
server *ApiServer,
server *ApiServer, // TODO: Replace with AppConfig for Metrics and Logger
dest io.WriteCloser,
) {
logger := link.Logger
logger := link.Logger.
With().
Str("component", "ToxicLink").
Str("method", "write").
Str("link", name).
Str("proxy", link.proxy.Name).
Str("link_addr", fmt.Sprintf("%p", link)).
Logger()

bytes, err := io.Copy(dest, link.output)
if err != nil {
logger.Warn().
Int64("bytes", bytes).
Err(err).
Msg("Destination terminated")
}
if server.Metrics.proxyMetricsEnabled() {
Msg("Could not write to destination")
logger.Trace().Msgf("link.output size is still have something!: %+v", link.output)
} else if server.Metrics.proxyMetricsEnabled() {
server.Metrics.ProxyMetrics.SentBytesTotal.
WithLabelValues(metricLabels...).Add(float64(bytes))
}

dest.Close()
logger.Trace().Msgf("Remove link %s from ToxicCollection", name)
link.toxics.RemoveLink(name)
logger.Trace().Msgf("RemoveConnection %s from Proxy %s", name, link.proxy.Name)
link.proxy.RemoveConnection(name)
}

Expand Down Expand Up @@ -213,9 +228,9 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp

log.Trace().Msg("Interrupt the previous toxic to update its output")
stop := make(chan bool)
go func() {
stop <- link.stubs[toxic_index-1].InterruptToxic()
}()
go func(stub *toxics.ToxicStub, stop chan bool) {
stop <- stub.InterruptToxic()
}(link.stubs[toxic_index-1], stop)

// Unblock the previous toxic if it is trying to flush
// If the previous toxic is closed, continue flusing until we reach the end.
Expand All @@ -231,9 +246,14 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp
if !stopped {
<-stop
}
return
return // TODO: There are some steps after this to clean buffer
}

err := link.stubs[toxic_index].WriteOutput(tmp, 5*time.Second)
if err != nil {
log.Err(err).
Msg("Could not write last packets after interrupt to Output")
}
link.stubs[toxic_index].Output <- tmp
}
}

Expand All @@ -244,7 +264,11 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp
link.stubs[toxic_index].Close()
return
}
link.stubs[toxic_index].Output <- tmp
err := link.stubs[toxic_index].WriteOutput(tmp, 5*time.Second)
if err != nil {
log.Err(err).
Msg("Could not write last packets after interrupt to Output")
}
}

link.stubs[toxic_index-1].Output = link.stubs[toxic_index].Output
Expand Down
62 changes: 62 additions & 0 deletions link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func TestStateCreated(t *testing.T) {
if flag.Lookup("test.v").DefValue == "true" {
log = zerolog.New(os.Stdout).With().Caller().Timestamp().Logger()
}

link := NewToxicLink(nil, collection, stream.Downstream, log)
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
collection.links["test"] = link
Expand All @@ -261,3 +262,64 @@ func TestStateCreated(t *testing.T) {
t.Fatalf("New toxic did not have state object created.")
}
}

func TestRemoveToxicWithBrokenConnection(t *testing.T) {
ctx := context.Background()

log := zerolog.Nop()
if flag.Lookup("test.v").DefValue == "true" {
log = zerolog.New(os.Stdout).With().Caller().Timestamp().Logger()
}
ctx = log.WithContext(ctx)

collection := NewToxicCollection(nil)
link := NewToxicLink(nil, collection, stream.Downstream, log)
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
collection.links["test"] = link

toxics := [2]*toxics.ToxicWrapper{
{
Toxic: &toxics.BandwidthToxic{
Rate: 0,
},
Type: "bandwidth",
Direction: stream.Downstream,
Toxicity: 1,
},
{
Toxic: &toxics.BandwidthToxic{
Rate: 0,
},
Type: "bandwidth",
Direction: stream.Upstream,
Toxicity: 1,
},
}

collection.chainAddToxic(toxics[0])
collection.chainAddToxic(toxics[1])

done := make(chan struct{})
defer close(done)

var data uint16 = 42
go func(log zerolog.Logger) {
for {
select {
case <-done:
link.input.Close()
return
case <-time.After(10 * time.Second):
log.Print("Finish load")
return
default:
buf := make([]byte, 2)
binary.BigEndian.PutUint16(buf, data)
link.input.Write(buf)
}
}
}(log)

collection.chainRemoveToxic(ctx, toxics[0])
collection.chainRemoveToxic(ctx, toxics[1])
}
31 changes: 31 additions & 0 deletions scripts/hazelcast.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>

<hazelcast xmlns="http://www.hazelcast.com/schema/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.hazelcast.com/schema/config
http://www.hazelcast.com/schema/config/hazelcast-config-5.1.xsd">

<properties>
<property name="hazelcast.merge.next.run.delay.seconds">15</property>
<property name="hazelcast.merge.first.run.delay.seconds">20</property>
<property name="hazelcast.partition.migration.chunks.enabled">false</property>
<property name="hazelcast.heartbeat.failuredetector.type">deadline</property>
<property name="hazelcast.heartbeat.interval.seconds">3</property>
<property name="hazelcast.max.no.heartbeat.seconds">10</property>
</properties>

<network>
<public-address>member-proxy:${proxyPort}</public-address>
<port auto-increment="false">5701</port>
<join>
<auto-detection enabled="false"/>
<tcp-ip enabled="true">
<member-list>
<member>member-proxy:${proxyPort0}</member>
<member>member-proxy:${proxyPort1}</member>
<member>member-proxy:${proxyPort2}</member>
</member-list>
</tcp-ip>
</join>
</network>
</hazelcast>
5 changes: 4 additions & 1 deletion scripts/test-e2e
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ function cleanup() {
}
trap "cleanup" EXIT SIGINT SIGTERM

echo "= Toxiproxy E2E tests"
echo
echo "== Setup"
echo
echo "=== Starting Web service"

pkill -15 "toxiproxy-server" || true
Expand Down Expand Up @@ -56,7 +60,6 @@ cli toggle shopify_http
echo -e "-----------------\n"

echo "== Benchmarking"

echo
echo "=== Without toxics"

Expand Down
137 changes: 137 additions & 0 deletions scripts/test-e2e-hazelcast
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#!/bin/bash

# Usage:
# test-e2e-hazelcast [docker image name for toxiproxy]

set -ueo pipefail

cd "$(dirname "$0")"

toxiproxy="../dist/toxiproxy-cli"
state="started"

wait_for_url() {
curl -s --retry-connrefused --retry 5 --retry-delay 2 --retry-max-time 30 \
--max-time 1 -L -I -X GET "${1}"
}

# Stop all background jobs on exit
function cleanup() {
echo -e "\n\n== Teardown: state=${state}"
if [[ $state != "success" ]]; then
docker kill -s SIGQUIT member-proxy
docker logs -t member-proxy
fi
docker stop member-proxy member0 member1 member2 || true
docker network rm toxiproxy-e2e || true
}
trap "cleanup" EXIT SIGINT SIGTERM

IMAGE_HAZELCAST="hazelcast/hazelcast:5.1.2-slim"
IMAGE_TOXIPROXY="${1:-ghcr.io/shopify/toxiproxy:2.4.0}"
TOXIPROXY_BASE_URL="http://localhost:8474"

echo "= Toxiproxy E2E tests with Hazelcast cluster"
echo
echo "== Setup"
echo
echo "=== Starting Toxiproxy"

docker rm -f member-proxy member0 member1 member2 &>/dev/null
docker network rm toxiproxy-e2e &>/dev/null || true

docker network create --subnet 172.18.5.0/24 toxiproxy-e2e

docker run --rm -t "${IMAGE_TOXIPROXY}" --version
docker run -d \
--name member-proxy \
--network toxiproxy-e2e \
--ip 172.18.5.2 \
-p 8474:8474 \
-e LOG_LEVEL=trace \
"$IMAGE_TOXIPROXY"

echo "=== Wait Toxiproxy API is available"
wait_for_url "${TOXIPROXY_BASE_URL}/version"

echo "=== Prepare proxies for Hazelcast cluster"
for i in {0..2}; do
echo "> Create proxy for member${i} on port 600${i}"
# curl --data "{\"name\": \"member${i}\", \"upstream\": \"member${i}:5701\", \"listen\": \"0.0.0.0:600${i}\"}" "${TOXIPROXY_BASE_URL}/proxies"
$toxiproxy create -l "0.0.0.0:600${i}" -u "member${i}:5701" "member${i}"
echo
done

echo
echo "=== Strating Hazelcast containers"
for i in {0..2}; do
echo "> Start Hazelcast on host member${i}"
docker run -d --rm \
--name "member${i}" \
--network toxiproxy-e2e \
--ip "172.18.5.1${i}" \
--volume "${PWD}/hazelcast.xml:/opt/hazelcast/config/hazelcast-docker.xml" \
--env HZ_PHONE_HOME_ENABLED=false \
--env JAVA_OPTS="-DproxyPort=600${i} -DproxyPort0=6000 -DproxyPort1=6001 -DproxyPort2=6002" \
"$IMAGE_HAZELCAST"
done

echo "> Wait for cluster join (30s)..."
sleep 30

echo "> Output of member0"
docker logs -t -n 10 member0

echo
echo "=== Initialize toxics for cluster"
for i in {0..2}; do
echo "> Adding toxics to member${i} proxy"
# curl --data "{\"name\": \"member${i}_downstream\", \"stream\": \"downstream\", \"toxicity\": 1.0, \"type\": \"bandwidth\", \"attributes\": { \"rate\": 0 }}" "${TOXIPROXY_BASE_URL}/proxies/member${i}/toxics"
$toxiproxy toxic add --type=bandwidth \
--downstream \
--toxicName="member${i}_downstream" \
--attribute="rate=0" \
--toxicity=1 \
"member${i}"
# curl --data "{\"name\": \"member${i}_upstream\", \"stream\": \"upstream\", \"toxicity\": 1.0, \"type\": \"bandwidth\", \"attributes\": { \"rate\": 0 }}" "${TOXIPROXY_BASE_URL}/proxies/member${i}/toxics"
$toxiproxy toxic add --type=bandwidth \
--upstream \
--toxicName="member${i}_upstream" \
--attribute="rate=0" \
--toxicity=1 \
"member${i}"
echo
$toxiproxy inspect "member${i}"
echo
done

echo "=== Wait for a the Hazelcast cluster split-brain (60s)..."
sleep 60

echo "=== Validate output of Toxiproxy and single member"
docker logs -t -n 10 member0
docker logs -t -n 10 member-proxy

echo "=== Removing toxics from proxies"
for i in {0..2}; do
echo "[$(date)] > Remove downstream bandwith Toxic for member${i} proxy"
# curl -X DELETE "${TOXIPROXY_BASE_URL}/proxies/member${i}/toxics/member${i}_downstream"
$toxiproxy toxic delete --toxicName="member${i}_downstream" "member${i}"
echo "[$(date)] > Remove ustream bandwith Toxic for member${i} proxy"
# curl -X DELETE "${TOXIPROXY_BASE_URL}/proxies/member${i}/toxics/member${i}_upstream"
$toxiproxy toxic delete --toxicName="member${i}_upstream" "member${i}"
done

echo "=== Validate output of Toxiproxy and single member after removing toxics"
docker logs -t -n 10 member0
docker logs -t -n 10 member-proxy

$toxiproxy list
$toxiproxy inspect member0
$toxiproxy inspect member1
$toxiproxy inspect member2

echo -e "=================\n"

echo "Succcess!"
state="success"
Loading

0 comments on commit 19263b7

Please sign in to comment.