Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DoPut FlightSQL handler inadvertently consumes schema at start of Request<Streaming<FlightData>> #4658

Closed
suremarc opened this issue Aug 7, 2023 · 0 comments · Fixed by #4797
Labels

Comments

@suremarc
Copy link
Contributor

suremarc commented Aug 7, 2023

Describe the bug
When executing DoPutPreparedStatementQuery requests to bind parameters to a prepared statement, using the DoPut Flight RPC, one may receive an error like this:

rpc error: code = Internal desc = Received RecordBatch prior to Schema

This is because the implementation of FlightService for FlightSQL servers consumes the first flight batch by calling .message() on the Request<Streaming<FlightData>> handle:

async fn do_put(
    &self,
    mut request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, Status> {
    let cmd = request.get_mut().message().await?.unwrap();

It does this to inspect the type of the FlightDescriptor, which tells it which DoPut handler to dispatch to in the FlightSqlService. If the schema was encoded in the very first message together with the FlightDescriptor, calling .message() consumes both the FlightDescriptor and the schema, so that a FlightSqlService handler such as do_put_prepared_statement_query no longer can read the schema from the Request<Streaming<FlightData>>.

To Reproduce
Implement a FlightSqlService that handles any kind of DoPut request and decodes the flight stream, such as the following:

impl FlightSqlService for FlightSqlServer {
    async fn do_put_prepared_statement_query(
        &self,
        query: CommandPreparedStatementQuery,
        request: Request<Streaming<FlightData>>,
    ) -> Result<Response<<Self as FlightService>::DoPutStream>> {
        info!("do_put_prepared_statement_query");
        let parameters = FlightRecordBatchStream::new_from_flight_data(
            request.into_inner().map_err(|e| e.into()),
        )
        .try_collect::<Vec<_>>()
        .await?;

        todo!()
    }
}

then execute a DoPut using a raw do_put request on the FlightSqlServiceClient, sending the following batch:

let cmd = CommandPreparedStatementQuery {
    prepared_statement_handle: handle.clone(),
};

let flight_stream = FlightDataEncoderBuilder::new()
    .with_flight_descriptor(Some(FlightDescriptor::new_cmd(
        cmd.as_any().encode_to_vec(),
    )))
    .with_schema(batch.schema())
    .build(stream::once(async { Ok(batch) }));
client
    .do_put(flight_stream.map(Result::unwrap))
    .await
    .context("submit DoPut request")?
    .try_collect::<Vec<_>>()
    .await
    .context("read DoPut response")?;

Expected behavior
The server should be able to properly handle flight streams sent from a compliant FlightSQL client.

Additional context
This bug was uncovered while implementing and testing prepared statements for a Rust FlightSQL server. The same bug was observed when submitting queries from both Rust and Go FlightSQL clients (the server is a Rust FlightSQL server).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants