use std::sync::Arc;
use crate::auth::Authenticator;
use crate::error::BQError;
use crate::model::data_format_options::DataFormatOptions;
use crate::model::table_data_insert_all_request::TableDataInsertAllRequest;
#[cfg(feature = "gzip")]
use crate::model::table_data_insert_all_request::TableDataInsertAllRequestGzipped;
use crate::model::table_data_insert_all_response::TableDataInsertAllResponse;
use crate::model::table_data_list_response::TableDataListResponse;
use crate::{process_response, urlencode, BIG_QUERY_V2_URL};
use reqwest::Client;
#[cfg(feature = "gzip")]
use reqwest::header::{CONTENT_ENCODING, CONTENT_TYPE};
#[derive(Clone)]
pub struct TableDataApi {
client: Client,
auth: Arc<dyn Authenticator>,
base_url: String,
}
impl TableDataApi {
pub(crate) fn new(client: Client, auth: Arc<dyn Authenticator>) -> Self {
Self {
client,
auth,
base_url: BIG_QUERY_V2_URL.to_string(),
}
}
pub(crate) fn with_base_url(&mut self, base_url: String) -> &mut Self {
self.base_url = base_url;
self
}
pub async fn insert_all(
&self,
project_id: &str,
dataset_id: &str,
table_id: &str,
insert_request: TableDataInsertAllRequest,
) -> Result<TableDataInsertAllResponse, BQError> {
let req_url = format!(
"{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/insertAll",
base_url = self.base_url,
project_id = urlencode(project_id),
dataset_id = urlencode(dataset_id),
table_id = urlencode(table_id)
);
let access_token = self.auth.access_token().await?;
#[cfg(feature = "gzip")]
let request = {
let insert_request_gzipped = TableDataInsertAllRequestGzipped::try_from(insert_request)?;
self.client
.post(&req_url)
.header(CONTENT_ENCODING, "gzip")
.header(CONTENT_TYPE, "application/octet-stream")
.bearer_auth(access_token)
.body(insert_request_gzipped.data)
.build()?
};
#[cfg(not(feature = "gzip"))]
let request = self
.client
.post(&req_url)
.bearer_auth(access_token)
.json(&insert_request)
.build()?;
let resp = self.client.execute(request).await?;
process_response(resp).await
}
#[cfg(feature = "gzip")]
pub async fn insert_all_gzipped(
&self,
project_id: &str,
dataset_id: &str,
table_id: &str,
insert_request_gzipped: TableDataInsertAllRequestGzipped,
) -> Result<TableDataInsertAllResponse, BQError> {
let req_url = format!(
"{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/insertAll",
base_url = self.base_url,
project_id = urlencode(project_id),
dataset_id = urlencode(dataset_id),
table_id = urlencode(table_id)
);
let access_token = self.auth.access_token().await?;
let request = self
.client
.post(&req_url)
.header(CONTENT_ENCODING, "gzip")
.header(CONTENT_TYPE, "application/octet-stream")
.bearer_auth(access_token)
.body(insert_request_gzipped.data)
.build()?;
let resp = self.client.execute(request).await?;
process_response(resp).await
}
pub async fn list(
&self,
project_id: &str,
dataset_id: &str,
table_id: &str,
parameters: ListQueryParameters,
) -> Result<TableDataListResponse, BQError> {
let req_url = format!(
"{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/data",
base_url = self.base_url,
project_id = urlencode(project_id),
dataset_id = urlencode(dataset_id),
table_id = urlencode(table_id)
);
let access_token = self.auth.access_token().await?;
let request = self
.client
.get(req_url.as_str())
.bearer_auth(access_token)
.query(¶meters)
.build()?;
let resp = self.client.execute(request).await?;
process_response(resp).await
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ListQueryParameters {
#[serde(skip_serializing_if = "Option::is_none")]
pub start_index: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_results: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub page_token: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub selected_fields: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub format_options: Option<DataFormatOptions>,
}
#[cfg(test)]
mod test {
use crate::error::BQError;
use crate::model::dataset::Dataset;
use crate::model::field_type::FieldType;
use crate::model::table::Table;
use crate::model::table_data_insert_all_request::TableDataInsertAllRequest;
#[cfg(feature = "gzip")]
use crate::model::table_data_insert_all_request::TableDataInsertAllRequestGzipped;
use crate::model::table_field_schema::TableFieldSchema;
use crate::model::table_schema::TableSchema;
use crate::{env_vars, Client};
#[derive(Serialize)]
struct Row {
col1: String,
col2: i64,
col3: bool,
}
#[tokio::test]
async fn test() -> Result<(), BQError> {
let (ref project_id, ref dataset_id, ref table_id, ref sa_key) = env_vars();
let dataset_id = &format!("{dataset_id}_tabledata");
let client = Client::from_service_account_key_file(sa_key).await?;
client.table().delete_if_exists(project_id, dataset_id, table_id).await;
client.dataset().delete_if_exists(project_id, dataset_id, true).await;
let dataset = client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
let table = dataset
.create_table(
&client,
Table::from_dataset(
&dataset,
table_id,
TableSchema::new(vec![
TableFieldSchema::new("col1", FieldType::String),
TableFieldSchema::new("col2", FieldType::Int64),
TableFieldSchema::new("col3", FieldType::Boolean),
]),
),
)
.await?;
let mut insert_request = TableDataInsertAllRequest::new();
insert_request.add_row(
None,
Row {
col1: "val1".into(),
col2: 2,
col3: false,
},
)?;
let result = client
.tabledata()
.insert_all(project_id, dataset_id, table_id, insert_request)
.await;
assert!(result.is_ok(), "Error: {:?}", result);
#[cfg(feature = "gzip")]
{
let mut insert_request = TableDataInsertAllRequest::new();
insert_request.add_row(
None,
Row {
col1: "val2".into(),
col2: 3,
col3: true,
},
)?;
let insert_request_gzipped =
TableDataInsertAllRequestGzipped::try_from(insert_request).expect("Failed to gzip insert request");
let result_gzipped = client
.tabledata()
.insert_all_gzipped(project_id, dataset_id, table_id, insert_request_gzipped)
.await;
assert!(result_gzipped.is_ok(), "Error: {:?}", result_gzipped);
}
table.delete(&client).await?;
dataset.delete(&client, true).await?;
Ok(())
}
}