diff --git a/client/websocket.go b/client/websocket.go index 58ecbf45dd1..8f283072ecc 100644 --- a/client/websocket.go +++ b/client/websocket.go @@ -109,32 +109,36 @@ func (p *Client) WebsocketWithPayload(query string, initPayload map[string]inter return c.Close() }, Next: func(response interface{}) error { - var op operationMessage - err := c.ReadJSON(&op) - if err != nil { - return err - } - if op.Type != dataMsg { - if op.Type == errorMsg { - return fmt.Errorf(string(op.Payload)) - } else { - return fmt.Errorf("expected data message, got %#v", op) + for { + var op operationMessage + err := c.ReadJSON(&op) + if err != nil { + return err + } + if op.Type != dataMsg { + if op.Type == connectionKaMsg { + continue + } else if op.Type == errorMsg { + return fmt.Errorf(string(op.Payload)) + } else { + return fmt.Errorf("expected data message, got %#v", op) + } } - } - var respDataRaw Response - err = json.Unmarshal(op.Payload, &respDataRaw) - if err != nil { - return fmt.Errorf("decode: %w", err) - } + var respDataRaw Response + err = json.Unmarshal(op.Payload, &respDataRaw) + if err != nil { + return fmt.Errorf("decode: %w", err) + } - // we want to unpack even if there is an error, so we can see partial responses - unpackErr := unpack(respDataRaw.Data, response) + // we want to unpack even if there is an error, so we can see partial responses + unpackErr := unpack(respDataRaw.Data, response) - if respDataRaw.Errors != nil { - return RawJsonError{respDataRaw.Errors} + if respDataRaw.Errors != nil { + return RawJsonError{respDataRaw.Errors} + } + return unpackErr } - return unpackErr }, } }