From dd67530af4636aaf5aa98983ff5696aa3910efb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Fri, 20 Sep 2024 22:19:25 +0000 Subject: [PATCH] Adding some documentation --- include/PgSQL_Protocol.h | 662 +++++++++++++++++++++++++++- include/PgSQL_Thread.h | 922 ++++++++++++++++++++++++++++++++++++++- lib/PgSQL_Session.cpp | 841 ----------------------------------- lib/PgSQL_Thread.cpp | 3 - 4 files changed, 1579 insertions(+), 849 deletions(-) diff --git a/include/PgSQL_Protocol.h b/include/PgSQL_Protocol.h index fdc095f52..18aebda45 100644 --- a/include/PgSQL_Protocol.h +++ b/include/PgSQL_Protocol.h @@ -122,6 +122,17 @@ class PG_pkt } PtrSize_t* get_PtrSize(unsigned c = PG_PKT_DEFAULT_SIZE); + + /** + * @brief Moves the current packet data to a PtrSizeArray. + * + * This function adds the current `ptr` and `size` to the provided + * `PtrSizeArray` (`psa`). It then resets the internal buffer (`ptr` and + * `size`) to a new buffer with a capacity of `c` if `c` is not zero. + * + * @param psa The PtrSizeArray where the current packet data will be added. + * @param c The desired capacity of the new internal buffer. + */ void to_PtrSizeArray(PtrSizeArray* psa, unsigned c = PG_PKT_DEFAULT_SIZE); void set_multi_pkt_mode(bool mode) { @@ -130,12 +141,82 @@ class PG_pkt if (mode == false) pkt_offset.clear(); } + /** + * @brief Resizes the internal buffer if needed to accommodate additional data. + * + * If the current size of the internal buffer (`size`) plus the requested length + * (`len`) exceeds the buffer's capacity (`capacity`), this function reallocates + * the buffer to a new size that's the nearest power of 2 greater than or equal + * to `size + len`. + * + * If the buffer already has enough space, this function does nothing. + * + * @param len The number of bytes of additional space required. + * + * @note This function only resizes the buffer if the `ownership` flag is true, + * indicating that the buffer is owned by the `PG_pkt` object. + */ void make_space(unsigned int len); + + /** + * @brief Appends a single character to the internal buffer. + * + * This function ensures there's enough space in the buffer and then appends + * the given character (`val`) to the end of the buffer. + * + * @param val The character to append. + */ void put_char(char val); + + /** + * @brief Appends a 16-bit unsigned integer to the internal buffer. + * + * This function ensures there's enough space in the buffer and then appends + * the given 16-bit unsigned integer (`val`) in big-endian byte order. + * + * @param val The 16-bit unsigned integer to append. + */ void put_uint16(uint16_t val); + + /** + * @brief Appends a 32-bit unsigned integer to the internal buffer. + * + * This function ensures there's enough space in the buffer and then appends + * the given 32-bit unsigned integer (`val`) in big-endian byte order. + * + * @param val The 32-bit unsigned integer to append. + */ void put_uint32(uint32_t val); + + /** + * @brief Appends a 64-bit unsigned integer to the internal buffer. + * + * This function appends the given 64-bit unsigned integer (`val`) to the + * internal buffer in big-endian byte order. + * + * @param val The 64-bit unsigned integer to append. + */ void put_uint64(uint64_t val); + + /** + * @brief Appends a block of bytes to the internal buffer. + * + * This function ensures there's enough space in the buffer and then copies + * `len` bytes from the provided data pointer (`data`) to the end of the buffer. + * + * @param data A pointer to the beginning of the data to append. + * @param len The number of bytes to append. + */ void put_bytes(const void* data, int len); + + /** + * @brief Appends a null-terminated string to the internal buffer. + * + * This function appends the given null-terminated string (`str`) to the + * internal buffer, including the null terminator. + * + * @param str The null-terminated string to append. + */ void put_string(const char* str); void write_generic(int type, const char* pktdesc, ...); @@ -169,7 +250,25 @@ class PG_pkt void write_DataRow(const char* tupdesc, ...); private: + /** + * @brief Initializes a new packet with a specified type. + * + * This function sets the first byte of the packet to the given `type` and + * reserves space for the packet length (which will be filled in later). + * + * @param type The type of the packet (must be a value between 0 and 255). + */ void start_packet(int type); + + /** + * @brief Completes a packet by filling in the length field. + * + * This function calculates the length of the packet (excluding the type + * byte) and writes it to the appropriate position in the packet buffer. + * + * @note If the `multiple_pkt_mode` flag is set to true, the length is + * calculated and written based on the last recorded packet offset. + */ void finish_packet(); char* ptr; @@ -197,17 +296,175 @@ class PgSQL_Query_Result { PgSQL_Query_Result(); ~PgSQL_Query_Result(); + /** + * @brief Initializes the PgSQL_Query_Result object. + * + * This method initializes the `PgSQL_Query_Result` object with the + * provided `PgSQL_Protocol`, `PgSQL_Data_Stream`, and `PgSQL_Connection` + * objects. It also initializes the internal buffer using the + * `buffer_init` method and resets any internal state. + * + * @param _proto A pointer to the `PgSQL_Protocol` object associated with + * this query result. + * @param _myds A pointer to the `PgSQL_Data_Stream` object associated with + * this query result. + * @param _conn A pointer to the `PgSQL_Connection` object associated with + * this query result. + * + * @note This method is typically called when a new query is executed. + */ void init(PgSQL_Protocol* _proto, PgSQL_Data_Stream* _myds, PgSQL_Connection* _conn); + + /** + * @brief Adds a row description to the query result. + * + * This method adds a row description (from a `PGresult` object) to the + * query result. It copies the row description data to the internal buffer + * or to the `PSarrayOUT` if the buffer is full. + * + * @param result A pointer to a `PGresult` object containing the row + * description to add. + * + * @return The number of bytes added to the query result. + * + * @note This method is used to prepare the client for receiving rows + * with the corresponding data types and column names. + */ unsigned int add_row_description(const PGresult* result); + + /** + * @brief Adds a row of data to the query result. + * + * This method adds a row of data (from a `PGresult` object) to the query + * result. It copies the row data to the internal buffer or to the + * `PSarrayOUT` if the buffer is full. + * + * @param result A pointer to a `PGresult` object containing the row data + * to add. + * + * @return The number of bytes added to the query result. + */ unsigned int add_row(const PGresult* result); + + /** + * @brief Adds a row of data to the query result from a PSresult. + * + * This method adds a row of data (from a `PSresult` object) to the query + * result. It copies the row data to the internal buffer or to the + * `PSarrayOUT` if the buffer is full. + * + * @param result A pointer to a `PSresult` object containing the row data + * to add. + * + * @return The number of bytes added to the query result. + */ unsigned int add_row(const PSresult* result); + + /** + * @brief Adds a command completion message to the query result. + * + * This method adds a command completion message (from a `PGresult` + * object) to the query result. It extracts the command tag and affected + * rows count (if requested) and adds them to the internal buffer or the + * `PSarrayOUT` if the buffer is full. + * + * @param result A pointer to a `PGresult` object containing the command + * completion message. + * @param extract_affected_rows A boolean flag indicating whether to + * extract the affected rows count from the + * `PGresult` object. + * + * @return The number of bytes added to the query result. + * + * @note This method is used to signal the completion of a command + * (e.g., INSERT, UPDATE, DELETE) and to send the appropriate + * response to the client. + */ unsigned int add_command_completion(const PGresult* result, bool extract_affected_rows = true); + + /** + * @brief Adds an error message to the query result. + * + * This method adds an error message (from a `PGresult` object) to the + * query result. It copies the error data to the internal buffer or to the + * `PSarrayOUT` if the buffer is full. + * + * @param result A pointer to a `PGresult` object containing the error + * message to add. + * + * @return The number of bytes added to the query result. + * + * @note This method is used to handle errors that occur during query + * execution and to send the error information to the client. + */ unsigned int add_error(const PGresult* result); + + /** + * @brief Adds an empty query response to the query result. + * + * This method adds an empty query response (for example from query + * returning no rows) to the query result. It copies the empty query + * response data to the internal buffer or to the `PSarrayOUT` if the + * buffer is full. + * + * @param result A pointer to a `PGresult` object representing the empty + * response. + * + * @return The number of bytes added to the query result. + * + * @note This method is used to handle cases where a query does not + * return any rows or data, and to send the appropriate response + * to the client. + */ unsigned int add_empty_query_response(const PGresult* result); + + /** + * @brief Adds a ready status message to the query result. + * + * This method adds a ready status message to the query result, indicating + * that the server is ready for a new query. The status reflects the + * transaction state. + * + * @param txn_status The transaction status type, indicating whether a + * transaction is in progress or not. + * + * @return The number of bytes added to the query result. + * + * @note This method is used to signal to the client that the server is + * ready for a new query and that any previous query has completed. + */ unsigned int add_ready_status(PGTransactionStatusType txn_status); + + /** + * @brief Retrieves the query result set and copies it to a PtrSizeArray. + * + * This method retrieves the accumulated query result, including row + * descriptions, rows, errors, etc., and copies it to the provided + * `PtrSizeArray`. It also resets the internal state of the + * `PgSQL_Query_Result` object after the result set is copied. + * + * @param PSarrayFinal The `PtrSizeArray` where the query result will be + * copied. + * + * @return `true` if the result set is complete (i.e., a ready status + * packet has been added), `false` otherwise. + * + * @note This method is typically called when all query results have been + * accumulated and are ready to be sent to the client. + */ bool get_resultset(PtrSizeArray* PSarrayFinal); // this also calls reset - + + /** + * @brief Calculates the current size of the PgSQL_Query_Result object. + * + * This method calculates the total size of the `PgSQL_Query_Result` + * object in bytes, including the size of the object itself, the internal + * buffer, and any packets stored in the `PSarrayOUT`. + * + * @return The current size of the `PgSQL_Query_Result` object in bytes. + */ unsigned long long current_size(); + inline bool is_transfer_started() const { return transfer_started; } inline unsigned long long get_num_rows() const { return num_rows; } inline unsigned long long get_affected_rows() const { return affected_rows; } @@ -216,10 +473,64 @@ class PgSQL_Query_Result { inline uint8_t get_result_packet_type() const { return result_packet_type; } private: + /** + * @brief Initializes the internal buffer for storing query results. + * + * If the `buffer` pointer is null, this function allocates a new buffer + * of size `PGSQL_RESULTSET_BUFLEN` and assigns it to the `buffer` pointer. + * It also resets the `buffer_used` counter to 0, indicating that the + * buffer is currently empty. + * + * @note This method is called by the `init` method to ensure that the + * buffer is properly initialized before any query results are added. + */ void buffer_init(); + inline unsigned int buffer_available_capacity() const { return (PGSQL_RESULTSET_BUFLEN - buffer_used); } + + /** + * @brief Reserves space in the internal buffer and returns a pointer. + * + * This method checks if there is enough space in the internal `buffer` + * to store the requested `size` of data. If there is space, it returns + * a pointer to the available location and updates `buffer_used`. + * Otherwise, it flushes the buffer to `PSarrayOUT`, allocates a new + * buffer, and returns a pointer to the available location. + * + * @param size The number of bytes of space to reserve. + * + * @return A pointer to the reserved space in the buffer, or `NULL` if + * there is not enough space. + * + * @note This method is used to efficiently manage the internal buffer + * and avoid unnecessary memory allocations. + */ unsigned char* buffer_reserve_space(unsigned int size); + + /** + * @brief Flushes the internal buffer to the PSarrayOUT. + * + * This method moves the data currently stored in the internal `buffer` + * to the `PSarrayOUT` (a `PtrSizeArray`). It then resizes the + * `buffer` to the default size `PGSQL_RESULTSET_BUFLEN` and resets + * `buffer_used` to 0. + * + * @note This method is used when the internal `buffer` is full and + * needs to be flushed to release the memory and continue adding + * more data. + */ void buffer_to_PSarrayOut(); + + /** + * @brief Resets the internal state of the PgSQL_Query_Result object. + * + * This method resets the internal state of the `PgSQL_Query_Result` + * object to its initial state, including clearing the result set data, + * resetting counters, and preparing for a new query result. + * + * @note This method is typically called after the query result has been + * sent to the client and the object is ready to handle a new query. + */ void reset(); PtrSizeArray PSarrayOUT; @@ -250,28 +561,375 @@ class PgSQL_Protocol : public MySQL_Protocol { } PgSQL_Data_Stream* get_myds() { return *myds; } + /** + * @brief Generates the initial handshake packet for the PostgreSQL protocol. + * + * This function generates the initial handshake packet that is sent to the + * PostgreSQL client. It includes an authentication request based on the + * configured authentication method (`pgsql_thread___authentication_method`). + * + * @param send A boolean flag indicating whether to send the packet immediately + * or just generate it. + * @param _ptr A pointer to a pointer where the generated packet data will be + * stored (if `send` is false). + * @param len A pointer to an unsigned integer where the length of the + * generated packet will be stored (if `send` is false). + * @param _thread_id A pointer to a 32-bit unsigned integer where the thread ID + * will be stored. + * @param deprecate_eof_active A boolean flag to control deprecation of EOF + * active behavior. + * + * @return `true` if the packet was successfully generated, `false` otherwise. + * + * @note This function updates the authentication method and next packet type + * in the `PgSQL_Data_Stream` object. If `send` is true, it also adds + * the generated packet to the output buffer and updates the data stream + * state. + */ bool generate_pkt_initial_handshake(bool send, void** ptr, unsigned int* len, uint32_t* thread_id, bool deprecate_eof_active) override; + + /** + * @brief Processes a PostgreSQL startup packet. + * + * This function processes a PostgreSQL startup packet received from the + * client. It extracts the connection parameters, checks for SSL requests, + * and validates the user name. + * + * @param pkt A pointer to the beginning of the packet buffer. + * @param len The length of the packet buffer in bytes. + * @param ssl_request A boolean variable that is set to `true` if the client + * requests an SSL connection. + * + * @return `true` if the startup packet was successfully processed, `false` + * otherwise. + * + * @note This function updates the data stream state to `STATE_SERVER_HANDSHAKE` + * after successfully processing the startup packet. It also handles + * SSL requests and generates an error packet if the user name is + * missing. + */ bool process_startup_packet(unsigned char* pkt, unsigned int len, bool& ssl_request); + + /** + * @brief Processes a PostgreSQL handshake response packet. + * + * This function processes a handshake response packet received from the + * PostgreSQL client. It handles authentication based on the selected + * authentication method (e.g., clear text password, SCRAM-SHA-256) and + * updates the session state. + * + * @param pkt A pointer to the beginning of the packet buffer. + * @param len The length of the packet buffer in bytes. + * + * @return The execution state after processing the handshake response + * packet. + * + * @note This function validates the packet type, retrieves user credentials + * from the database, performs authentication, and updates the session + * state. It also handles errors related to authentication and invalid + * packets. + */ EXECUTION_STATE process_handshake_response_packet(unsigned char* pkt, unsigned int len); + + /** + * @brief Sends a welcome message to the PostgreSQL client. + * + * This function sends a welcome message to the PostgreSQL client after a + * successful authentication. The welcome message includes parameter status + * messages and a ready-for-query message. + * + * @note This function updates the output buffer with the welcome message + * data. It also sets the session state to `STATE_CLIENT_AUTH_OK`. + */ void welcome_client(); + /** + * @brief Generates an error packet for the PostgreSQL protocol. + * + * This function generates an error packet that is sent to the PostgreSQL + * client in case of an error. It includes the error severity, code, and + * message. + * + * @param send A boolean flag indicating whether to send the packet + * immediately or just generate it. + * @param ready A boolean flag indicating whether to generate a ready-for-query + * packet after the error. + * @param msg The error message to be included in the packet. + * @param code The error code. + * @param fatal A boolean flag indicating whether the error is fatal. + * @param track A boolean flag to control whether to track the error count. + * @param _ptr A pointer to a `PtrSize_t` structure (if `send` is false) + * where the generated packet data will be stored. + * + * @note This function updates the output buffer with the generated error + * packet. It also updates the data stream state to `STATE_ERR` if + * necessary. + */ void generate_error_packet(bool send, bool ready, const char* msg, PGSQL_ERROR_CODES code, bool fatal, bool track = false, PtrSize_t* _ptr = NULL); + + /** + * @brief Generates an "OK" packet for the PostgreSQL protocol. + * + * This function generates an "OK" packet, which is sent to the PostgreSQL + * client after a successful command execution (e.g., INSERT, UPDATE, DELETE, + * SELECT). It includes a command tag (e.g., "INSERT 0 10" for an INSERT + * command that affected 10 rows) and a ready-for-query message if `ready` + * is true. + * + * @param send A boolean flag indicating whether to send the packet + * immediately or just generate it. + * @param ready A boolean flag indicating whether to generate a ready-for-query + * packet after the "OK" packet. + * @param msg An optional message to be included in the "OK" packet. + * @param rows The number of rows affected by the command (used for + * INSERT, UPDATE, DELETE, and SELECT). + * @param query The original query string that was executed. + * @param _ptr A pointer to a `PtrSize_t` structure (if `send` is false) + * where the generated packet data will be stored. + * + * @return `true` if the packet was successfully generated, `false` otherwise. + * + * @note This function extracts the appropriate command tag based on the + * `query` string and constructs the "OK" packet accordingly. It also + * updates the output buffer with the generated packet. If `ready` is + * true, it also generates and sends a ready-for-query packet. + */ bool generate_ok_packet(bool send, bool ready, const char* msg, int rows, const char* query, PtrSize_t* _ptr = NULL); //bool generate_row_description(bool send, PgSQL_Query_Result* rs, const PG_Fields& fields, unsigned int size); - + + /** + * @brief Copies a row description from a PGresult to a PgSQL_Query_Result. + * + * This function copies the row description from a `PGresult` object (typically + * obtained from libpq) to a `PgSQL_Query_Result` object. The row description + * contains information about the columns returned by a query, such as column + * names, data types, and other metadata. + * + * @param send A boolean flag indicating whether to send the generated packet + * immediately or just generate it. (Currently not supported). + * @param pg_query_result A pointer to the `PgSQL_Query_Result` object where the + * row description will be copied. + * @param result A pointer to the `PGresult` object containing the row + * description to be copied. + * + * @return The number of bytes copied to the `PgSQL_Query_Result` object. + * + * @note This function is used to prepare the client for receiving rows + * with the corresponding data types and column names. + */ unsigned int copy_row_description_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result); + + /** + * @brief Copies a row of data from a PGresult to a PgSQL_Query_Result. + * + * This function copies a row of data from a `PGresult` object (typically + * obtained from libpq) to a `PgSQL_Query_Result` object. The row data + * represents a single row from the result set of a query. + * + * @param send A boolean flag indicating whether to send the generated packet + * immediately or just generate it. (Currently not supported). + * @param pg_query_result A pointer to the `PgSQL_Query_Result` object where the + * row data will be copied. + * @param result A pointer to the `PGresult` object containing the row data + * to be copied. + * + * @return The number of bytes copied to the `PgSQL_Query_Result` object. + */ unsigned int copy_row_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result); + + /** + * @brief Copies a command completion message from a PGresult to a + * PgSQL_Query_Result. + * + * This function copies a command completion message from a `PGresult` object + * (typically obtained from libpq) to a `PgSQL_Query_Result` object. The + * command completion message indicates that a command (e.g., INSERT, UPDATE, + * DELETE) has finished executing. + * + * @param send A boolean flag indicating whether to send the generated packet + * immediately or just generate it. (Currently not supported). + * @param pg_query_result A pointer to the `PgSQL_Query_Result` object where the + * command completion message will be copied. + * @param result A pointer to the `PGresult` object containing the command + * completion message to be copied. + * @param extract_affected_rows A boolean flag indicating whether to extract + * the affected rows count from the `PGresult` + * object. + * + * @return The number of bytes copied to the `PgSQL_Query_Result` object. + * + * @note This function extracts the command tag and affected rows count (if + * requested) and copies them to the `PgSQL_Query_Result` object. + */ unsigned int copy_command_completion_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result, bool extract_affected_rows); + + /** + * @brief Copies an error message from a PGresult to a PgSQL_Query_Result. + * + * This function copies an error message from a `PGresult` object (typically + * obtained from libpq) to a `PgSQL_Query_Result` object. The error message + * contains information about an error that occurred during query execution. + * + * @param send A boolean flag indicating whether to send the generated packet + * immediately or just generate it. (Currently not supported). + * @param pg_query_result A pointer to the `PgSQL_Query_Result` object where the + * error message will be copied. + * @param result A pointer to the `PGresult` object containing the error + * message to be copied. + * + * @return The number of bytes copied to the `PgSQL_Query_Result` object. + * + * @note This function extracts the various error fields (severity, code, + * message, detail, etc.) from the `PGresult` object and copies them + * to the `PgSQL_Query_Result` object. + */ unsigned int copy_error_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result); + + /** + * @brief Copies an empty query response from a PGresult to a + * PgSQL_Query_Result. + * + * This function copies an empty query response from a `PGresult` object + * (typically obtained from libpq) to a `PgSQL_Query_Result` object. The + * empty query response indicates that a query did not return any rows. + * + * @param send A boolean flag indicating whether to send the generated packet + * immediately or just generate it. (Currently not supported). + * @param pg_query_result A pointer to the `PgSQL_Query_Result` object where the + * empty query response will be copied. + * @param result A pointer to the `PGresult` object containing the empty query + * response to be copied. + * + * @return The number of bytes copied to the `PgSQL_Query_Result` object. + */ unsigned int copy_empty_query_response_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result); + + /** + * @brief Copies a ready status message from a PGresult to a + * PgSQL_Query_Result. + * + * This function copies a ready status message from a `PGresult` object + * (typically obtained from libpq) to a `PgSQL_Query_Result` object. The + * ready status indicates that the server is ready for a new query. + * + * @param send A boolean flag indicating whether to send the generated packet + * immediately or just generate it. (Currently not supported). + * @param pg_query_result A pointer to the `PgSQL_Query_Result` object where the + * ready status message will be copied. + * @param txn_status The transaction status type, indicating whether a + * transaction is in progress or not. + * + * @return The number of bytes copied to the `PgSQL_Query_Result` object. + */ unsigned int copy_ready_status_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, PGTransactionStatusType txn_status); + + /** + * @brief Copies a buffer from a PSresult to a PgSQL_Query_Result. + * + * This function copies a buffer of data from a `PSresult` object to a + * `PgSQL_Query_Result` object. The buffer can contain various types of + * data, including row data or other results. + * + * @param send A boolean flag indicating whether to send the generated packet + * immediately or just generate it. (Currently not supported). + * @param pg_query_result A pointer to the `PgSQL_Query_Result` object where the + * buffer will be copied. + * @param result A pointer to the `PSresult` object containing the buffer to + * be copied. + * + * @return The number of bytes copied to the `PgSQL_Query_Result` object. + */ unsigned int copy_buffer_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PSresult* result); private: + + /** + * @brief Extracts the header information from a PostgreSQL packet. + * + * This function reads the header information from a received PostgreSQL + * packet and populates the `pgsql_hdr` structure with the packet type and + * length. It handles both the new (v3) and old (v2) packet formats. + * + * @param pkt A pointer to the beginning of the packet buffer. + * @param pkt_len The length of the packet buffer in bytes. + * @param hdr A pointer to a `pgsql_hdr` structure where the extracted header + * information will be stored. + * + * @return `true` if the header was successfully parsed, `false` otherwise. + * + * @note This function performs basic validation on the packet length and + * header fields to ensure that the packet is valid. + */ bool get_header(unsigned char* pkt, unsigned int len, pgsql_hdr* hdr); + + /** + * @brief Loads the connection parameters from a PostgreSQL startup packet. + * + * This function extracts the connection parameters (e.g., user, database, + * client encoding) from a PostgreSQL startup packet and stores them in the + * connection parameters object (`myconn->conn_params`). + * + * @param pkt A pointer to a `pgsql_hdr` structure containing the startup + * packet data. + * @param startup A boolean flag indicating whether this is a startup packet. + * + * @note This function iterates through the key-value pairs in the startup + * packet and stores them in the connection parameters object. + */ void load_conn_parameters(pgsql_hdr* pkt, bool startup); + + /** + * @brief Handles the client's first message in a SCRAM-SHA-256 + * authentication exchange. + * + * This function receives the client's first message during the SCRAM-SHA-256 + * authentication process. It parses the message, generates the server's + * first message, and sends it back to the client. + * + * @param scram_state A pointer to the `ScramState` structure that maintains + * the state of the SCRAM exchange. + * @param user A pointer to the `PgCredentials` structure containing the user + * credentials. + * @param data A pointer to the buffer containing the client's first message. + * @param datalen The length of the client's first message in bytes. + * + * @return `true` if the client's first message was successfully handled, + * `false` otherwise. + * + * @note This function performs the following steps: + * 1. Parses the client's first message to extract the authentication + * mechanism and client nonce. + * 2. Generates the server's first message, which includes the server + * nonce and salt. + * 3. Sends the server's first message to the client. + */ bool scram_handle_client_first(ScramState* scram_state, PgCredentials* user, const unsigned char* data, uint32_t datalen); + + /** + * @brief Handles the client's final message in a SCRAM-SHA-256 + * authentication exchange. + * + * This function receives the client's final message during the SCRAM-SHA-256 + * authentication process. It validates the client's proof, generates the + * server's final message, and sends it back to the client. + * + * @param scram_state A pointer to the `ScramState` structure that maintains + * the state of the SCRAM exchange. + * @param user A pointer to the `PgCredentials` structure containing the user + * credentials. + * @param data A pointer to the buffer containing the client's final message. + * @param datalen The length of the client's final message in bytes. + * + * @return `true` if the client's final message was successfully handled, + * `false` otherwise. + * + * @note This function performs the following steps: + * 1. Parses the client's final message to extract the client proof. + * 2. Verifies the client's proof against the expected value. + * 3. Generates the server's final message. + * 4. Sends the server's final message to the client. + */ bool scram_handle_client_final(ScramState* scram_state, PgCredentials* user, const unsigned char* data, uint32_t datalen); PgSQL_Data_Stream** myds; diff --git a/include/PgSQL_Thread.h b/include/PgSQL_Thread.h index 094e98634..1f86acacb 100644 --- a/include/PgSQL_Thread.h +++ b/include/PgSQL_Thread.h @@ -154,7 +154,23 @@ class __attribute__((aligned(64))) PgSQL_Thread : public Base_Thread //unsigned int find_session_idx_in_mysql_sessions(PgSQL_Session * sess); //bool set_backend_to_be_skipped_if_frontend_is_slow(PgSQL_Data_Stream * myds, unsigned int n); void handle_mirror_queue_mysql_sessions(); + + /** + * @brief Processes kill requests from the thread's kill queues. + * + * @details This function checks the thread's kill queues (`kq.conn_ids` and `kq.query_ids`) + * for any pending kill requests. If there are any requests, it calls `Scan_Sessions_to_Kill_All()` + * to iterate through all session arrays across all threads and identify sessions that match + * the kill requests. The `killed` flag is set to true for matching sessions. After processing + * all kill requests, the kill queues are cleared. + * + * @note This function is called within the `run()` function during a maintenance loop to + * process kill requests for connections and queries. It ensures that sessions matching + * kill requests are terminated. + * + */ void handle_kill_queues(); + //void check_timing_out_session(unsigned int n); //void check_for_invalid_fd(unsigned int n); //void read_one_byte_from_pipe(unsigned int n); @@ -212,34 +228,419 @@ class __attribute__((aligned(64))) PgSQL_Thread : public Base_Thread // if set_parser_algorithm == 2 , a single thr_SetParser is used SetParser* thr_SetParser; + /** + * @brief Default constructor for the PgSQL_Thread class. + * + * @details This constructor initializes various members of the PgSQL_Thread object to their + * default values. It sets up mutexes, initializes status variables, and sets up the thread's + * variables. It also sets the thread's `shutdown` flag to `false`, indicating that the thread + * is not yet in a shutdown state. + * + * @note This constructor is called when a new PgSQL_Thread object is created. + */ PgSQL_Thread(); + + /** + * @brief Destructor for the PgSQL_Thread class. + * + * @details This destructor cleans up the PgSQL_Thread object, releasing resources and + * freeing allocated memory. It deletes session objects, frees cached connections, and + * destroys mutexes. It also ensures that the thread's `shutdown` flag is set to `true` + * to indicate that the thread is no longer active. + * + * @note This destructor is called automatically when the PgSQL_Thread object goes out of + * scope or is explicitly deleted. + */ ~PgSQL_Thread(); -// PgSQL_Session* create_new_session_and_client_data_stream(int _fd); + + // PgSQL_Session* create_new_session_and_client_data_stream(int _fd); + + /** + * @brief Initializes the PgSQL_Thread object. + * + * @return `true` if initialization is successful, `false` otherwise. + * + * @details This function performs the initial setup for the PgSQL_Thread object. It allocates + * memory for various data structures, initializes mutexes, creates a pipe for communication, + * and configures the thread's variables. It also sets up regular expressions for parsing + * certain SQL statements, such as `SET` commands. + * + * @note This function is called once during the thread's lifetime to prepare it for + * handling connections and processing queries. + * + */ bool init(); + + /** + * @brief Retrieves multiple idle connections from the global connection pool. + * + * @param num_idles A reference to an integer variable that will hold the number of idle connections retrieved. + * + * @details This function requests multiple idle connections from the global connection pool (`PgHGM`) + * and stores them in the `my_idle_conns` array. It then creates new sessions for each retrieved + * connection, attaches the connection to the session's backend data stream, and registers the + * session as a connection handler. It also sets the connection's status to `PINGING_SERVER` + * and initiates the pinging process. + * + * @note This function is called within the `run()` function to acquire idle connections + * from the global pool and prepare them for use by the thread. + * + */ void run___get_multiple_idle_connections(int& num_idles); + + /** + * @brief Cleans up the mirror queue to manage concurrency. + * + * @details This function ensures that the number of mirror sessions in the `mirror_queue_mysql_sessions_cache` + * array does not exceed the maximum concurrency limit (`pgsql_thread___mirror_max_concurrency`). + * It removes sessions from the cache if the limit is exceeded. + * + * @note This function is called within the `run()` function during a maintenance loop to + * control the concurrency of mirror sessions. + * + */ void run___cleanup_mirror_queue(); + //void ProcessAllMyDS_BeforePoll(); //void ProcessAllMyDS_AfterPoll(); + + /** + * @brief The main loop for the PgSQL_Thread object. + * + * @details This function implements the main loop for the thread. It handles events, processes + * sessions, manages connections, and performs maintenance tasks. It continuously monitors + * the `shutdown` flag and exits the loop when it is set to true. The loop includes various + * steps such as: + * + * - Acquiring idle connections from the global pool. + * - Processing the mirror queue for completed mirror sessions. + * - Calling `ProcessAllMyDS_BeforePoll()` and `ProcessAllMyDS_AfterPoll()` functions to + * handle data stream events before and after the `poll()` call. + * - Adding and removing listeners to the polling loop. + * - Calling `poll()` to wait for events on sockets. + * - Processing all sessions using the `process_all_sessions()` function. + * - Returning unused connections to the global pool. + * - Refreshing the thread's variables. + * - Handling kill requests for connections or queries. + * + * @note This function is the entry point for the thread's execution. It is responsible for + * managing all aspects of the thread's lifecycle, including handling connections, processing + * queries, and performing maintenance tasks. + * + */ void run(); + + /** + * @brief Adds a new listener socket to the polling loop. + * + * @param sock The file descriptor of the listener socket. + * + * @details This function creates a new `PgSQL_Data_Stream` object for the listener socket, + * sets its type to `MYDS_LISTENER`, and adds it to the `mypolls` array for monitoring. + * + * @note This function is called by the `run()` function when a new listener socket + * is added to the thread's monitoring list. + * + */ void poll_listener_add(int sock); + + /** + * @brief Removes a listener socket from the polling loop. + * + * @param sock The file descriptor of the listener socket to remove. + * + * @details This function finds the listener socket in the `mypolls` array based on its file + * descriptor and removes it from the array. It then deletes the associated `PgSQL_Data_Stream` + * object. + * + * @note This function is called by the `run()` function when a listener socket is + * removed from the thread's monitoring list. + * + */ void poll_listener_del(int sock); + //void register_session(PgSQL_Session*, bool up_start = true); + + /** + * @brief Unregisters a session from the thread's session array. + * + * @param idx The index of the session to unregister. + * + * @details This function removes a session from the `mysql_sessions` array at the specified index. + * It does not delete the session object itself; it is assumed that the caller will handle + * the deletion. + * + * @note This function is called by various parts of the code when a session is no longer + * active and needs to be removed from the thread's session list. + * + */ void unregister_session(int); + + /** + * @brief Returns a pointer to the `pollfd` structure for a specific data stream. + * + * @param i The index of the data stream in the `mypolls` array. + * + * @return A pointer to the `pollfd` structure for the specified data stream. + * + * @details This function provides access to the `pollfd` structure for a particular data + * stream in the `mypolls` array. This structure is used by the `poll()` function to + * monitor events on the associated socket. + * + * @note This function is used internally by the thread to obtain references to the + * `pollfd` structures for data streams when interacting with the `poll()` function. + * + */ struct pollfd* get_pollfd(unsigned int i); + + /** + * @brief Processes data on a specific data stream. + * + * @param myds A pointer to the `PgSQL_Data_Stream` object to process. + * @param n The index of the data stream in the `mypolls` array. + * + * @return `true` if processing is successful, `false` if the session should be removed. + * + * @details This function handles data events on a specific data stream. It checks for events + * such as `POLLIN`, `POLLOUT`, `POLLERR`, and `POLLHUP`. Based on the events, it reads data + * from the network, processes packets, and updates the session's status. It also handles + * timeout events and connection failures. + * + * @note This function is called by the `run()` function after the `poll()` call to process + * events on each data stream. It is responsible for managing data flow and updating + * session states. + * + */ bool process_data_on_data_stream(PgSQL_Data_Stream * myds, unsigned int n); + //void ProcessAllSessions_SortingSessions(); + + /** + * @brief Processes a completed mirror session and manages its resources. + * + * @param n The index of the session in the `mysql_sessions` array. + * @param sess A pointer to the `PgSQL_Session` object representing the completed mirror session. + * + * @details This function handles the completion of a mirror session. It removes the completed + * session from the `mysql_sessions` array and decrements the `n` index to reflect the removal. + * It then checks if the `mirror_queue_mysql_sessions_cache` array is below a certain length + * (determined by `pgsql_thread___mirror_max_concurrency` and a scaling factor). + * If the cache is not full, the session is added to the cache, otherwise, it is deleted. + * + * @note This function is called within the `process_all_sessions()` function when a mirror + * session reaches the `WAITING_CLIENT_DATA` status, indicating completion. + * + */ void ProcessAllSessions_CompletedMirrorSession(unsigned int& n, PgSQL_Session * sess); + + /** + * @brief Performs maintenance tasks on a session during a maintenance loop. + * + * @param sess A pointer to the `PgSQL_Session` object to be maintained. + * @param sess_time The idle time of the session in milliseconds. + * @param total_active_transactions_ A reference to the total number of active transactions across all threads. + * + * @details This function performs various maintenance checks on a session during a maintenance + * loop. It checks for idle transactions, inactive sessions, and expired connections. It also + * handles situations where the server's table version has changed and ensures that sessions + * using offline nodes are terminated. + * + * @note This function is called within the `process_all_sessions()` function during a + * maintenance loop. It is responsible for ensuring that sessions are properly managed and + * that resources are released when necessary. + * + */ void ProcessAllSessions_MaintenanceLoop(PgSQL_Session * sess, unsigned long long sess_time, unsigned int& total_active_transactions_); + + /** + * @brief Processes all active sessions associated with the current thread. + * + * @details This function iterates through all sessions in the `mysql_sessions` array. For each + * session, it performs the following actions: + * + * - Checks for completed mirror sessions and calls `ProcessAllSessions_CompletedMirrorSession()` + * if necessary. + * - If a maintenance loop is active, it calls `ProcessAllSessions_MaintenanceLoop()` to + * perform maintenance tasks on the session. + * - If the session is healthy and needs processing, it calls the session's `handler()` + * function to handle session logic. + * - If the session is unhealthy, it closes the connection and removes the session from the + * `mysql_sessions` array. + * + * @note This function is called within the `run()` function of the `PgSQL_Thread` class. It + * is the core function responsible for managing and processing all active sessions associated + * with the thread. + * + */ void process_all_sessions(); + + /** + * @brief Refreshes the thread's variables from the global variables handler. + * + * @details This function updates the thread's variables with the latest values from the + * global variables handler (`GloPTH`) to ensure consistency. It retrieves all relevant + * variables from the global handler and updates the corresponding variables in the + * thread's local scope. + * + * @note This function is called periodically by `PgSQL_Thread::run()` to ensure that + * the thread's variables are synchronized with the global variables handler. + * + */ void refresh_variables(); + + /** + * @brief Registers a session as a connection handler. + * + * @param _sess A pointer to the `PgSQL_Session` object to register. + * @param _new A boolean flag indicating whether the session is newly created (true) or not (false). + * + * @details This function marks a session as a connection handler, adding it to the + * `mysql_sessions` array. It sets the session's `thread` pointer to the current thread + * and sets the `connections_handler` flag to true. + * + * @note This function is used to track sessions that are responsible for handling + * connections. + * + */ void register_session_connection_handler(PgSQL_Session * _sess, bool _new = false); + + /** + * @brief Unregisters a session as a connection handler. + * + * @param idx The index of the session in the `mysql_sessions` array. + * @param _new A boolean flag indicating whether the session is newly created (true) or not (false). + * + * @details This function removes a session from the `mysql_sessions` array, effectively + * unregistering it as a connection handler. + * + * @note This function is typically called when a session is no longer active or needs to be + * removed from the connection handler list. + * + */ void unregister_session_connection_handler(int idx, bool _new = false); + + /** + * @brief Handles a new connection accepted by a listener. + * + * @param myds A pointer to the `PgSQL_Data_Stream` object representing the new connection. + * @param n The index of the listener in the `mypolls` array. + * + * @details This function handles the acceptance of a new connection from a listener. It + * accepts the connection using `accept()`, performs some sanity checks, and then creates + * a new `PgSQL_Session` object to manage the connection. It configures the session's + * data stream, adds the connection to the `mypolls` array, and sets the connection's + * state to `CONNECTING_CLIENT`. + * + * @note This function is called within the `run()` function of the `PgSQL_Thread` class + * when a new connection is accepted by a listener. It is responsible for initializing + * the session and adding the connection to the polling loop. + * + */ void listener_handle_new_connection(PgSQL_Data_Stream * myds, unsigned int n); + + /** + * @brief Calculates and updates the memory statistics for the current thread. + * + * @details This function iterates through all sessions associated with the current + * thread and gathers memory usage information from each session. It updates + * the `status_variables` structure with the calculated memory statistics, + * including the following: + * + * - `st_var_mysql_backend_buffers_bytes`: Total bytes used for backend + * connection buffers when fast forwarding is enabled. + * - `st_var_mysql_frontend_buffers_bytes`: Total bytes used for frontend + * connection buffers (read/write buffers and other queues). + * - `st_var_mysql_session_internal_bytes`: Total bytes used for internal + * session data structures. + * + * @note This function is called by `SQL3_GlobalStatus()` when the `_memory` + * flag is set to true. + * + */ void Get_Memory_Stats(); + + /** + * @brief Retrieves a local connection from the thread's cached connection pool. + * + * @param _hid The hostgroup ID to search for connections in. + * @param sess The current session requesting the connection. + * @param gtid_uuid The UUID of the GTID to consider (if applicable). + * @param gtid_trxid The transaction ID of the GTID to consider (if applicable). + * @param max_lag_ms The maximum replication lag allowed for the connection (if applicable). + * + * @return A pointer to a `PgSQL_Connection` object if a suitable connection is found, + * `NULL` otherwise. + * + * @details This function attempts to find a suitable connection in the thread's + * cached connection pool (`cached_connections`). It checks for matching hostgroup + * ID, connection options, GTID (if provided), and maximum replication lag (if + * provided). If a matching connection is found, it is removed from the cache and + * returned. + * + * @note This function is used by `PgSQL_Session::handler()` to obtain a + * connection from the local cache before resorting to the global connection pool. + * + */ PgSQL_Connection* get_MyConn_local(unsigned int, PgSQL_Session * sess, char* gtid_uuid, uint64_t gtid_trxid, int max_lag_ms); + + /** + * @brief Adds a connection to the thread's local connection cache. + * + * @param c The `PgSQL_Connection` object to add to the cache. + * + * @details This function checks the status of the connection's parent server + * (`c->parent->status`) and the connection's asynchronous state machine + * (`c->async_state_machine`). If the server is online and the connection is idle, + * the connection is added to the `cached_connections` pool. Otherwise, the + * connection is pushed to the global connection pool using + * `PgHGM->push_MyConn_to_pool()`. + * + * @note This function is used to manage the thread's local connection cache. + * + */ void push_MyConn_local(PgSQL_Connection*); + + /** + * @brief Returns all connections in the thread's local cache to the global pool. + * + * @details This function iterates through the `cached_connections` pool and + * pushes each connection to the global connection pool using + * `PgHGM->push_MyConn_to_pool_array()`. After pushing the connections, the + * local cache is cleared. + * + * @note This function is called periodically by `PgSQL_Thread::run()` to + * ensure that unused connections are returned to the global pool. + * + */ void return_local_connections(); + + /** + * @brief Iterates through a session array to identify and kill sessions. + * + * @param mysess A pointer to the `PtrArray` containing the sessions to scan. + * + * @details This function iterates through the specified session array and checks + * each session against the thread's kill queues (`kq.conn_ids` and + * `kq.query_ids`). If a session matches a kill request, its `killed` flag is set + * to true. The kill queues are then updated to remove the processed kill + * requests. + * + * @note This function is called by `Scan_Sessions_to_Kill_All()` to kill + * sessions based on kill requests. + * + */ void Scan_Sessions_to_Kill(PtrArray * mysess); + + /** + * @brief Scans all session arrays across all threads to identify and kill sessions. + * + * @details This function iterates through all session arrays across different threads, including main worker threads and idle threads. + * It calls `Scan_Sessions_to_Kill()` for each session array to check for kill requests. + * The kill queues (`kq.conn_ids` and `kq.query_ids`) are cleared after processing all kill requests. + * + * @note This function is called by `PgSQL_Threads_Handler::kill_connection_or_query()` to kill sessions based on kill requests. + * + */ void Scan_Sessions_to_Kill_All(); }; @@ -661,51 +1062,566 @@ class PgSQL_Threads_Handler #ifdef IDLE_THREADS proxysql_pgsql_thread_t* pgsql_threads_idles; #endif // IDLE_THREADS + /** + * @brief Returns the current global version number for thread variables. + * + * @return The current global version number. + * + * @details This function retrieves the current global version number for thread variables. + * This number is incremented whenever a thread variable is changed, allowing threads to + * detect changes and refresh their local variables accordingly. + * + * @note This function is used by threads to check for changes in global variables and + * to update their local copies if necessary. + * + */ unsigned int get_global_version(); + + /** + * @brief Acquires a write lock on the thread variables. + * + * @details This function acquires a write lock on the thread variables using a read-write lock. + * This lock prevents other threads from modifying the variables while the lock is held. + * + * @note This function should be called before modifying any thread variables to ensure + * data consistency. + * + */ void wrlock(); + + /** + * @brief Releases a write lock on the thread variables. + * + * @details This function releases the write lock on the thread variables that was previously + * acquired using `wrlock()`. After calling this function, other threads can modify the + * variables. + * + * @note This function should be called after modifying thread variables to release the + * lock and allow other threads to access the variables. + * + */ void wrunlock(); + + /** + * @brief Commits changes to thread variables and increments the global version. + * + * @details This function increments the global version number for thread variables, signaling + * to other threads that changes have been made. It also updates the global variables + * handler (`GloPTH`) with the committed changes. + * + * @note This function should be called after modifying thread variables to ensure that + * other threads are notified of the changes and can update their local copies. + * + */ void commit(); + + /** + * @brief Retrieves the value of a thread variable as a string. + * + * @param name The name of the variable to retrieve. + * + * @return A pointer to a string containing the value of the variable, or `NULL` if + * the variable is not found. + * + * @details This function retrieves the value of a thread variable as a string. It first + * checks for monitor-related variables, then for SSL variables, and finally for default + * variables. If the variable is found, its value is returned as a dynamically allocated + * string. Otherwise, `NULL` is returned. + * + * @note This function is used to access the values of thread variables from other parts + * of the code. + * + */ char* get_variable(char* name); + + /** + * @brief Sets the value of a thread variable. + * + * @param name The name of the variable to set. + * @param value The new value to assign to the variable. + * + * @return `true` if the variable is set successfully, `false` otherwise. + * + * @details This function sets the value of a thread variable. It first checks for monitor, + * SSL, and default variables. If the variable is found, it updates the variable's value + * with the provided string. For integer variables, it performs range validation. For + * boolean variables, it checks for valid "true" or "false" values. For some variables, + * it performs additional input validation. If the variable is not found or the provided + * value is invalid, `false` is returned. + * + * @note This function is used to modify the values of thread variables from other parts + * of the code. + * + */ bool set_variable(char* name, const char* value); + + /** + * @brief Returns a list of all available thread variables. + * + * @return A dynamically allocated array of strings containing the names of all thread + * variables, or `NULL` if there are no variables. + * + * @details This function retrieves a list of all available thread variables. It scans both + * the `pgsql_thread_variables_names` array and the `mysql_tracked_variables` array to + * include both PgSQL-specific and MySQL-related variables. The returned list is dynamically + * allocated and should be freed by the caller. + * + * @note This function is used to obtain a list of available thread variables for + * display or other purposes. + * + */ char** get_variables_list(); + + /** + * @brief Checks if a thread variable exists. + * + * @param name The name of the variable to check. + * + * @return `true` if the variable exists, `false` otherwise. + * + * @details This function checks if a thread variable exists. It scans both the + * `pgsql_thread_variables_names` array and the `mysql_tracked_variables` array to + * determine if the variable is defined. + * + * @note This function is used to check for the existence of thread variables before + * attempting to access or modify them. + * + */ bool has_variable(const char* name); + /** + * @brief Default constructor for the PgSQL_Threads_Handler class. + * + * @details This constructor initializes various members of the PgSQL_Threads_Handler object + * to their default values. It sets up mutexes, initializes variables, and creates a + * `PgSQL_Listeners_Manager` object. It also sets the `bootstrapping_listeners` flag to + * `true` to indicate that the listener bootstrapping process is ongoing. + * + * @note This constructor is called when a new PgSQL_Threads_Handler object is created. + * + */ PgSQL_Threads_Handler(); + + /** + * @brief Destructor for the PgSQL_Threads_Handler class. + * + * @details This destructor cleans up the PgSQL_Threads_Handler object, releasing resources + * and freeing allocated memory. It frees dynamically allocated strings, deletes the + * `PgSQL_Listeners_Manager` object, and destroys mutexes. + * + * @note This destructor is called automatically when the PgSQL_Threads_Handler object + * goes out of scope or is explicitly deleted. + * + */ ~PgSQL_Threads_Handler(); + /** + * @brief Retrieves the value of a thread variable as a string. + * + * @param name The name of the variable to retrieve. + * + * @return A pointer to a string containing the value of the variable, or `NULL` if + * the variable is not found. + * + * @details This function retrieves the value of a thread variable as a string. It checks + * if the variable exists and then returns its value as a dynamically allocated string. + * If the variable is not found, it returns `NULL`. + * + * @note This function is used internally by the `get_variable()` function to retrieve + * the value of a variable as a string. + */ char* get_variable_string(char* name); + + /** + * @brief Retrieves the value of a thread variable as a uint16_t. + * + * @param name The name of the variable to retrieve. + * + * @return The value of the variable as a uint16_t, or 0 if the variable is not found + * or its value is not a valid uint16_t. + * + * @details This function retrieves the value of a thread variable as a uint16_t. It checks + * if the variable exists and then converts its value to a uint16_t. If the variable is + * not found or its value is not a valid uint16_t, it returns 0. + * + * @note This function is used internally by the `get_variable()` function to retrieve + * the value of a variable as a uint16_t. + */ uint16_t get_variable_uint16(char* name); + + /** + * @brief Retrieves the value of a thread variable as an integer. + * + * @param name The name of the variable to retrieve. + * + * @return The value of the variable as an integer, or 0 if the variable is not found + * or its value is not a valid integer. + * + * @details This function retrieves the value of a thread variable as an integer. It checks + * if the variable exists and then converts its value to an integer. If the variable is + * not found or its value is not a valid integer, it returns 0. + * + * @note This function is used internally by the `get_variable()` function to retrieve + * the value of a variable as an integer. + */ int get_variable_int(const char* name); + + /** + * @brief Prints the current version of the PgSQL_Threads_Handler class. + * + * @details This function prints the current version of the PgSQL_Threads_Handler class + * to the standard error stream. + * + * @note This function is used for debugging and informational purposes. + */ void print_version(); + + /** + * @brief Initializes the PgSQL_Threads_Handler object. + * + * @param num The number of threads to create. + * @param stack The stack size for each thread. + * + * @details This function initializes the PgSQL_Threads_Handler object, creating the + * specified number of threads with the given stack size. It also initializes the + * global variables handler (`GloPTH`) and sets up the thread pool. + * + * @note This function is called once during the PgSQL_Threads_Handler object's + * lifetime to prepare it for managing threads. + */ void init(unsigned int num = 0, size_t stack = 0); + + /** + * @brief Creates a new thread. + * + * @param tn The thread number. + * @param start_routine The start routine for the thread. + * @param epoll_thread A boolean flag indicating whether the thread is an epoll thread (true) + * or a worker thread (false). + * + * @return A pointer to the newly created thread object, or `NULL` if the thread creation + * failed. + * + * @details This function creates a new thread with the specified thread number, start routine, + * and thread type. It initializes the thread object, sets up the thread's variables, and + * starts the thread's execution. + * + * @note This function is used to create new threads for the PgSQL_Threads_Handler object. + * + */ proxysql_pgsql_thread_t* create_thread(unsigned int tn, void* (*start_routine) (void*), bool); + + /** + * @brief Shuts down all threads in the thread pool. + * + * @details This function shuts down all threads in the thread pool, gracefully terminating + * their execution. It sets the `shutdown` flag to `true` for each thread, allowing them + * to exit their main loop. It then waits for all threads to terminate and frees any + * associated resources. + * + * @note This function is called when the PgSQL_Threads_Handler object is being shut down + * to gracefully terminate all managed threads. + * + */ void shutdown_threads(); + + /** + * @brief Adds a new listener to the thread pool, based on an interface string. + * + * @param iface The interface string in the format "address:port" or "[ipv6_address]:port". + * + * @return 0 on success, -1 on failure. + * + * @details This function adds a new listener to the thread pool based on the provided + * interface string. It delegates the actual listener creation to the `PgSQL_Listeners_Manager` + * object (`MLM`). If the listener is successfully added, it signals all threads in the pool + * to update their polling lists. + * + * @note This function is used to configure listeners for the PgSQL_Threads_Handler object. + */ int listener_add(const char* iface); + + /** + * @brief Adds a new listener to the thread pool, based on an address and port. + * + * @param address The address of the listener. + * @param port The port of the listener. + * + * @return 0 on success, -1 on failure. + * + * @details This function adds a new listener to the thread pool based on the provided + * address and port. It delegates the actual listener creation to the `PgSQL_Listeners_Manager` + * object (`MLM`). If the listener is successfully added, it signals all threads in the pool + * to update their polling lists. + * + * @note This function is used to configure listeners for the PgSQL_Threads_Handler object. + */ int listener_add(const char* address, int port); + + /** + * @brief Removes a listener from the thread pool, based on an interface string. + * + * @param iface The interface string in the format "address:port" or "[ipv6_address]:port". + * + * @return 0 on success, -1 on failure. + * + * @details This function removes a listener from the thread pool based on the provided + * interface string. It delegates the actual listener removal to the `PgSQL_Listeners_Manager` + * object (`MLM`). If the listener is successfully removed, it signals all threads in the pool + * to update their polling lists. + * + * @note This function is used to remove listeners from the PgSQL_Threads_Handler object. + */ int listener_del(const char* iface); + + /** + * @brief Removes a listener from the thread pool, based on an address and port. + * + * @param address The address of the listener to remove. + * @param port The port of the listener to remove. + * + * @return 0 on success, -1 on failure. + * + * @details This function removes a listener from the thread pool based on the provided + * address and port. It delegates the actual listener removal to the `PgSQL_Listeners_Manager` + * object (`MLM`). If the listener is successfully removed, it signals all threads in the pool + * to update their polling lists. + * + * @note This function is used to remove listeners from the PgSQL_Threads_Handler object. + */ int listener_del(const char* address, int port); + + /** + * @brief Starts all configured listeners in the thread pool. + * + * @details This function starts all listeners that have been configured for the + * PgSQL_Threads_Handler object. It parses the `interfaces` variable, which contains + * a list of interface strings, and calls `listener_add()` to add each listener + * to the pool. After all listeners have been added, it sets the `bootstrapping_listeners` + * flag to `false` to indicate that the listener bootstrapping process is complete. + * + * @note This function is called to initiate the listening process for the + * PgSQL_Threads_Handler object. + */ void start_listeners(); + + /** + * @brief Stops all listeners in the thread pool. + * + * @details This function stops all listeners that have been configured for the + * PgSQL_Threads_Handler object. It parses the `interfaces` variable, which contains + * a list of interface strings, and calls `listener_del()` to remove each listener + * from the pool. + * + * @note This function is called to terminate the listening process for the + * PgSQL_Threads_Handler object. + */ void stop_listeners(); + + /** + * @brief Signals all threads in the thread pool. + * + * @param _c The signal value to send to each thread. + * + * @details This function sends a signal to all threads in the thread pool. It iterates + * through the thread pool and writes the signal value to the pipe associated with each + * thread. + * + * @note This function is used to send signals to threads for various purposes, such as + * notifying them of changes in global variables, requesting a thread to perform a specific + * task, or signaling a shutdown event. + * + */ void signal_all_threads(unsigned char _c = 0); + + /** + * @brief Retrieves a process list for all threads in the thread pool. + * + * @return A `SQLite3_result` object containing the process list, or `NULL` if an error + * occurred. + * + * @details This function retrieves a process list for all threads in the thread pool. It + * iterates through the thread pool and gathers information about each active session. + * The information is then formatted into a `SQLite3_result` object, which can be used + * by the SQLite3 engine to return the process list to the client. + * + * @note This function is used to provide a process list view for the PgSQL_Threads_Handler + * object, allowing administrators to monitor active sessions and their status. + * + */ SQLite3_result* SQL3_Processlist(); + + /** + * @brief Retrieves global status information for the thread pool. + * + * @param _memory A boolean flag indicating whether to include memory statistics in the + * global status information. + * + * @return A `SQLite3_result` object containing the global status information, or `NULL` + * if an error occurred. + * + * @details This function retrieves global status information for the thread pool, including + * metrics such as uptime, active transactions, connections, and queries. If the `_memory` + * flag is set to `true`, it also includes memory statistics for each thread. + * + * @note This function is used to provide a global status view for the PgSQL_Threads_Handler + * object, allowing administrators to monitor the overall health and performance of the + * thread pool. + * + */ SQLite3_result* SQL3_GlobalStatus(bool _memory); + + /** + * @brief Kills a session based on its thread session ID. + * + * @param _thread_session_id The thread session ID of the session to kill. + * + * @return `true` if the session is found and killed, `false` otherwise. + * + * @details This function attempts to find and kill a session based on its thread session ID. + * It iterates through all threads in the thread pool and searches for a session with the + * matching ID. If the session is found, its `killed` flag is set to `true`, indicating that + * the session should be terminated. + * + * @note This function is used to terminate a specific session by its thread session ID. + * + */ bool kill_session(uint32_t _thread_session_id); + + /** + * @brief Retrieves the total length of the mirror queue across all threads. + * + * @return The total length of the mirror queue. + * + * @details This function retrieves the total length of the mirror queue across all threads. + * It iterates through the thread pool and sums the length of the mirror queue for each + * thread. + * + * @note This function is used to monitor the size of the mirror queue, which is used + * to queue mirror sessions for processing. + * + */ unsigned long long get_total_mirror_queue(); + //unsigned long long get_status_variable(enum PgSQL_Thread_status_variable v_idx, p_th_counter::metric m_idx, unsigned long long conv = 0); //unsigned long long get_status_variable(enum PgSQL_Thread_status_variable v_idx, p_th_gauge::metric m_idx, unsigned long long conv = 0); + + /** + * @brief Retrieves the total number of active transactions across all threads. + * + * @return The total number of active transactions. + * + * @details This function retrieves the total number of active transactions across all + * threads in the thread pool. It iterates through the thread pool and sums the number + * of active transactions for each thread. + * + * @note This function is used to monitor the number of active transactions, which is + * a key performance indicator for the PgSQL_Threads_Handler object. + * + */ unsigned int get_active_transations(); + #ifdef IDLE_THREADS + /** + * @brief Retrieves the number of non-idle client connections across all threads. + * + * @return The number of non-idle client connections. + * + * @details This function retrieves the number of non-idle client connections across all + * threads in the thread pool. It iterates through the thread pool and sums the number + * of non-idle client connections for each thread. + * + * @note This function is used to monitor the number of active client connections, which + * is a key performance indicator for the PgSQL_Threads_Handler object. + * + */ unsigned int get_non_idle_client_connections(); #endif // IDLE_THREADS + + /** + * @brief Retrieves the total number of bytes used for backend connection buffers across + * all threads. + * + * @return The total number of bytes used for backend connection buffers. + * + * @details This function retrieves the total number of bytes used for backend connection + * buffers across all threads in the thread pool. It iterates through the thread pool and + * sums the number of bytes used for backend connection buffers for each thread. + * + * @note This function is used to monitor the memory usage of backend connection buffers, + * which is a key performance indicator for the PgSQL_Threads_Handler object. + * + */ unsigned long long get_pgsql_backend_buffers_bytes(); + + /** + * @brief Retrieves the total number of bytes used for frontend connection buffers across + * all threads. + * + * @return The total number of bytes used for frontend connection buffers. + * + * @details This function retrieves the total number of bytes used for frontend connection + * buffers across all threads in the thread pool. It iterates through the thread pool and + * sums the number of bytes used for frontend connection buffers for each thread. + * + * @note This function is used to monitor the memory usage of frontend connection buffers, + * which is a key performance indicator for the PgSQL_Threads_Handler object. + * + */ unsigned long long get_pgsql_frontend_buffers_bytes(); + + /** + * @brief Retrieves the total number of bytes used for internal session data structures + * across all threads. + * + * @return The total number of bytes used for internal session data structures. + * + * @details This function retrieves the total number of bytes used for internal session + * data structures across all threads in the thread pool. It iterates through the thread pool + * and sums the number of bytes used for internal session data structures for each thread. + * + * @note This function is used to monitor the memory usage of internal session data + * structures, which is a key performance indicator for the PgSQL_Threads_Handler object. + * + */ unsigned long long get_pgsql_session_internal_bytes(); + iface_info* MLM_find_iface_from_fd(int fd) { return MLM->find_iface_from_fd(fd); } + + /** + * @brief Calculates and updates the memory statistics for all threads in the pool. + * + * @details This function iterates through all threads in the thread pool and calls + * the `Get_Memory_Stats()` function for each thread to calculate and update its + * memory statistics. + * + * @note This function is used to gather memory statistics for all threads in the + * pool, providing a comprehensive view of memory usage. + * + */ void Get_Memory_Stats(); + + /** + * @brief Sends a kill request to all threads in the pool to either kill a connection + * or a query. + * + * @param _thread_session_id The thread session ID of the connection or query to kill. + * @param query A boolean flag indicating whether to kill a query (true) or a connection + * (false). + * @param username The username associated with the connection or query. + * + * @details This function sends a kill request to all threads in the pool to either kill + * a connection or a query. It adds the kill request to the kill queue (`kq.conn_ids` or + * `kq.query_ids`) for each thread and then signals all threads to process the kill queue. + * + * @note This function is used to terminate a specific connection or query by its thread + * session ID. + * + */ void kill_connection_or_query(uint32_t _thread_session_id, bool query, char* username); }; - - + + #endif /* __CLASS_PGSQL_THREAD_H */ diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index edac2a69d..b0ce1c90e 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -1522,37 +1522,6 @@ void PgSQL_Session::handler_again___new_thread_to_kill_connection() { // true should jump to handler_again #define NEXT_IMMEDIATE_NEW(new_st) do { set_status(new_st); return true; } while (0) -#if 0 -bool PgSQL_Session::handler_again___verify_backend_multi_statement() { - if ((client_myds->myconn->options.client_flag & CLIENT_MULTI_STATEMENTS) != (mybe->server_myds->myconn->options.client_flag & CLIENT_MULTI_STATEMENTS)) { - - if (client_myds->myconn->options.client_flag & CLIENT_MULTI_STATEMENTS) - mybe->server_myds->myconn->options.client_flag |= CLIENT_MULTI_STATEMENTS; - else - mybe->server_myds->myconn->options.client_flag &= ~CLIENT_MULTI_STATEMENTS; - - switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - case PROCESSING_STMT_EXECUTE: - previous_status.push(PROCESSING_STMT_EXECUTE); - break; - default: - // LCOV_EXCL_START - assert(0); - break; - // LCOV_EXCL_STOP - } - NEXT_IMMEDIATE_NEW(SETTING_MULTI_STMT); - } - return false; -} -#endif // 0 - bool PgSQL_Session::handler_again___verify_init_connect() { if (mybe->server_myds->myconn->options.init_connect_sent == false) { // we needs to set it to true @@ -1574,117 +1543,6 @@ bool PgSQL_Session::handler_again___verify_init_connect() { return false; } -#if 0 -bool PgSQL_Session::handler_again___verify_backend_session_track_gtids() { - bool ret = false; - proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Session %p , client: %s , backend: %s\n", this, client_myds->myconn->options.session_track_gtids, mybe->server_myds->myconn->options.session_track_gtids); - // we first verify that the backend supports it - // if backend is old (or if it is not pgsql) ignore this setting - if ((mybe->server_myds->myconn->pgsql->server_capabilities & CLIENT_SESSION_TRACKING) == 0) { - // the backend doesn't support CLIENT_SESSION_TRACKING - return ret; // exit immediately - } - uint32_t b_int = mybe->server_myds->myconn->options.session_track_gtids_int; - uint32_t f_int = client_myds->myconn->options.session_track_gtids_int; - - // we need to precompute and hardcode the values for OFF and OWN_GTID - // for performance reason we hardcoded the values - // OFF = 114160514 - if ( - (b_int == 114160514) // OFF - || - (b_int == 0) // not configured yet - ) { - if (strcmp(mysql_thread___default_session_track_gtids, (char*)"OWN_GTID") == 0) { - // backend connection doesn't have session_track_gtids enabled - ret = true; - } - else { - if (f_int != 0 && f_int != 114160514) { - // client wants GTID - ret = true; - } - } - } - - if (ret) { - // we deprecated handler_again___verify_backend__generic_variable - // and moved the logic here - if (mybe->server_myds->myconn->options.session_track_gtids) { // reset current value - free(mybe->server_myds->myconn->options.session_track_gtids); - mybe->server_myds->myconn->options.session_track_gtids = NULL; - } - // because the only two possible values are OWN_GTID and OFF - // and because we don't mind receiving GTIDs , if we reach here - // it means we are setting it to OWN_GTID, either because the client - // wants it, or because it is the default - // therefore we hardcode "OWN_GTID" - mybe->server_myds->myconn->options.session_track_gtids = strdup((char*)"OWN_GTID"); - mybe->server_myds->myconn->options.session_track_gtids_int = - SpookyHash::Hash32((char*)"OWN_GTID", strlen((char*)"OWN_GTID"), 10); - // we now switch status to set session_track_gtids - // Sets the previous status of the PgSQL session according to the current status. - set_previous_status_mode3(); - NEXT_IMMEDIATE_NEW(SETTING_SESSION_TRACK_GTIDS); - } - return ret; -} - -bool PgSQL_Session::handler_again___verify_ldap_user_variable() { - bool ret = false; - if (mybe->server_myds->myconn->options.ldap_user_variable_sent == false) { - ret = true; - } - if (mybe->server_myds->myconn->options.ldap_user_variable_value == NULL) { - ret = true; - } - if (ret == false) { - if (mybe->server_myds->myconn->options.ldap_user_variable_sent) { - if (client_myds && client_myds->myconn) { - if (client_myds->myconn->userinfo) { - if (client_myds->myconn->userinfo->fe_username) { - if (strcmp(mybe->server_myds->myconn->options.ldap_user_variable_value, client_myds->myconn->userinfo->fe_username)) { - ret = true; - free(mybe->server_myds->myconn->options.ldap_user_variable); - mybe->server_myds->myconn->options.ldap_user_variable = NULL; - free(mybe->server_myds->myconn->options.ldap_user_variable_value); - mybe->server_myds->myconn->options.ldap_user_variable_value = NULL; - mybe->server_myds->myconn->options.ldap_user_variable_sent = false; - } - } - } - } - } - } - if (ret) { - // we needs to set it to true - mybe->server_myds->myconn->options.ldap_user_variable_sent = true; - if (mysql_thread___ldap_user_variable) { - // we send ldap user variable query only if set - mybe->server_myds->myconn->options.ldap_user_variable = strdup(mysql_thread___ldap_user_variable); - switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - case PROCESSING_STMT_EXECUTE: - previous_status.push(PROCESSING_STMT_EXECUTE); - break; - default: - // LCOV_EXCL_START - assert(0); - break; - // LCOV_EXCL_STOP - } - NEXT_IMMEDIATE_NEW(SETTING_LDAP_USER_VARIABLE); - } - } - return false; -} -#endif // 0 - bool PgSQL_Session::handler_again___verify_backend_user_db() { PgSQL_Data_Stream* myds = mybe->server_myds; proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Session %p , client: %s , backend: %s\n", this, client_myds->myconn->userinfo->username, mybe->server_myds->myconn->userinfo->username); @@ -1784,193 +1642,6 @@ bool PgSQL_Session::handler_again___status_SETTING_INIT_CONNECT(int* _rc) { return ret; } -#if 0 -bool PgSQL_Session::handler_again___status_SETTING_LDAP_USER_VARIABLE(int* _rc) { - bool ret = false; - assert(mybe->server_myds->myconn); - PgSQL_Data_Stream* myds = mybe->server_myds; - PgSQL_Connection* myconn = myds->myconn; - myds->DSS = STATE_MARIADB_QUERY; - enum session_status st = status; - - if ( - (GloMyLdapAuth == NULL) || (use_ldap_auth == false) - || - (client_myds == NULL || client_myds->myconn == NULL || client_myds->myconn->userinfo == NULL) - ) { // nothing to do - myds->revents |= POLLOUT; // we also set again POLLOUT to send a query immediately! - //myds->free_mysql_real_query(); - myds->DSS = STATE_MARIADB_GENERIC; - st = previous_status.top(); - previous_status.pop(); - NEXT_IMMEDIATE_NEW(st); - } - - if (myds->mypolls == NULL) { - thread->mypolls.add(POLLIN | POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime); - } - int rc; - if (myconn->async_state_machine == ASYNC_IDLE) { - char* fe = client_myds->myconn->userinfo->fe_username; - char* a = (char*)"SET @%s:='%s'"; - if (fe == NULL) { - fe = (char*)"unknown"; - } - if (myconn->options.ldap_user_variable_value) { - free(myconn->options.ldap_user_variable_value); - } - myconn->options.ldap_user_variable_value = strdup(fe); - char* buf = (char*)malloc(strlen(fe) + strlen(a) + strlen(myconn->options.ldap_user_variable)); - sprintf(buf, a, myconn->options.ldap_user_variable, fe); - rc = myconn->async_send_simple_command(myds->revents, buf, strlen(buf)); - free(buf); - } - else { // if async_state_machine is not ASYNC_IDLE , arguments are ignored - rc = myconn->async_send_simple_command(myds->revents, (char*)"", 0); - } - if (rc == 0) { - myds->revents |= POLLOUT; // we also set again POLLOUT to send a query immediately! - //myds->free_mysql_real_query(); - myds->DSS = STATE_MARIADB_GENERIC; - st = previous_status.top(); - previous_status.pop(); - NEXT_IMMEDIATE_NEW(st); - } - else { - if (rc == -1) { - // the command failed - int myerr = mysql_errno(myconn->pgsql); - PgHGM->p_update_pgsql_error_counter( - p_pgsql_error_type::pgsql, - myconn->parent->myhgc->hid, - myconn->parent->address, - myconn->parent->port, - (myerr ? myerr : ER_PROXYSQL_OFFLINE_SRV) - ); - if (myerr >= 2000 || myerr == 0) { - bool retry_conn = false; - // client error, serious - detected_broken_connection(__FILE__, __LINE__, __func__, "while setting LDAP USER VARIABLE", myconn); - if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { - retry_conn = true; - } - myds->destroy_MySQL_Connection_From_Pool(false); - myds->fd = 0; - if (retry_conn) { - myds->DSS = STATE_NOT_INITIALIZED; - NEXT_IMMEDIATE_NEW(CONNECTING_SERVER); - } - *_rc = -1; // an error happened, we should destroy the Session - return ret; - } - else { - proxy_warning("Error while setting LDAP USER VARIABLE: %s:%d hg %d : %d, %s\n", myconn->parent->address, myconn->parent->port, current_hostgroup, myerr, mysql_error(myconn->pgsql)); - // we won't go back to PROCESSING_QUERY - st = previous_status.top(); - previous_status.pop(); - char sqlstate[10]; - sprintf(sqlstate, "%s", mysql_sqlstate(myconn->pgsql)); - client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, mysql_errno(myconn->pgsql), sqlstate, mysql_error(myconn->pgsql)); - myds->destroy_MySQL_Connection_From_Pool(true); - myds->fd = 0; - status = WAITING_CLIENT_DATA; - client_myds->DSS = STATE_SLEEP; - } - } - else { - // rc==1 , nothing to do for now - } - } - return ret; -} - -bool PgSQL_Session::handler_again___status_SETTING_SQL_LOG_BIN(int* _rc) { - bool ret = false; - assert(mybe->server_myds->myconn); - PgSQL_Data_Stream* myds = mybe->server_myds; - PgSQL_Connection* myconn = myds->myconn; - myds->DSS = STATE_MARIADB_QUERY; - enum session_status st = status; - if (myds->mypolls == NULL) { - thread->mypolls.add(POLLIN | POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime); - } - char* query = NULL; - unsigned long query_length = 0; - if (myconn->async_state_machine == ASYNC_IDLE) { - char* q = (char*)"SET SQL_LOG_BIN=%s"; - query = (char*)malloc(strlen(q) + 8); - sprintf(query, q, pgsql_variables.client_get_value(this, SQL_SQL_LOG_BIN)); - query_length = strlen(query); - } - int rc = myconn->async_send_simple_command(myds->revents, query, query_length); - if (query) { - free(query); - query = NULL; - } - if (rc == 0) { - if (!strcmp("0", pgsql_variables.client_get_value(this, SQL_SQL_LOG_BIN)) || !strcasecmp("OFF", pgsql_variables.client_get_value(this, SQL_SQL_LOG_BIN))) { - // Pay attention here. STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0 sets sql_log_bin to ZERO: - // - sql_log_bin=0 => true - // - sql_log_bin=1 => false - myconn->set_status(true, STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0); - } - else if (!strcmp("1", pgsql_variables.client_get_value(this, SQL_SQL_LOG_BIN)) || !strcasecmp("ON", pgsql_variables.client_get_value(this, SQL_SQL_LOG_BIN))) { - myconn->set_status(false, STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0); - } - myds->revents |= POLLOUT; // we also set again POLLOUT to send a query immediately! - myds->DSS = STATE_MARIADB_GENERIC; - st = previous_status.top(); - previous_status.pop(); - NEXT_IMMEDIATE_NEW(st); - } - else { - if (rc == -1) { - // the command failed - int myerr = mysql_errno(myconn->pgsql); - PgHGM->p_update_pgsql_error_counter( - p_pgsql_error_type::pgsql, - myconn->parent->myhgc->hid, - myconn->parent->address, - myconn->parent->port, - (myerr ? myerr : ER_PROXYSQL_OFFLINE_SRV) - ); - if (myerr >= 2000 || myerr == 0) { - bool retry_conn = false; - // client error, serious - detected_broken_connection(__FILE__, __LINE__, __func__, "while setting SQL_LOG_BIN", myconn); - if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { - retry_conn = true; - } - myds->destroy_MySQL_Connection_From_Pool(false); - myds->fd = 0; - if (retry_conn) { - myds->DSS = STATE_NOT_INITIALIZED; - NEXT_IMMEDIATE_NEW(CONNECTING_SERVER); - } - *_rc = -1; // an error happened, we should destroy the Session - return ret; - } - else { - proxy_warning("Error while setting SQL_LOG_BIN: %s:%d hg %d : %d, %s\n", myconn->parent->address, myconn->parent->port, current_hostgroup, myerr, mysql_error(myconn->pgsql)); - // we won't go back to PROCESSING_QUERY - st = previous_status.top(); - previous_status.pop(); - char sqlstate[10]; - sprintf(sqlstate, "%s", mysql_sqlstate(myconn->pgsql)); - client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, mysql_errno(myconn->pgsql), sqlstate, mysql_error(myconn->pgsql)); - myds->destroy_MySQL_Connection_From_Pool(true); - myds->fd = 0; - RequestEnd(myds); - } - } - else { - // rc==1 , nothing to do for now - } - } - return ret; -} -#endif // 0 - bool PgSQL_Session::handler_again___status_CHANGING_CHARSET(int* _rc) { assert(mybe->server_myds->myconn); PgSQL_Data_Stream* myds = mybe->server_myds; @@ -2252,148 +1923,6 @@ bool PgSQL_Session::handler_again___status_SETTING_GENERIC_VARIABLE(int* _rc, co return ret; } -#if 0 -bool PgSQL_Session::handler_again___status_SETTING_MULTI_STMT(int* _rc) { - assert(mybe->server_myds->myconn); - PgSQL_Data_Stream* myds = mybe->server_myds; - PgSQL_Connection* myconn = myds->myconn; - enum session_status st = status; - bool ret = false; - - if (myds->mypolls == NULL) { - thread->mypolls.add(POLLIN | POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime); - } - int rc = myconn->async_set_option(myds->revents, myconn->options.client_flag & CLIENT_MULTI_STATEMENTS); - if (rc == 0) { - myds->DSS = STATE_MARIADB_GENERIC; - st = previous_status.top(); - previous_status.pop(); - NEXT_IMMEDIATE_NEW(st); - } - else { - if (rc == -1) { - // the command failed - int myerr = mysql_errno(myconn->pgsql); - PgHGM->p_update_pgsql_error_counter( - p_pgsql_error_type::pgsql, - myconn->parent->myhgc->hid, - myconn->parent->address, - myconn->parent->port, - (myerr ? myerr : ER_PROXYSQL_OFFLINE_SRV) - ); - if (myerr >= 2000 || myerr == 0) { - bool retry_conn = false; - // client error, serious - detected_broken_connection(__FILE__, __LINE__, __func__, "while setting MYSQL_OPTION_MULTI_STATEMENTS", myconn); - //if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { - if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { - retry_conn = true; - } - myds->destroy_MySQL_Connection_From_Pool(false); - myds->fd = 0; - if (retry_conn) { - myds->DSS = STATE_NOT_INITIALIZED; - NEXT_IMMEDIATE_NEW(CONNECTING_SERVER); - } - *_rc = -1; // an error happened, we should destroy the Session - return ret; - } - else { - proxy_warning("Error during MYSQL_OPTION_MULTI_STATEMENTS : %d, %s\n", myerr, mysql_error(myconn->pgsql)); - // we won't go back to PROCESSING_QUERY - st = previous_status.top(); - previous_status.pop(); - char sqlstate[10]; - sprintf(sqlstate, "%s", mysql_sqlstate(myconn->pgsql)); - client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, mysql_errno(myconn->pgsql), sqlstate, mysql_error(myconn->pgsql)); - myds->destroy_MySQL_Connection_From_Pool(true); - myds->fd = 0; - RequestEnd(myds); - } - } - else { - // rc==1 , nothing to do for now - } - } - return ret; -} - -bool PgSQL_Session::handler_again___status_SETTING_SESSION_TRACK_GTIDS(int* _rc) { - bool ret = false; - assert(mybe->server_myds->myconn); - ret = handler_again___status_SETTING_GENERIC_VARIABLE(_rc, (char*)"SESSION_TRACK_GTIDS", mybe->server_myds->myconn->options.session_track_gtids, true); - return ret; -} - -bool PgSQL_Session::handler_again___status_CHANGING_SCHEMA(int* _rc) { - bool ret = false; - //fprintf(stderr,"CHANGING_SCHEMA\n"); - assert(mybe->server_myds->myconn); - PgSQL_Data_Stream* myds = mybe->server_myds; - PgSQL_Connection* myconn = myds->myconn; - myds->DSS = STATE_MARIADB_QUERY; - enum session_status st = status; - if (myds->mypolls == NULL) { - thread->mypolls.add(POLLIN | POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime); - } - int rc = myconn->async_select_db(myds->revents); - if (rc == 0) { - //__sync_fetch_and_add(&PgHGM->status.backend_init_db, 1); - myds->myconn->userinfo->set(client_myds->myconn->userinfo); - myds->DSS = STATE_MARIADB_GENERIC; - st = previous_status.top(); - previous_status.pop(); - NEXT_IMMEDIATE_NEW(st); - } - else { - if (rc == -1) { - // the command failed - int myerr = mysql_errno(myconn->pgsql); - PgHGM->p_update_pgsql_error_counter( - p_pgsql_error_type::pgsql, - myconn->parent->myhgc->hid, - myconn->parent->address, - myconn->parent->port, - (myerr ? myerr : ER_PROXYSQL_OFFLINE_SRV) - ); - if (myerr >= 2000 || myerr == 0) { - bool retry_conn = false; - // client error, serious - detected_broken_connection(__FILE__, __LINE__, __func__, "during INIT_DB", myconn); - //if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { - if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { - retry_conn = true; - } - myds->destroy_MySQL_Connection_From_Pool(false); - myds->fd = 0; - if (retry_conn) { - myds->DSS = STATE_NOT_INITIALIZED; - NEXT_IMMEDIATE_NEW(CONNECTING_SERVER); - } - *_rc = -1; // an error happened, we should destroy the Session - return ret; - } - else { - proxy_warning("Error during INIT_DB: %d, %s\n", myerr, mysql_error(myconn->pgsql)); - // we won't go back to PROCESSING_QUERY - st = previous_status.top(); - previous_status.pop(); - char sqlstate[10]; - sprintf(sqlstate, "%s", mysql_sqlstate(myconn->pgsql)); - client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, mysql_errno(myconn->pgsql), sqlstate, mysql_error(myconn->pgsql)); - myds->destroy_MySQL_Connection_From_Pool(true); - myds->fd = 0; - RequestEnd(myds); - } - } - else { - // rc==1 , nothing to do for now - } - } - return false; -} -#endif // 0 - bool PgSQL_Session::handler_again___status_CONNECTING_SERVER(int* _rc) { //fprintf(stderr,"CONNECTING_SERVER\n"); unsigned long long curtime = monotonic_time(); @@ -3493,15 +3022,6 @@ int PgSQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) { proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Received query to be processed with MariaDB Client library\n"); mybe->server_myds->killed_at = 0; mybe->server_myds->kill_type = 0; -#if 0 - if (GloMyLdapAuth) { - if (session_type == PROXYSQL_SESSION_PGSQL) { - if (mysql_thread___add_ldap_user_comment && strlen(mysql_thread___add_ldap_user_comment)) { - add_ldap_comment_to_pkt(&pkt); - } - } - } -#endif // 0 mybe->server_myds->mysql_real_query.init(&pkt); mybe->server_myds->statuses.questions++; client_myds->setDSS_STATE_QUERY_SENT_NET(); @@ -3713,15 +3233,6 @@ int PgSQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) { proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Received query to be processed with MariaDB Client library\n"); mybe->server_myds->killed_at = 0; mybe->server_myds->kill_type = 0; -#if 0 - if (GloMyLdapAuth) { - if (session_type == PROXYSQL_SESSION_PGSQL) { - if (mysql_thread___add_ldap_user_comment && strlen(mysql_thread___add_ldap_user_comment)) { - add_ldap_comment_to_pkt(&pkt); - } - } - } -#endif // 0 mybe->server_myds->mysql_real_query.init(&pkt); mybe->server_myds->statuses.questions++; client_myds->setDSS_STATE_QUERY_SENT_NET(); @@ -3731,142 +3242,11 @@ int PgSQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) { } break; case _MYSQL_COM_STMT_PREPARE: -#if 0 - if (GloMyLdapAuth) { - if (session_type == PROXYSQL_SESSION_PGSQL) { - if (mysql_thread___add_ldap_user_comment && strlen(mysql_thread___add_ldap_user_comment)) { - add_ldap_comment_to_pkt(&pkt); - } - } - } -#endif // 0 handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_PREPARE(pkt); break; case _MYSQL_COM_STMT_EXECUTE: handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_EXECUTE(pkt); break; -#if 0 - case _MYSQL_COM_STMT_RESET: - handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_RESET(pkt); - break; - case _MYSQL_COM_STMT_CLOSE: - handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_CLOSE(pkt); - break; - case _MYSQL_COM_STMT_SEND_LONG_DATA: - handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_SEND_LONG_DATA(pkt); - break; - case _MYSQL_COM_BINLOG_DUMP: - case _MYSQL_COM_BINLOG_DUMP_GTID: - case _MYSQL_COM_REGISTER_SLAVE: - // In this switch we handle commands that download binlog events from MySQL - // servers. For these commands a lot of the features provided by ProxySQL - // aren't useful, like multiplexing, query parsing, etc. For this reason, - // ProxySQL enables fast_forward when it receives these commands.  - { - // we use a switch to write the command in the info message - std::string q = "Received command "; - switch ((enum_mysql_command)c) { - case _MYSQL_COM_BINLOG_DUMP: - q += "MYSQL_COM_BINLOG_DUMP"; - break; - case _MYSQL_COM_BINLOG_DUMP_GTID: - q += "MYSQL_COM_BINLOG_DUMP_GTID"; - break; - case _MYSQL_COM_REGISTER_SLAVE: - q += "MYSQL_COM_REGISTER_SLAVE"; - break; - default: - assert(0); - break; - }; - // we add the client details in the info message - if (client_myds && client_myds->addr.addr) { - q += " from client " + std::string(client_myds->addr.addr) + ":" + std::to_string(client_myds->addr.port); - } - q += " . Changing session fast_forward to true"; - proxy_info("%s\n", q.c_str()); - } - session_fast_forward = true; - - if (client_myds->PSarrayIN->len) { - proxy_error("UNEXPECTED PACKET FROM CLIENT -- PLEASE REPORT A BUG\n"); - assert(0); - } - client_myds->PSarrayIN->add(pkt.ptr, pkt.size); - - // The following code prepares the session as if it was configured with fast - // forward before receiving the command. This way the state machine will - // handle the command automatically. - current_hostgroup = previous_hostgroup; - mybe = find_or_create_backend(current_hostgroup); // set a backend - mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active - // We reinitialize the 'wait_until' since this session shouldn't wait for processing as - // we are now transitioning to 'FAST_FORWARD'. - mybe->server_myds->wait_until = 0; - if (mybe->server_myds->DSS == STATE_NOT_INITIALIZED) { - // NOTE: This section is entirely borrowed from 'STATE_SLEEP' for 'session_fast_forward'. - // Check comments there for extra information. - // ============================================================================= - if (mybe->server_myds->max_connect_time == 0) { - uint64_t connect_timeout = - pgsql_thread___connect_timeout_server < pgsql_thread___connect_timeout_server_max ? - pgsql_thread___connect_timeout_server_max : pgsql_thread___connect_timeout_server; - mybe->server_myds->max_connect_time = thread->curtime + connect_timeout * 1000; - } - mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure; - CurrentQuery.start_time = thread->curtime; - // ============================================================================= - - // we don't have a connection - previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD - set_status(CONNECTING_SERVER); // now we need a connection - } - else { - // In case of having a connection, we need to make user to reset the state machine - // for current server 'PgSQL_Data_Stream', setting it outside of any state handled - // by 'mariadb' library. Otherwise 'MySQL_Thread' will threat this - // 'PgSQL_Data_Stream' as library handled. - mybe->server_myds->DSS = STATE_READY; - // myds needs to have encrypted value set correctly - { - PgSQL_Data_Stream* myds = mybe->server_myds; - PgSQL_Connection* myconn = myds->myconn; - assert(myconn != NULL); - // PMC-10005 - // if backend connection uses SSL we will set - // encrypted = true and we will start using the SSL structure - // directly from P_MARIADB_TLS structure. - MYSQL* pgsql = myconn->pgsql; - if (pgsql && myconn->ret_mysql) { - if (pgsql->options.use_ssl == 1) { - P_MARIADB_TLS* matls = (P_MARIADB_TLS*)pgsql->net.pvio->ctls; - if (matls != NULL) { - myds->encrypted = true; - myds->ssl = (SSL*)matls->ssl; - myds->rbio_ssl = BIO_new(BIO_s_mem()); - myds->wbio_ssl = BIO_new(BIO_s_mem()); - SSL_set_bio(myds->ssl, myds->rbio_ssl, myds->wbio_ssl); - } - else { - // if pgsql->options.use_ssl == 1 but matls == NULL - // it means that ProxySQL tried to use SSL to connect to the backend - // but the backend didn't support SSL - } - } - } - } - set_status(FAST_FORWARD); // we can set status to FAST_FORWARD - } - - break; - case _MYSQL_COM_QUIT: - proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_QUIT packet\n"); - if (GloPgSQL_Logger) { GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_QUIT, this, NULL); } - l_free(pkt.size, pkt.ptr); - handler_ret = -1; - return handler_ret; - break; -#endif // 0 default: // in this switch we only handle the most common commands. // The not common commands are handled by "default" , that @@ -4482,32 +3862,8 @@ int PgSQL_Session::handler() { if (handler_again___verify_init_connect()) { goto handler_again; } -#if 0 - if (use_ldap_auth) { - if (handler_again___verify_ldap_user_variable()) { - goto handler_again; - } - } - if (handler_again___verify_backend_autocommit()) { - goto handler_again; - } -#endif // 0 if (locked_on_hostgroup == -1 || locked_on_hostgroup_and_all_variables_set == false) { -#if 0 - if (handler_again___verify_backend_multi_statement()) { - goto handler_again; - } - - if (handler_again___verify_backend_session_track_gtids()) { - goto handler_again; - } -#endif // 0 - // Optimize network traffic when we can use 'SET NAMES' - //if (verify_set_names(this)) { - // goto handler_again; - //} - for (auto i = 0; i < SQL_NAME_LAST_LOW_WM; i++) { auto client_hash = client_myds->myconn->var_hash[i]; #ifdef DEBUG @@ -4872,28 +4228,9 @@ bool PgSQL_Session::handler_again___multiple_statuses(int* rc) { case RESETTING_CONNECTION_V2: ret = handler_again___status_RESETTING_CONNECTION(rc); break; -#if 0 - case CHANGING_AUTOCOMMIT: - ret = handler_again___status_CHANGING_AUTOCOMMIT(rc); - break; - case CHANGING_SCHEMA: - ret = handler_again___status_CHANGING_SCHEMA(rc); - break; - case SETTING_LDAP_USER_VARIABLE: - ret = handler_again___status_SETTING_LDAP_USER_VARIABLE(rc); - break; -#endif // 0 case SETTING_INIT_CONNECT: ret = handler_again___status_SETTING_INIT_CONNECT(rc); break; -#if 0 - case SETTING_MULTI_STMT: - ret = handler_again___status_SETTING_MULTI_STMT(rc); - break; - case SETTING_SESSION_TRACK_GTIDS: - ret = handler_again___status_SETTING_SESSION_TRACK_GTIDS(rc); - break; -#endif // 0 case SETTING_SET_NAMES: ret = handler_again___status_CHANGING_CHARSET(rc); break; @@ -4903,74 +4240,6 @@ bool PgSQL_Session::handler_again___multiple_statuses(int* rc) { return ret; } -/* -void PgSQL_Session::handler___status_CHANGING_USER_CLIENT___STATE_CLIENT_HANDSHAKE(PtrSize_t *pkt, bool *wrong_pass) { - // FIXME: no support for SSL yet - if ( - client_myds->myprot.process_pkt_auth_swich_response((unsigned char *)pkt->ptr,pkt->size)==true - ) { - l_free(pkt->size,pkt->ptr); - proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Session=%p , DS=%p . Successful connection\n", this, client_myds); - client_myds->myprot.generate_pkt_OK(true,NULL,NULL,2,0,0,0,0,NULL); - GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_CHANGE_USER_OK, this, NULL); - status=WAITING_CLIENT_DATA; - client_myds->DSS=STATE_SLEEP; - } else { - l_free(pkt->size,pkt->ptr); - *wrong_pass=true; - // FIXME: this should become close connection - client_myds->setDSS_STATE_QUERY_SENT_NET(); - char *client_addr=NULL; - if (client_myds->client_addr) { - char buf[512]; - switch (client_myds->client_addr->sa_family) { - case AF_INET: { - struct sockaddr_in *ipv4 = (struct sockaddr_in *)client_myds->client_addr; - if (ipv4->sin_port) { - inet_ntop(client_myds->client_addr->sa_family, &ipv4->sin_addr, buf, INET_ADDRSTRLEN); - client_addr = strdup(buf); - } else { - client_addr = strdup((char *)"localhost"); - } - break; - } - case AF_INET6: { - struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)client_myds->client_addr; - inet_ntop(client_myds->client_addr->sa_family, &ipv6->sin6_addr, buf, INET6_ADDRSTRLEN); - client_addr = strdup(buf); - break; - } - default: - client_addr = strdup((char *)"localhost"); - break; - } - } else { - client_addr = strdup((char *)""); - } - char *_s=(char *)malloc(strlen(client_myds->myconn->userinfo->username)+100+strlen(client_addr)); - sprintf(_s,"ProxySQL Error: Access denied for user '%s'@'%s' (using password: %s)", client_myds->myconn->userinfo->username, client_addr, (client_myds->myconn->userinfo->password ? "YES" : "NO")); - proxy_error("ProxySQL Error: Access denied for user '%s'@'%s' (using password: %s)", client_myds->myconn->userinfo->username, client_addr, (client_myds->myconn->userinfo->password ? "YES" : "NO")); - client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,2,1045,(char *)"28000", _s, true); -#ifdef DEBUG - if (client_myds->myconn->userinfo->password) { - char *tmp_pass=strdup(client_myds->myconn->userinfo->password); - int lpass = strlen(tmp_pass); - for (int i=2; imyconn->userinfo->username, client_addr, tmp_pass); - free(tmp_pass); - } else { - proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Session=%p , DS=%p . Wrong credentials for frontend: %s:%s . No password. Disconnecting\n", this, client_myds, client_myds->myconn->userinfo->username, client_addr); - } -#endif //DEBUG - GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_CHANGE_USER_ERR, this, NULL); - free(_s); - __sync_fetch_and_add(&PgHGM->status.access_denied_wrong_password, 1); - } -} -*/ - void PgSQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(PtrSize_t* pkt, bool* wrong_pass) { bool is_encrypted = client_myds->encrypted; bool handshake_response_return = false; @@ -5076,16 +4345,6 @@ void PgSQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE( l_free(pkt->size, pkt->ptr); //if (client_myds->encrypted==false) { if (client_myds->myconn->userinfo->dbname == NULL) { -#if 0 -#ifdef PROXYSQLCLICKHOUSE - if (session_type == PROXYSQL_SESSION_CLICKHOUSE) { - if (strlen(default_schema) == 0) { - free(default_schema); - default_schema = strdup((char*)"default"); - } - } -#endif /* PROXYSQLCLICKHOUSE */ -#endif client_myds->myconn->userinfo->set_dbname(default_schema); } int free_users = 0; @@ -7378,58 +6637,6 @@ bool PgSQL_Session::handle_command_query_kill(PtrSize_t* pkt) { return false; } -#if 0 -void PgSQL_Session::add_ldap_comment_to_pkt(PtrSize_t* _pkt) { - if (GloMyLdapAuth == NULL) - return; - if (use_ldap_auth == false) - return; - if (client_myds == NULL || client_myds->myconn == NULL || client_myds->myconn->userinfo == NULL) - return; - if (client_myds->myconn->userinfo->fe_username == NULL) - return; - char* fe = client_myds->myconn->userinfo->fe_username; - char* a = (char*)" /* %s=%s */"; - char* b = (char*)malloc(strlen(a) + strlen(fe) + strlen(mysql_thread___add_ldap_user_comment)); - sprintf(b, a, mysql_thread___add_ldap_user_comment, fe); - PtrSize_t _new_pkt; - _new_pkt.ptr = malloc(strlen(b) + _pkt->size); - memcpy(_new_pkt.ptr, _pkt->ptr, 5); - unsigned char* _c = (unsigned char*)_new_pkt.ptr; - _c += 5; - void* idx = memchr((char*)_pkt->ptr + 5, ' ', _pkt->size - 5); - if (idx) { - size_t first_word_len = (char*)idx - (char*)_pkt->ptr - 5; - if (((char*)_pkt->ptr + 5)[0] == '/' && ((char*)_pkt->ptr + 5)[1] == '*') { - void* comment_endpos = memmem(static_cast(_pkt->ptr) + 7, _pkt->size - 7, "*/", strlen("*/")); - - if (comment_endpos == NULL || idx < comment_endpos) { - b[1] = ' '; - b[2] = ' '; - b[strlen(b) - 1] = ' '; - b[strlen(b) - 2] = ' '; - } - } - memcpy(_c, (char*)_pkt->ptr + 5, first_word_len); - _c += first_word_len; - memcpy(_c, b, strlen(b)); - _c += strlen(b); - memcpy(_c, (char*)idx, _pkt->size - 5 - first_word_len); - } - else { - memcpy(_c, (char*)_pkt->ptr + 5, _pkt->size - 5); - _c += _pkt->size - 5; - memcpy(_c, b, strlen(b)); - } - l_free(_pkt->size, _pkt->ptr); - _pkt->size = _pkt->size + strlen(b); - _pkt->ptr = _new_pkt.ptr; - free(b); - CurrentQuery.QueryLength = _pkt->size - 5; - CurrentQuery.QueryPointer = (unsigned char*)_pkt->ptr + 5; -} -#endif // 0 - void PgSQL_Session::finishQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn, bool prepared_stmt_with_no_params) { myds->myconn->reduce_auto_increment_delay_token(); if (locked_on_hostgroup >= 0) { @@ -7582,54 +6789,6 @@ void PgSQL_Session::unable_to_parse_set_statement(bool* lock_hostgroup) { } } -#if 0 -void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_RESET(PtrSize_t& pkt) { - uint32_t stmt_global_id = 0; - memcpy(&stmt_global_id, (char*)pkt.ptr + 5, sizeof(uint32_t)); - SLDH->reset(stmt_global_id); - l_free(pkt.size, pkt.ptr); - client_myds->setDSS_STATE_QUERY_SENT_NET(); - unsigned int nTrx = NumActiveTransactions(); - uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0); - if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT; - client_myds->myprot.generate_pkt_OK(true, NULL, NULL, 1, 0, 0, setStatus, 0, NULL); - client_myds->DSS = STATE_SLEEP; - status = WAITING_CLIENT_DATA; -} - -void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_CLOSE(PtrSize_t& pkt) { - uint32_t client_global_id = 0; - memcpy(&client_global_id, (char*)pkt.ptr + 5, sizeof(uint32_t)); - // FIXME: no input validation - uint64_t stmt_global_id = 0; - stmt_global_id = client_myds->myconn->local_stmts->find_global_stmt_id_from_client(client_global_id); - SLDH->reset(client_global_id); - if (stmt_global_id) { - sess_STMTs_meta->erase(stmt_global_id); - } - client_myds->myconn->local_stmts->client_close(client_global_id); - l_free(pkt.size, pkt.ptr); - // FIXME: this is not complete. Counters should be decreased - thread->status_variables.stvar[st_var_frontend_stmt_close]++; - thread->status_variables.stvar[st_var_queries]++; - client_myds->DSS = STATE_SLEEP; - status = WAITING_CLIENT_DATA; -} - - -void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_SEND_LONG_DATA(PtrSize_t& pkt) { - // FIXME: no input validation - uint32_t stmt_global_id = 0; - memcpy(&stmt_global_id, (char*)pkt.ptr + 5, sizeof(uint32_t)); - uint32_t stmt_param_id = 0; - memcpy(&stmt_param_id, (char*)pkt.ptr + 9, sizeof(uint16_t)); - SLDH->add(stmt_global_id, stmt_param_id, (char*)pkt.ptr + 11, pkt.size - 11); - client_myds->DSS = STATE_SLEEP; - status = WAITING_CLIENT_DATA; - l_free(pkt.size, pkt.ptr); -} -#endif // 0 - void PgSQL_Session::detected_broken_connection(const char* file, unsigned int line, const char* func, const char* action, PgSQL_Connection* myconn, bool verbose) { const char* code = PgSQL_Error_Helper::get_error_code(PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION);; diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index 53f1d5a83..ebfeb6ff0 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -5190,9 +5190,6 @@ PgSQL_Connection* PgSQL_Thread::get_MyConn_local(unsigned int _hid, PgSQL_Sessio if (it != parents.end()) { // we didn't exclude this server (yet?) bool gtid_found = false; -#if 0 - gtid_found = PgHGM->gtid_exists(mysrvc, gtid_uuid, gtid_trxid); -#endif // 0 if (gtid_found) { // this server has the correct GTID c = (PgSQL_Connection*)cached_connections->remove_index_fast(i); return c;