diff --git a/conn_batch.go b/conn_batch.go index a729b2d238..d65300621c 100644 --- a/conn_batch.go +++ b/conn_batch.go @@ -23,6 +23,7 @@ import ( "os" "regexp" "slices" + "syscall" "time" "github.com/pkg/errors" @@ -286,6 +287,10 @@ func (b *batch) Flush() error { } if b.block.Rows() != 0 { if err := b.conn.sendData(b.block, ""); err != nil { + // broken pipe/conn reset aren't generally recoverable on retry + if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) { + b.release(err) + } return err } if b.closeOnFlush { diff --git a/tests/issues/1421_test.go b/tests/issues/1421_test.go new file mode 100644 index 0000000000..904ee4fde7 --- /dev/null +++ b/tests/issues/1421_test.go @@ -0,0 +1,67 @@ +package issues + +import ( + "context" + "errors" + "os" + "syscall" + "testing" + + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/ClickHouse/clickhouse-go/v2/tests" + "github.com/docker/docker/api/types/container" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" +) + +//goland:noinspection ALL +const insertQry = "INSERT INTO test (foo, foo2)" + +func Test1421BatchFlushBrokenConn(t *testing.T) { + // create a dedicated test environment for this test + // note: test environment management is a bit messy, consider refactoring + env, err := tests.CreateClickHouseTestEnvironment(t.Name()) + tests.SetTestEnvironment(t.Name(), env) + require.NoError(t, tests.CreateDatabase(t.Name())) + + require.NoError(t, err) + require.NotNil(t, env) + ctx := context.Background() + client, err := testcontainers.NewDockerClientWithOpts(ctx) + require.NoError(t, err) + chClient, err := tests.TestClientWithDefaultSettings(env) + + err = chClient.Exec(ctx, "CREATE TABLE test (foo String, foo2 String) ENGINE = MergeTree ORDER BY (foo)") + require.NoError(t, err) + batch, err := chClient.PrepareBatch(ctx, insertQry, driver.WithCloseOnFlush()) + require.NoError(t, err) + err = batch.Append("bar", "bar") + require.NoError(t, err) + err = batch.Flush() + require.NoError(t, err) + err = batch.Append("bar2", "bar2") + require.NoError(t, err) + err = batch.Flush() + require.NoError(t, err) + + err = batch.Append(RandAsciiString(200000000), RandAsciiString(20000000)) + + require.NoError(t, err) + ch := make(chan struct{}) + go func() { + err = batch.Flush() + close(ch) + }() + //timeout := 0 + err2 := client.ContainerKill(ctx, env.ContainerID, "KILL") + <-ch + require.NoError(t, err2) + require.True(t, errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET)) + err = client.ContainerStart(ctx, env.ContainerID, container.StartOptions{}) + require.NoError(t, err) + err = batch.Flush() + // retry after server is up should have either no error, or a reconnect error (for example because the mapped port + // changed on container startup) + require.True(t, err == nil || errors.Is(err, syscall.ECONNREFUSED) || os.IsTimeout(err), err) + +} diff --git a/tests/utils.go b/tests/utils.go index b33aa2c918..2eda3fa62f 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -61,6 +61,7 @@ func GetClickHouseTestVersion() string { } type ClickHouseTestEnvironment struct { + ContainerID string Port int HttpPort int SslPort int @@ -203,11 +204,12 @@ func CreateClickHouseTestEnvironment(testSet string) (ClickHouseTestEnvironment, hps, _ := clickhouseContainer.MappedPort(ctx, "8443") ip, _ := clickhouseContainer.ContainerIP(ctx) testEnv := ClickHouseTestEnvironment{ - Port: p.Int(), - HttpPort: hp.Int(), - SslPort: sslPort.Int(), - HttpsPort: hps.Int(), - Host: "127.0.0.1", + ContainerID: clickhouseContainer.GetContainerID(), + Port: p.Int(), + HttpPort: hp.Int(), + SslPort: sslPort.Int(), + HttpsPort: hps.Int(), + Host: "127.0.0.1", // we set this explicitly - note its also set in the /etc/clickhouse-server/users.d/admin.xml Username: "default", Password: "ClickHouse",