use crate::error::BQError;
use crate::model::error_proto::ErrorProto;
use crate::model::get_query_results_response::GetQueryResultsResponse;
use crate::model::job_reference::JobReference;
use crate::model::table_row::TableRow;
use crate::model::table_schema::TableSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct QueryResponse {
#[serde(skip_serializing_if = "Option::is_none")]
pub cache_hit: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub errors: Option<Vec<ErrorProto>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub job_complete: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub job_reference: Option<JobReference>,
#[serde(skip_serializing_if = "Option::is_none")]
pub kind: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub num_dml_affected_rows: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub page_token: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub rows: Option<Vec<TableRow>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub schema: Option<TableSchema>,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_bytes_processed: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_rows: Option<String>,
}
impl From<GetQueryResultsResponse> for QueryResponse {
fn from(resp: GetQueryResultsResponse) -> Self {
Self {
cache_hit: resp.cache_hit,
errors: resp.errors,
job_complete: resp.job_complete,
job_reference: resp.job_reference,
kind: resp.kind,
num_dml_affected_rows: resp.num_dml_affected_rows,
page_token: resp.page_token,
rows: resp.rows,
schema: resp.schema,
total_bytes_processed: resp.total_bytes_processed,
total_rows: resp.total_rows,
}
}
}
#[derive(Debug)]
pub struct ResultSet {
cursor: i64,
row_count: i64,
rows: Vec<TableRow>,
fields: HashMap<String, usize>,
}
impl ResultSet {
pub fn new_from_query_response(query_response: QueryResponse) -> Self {
if query_response.job_complete.unwrap_or(false) && query_response.schema.is_some() {
let row_count = query_response.rows.as_ref().map_or(0, Vec::len) as i64;
let table_schema = query_response.schema.as_ref().expect("Expecting a schema");
let table_fields = table_schema
.fields
.as_ref()
.expect("Expecting a non empty list of fields");
let fields: HashMap<String, usize> = table_fields
.iter()
.enumerate()
.map(|(pos, field)| (field.name.clone(), pos))
.collect();
let rows = query_response.rows.unwrap_or_default();
Self {
cursor: -1,
row_count,
rows,
fields,
}
} else {
Self {
cursor: -1,
row_count: 0,
rows: vec![],
fields: HashMap::new(),
}
}
}
pub fn new_from_get_query_results_response(get_query_results_response: GetQueryResultsResponse) -> Self {
if get_query_results_response.job_complete.unwrap_or(false) && get_query_results_response.schema.is_some() {
let row_count = get_query_results_response.rows.as_ref().map_or(0, Vec::len) as i64;
let table_schema = get_query_results_response.schema.as_ref().expect("Expecting a schema");
let table_fields = table_schema
.fields
.as_ref()
.expect("Expecting a non empty list of fields");
let fields: HashMap<String, usize> = table_fields
.iter()
.enumerate()
.map(|(pos, field)| (field.name.clone(), pos))
.collect();
let rows = get_query_results_response.rows.unwrap_or_default();
Self {
cursor: -1,
row_count,
rows,
fields,
}
} else {
Self {
cursor: -1,
row_count: 0,
rows: vec![],
fields: HashMap::new(),
}
}
}
pub fn next_row(&mut self) -> bool {
if self.cursor == (self.row_count - 1) {
false
} else {
self.cursor += 1;
true
}
}
pub fn row_count(&self) -> usize {
self.row_count as usize
}
pub fn column_names(&self) -> Vec<String> {
self.fields.keys().cloned().collect()
}
pub fn column_index(&self, column_name: &str) -> Option<&usize> {
self.fields.get(column_name)
}
pub fn get_i64(&self, col_index: usize) -> Result<Option<i64>, BQError> {
let json_value = self.get_json_value(col_index)?;
match &json_value {
None => Ok(None),
Some(json_value) => match json_value {
serde_json::Value::Number(value) => Ok(value.as_i64()),
serde_json::Value::String(value) => match (value.parse::<i64>(), value.parse::<f64>()) {
(Ok(v), _) => Ok(Some(v)),
(Err(_), Ok(v)) => Ok(Some(v as i64)),
_ => Err(BQError::InvalidColumnType {
col_index,
col_type: ResultSet::json_type(json_value),
type_requested: "I64".into(),
}),
},
_ => Err(BQError::InvalidColumnType {
col_index,
col_type: ResultSet::json_type(json_value),
type_requested: "I64".into(),
}),
},
}
}
pub fn get_i64_by_name(&self, col_name: &str) -> Result<Option<i64>, BQError> {
let col_index = self.fields.get(col_name);
match col_index {
None => Err(BQError::InvalidColumnName {
col_name: col_name.into(),
}),
Some(col_index) => self.get_i64(*col_index),
}
}
pub fn get_serde<T>(&self, col_index: usize) -> Result<Option<T>, BQError>
where
T: serde::de::DeserializeOwned,
{
let json_value = self.get_json_value(col_index)?;
match json_value {
None => Ok(None),
Some(json_value) => match serde_json::from_value::<T>(json_value.clone()) {
Ok(value) => Ok(Some(value)),
Err(_) => Err(BQError::InvalidColumnType {
col_index,
col_type: ResultSet::json_type(&json_value),
type_requested: std::any::type_name::<T>().to_string(),
}),
},
}
}
pub fn get_serde_by_name<T>(&self, col_name: &str) -> Result<Option<T>, BQError>
where
T: serde::de::DeserializeOwned,
{
let col_index = self.fields.get(col_name);
match col_index {
None => Err(BQError::InvalidColumnName {
col_name: col_name.into(),
}),
Some(col_index) => self.get_serde(*col_index),
}
}
pub fn get_f64(&self, col_index: usize) -> Result<Option<f64>, BQError> {
let json_value = self.get_json_value(col_index)?;
match &json_value {
None => Ok(None),
Some(json_value) => match json_value {
serde_json::Value::Number(value) => Ok(value.as_f64()),
serde_json::Value::String(value) => {
let value: Result<f64, _> = value.parse();
match &value {
Err(_) => Err(BQError::InvalidColumnType {
col_index,
col_type: ResultSet::json_type(json_value),
type_requested: "F64".into(),
}),
Ok(value) => Ok(Some(*value)),
}
}
_ => Err(BQError::InvalidColumnType {
col_index,
col_type: ResultSet::json_type(json_value),
type_requested: "F64".into(),
}),
},
}
}
pub fn get_f64_by_name(&self, col_name: &str) -> Result<Option<f64>, BQError> {
let col_index = self.fields.get(col_name);
match col_index {
None => Err(BQError::InvalidColumnName {
col_name: col_name.into(),
}),
Some(col_index) => self.get_f64(*col_index),
}
}
pub fn get_bool(&self, col_index: usize) -> Result<Option<bool>, BQError> {
let json_value = self.get_json_value(col_index)?;
match &json_value {
None => Ok(None),
Some(json_value) => match json_value {
serde_json::Value::Bool(value) => Ok(Some(*value)),
serde_json::Value::String(value) => {
let value: Result<bool, _> = value.parse();
match &value {
Err(_) => Err(BQError::InvalidColumnType {
col_index,
col_type: ResultSet::json_type(json_value),
type_requested: "Bool".into(),
}),
Ok(value) => Ok(Some(*value)),
}
}
_ => Err(BQError::InvalidColumnType {
col_index,
col_type: ResultSet::json_type(json_value),
type_requested: "Bool".into(),
}),
},
}
}
pub fn get_bool_by_name(&self, col_name: &str) -> Result<Option<bool>, BQError> {
let col_index = self.fields.get(col_name);
match col_index {
None => Err(BQError::InvalidColumnName {
col_name: col_name.into(),
}),
Some(col_index) => self.get_bool(*col_index),
}
}
pub fn get_string(&self, col_index: usize) -> Result<Option<String>, BQError> {
let json_value = self.get_json_value(col_index)?;
match json_value {
None => Ok(None),
Some(json_value) => match json_value {
serde_json::Value::String(value) => Ok(Some(value)),
serde_json::Value::Number(value) => Ok(Some(value.to_string())),
serde_json::Value::Bool(value) => Ok(Some(value.to_string())),
_ => Err(BQError::InvalidColumnType {
col_index,
col_type: ResultSet::json_type(&json_value),
type_requested: "String".into(),
}),
},
}
}
pub fn get_string_by_name(&self, col_name: &str) -> Result<Option<String>, BQError> {
let col_index = self.fields.get(col_name);
match col_index {
None => Err(BQError::InvalidColumnName {
col_name: col_name.into(),
}),
Some(col_index) => self.get_string(*col_index),
}
}
pub fn get_json_value(&self, col_index: usize) -> Result<Option<serde_json::Value>, BQError> {
if self.cursor < 0 || self.cursor == self.row_count {
return Err(BQError::NoDataAvailable);
}
if col_index >= self.fields.len() {
return Err(BQError::InvalidColumnIndex { col_index });
}
Ok(self
.rows
.get(self.cursor as usize)
.and_then(|row| row.columns.as_ref())
.and_then(|cols| cols.get(col_index))
.and_then(|col| col.value.clone()))
}
pub fn get_json_value_by_name(&self, col_name: &str) -> Result<Option<serde_json::Value>, BQError> {
let col_pos = self.fields.get(col_name);
match col_pos {
None => Err(BQError::InvalidColumnName {
col_name: col_name.into(),
}),
Some(col_pos) => self.get_json_value(*col_pos),
}
}
fn json_type(json_value: &serde_json::Value) -> String {
match json_value {
Value::Null => "Null".into(),
Value::Bool(_) => "Bool".into(),
Value::Number(_) => "Number".into(),
Value::String(_) => "String".into(),
Value::Array(_) => "Array".into(),
Value::Object(_) => "Object".into(),
}
}
}