Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error: socket hang up #294

Closed
uginroot opened this issue Aug 5, 2024 · 15 comments · Fixed by #296
Closed

Error: socket hang up #294

uginroot opened this issue Aug 5, 2024 · 15 comments · Fixed by #296
Labels
bug Something isn't working

Comments

@uginroot
Copy link

uginroot commented Aug 5, 2024

Describe the bug

Similar problem which was sort of solved in version 0.3.0: #150
Error: socket hang up

This problem only appears at very high insertion rates in dozens of tables.

I am reading data from kafka in 10 threads and the addToInsertQueue call is going to different tables depending on the message received from kafka.

If I add await sleep(50) before calling addToInsertQueue, the problem appears much less often about once every 2-3 hours. If there is only one table, the problem appears about once a week.

I specifically use a machine with 2 CPUs and 4GB of memory. The CPU is at 100% on one of the processors, the memory is at 1.7GB.

Steps to reproduce

  1. Create many tables (100)
  2. Insert batch rows to many tables
  3. Wait 5-10 min

Expected behaviour

No uncaughtException messages

Code example

import { ClickHouseLogLevel, createClient, NodeClickHouseClient } from "@clickhouse/client";
import Stream from "stream";

const connection: NodeClickHouseClient = createClient({
    url: `http://remote-host:8123`,
    password: "password",
    username: "default",
    database: "default",
    keep_alive: {
        enabled: true,
        idle_socket_ttl: 1000,
    },
    log: {
        level: ClickHouseLogLevel.TRACE,
    },
});

type ConnBaseResult = ReturnType<ReturnType<typeof createClient>["insert"]>;

type BatchContext = {
    timeStart: number;
    insertStream: Stream.Readable;
    insertPromise: ConnBaseResult;
    timeoutId?: NodeJS.Timeout;
    rowsCount: number;
};

export abstract class ClickhouseAbstractBatchInsertRepository<T extends Record<string, any>> {
    private batchContext: BatchContext | undefined = undefined;
    
    protected readonly insertQueueCapacity = 10_000;

    protected readonly autoCommitInsertQueueInterval = 9_000;

    public constructor(private readonly clickhouseConnection: NodeClickHouseClient) {
        clickhouseConnection;
    }

    public async addToInsertQueue(row: T) {
        if (this.batchContext === undefined) {
            const insertStream = new Stream.Readable({
                objectMode: true, // required for JSON* family formats
                read() {
                    //
                },
            });
            this.batchContext = {
                timeStart: Date.now(),
                insertStream: insertStream,
                insertPromise: this.clickhouseConnection.insert({
                    table: this.getTableName(),
                    values: insertStream,
                    format: "JSONEachRow",
                }),
                timeoutId: setTimeout(() => {
                    void this.commit();
                }, this.autoCommitInsertQueueInterval),
                rowsCount: 0,
            };
        }
        this.batchContext.insertStream.push(row);
        this.batchContext.rowsCount++;
        void this._commit();
    }

    public async commit(): Promise<void> {
        await this._commit(true);
    }

    private async _commit(force = false): Promise<void> {
        const batchContext = this.batchContext;
        if (batchContext === undefined) {
            return;
        }

        if (!force && batchContext.rowsCount < this.insertQueueCapacity) {
            return;
        }

        this.batchContext = undefined;
        clearTimeout(batchContext.timeoutId);

        const exec = async (batchContext: BatchContext) => {
            if (batchContext.rowsCount === 0) {
                return;
            }

            // Выполняем запрос для вставки всех записей в формате json
            const startStart = Date.now();
            batchContext.insertStream.push(null);
            await batchContext.insertPromise;
            batchContext.insertStream.destroy();
        };

        await exec(batchContext);
    }
    
    protected abstract getTableName(): string;
}


// 1. Create many child classes for each table (extended ClickhouseAbstractBatchInsertRepository)
// 2. Call addToInsertQueue for each row

Error log

Insert: HTTP request error.
Error: socket hang up
    at connResetException (node:internal/errors:717:14)
    at Socket.socketOnEnd (node:_http_client:526:23)
    at Socket.emit (node:events:525:35)
    at Socket.emit (node:domain:489:12)
    at endReadableNT (node:internal/streams/readable:1359:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)

image
image

Configuration

Environment

  • Client version: 1.4.0
  • Language version: 5.5.4
  • OS: node:20.16.0-alpine3.20

ClickHouse server

  • ClickHouse Server version: 23.8.2
@uginroot uginroot added the bug Something isn't working label Aug 5, 2024
@uginroot
Copy link
Author

uginroot commented Aug 5, 2024

I think this is the reason https://medium.com/dkatalis/eventloop-in-nodejs-macrotasks-and-microtasks-164417e619b9

@slvrtrn
Copy link
Contributor

slvrtrn commented Aug 5, 2024

Yes, considering your logs and your scenario, it indeed looks like when the event loop is blocked for too long, the timer that handles an idle socket removal is not working as expected (see

// Avoiding the built-in socket.timeout() method usage here,
// as we don't want to clash with the actual request timeout.
const idleTimeoutHandle = setTimeout(() => {
this.logger.trace({
message: `Removing socket ${socketId} after ${this.idleSocketTTL} ms of idle`,
})
this.knownSockets.delete(socket)
socket.destroy()
}, this.idleSocketTTL).unref()
). It also explains why await sleep(50) helps (probably await sleep(0) would've worked, too).

@uginroot
Copy link
Author

uginroot commented Aug 5, 2024

@slvrtrn What helped me was adding this code before the request:

// TODO: make macrotask
await new Promise((resolve) => setTimeout(resolve, 0));

@slvrtrn
Copy link
Contributor

slvrtrn commented Aug 5, 2024

Side note: you could also try https://clickhouse.com/docs/en/optimize/asynchronous-inserts instead, even without waiting for an ack, instead of batching on the client side.

There are examples in the repo as well: https://github.com/ClickHouse/clickhouse-js/blob/main/examples/async_insert_without_waiting.ts

What helped me was adding this code before the request:

This checks out. Probably makes sense to add it at the start of the NodeBaseConnection#request method.

@uginroot
Copy link
Author

uginroot commented Aug 5, 2024

I tried this way, but this way I think is wrong because I had a request to the server on every call to addToInsertQueue, and the insertion speed on one worker dropped from 800 per second to 300. And also the clickhouse server was overloaded (7% -> 30%, 8cpu).

Maybe I did something wrong.

I don't have the ability to accumulate records, so with this approach I have to call insert for each event.

@slvrtrn
Copy link
Contributor

slvrtrn commented Aug 5, 2024

Maybe I did something wrong.

IIRC, Kafka consumers poll multiple messages at a time (and kafkajs has this), depending on the batch size in bytes, of course.

So maybe with async_insert the interface could be

public async addToInsertQueue(rows: Array<T>) {
  //
}

instead.

Anyways, would you like to open a PR with a zero timeout in request, if it is confirmed to resolve the issue? Otherwise, I can add it later.

@uginroot
Copy link
Author

uginroot commented Aug 5, 2024

It took me a few days to figure out the cause of this error, for now it will be enough for me to locally apply my variant, but I hope it will be fixed later.

By the way, I found out that calling sleep(0) before insert does not help, or rather it does, but not for long. It crashes about once every 30 minutes, instead of crashing once every 5-10 minutes.

Only sleep(0) before each addToInsertQueue call saves, otherwise the timeout, judging by the logs, is still not respected, the difference is not 2000 ms, as in the logs above, but within 100ms.

@slvrtrn
Copy link
Contributor

slvrtrn commented Aug 7, 2024

The fix is included in 1.4.1.

@uginroot
Copy link
Author

uginroot commented Aug 8, 2024

Thank you! Checked, no more errors like that.

@uginroot
Copy link
Author

uginroot commented Aug 13, 2024

@slvrtrn As it turns out the error still occurs. But now not a few minutes after startup, but about 1-2 times a day.

I dug into the source code of http.Request and http.Agent implementations and I think it is related to calling callback functions on socket (http.Request) and destroy (http.Agent) events through process.nextTick after emit method call.

I think you should think how to get rid of setTimeout in request method completely or, more simple solution, add mutex implementation.

class Mutex {
  constructor() {
    this._lastPromise = Promise.resolve();
  }

  async lock() {
    let resolveUnlock;

    const unlockPromise = new Promise(resolve => {
      resolveUnlock = resolve;
    });

    const currentPromise = this._lastPromise.then(() => unlockPromise);
    this._lastPromise = currentPromise;

    await currentPromise;
    return resolveUnlock;
  }
}

this.mutex = new Mutex();

async function request() {
  const unlock = await this.mutex.lock();
  await sleep(0);

  return new Promise(() => {
    // ...
    request.on('socket', (socket) => {
      // ...
      unlock();
    });
    // ...
  });
}

@slvrtrn
Copy link
Contributor

slvrtrn commented Aug 14, 2024

@uginroot, aside from that, could you try adding backpressure handling to the stream that is provided into the insert method?

I checked the code from the OP again; it could be that

this.batchContext.insertStream.push(row);
this.batchContext.rowsCount++;

is called too often, and the event loop is overloaded with various events emitted by that. In that case, push will return false (and you should wait for drain to continue pushing the values).

Also, is there a good reason why this promise is dangling?

void this._commit();

@uginroot
Copy link
Author

uginroot commented Aug 14, 2024

I added this code, I'll see if it works or not

import { once } from "events";
// ...
while (!this.batchContext.insertStream.push(row)) {
    await once(this.batchContext.insertStream, "drain");
}

Regarding void this._commit(); I actually simplified the code a bit to make it shorter, but the essence remains the same.

But I've already checked and made sure that it's not that.

The problem is that the onSocket call happens before cleanup works, but cleanup still works and then I try to write with a closed socket.

I tried to come up with a nice and simple solution without using setTimeout, but only solutions with storing sockets in an array (like here) and deleting them before calling this.createClientRequest(params) + Mutex work.


By the way, if you add your own http.Agent, the problem doesn't appear anymore:

http_agent: new http.Agent({
    keepAlive: true,
    keepAliveMsecs: 1000,
    timeout: 1000,
    maxSockets: 20,
}),

@slvrtrn
Copy link
Contributor

slvrtrn commented Aug 14, 2024

while (!this.batchContext.insertStream.push(row)) {
    await once(this.batchContext.insertStream, "drain");
}

Shouldn't it be

if (!this.batchContext.insertStream.push(row)) {
  await once(this.batchContext.insertStream, "drain")
}

instead?

Cause in the docs:

Returns: true if additional chunks of data may continue to be pushed; false otherwise.

In any case, as I mentioned earlier, you could also try writing an entire batch received from your message broker (see EachBatch) without wrapping essentially every single row in a promise (with your addToInsertQueue) as it seems wasteful (i.e. addToInsertQueue(row: T) could become addToInsertQueue(rows: T[])); or you could try async inserts instead (again, not with single rows, but for the entire batch from that EachBatch method, as sending rows one by one will have too much overhead).

@uginroot
Copy link
Author

uginroot commented Aug 14, 2024

I guess we have to use eachBatch to ensure that all events are recorded.

@slvrtrn
Copy link
Contributor

slvrtrn commented Aug 14, 2024

if you add your own http.Agent, the problem doesn't appear anymore

You mentioned that the issue now happens 1-2 times a day. Maybe it hasn't been triggered yet when using a "custom" agent? Also, this is almost identical to how the internal HTTP agent is instantiated:

https://github.com/ClickHouse/clickhouse-js/blob/main/packages/client-node/src/connection/node_http_connection.ts#L11-L14

keepAliveMsecs: 1000,

this is the default value.

timeout: 1000,

IIRC this essentially calls socket.setTimeout and, by default, this is already set by the client on the socket level (here) and regulated via the request_timeout client option.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants