Skip to content

Commit

Permalink
Merge pull request #346 from phanindra-ramesh/bq_job_loc
Browse files Browse the repository at this point in the history
Adding bigquery job location in parameters while getting job details
  • Loading branch information
wangxiaoying authored Sep 7, 2022
2 parents 80e7a0a + d1e6f14 commit 87ffcc3
Showing 1 changed file with 27 additions and 17 deletions.
44 changes: 27 additions & 17 deletions connectorx/src/sources/bigquery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,18 +238,28 @@ impl SourcePartition for BigQuerySourcePartition {
self.project_id.as_str(),
QueryRequest::new(self.query.as_str()),
))?;
let job_info = qry
.query_response()
.job_reference
.as_ref()
.ok_or_else(|| anyhow!("job_reference is none"))?;
let params = GetQueryResultsParameters {
format_options: None,
location: job_info.location.clone(),
max_results: None,
page_token: None,
start_index: None,
timeout_ms: None,
};
let rs = self.rt.block_on(
job.get_query_results(
self.project_id.as_str(),
qry.query_response()
.job_reference
.as_ref()
.ok_or_else(|| anyhow!("JobReferencce is none"))?
job_info
.job_id
.as_ref()
.ok_or_else(|| anyhow!("job_id is none"))?
.as_str(),
GetQueryResultsParameters::default(),
params,
),
)?;
BigQuerySourceParser::new(self.rt.clone(), self.client.clone(), rs, &self.schema)
Expand Down Expand Up @@ -327,7 +337,7 @@ macro_rules! impl_produce {
if ridx == (self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?.len()) {
let job = self.client.job();
let job_info = self.response.job_reference.as_ref().ok_or_else(|| anyhow!("job_reference is none"))?;
let params = GetQueryResultsParameters { format_options: None, location: None, max_results: None, page_token: self.response.page_token.clone(), start_index: None, timeout_ms: None };
let params = GetQueryResultsParameters { format_options: None, location: job_info.location.clone(), max_results: None, page_token: self.response.page_token.clone(), start_index: None, timeout_ms: None };
self.response = self.rt.block_on(
job.get_query_results(
job_info.project_id.as_ref().ok_or_else(|| anyhow!("project_id is none"))?.as_str(),
Expand Down Expand Up @@ -360,7 +370,7 @@ macro_rules! impl_produce {
if ridx == (self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?.len()) {
let job = self.client.job();
let job_info = self.response.job_reference.as_ref().ok_or_else(|| anyhow!("job_reference is none"))?;
let params = GetQueryResultsParameters { format_options: None, location: None, max_results: None, page_token: self.response.page_token.clone(), start_index: None, timeout_ms: None };
let params = GetQueryResultsParameters { format_options: None, location: job_info.location.clone(), max_results: None, page_token: self.response.page_token.clone(), start_index: None, timeout_ms: None };
self.response = self.rt.block_on(
job.get_query_results(
job_info.project_id.as_ref().ok_or_else(|| anyhow!("project_id is none"))?.as_str(),
Expand Down Expand Up @@ -411,7 +421,7 @@ impl<'r, 'a> Produce<'r, bool> for BigQuerySourceParser {
.ok_or_else(|| anyhow!("job_reference is none"))?;
let params = GetQueryResultsParameters {
format_options: None,
location: None,
location: job_info.location.clone(),
max_results: None,
page_token: self.response.page_token.clone(),
start_index: None,
Expand Down Expand Up @@ -485,7 +495,7 @@ impl<'r, 'a> Produce<'r, Option<bool>> for BigQuerySourceParser {
.ok_or_else(|| anyhow!("job_reference is none"))?;
let params = GetQueryResultsParameters {
format_options: None,
location: None,
location: job_info.location.clone(),
max_results: None,
page_token: self.response.page_token.clone(),
start_index: None,
Expand Down Expand Up @@ -561,7 +571,7 @@ impl<'r, 'a> Produce<'r, NaiveDate> for BigQuerySourceParser {
.ok_or_else(|| anyhow!("job_reference is none"))?;
let params = GetQueryResultsParameters {
format_options: None,
location: None,
location: job_info.location.clone(),
max_results: None,
page_token: self.response.page_token.clone(),
start_index: None,
Expand Down Expand Up @@ -630,7 +640,7 @@ impl<'r, 'a> Produce<'r, Option<NaiveDate>> for BigQuerySourceParser {
.ok_or_else(|| anyhow!("job_reference is none"))?;
let params = GetQueryResultsParameters {
format_options: None,
location: None,
location: job_info.location.clone(),
max_results: None,
page_token: self.response.page_token.clone(),
start_index: None,
Expand Down Expand Up @@ -705,7 +715,7 @@ impl<'r, 'a> Produce<'r, NaiveDateTime> for BigQuerySourceParser {
.ok_or_else(|| anyhow!("job_reference is none"))?;
let params = GetQueryResultsParameters {
format_options: None,
location: None,
location: job_info.location.clone(),
max_results: None,
page_token: self.response.page_token.clone(),
start_index: None,
Expand Down Expand Up @@ -774,7 +784,7 @@ impl<'r, 'a> Produce<'r, Option<NaiveDateTime>> for BigQuerySourceParser {
.ok_or_else(|| anyhow!("job_reference is none"))?;
let params = GetQueryResultsParameters {
format_options: None,
location: None,
location: job_info.location.clone(),
max_results: None,
page_token: self.response.page_token.clone(),
start_index: None,
Expand Down Expand Up @@ -849,7 +859,7 @@ impl<'r, 'a> Produce<'r, NaiveTime> for BigQuerySourceParser {
.ok_or_else(|| anyhow!("job_reference is none"))?;
let params = GetQueryResultsParameters {
format_options: None,
location: None,
location: job_info.location.clone(),
max_results: None,
page_token: self.response.page_token.clone(),
start_index: None,
Expand Down Expand Up @@ -918,7 +928,7 @@ impl<'r, 'a> Produce<'r, Option<NaiveTime>> for BigQuerySourceParser {
.ok_or_else(|| anyhow!("job_reference is none"))?;
let params = GetQueryResultsParameters {
format_options: None,
location: None,
location: job_info.location.clone(),
max_results: None,
page_token: self.response.page_token.clone(),
start_index: None,
Expand Down Expand Up @@ -993,7 +1003,7 @@ impl<'r, 'a> Produce<'r, DateTime<Utc>> for BigQuerySourceParser {
.ok_or_else(|| anyhow!("job_reference is none"))?;
let params = GetQueryResultsParameters {
format_options: None,
location: None,
location: job_info.location.clone(),
max_results: None,
page_token: self.response.page_token.clone(),
start_index: None,
Expand Down Expand Up @@ -1065,7 +1075,7 @@ impl<'r, 'a> Produce<'r, Option<DateTime<Utc>>> for BigQuerySourceParser {
.ok_or_else(|| anyhow!("job_reference is none"))?;
let params = GetQueryResultsParameters {
format_options: None,
location: None,
location: job_info.location.clone(),
max_results: None,
page_token: self.response.page_token.clone(),
start_index: None,
Expand Down

0 comments on commit 87ffcc3

Please sign in to comment.