use std::sync::Arc;
use async_stream::stream;
use reqwest::Client;
use tokio_stream::Stream;
use crate::auth::Authenticator;
use crate::error::BQError;
use crate::model::get_query_results_parameters::GetQueryResultsParameters;
use crate::model::get_query_results_response::GetQueryResultsResponse;
use crate::model::job::Job;
use crate::model::job_cancel_response::JobCancelResponse;
use crate::model::job_configuration::JobConfiguration;
use crate::model::job_configuration_query::JobConfigurationQuery;
use crate::model::job_list::JobList;
use crate::model::job_list_parameters::JobListParameters;
use crate::model::job_reference::JobReference;
use crate::model::query_request::QueryRequest;
use crate::model::query_response::QueryResponse;
use crate::model::table_row::TableRow;
use crate::{process_response, urlencode, BIG_QUERY_V2_URL};
#[derive(Clone)]
pub struct JobApi {
client: Client,
auth: Arc<dyn Authenticator>,
base_url: String,
}
impl JobApi {
pub(crate) fn new(client: Client, auth: Arc<dyn Authenticator>) -> Self {
Self {
client,
auth,
base_url: BIG_QUERY_V2_URL.to_string(),
}
}
pub(crate) fn with_base_url(&mut self, base_url: String) -> &mut Self {
self.base_url = base_url;
self
}
pub async fn query(&self, project_id: &str, query_request: QueryRequest) -> Result<QueryResponse, BQError> {
let req_url = format!(
"{base_url}/projects/{project_id}/queries",
base_url = self.base_url,
project_id = urlencode(project_id)
);
let access_token = self.auth.access_token().await?;
let request = self
.client
.post(req_url.as_str())
.bearer_auth(access_token)
.json(&query_request)
.build()?;
let resp = self.client.execute(request).await?;
let query_response: QueryResponse = process_response(resp).await?;
Ok(query_response)
}
pub fn query_all<'a>(
&'a self,
project_id: &'a str,
query: JobConfigurationQuery,
page_size: Option<i32>,
) -> impl Stream<Item = Result<Vec<TableRow>, BQError>> + 'a {
stream! {
let job = Job {
configuration: Some(JobConfiguration {
dry_run: Some(false),
query: Some(query),
..Default::default()
}),
..Default::default()
};
let job = self.insert(project_id, job).await?;
if let Some(ref job_id) = job.job_reference.and_then(|r| r.job_id) {
let mut page_token: Option<String> = None;
loop {
let qr = self
.get_query_results(
project_id,
job_id,
GetQueryResultsParameters {
page_token: page_token.clone(),
max_results: page_size,
..Default::default()
},
)
.await?;
if !qr.job_complete.unwrap_or(false) {
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
continue;
}
yield Ok(qr.rows.unwrap_or_default());
page_token = match qr.page_token {
None => break,
f => f,
};
}
}
}
}
pub fn query_all_with_location<'a>(
&'a self,
project_id: &'a str,
location: &'a str,
query: JobConfigurationQuery,
page_size: Option<i32>,
) -> impl Stream<Item = Result<Vec<TableRow>, BQError>> + 'a {
stream! {
let job = Job {
configuration: Some(JobConfiguration {
dry_run: Some(false),
query: Some(query),
..Default::default()
}),
job_reference: Some(JobReference {
location: Some(location.to_string()),
project_id: Some(project_id.to_string()),
..Default::default()
}),
..Default::default()
};
let job = self.insert(project_id, job).await?;
if let Some(ref job_id) = job.job_reference.and_then(|r| r.job_id) {
let mut page_token: Option<String> = None;
loop {
let qr = self
.get_query_results(
project_id,
job_id,
GetQueryResultsParameters {
page_token: page_token.clone(),
max_results: page_size,
location: Some(location.to_string()),
..Default::default()
},
)
.await?;
if !qr.job_complete.unwrap_or(false) {
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
continue;
}
yield Ok(qr.rows.unwrap_or_default());
page_token = match qr.page_token {
None => break,
f => f,
};
}
}
}
}
pub fn query_all_with_job_reference<'a>(
&'a self,
project_id: &'a str,
job_reference: JobReference,
query: JobConfigurationQuery,
page_size: Option<i32>,
) -> impl Stream<Item = Result<Vec<TableRow>, BQError>> + 'a {
stream! {
let location = job_reference.location.as_ref().cloned();
let job = Job {
configuration: Some(JobConfiguration {
dry_run: Some(false),
query: Some(query),
..Default::default()
}),
job_reference: Some(job_reference),
..Default::default()
};
let job = self.insert(project_id, job).await?;
if let Some(ref job_id) = job.job_reference.and_then(|r| r.job_id) {
let mut page_token: Option<String> = None;
loop {
let gqrp = GetQueryResultsParameters {
page_token,
max_results: page_size,
location: location.clone(),
..Default::default()
};
let qr = self
.get_query_results(
project_id,
job_id,
gqrp,
)
.await?;
yield Ok(qr.rows.expect("Rows are not present"));
page_token = match qr.page_token {
None => break,
f => f,
};
}
}
}
}
pub async fn insert(&self, project_id: &str, job: Job) -> Result<Job, BQError> {
let req_url = format!(
"{base_url}/projects/{project_id}/jobs",
base_url = self.base_url,
project_id = urlencode(project_id)
);
let access_token = self.auth.access_token().await?;
let request = self
.client
.post(req_url.as_str())
.bearer_auth(access_token)
.json(&job)
.build()?;
let resp = self.client.execute(request).await?;
process_response(resp).await
}
pub async fn list(&self, project_id: &str) -> Result<JobList, BQError> {
let req_url = format!(
"{base_url}/projects/{project_id}/jobs",
base_url = self.base_url,
project_id = urlencode(project_id)
);
let access_token = self.auth.access_token().await?;
let request = self.client.get(req_url.as_str()).bearer_auth(access_token).build()?;
let resp = self.client.execute(request).await?;
process_response(resp).await
}
pub fn get_job_list<'a>(
&'a self,
project_id: &'a str,
parameters: Option<JobListParameters>,
) -> impl Stream<Item = Result<JobList, BQError>> + 'a {
stream! {
let req_url = format!(
"{base_url}/projects/{project_id}/jobs",
base_url = self.base_url,
project_id = urlencode(project_id),
);
let mut params = parameters.unwrap_or_default();
params.page_token = None;
loop {
let mut request_builder = self.client.get(req_url.as_str());
request_builder = request_builder.query(¶ms);
let access_token = self.auth.access_token().await?;
let request = request_builder.bearer_auth(access_token).build()?;
let resp = self.client.execute(request).await?;
let process_resp: Result<JobList, BQError> = process_response(resp).await;
yield match process_resp {
Err(e) => {params.page_token=None; Err(e)},
Ok(job_list) => {params.page_token.clone_from(&job_list.next_page_token); Ok(job_list.clone())}
};
if params.page_token.is_none() {
break;
}
}
}
}
pub async fn get_query_results(
&self,
project_id: &str,
job_id: &str,
parameters: GetQueryResultsParameters,
) -> Result<GetQueryResultsResponse, BQError> {
let req_url = format!(
"{base_url}/projects/{project_id}/queries/{job_id}",
base_url = self.base_url,
project_id = urlencode(project_id),
job_id = urlencode(job_id),
);
let access_token = self.auth.access_token().await?;
let request = self
.client
.get(req_url.as_str())
.query(¶meters)
.bearer_auth(access_token)
.build()?;
let resp = self.client.execute(request).await?;
let get_query_results_response: GetQueryResultsResponse = process_response(resp).await?;
Ok(get_query_results_response)
}
pub async fn get_job(&self, project_id: &str, job_id: &str, location: Option<&str>) -> Result<Job, BQError> {
let req_url = format!(
"{base_url}/projects/{project_id}/jobs/{job_id}",
base_url = self.base_url,
project_id = urlencode(project_id),
job_id = urlencode(job_id),
);
let mut request_builder = self.client.get(req_url.as_str());
if let Some(location) = location {
request_builder = request_builder.query(&[("location", location)]);
}
let access_token = self.auth.access_token().await?;
let request = request_builder.bearer_auth(access_token).build()?;
let resp = self.client.execute(request).await?;
process_response(resp).await
}
pub async fn cancel_job(
&self,
project_id: &str,
job_id: &str,
location: Option<&str>,
) -> Result<JobCancelResponse, BQError> {
let req_url = format!(
"{base_url}/projects/{project_id}/jobs/{job_id}/cancel",
base_url = self.base_url,
project_id = urlencode(project_id),
job_id = urlencode(job_id),
);
let mut request_builder = self.client.post(req_url.as_str());
if let Some(location) = location {
request_builder = request_builder.query(&[("location", location)]);
}
let access_token = self.auth.access_token().await?;
let request = request_builder.bearer_auth(access_token).build()?;
let resp = self.client.execute(request).await?;
process_response(resp).await
}
}
#[cfg(test)]
mod test {
use serde::Serialize;
use tokio_stream::StreamExt;
use crate::error::BQError;
use crate::model::dataset::Dataset;
use crate::model::field_type::serialize_json_as_string;
use crate::model::job_configuration_query::JobConfigurationQuery;
use crate::model::job_reference::JobReference;
use crate::model::query_parameter::QueryParameter;
use crate::model::query_parameter_type::QueryParameterType;
use crate::model::query_parameter_value::QueryParameterValue;
use crate::model::query_request::QueryRequest;
use crate::model::query_response::{QueryResponse, ResultSet};
use crate::model::table::Table;
use crate::model::table_data_insert_all_request::TableDataInsertAllRequest;
use crate::model::table_field_schema::TableFieldSchema;
use crate::model::table_schema::TableSchema;
use crate::{env_vars, Client};
#[derive(Serialize)]
struct MyRow {
int_value: i64,
float_value: f64,
bool_value: bool,
string_value: String,
record_value: FirstRecordLevel,
#[serde(serialize_with = "serialize_json_as_string")]
json_value: serde_json::value::Value,
}
#[derive(Serialize)]
struct FirstRecordLevel {
int_value: i64,
string_value: String,
record_value: SecondRecordLevel,
}
#[derive(Serialize)]
struct SecondRecordLevel {
int_value: i64,
string_value: String,
}
#[tokio::test]
async fn test() -> Result<(), BQError> {
let (ref project_id, ref dataset_id, ref table_id, ref sa_key) = env_vars();
let dataset_id = &format!("{dataset_id}_job");
let client = Client::from_service_account_key_file(sa_key).await?;
client.table().delete_if_exists(project_id, dataset_id, table_id).await;
client.dataset().delete_if_exists(project_id, dataset_id, true).await;
let created_dataset = client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
assert_eq!(created_dataset.id, Some(format!("{project_id}:{dataset_id}")));
let table = Table::new(
project_id,
dataset_id,
table_id,
TableSchema::new(vec![
TableFieldSchema::integer("int_value"),
TableFieldSchema::float("float_value"),
TableFieldSchema::bool("bool_value"),
TableFieldSchema::string("string_value"),
TableFieldSchema::record(
"record_value",
vec![
TableFieldSchema::integer("int_value"),
TableFieldSchema::string("string_value"),
TableFieldSchema::record(
"record_value",
vec![
TableFieldSchema::integer("int_value"),
TableFieldSchema::string("string_value"),
],
),
],
),
TableFieldSchema::json("json_value"),
]),
);
let created_table = client.table().create(table).await?;
assert_eq!(created_table.table_reference.table_id, table_id.to_string());
let mut insert_request = TableDataInsertAllRequest::new();
insert_request.add_row(
None,
MyRow {
int_value: 1,
float_value: 1.0,
bool_value: false,
string_value: "first".into(),
record_value: FirstRecordLevel {
int_value: 10,
string_value: "sub_level_1.1".into(),
record_value: SecondRecordLevel {
int_value: 20,
string_value: "leaf".to_string(),
},
},
json_value: serde_json::from_str("{\"a\":2,\"b\":\"hello\"}")?,
},
)?;
insert_request.add_row(
None,
MyRow {
int_value: 2,
float_value: 2.0,
bool_value: true,
string_value: "second".into(),
record_value: FirstRecordLevel {
int_value: 11,
string_value: "sub_level_1.2".into(),
record_value: SecondRecordLevel {
int_value: 21,
string_value: "leaf".to_string(),
},
},
json_value: serde_json::from_str("{\"a\":1,\"b\":\"goodbye\",\"c\":3}")?,
},
)?;
insert_request.add_row(
None,
MyRow {
int_value: 3,
float_value: 3.0,
bool_value: false,
string_value: "third".into(),
record_value: FirstRecordLevel {
int_value: 12,
string_value: "sub_level_1.3".into(),
record_value: SecondRecordLevel {
int_value: 22,
string_value: "leaf".to_string(),
},
},
json_value: serde_json::from_str("{\"b\":\"world\",\"c\":2}")?,
},
)?;
insert_request.add_row(
None,
MyRow {
int_value: 4,
float_value: 4.0,
bool_value: true,
string_value: "fourth".into(),
record_value: FirstRecordLevel {
int_value: 13,
string_value: "sub_level_1.4".into(),
record_value: SecondRecordLevel {
int_value: 23,
string_value: "leaf".to_string(),
},
},
json_value: serde_json::from_str("{\"a\":3,\"c\":1}")?,
},
)?;
let n_rows = insert_request.len();
let result = client
.tabledata()
.insert_all(project_id, dataset_id, table_id, insert_request)
.await;
assert!(result.is_ok(), "{:?}", result);
let result = result.unwrap();
assert!(result.insert_errors.is_none(), "{:?}", result);
let query_response = client
.job()
.query(
project_id,
QueryRequest::new(format!(
"SELECT COUNT(*) AS c FROM `{project_id}.{dataset_id}.{table_id}`"
)),
)
.await?;
let job_id = query_response
.job_reference
.as_ref()
.expect("expected job_reference")
.job_id
.clone()
.expect("expected job_id");
let mut rs = ResultSet::new_from_query_response(query_response);
while rs.next_row() {
assert!(rs.get_i64_by_name("c")?.is_some());
}
let job = client.job_api.get_job(project_id, &job_id, None).await?;
assert_eq!(job.status.unwrap().state.unwrap(), "DONE");
let query_results = client
.job()
.get_query_results(project_id, &job_id, Default::default())
.await?;
let mut query_results_rs = ResultSet::new_from_query_response(QueryResponse::from(query_results));
assert_eq!(query_results_rs.row_count(), rs.row_count());
while query_results_rs.next_row() {
assert!(rs.get_i64_by_name("c")?.is_some());
}
let query_all_results: Result<Vec<_>, _> = client
.job()
.query_all(
project_id,
JobConfigurationQuery {
query: format!("SELECT * FROM `{project_id}.{dataset_id}.{table_id}`"),
query_parameters: None,
use_legacy_sql: Some(false),
..Default::default()
},
Some(2),
)
.collect::<Result<Vec<_>, _>>()
.await
.map(|vec_of_vecs| vec_of_vecs.into_iter().flatten().collect());
assert!(query_all_results.is_ok());
assert_eq!(query_all_results.unwrap().len(), n_rows);
let location = "us";
let query_all_results_with_location: Result<Vec<_>, _> = client
.job()
.query_all_with_location(
project_id,
location,
JobConfigurationQuery {
query: format!("SELECT * FROM `{project_id}.{dataset_id}.{table_id}`"),
query_parameters: None,
use_legacy_sql: Some(false),
..Default::default()
},
Some(2),
)
.collect::<Result<Vec<_>, _>>()
.await
.map(|vec_of_vecs| vec_of_vecs.into_iter().flatten().collect());
assert!(query_all_results_with_location.is_ok());
assert_eq!(query_all_results_with_location.unwrap().len(), n_rows);
let job_reference = JobReference {
project_id: Some(project_id.to_string()),
location: Some(location.to_string()),
..Default::default()
};
let query_all_results_with_job_reference: Result<Vec<_>, _> = client
.job()
.query_all_with_job_reference(
project_id,
job_reference,
JobConfigurationQuery {
query: format!("SELECT * FROM `{project_id}.{dataset_id}.{table_id}`"),
query_parameters: None,
use_legacy_sql: Some(false),
..Default::default()
},
Some(2),
)
.collect::<Result<Vec<_>, _>>()
.await
.map(|vec_of_vecs| vec_of_vecs.into_iter().flatten().collect());
assert!(query_all_results_with_job_reference.is_ok());
assert_eq!(query_all_results_with_job_reference.unwrap().len(), n_rows);
let query_all_results_with_parameter: Result<Vec<_>, _> = client
.job()
.query_all(
project_id,
JobConfigurationQuery {
query: format!("SELECT int_value, json_value.a, json_value.b FROM `{project_id}.{dataset_id}.{table_id}` where CAST(JSON_VALUE(json_value,'$.a') as int) >= @compare"),
query_parameters: Some(vec![QueryParameter {
name: Some("compare".to_string()),
parameter_type: Some(QueryParameterType { array_type: None, struct_types: None, r#type: "INTEGER".to_string() }),
parameter_value: Some(QueryParameterValue { array_values: None, struct_values: None, value: Some("2".to_string()) }),
}]),
use_legacy_sql: Some(false),
..Default::default()
},
Some(2),
)
.collect::<Result<Vec<_>, _>>()
.await
.map(|vec_of_vecs| vec_of_vecs.into_iter().flatten().collect());
assert!(query_all_results_with_parameter.is_ok());
assert_eq!(query_all_results_with_parameter.unwrap().len(), 2);
client.table().delete(project_id, dataset_id, table_id).await?;
client.dataset().delete(project_id, dataset_id, true).await?;
Ok(())
}
}