use std::sync::Arc;
use log::warn;
use reqwest::Client;
use crate::auth::Authenticator;
use crate::error::BQError;
use crate::model::get_iam_policy_request::GetIamPolicyRequest;
use crate::model::policy::Policy;
use crate::model::set_iam_policy_request::SetIamPolicyRequest;
use crate::model::table::Table;
use crate::model::table_list::TableList;
use crate::model::test_iam_permissions_request::TestIamPermissionsRequest;
use crate::model::test_iam_permissions_response::TestIamPermissionsResponse;
use crate::{process_response, urlencode, BIG_QUERY_V2_URL};
#[derive(Clone)]
pub struct TableApi {
client: Client,
auth: Arc<dyn Authenticator>,
base_url: String,
}
impl TableApi {
pub(crate) fn new(client: Client, auth: Arc<dyn Authenticator>) -> Self {
Self {
client,
auth,
base_url: BIG_QUERY_V2_URL.to_string(),
}
}
pub(crate) fn with_base_url(&mut self, base_url: String) -> &mut Self {
self.base_url = base_url;
self
}
pub async fn create(&self, table: Table) -> Result<Table, BQError> {
let req_url = &format!(
"{base_url}/projects/{project_id}/datasets/{dataset_id}/tables",
base_url = self.base_url,
project_id = urlencode(&table.table_reference.project_id),
dataset_id = urlencode(&table.table_reference.dataset_id)
);
let access_token = self.auth.access_token().await?;
let request = self
.client
.post(req_url.as_str())
.bearer_auth(access_token)
.json(&table)
.build()?;
let response = self.client.execute(request).await?;
process_response(response).await
}
pub async fn delete(&self, project_id: &str, dataset_id: &str, table_id: &str) -> Result<(), BQError> {
let req_url = &format!(
"{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}",
base_url = self.base_url,
project_id = urlencode(project_id),
dataset_id = urlencode(dataset_id),
table_id = urlencode(table_id)
);
let access_token = self.auth.access_token().await?;
let request = self.client.delete(req_url.as_str()).bearer_auth(access_token).build()?;
let response = self.client.execute(request).await?;
if response.status().is_success() {
Ok(())
} else {
Err(BQError::ResponseError {
error: response.json().await?,
})
}
}
pub async fn delete_if_exists(&self, project_id: &str, dataset_id: &str, table_id: &str) -> bool {
match self.delete(project_id, dataset_id, table_id).await {
Err(BQError::ResponseError { error }) => {
if error.error.code != 404 {
warn!("table.delete_if_exists: unexpected error: {:?}", error);
}
false
}
Err(err) => {
warn!("table.delete_if_exists: unexpected error: {:?}", err);
false
}
Ok(_) => true,
}
}
pub async fn get(
&self,
project_id: &str,
dataset_id: &str,
table_id: &str,
selected_fields: Option<Vec<&str>>,
) -> Result<Table, BQError> {
let req_url = &format!(
"{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}",
base_url = self.base_url,
project_id = urlencode(project_id),
dataset_id = urlencode(dataset_id),
table_id = urlencode(table_id)
);
let access_token = self.auth.access_token().await?;
let mut request_builder = self.client.get(req_url.as_str()).bearer_auth(access_token);
if let Some(selected_fields) = selected_fields {
let selected_fields = selected_fields.join(",");
request_builder = request_builder.query(&[("selectedFields", selected_fields)]);
}
let request = request_builder.build()?;
let response = self.client.execute(request).await?;
process_response(response).await
}
pub async fn list(&self, project_id: &str, dataset_id: &str, options: ListOptions) -> Result<TableList, BQError> {
let req_url = &format!(
"{base_url}/projects/{project_id}/datasets/{dataset_id}/tables",
base_url = self.base_url,
project_id = urlencode(project_id),
dataset_id = urlencode(dataset_id)
);
let access_token = self.auth.access_token().await?;
let mut request = self.client.get(req_url).bearer_auth(access_token);
if let Some(max_results) = options.max_results {
request = request.query(&[("maxResults", max_results.to_string())]);
}
if let Some(page_token) = options.page_token {
request = request.query(&[("pageToken", page_token)]);
}
let request = request.build()?;
let response = self.client.execute(request).await?;
process_response(response).await
}
pub async fn patch(
&self,
project_id: &str,
dataset_id: &str,
table_id: &str,
table: Table,
) -> Result<Table, BQError> {
let req_url = &format!(
"{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}",
base_url = self.base_url,
project_id = urlencode(project_id),
dataset_id = urlencode(dataset_id),
table_id = urlencode(table_id)
);
let access_token = self.auth.access_token().await?;
let request = self
.client
.patch(req_url)
.bearer_auth(access_token)
.json(&table)
.build()?;
let response = self.client.execute(request).await?;
process_response(response).await
}
pub async fn update(
&self,
project_id: &str,
dataset_id: &str,
table_id: &str,
table: Table,
) -> Result<Table, BQError> {
let req_url = &format!(
"{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}",
base_url = self.base_url,
project_id = urlencode(project_id),
dataset_id = urlencode(dataset_id),
table_id = urlencode(table_id)
);
let access_token = self.auth.access_token().await?;
let request = self
.client
.put(req_url)
.bearer_auth(access_token)
.json(&table)
.build()?;
let response = self.client.execute(request).await?;
process_response(response).await
}
pub async fn get_iam_policy(
&self,
resource: &str,
get_iam_policy_request: GetIamPolicyRequest,
) -> Result<Policy, BQError> {
let req_url = &format!(
"{base_url}/projects/{resource}/:getIamPolicy",
base_url = self.base_url,
resource = urlencode(resource)
);
let access_token = self.auth.access_token().await?;
let request = self
.client
.post(req_url.as_str())
.bearer_auth(access_token)
.json(&get_iam_policy_request)
.build()?;
let response = self.client.execute(request).await?;
process_response(response).await
}
pub async fn set_iam_policy(
&self,
resource: &str,
set_iam_policy_request: SetIamPolicyRequest,
) -> Result<Policy, BQError> {
let req_url = &format!(
"{base_url}/projects/{resource}/:setIamPolicy",
base_url = self.base_url,
resource = urlencode(resource)
);
let access_token = self.auth.access_token().await?;
let request = self
.client
.post(req_url.as_str())
.bearer_auth(access_token)
.json(&set_iam_policy_request)
.build()?;
let response = self.client.execute(request).await?;
process_response(response).await
}
pub async fn test_iam_permissions(
&self,
resource: &str,
test_iam_permissions_request: TestIamPermissionsRequest,
) -> Result<TestIamPermissionsResponse, BQError> {
let req_url = &format!(
"{base_url}/projects/{resource}/:testIamPermissions",
base_url = self.base_url,
resource = urlencode(resource)
);
let access_token = self.auth.access_token().await?;
let request = self
.client
.post(req_url.as_str())
.bearer_auth(access_token)
.json(&test_iam_permissions_request)
.build()?;
let response = self.client.execute(request).await?;
process_response(response).await
}
}
#[derive(Default)]
pub struct ListOptions {
max_results: Option<u64>,
page_token: Option<String>,
}
impl ListOptions {
pub fn max_results(mut self, value: u64) -> Self {
self.max_results = Some(value);
self
}
pub fn page_token(mut self, value: String) -> Self {
self.page_token = Some(value);
self
}
}
#[cfg(test)]
mod test {
use crate::error::BQError;
use crate::model::dataset::Dataset;
use crate::model::field_type::FieldType;
use crate::model::table::Table;
use crate::model::table_field_schema::TableFieldSchema;
use crate::model::table_schema::TableSchema;
use crate::table::ListOptions;
use crate::{env_vars, Client};
use std::time::{Duration, SystemTime};
#[tokio::test]
async fn test() -> Result<(), BQError> {
let (ref project_id, ref dataset_id, ref table_id, ref sa_key) = env_vars();
let dataset_id = &format!("{dataset_id}_table");
let client = Client::from_service_account_key_file(sa_key).await?;
client.dataset().delete_if_exists(project_id, dataset_id, true).await;
let created_dataset = client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
assert_eq!(created_dataset.id, Some(format!("{project_id}:{dataset_id}")));
let table = Table::new(
project_id,
dataset_id,
table_id,
TableSchema::new(vec![
TableFieldSchema::new("col1", FieldType::String),
TableFieldSchema::new("col2", FieldType::Int64),
TableFieldSchema::new("col3", FieldType::Boolean),
TableFieldSchema::new("col4", FieldType::Datetime),
]),
);
let created_table = client
.table()
.create(
table
.description("A table used for unit tests")
.label("owner", "me")
.label("env", "prod")
.expiration_time(SystemTime::now() + Duration::from_secs(3600)),
)
.await?;
assert_eq!(created_table.table_reference.table_id, table_id.to_string());
let table = client.table().get(project_id, dataset_id, table_id, None).await?;
assert_eq!(table.table_reference.table_id, table_id.to_string());
let table = client.table().update(project_id, dataset_id, table_id, table).await?;
assert_eq!(table.table_reference.table_id, table_id.to_string());
let table = client.table().patch(project_id, dataset_id, table_id, table).await?;
assert_eq!(table.table_reference.table_id, table_id.to_string());
let tables = client
.table()
.list(project_id, dataset_id, ListOptions::default())
.await?;
let mut created_table_found = false;
for table_list_tables in tables.tables.unwrap().iter() {
if &table_list_tables.table_reference.dataset_id == dataset_id {
created_table_found = true;
}
}
assert!(created_table_found);
client.table().delete(project_id, dataset_id, table_id).await?;
client.dataset().delete(project_id, dataset_id, true).await?;
Ok(())
}
}