Skip to content

Commit

Permalink
Retry on broken pipe in batch (#1423)
Browse files Browse the repository at this point in the history
* Release connections on error during Flush()

* Add test to illustrate broken batch flushes

* Use a dedicated test environment for broken connection test recover #1421

---------

Co-authored-by: Robert Gettys <rgettys@tesla.com>
  • Loading branch information
jkaflik and rgettys-tesla authored Oct 16, 2024
1 parent 269b0f3 commit 02efdee
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 5 deletions.
5 changes: 5 additions & 0 deletions conn_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"regexp"
"slices"
"syscall"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -286,6 +287,10 @@ func (b *batch) Flush() error {
}
if b.block.Rows() != 0 {
if err := b.conn.sendData(b.block, ""); err != nil {
// broken pipe/conn reset aren't generally recoverable on retry
if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
b.release(err)
}
return err
}
if b.closeOnFlush {
Expand Down
67 changes: 67 additions & 0 deletions tests/issues/1421_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package issues

import (
"context"
"errors"
"os"
"syscall"
"testing"

"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/ClickHouse/clickhouse-go/v2/tests"
"github.com/docker/docker/api/types/container"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
)

//goland:noinspection ALL
const insertQry = "INSERT INTO test (foo, foo2)"

func Test1421BatchFlushBrokenConn(t *testing.T) {
// create a dedicated test environment for this test
// note: test environment management is a bit messy, consider refactoring
env, err := tests.CreateClickHouseTestEnvironment(t.Name())
tests.SetTestEnvironment(t.Name(), env)
require.NoError(t, tests.CreateDatabase(t.Name()))

require.NoError(t, err)
require.NotNil(t, env)
ctx := context.Background()
client, err := testcontainers.NewDockerClientWithOpts(ctx)
require.NoError(t, err)
chClient, err := tests.TestClientWithDefaultSettings(env)

err = chClient.Exec(ctx, "CREATE TABLE test (foo String, foo2 String) ENGINE = MergeTree ORDER BY (foo)")
require.NoError(t, err)
batch, err := chClient.PrepareBatch(ctx, insertQry, driver.WithCloseOnFlush())
require.NoError(t, err)
err = batch.Append("bar", "bar")
require.NoError(t, err)
err = batch.Flush()
require.NoError(t, err)
err = batch.Append("bar2", "bar2")
require.NoError(t, err)
err = batch.Flush()
require.NoError(t, err)

err = batch.Append(RandAsciiString(200000000), RandAsciiString(20000000))

require.NoError(t, err)
ch := make(chan struct{})
go func() {
err = batch.Flush()
close(ch)
}()
//timeout := 0
err2 := client.ContainerKill(ctx, env.ContainerID, "KILL")
<-ch
require.NoError(t, err2)
require.True(t, errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET))
err = client.ContainerStart(ctx, env.ContainerID, container.StartOptions{})
require.NoError(t, err)
err = batch.Flush()
// retry after server is up should have either no error, or a reconnect error (for example because the mapped port
// changed on container startup)
require.True(t, err == nil || errors.Is(err, syscall.ECONNREFUSED) || os.IsTimeout(err), err)

}
12 changes: 7 additions & 5 deletions tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func GetClickHouseTestVersion() string {
}

type ClickHouseTestEnvironment struct {
ContainerID string
Port int
HttpPort int
SslPort int
Expand Down Expand Up @@ -203,11 +204,12 @@ func CreateClickHouseTestEnvironment(testSet string) (ClickHouseTestEnvironment,
hps, _ := clickhouseContainer.MappedPort(ctx, "8443")
ip, _ := clickhouseContainer.ContainerIP(ctx)
testEnv := ClickHouseTestEnvironment{
Port: p.Int(),
HttpPort: hp.Int(),
SslPort: sslPort.Int(),
HttpsPort: hps.Int(),
Host: "127.0.0.1",
ContainerID: clickhouseContainer.GetContainerID(),
Port: p.Int(),
HttpPort: hp.Int(),
SslPort: sslPort.Int(),
HttpsPort: hps.Int(),
Host: "127.0.0.1",
// we set this explicitly - note its also set in the /etc/clickhouse-server/users.d/admin.xml
Username: "default",
Password: "ClickHouse",
Expand Down

0 comments on commit 02efdee

Please sign in to comment.