surrealcs_kernel/messages/client/
message.rsuse std::fmt::Debug;
use nanoservices_utils::errors::{NanoServiceError, NanoServiceErrorStatus};
use tokio::sync::mpsc;
use crate::messages::server::interface::{ServerMessage, ServerTransactionMessage};
use crate::messages::server::wrapper::WrappedServerMessage;
#[derive(Debug, Clone)]
pub enum TransactionMessage {
TransactionOperation(WrappedServerMessage),
Register(mpsc::Sender<TransactionMessage>),
Ping((usize, String)),
Deregister(usize),
CloseConnection,
Registered(usize),
Unregistered,
Error(NanoServiceError),
}
impl TransactionMessage {
pub fn extract_transaction_operation(
self,
) -> Result<(usize, ServerTransactionMessage), NanoServiceError> {
match self {
TransactionMessage::TransactionOperation(op) => {
let transaction = match op.message {
ServerMessage::Error(trans) => return Err(trans),
ServerMessage::SendOperation(trans) => trans,
ServerMessage::BeginTransaction(trans) => trans,
ServerMessage::CommitTransaction => ServerTransactionMessage::Commit,
ServerMessage::RollbackTransaction => ServerTransactionMessage::Rollback,
_ => {
return Err(NanoServiceError::new(
format!("ServerMessage not SendOperation: {:?}", op.message),
NanoServiceErrorStatus::Unknown,
))
}
};
let server_id = match op.server_id {
Some(id) => id,
None => {
return Err(NanoServiceError::new(
"Server id not found".to_string(),
NanoServiceErrorStatus::Unknown,
))
}
};
Ok((server_id, transaction))
}
TransactionMessage::Error(e) => {
tracing::error!("message error: {:?}", e);
Err(e)
}
_ => Err(NanoServiceError::new(
"TransactionMessage not TransactionOperation".to_string(),
NanoServiceErrorStatus::Unknown,
)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::messages::server::kv_operations::MessagePut;
static CONNECTION_ID: &str = "1-1234567890";
static _TRANSACTION_ID: &str = "1-1-1234567890";
#[test]
fn test_extract_transaction_operation() {
let transaction = ServerTransactionMessage::Put(MessagePut {
key: b"key".to_vec(),
value: b"value".to_vec(),
version: None,
});
let mut wrapped = WrappedServerMessage::new(
1,
ServerMessage::SendOperation(transaction),
CONNECTION_ID.into(),
);
wrapped.server_id = Some(1);
let message = TransactionMessage::TransactionOperation(wrapped);
let result = message.extract_transaction_operation().unwrap();
assert_eq!(result.0, 1);
let extracted_transaction = match result.1 {
ServerTransactionMessage::Put(transaction) => transaction,
_ => panic!("Transaction not Put"),
};
assert_eq!(extracted_transaction.key, b"key".to_vec());
assert_eq!(extracted_transaction.value, b"value".to_vec());
}
#[test]
fn test_extract_transaction_operation_error_wrong_transaction_message_type() {
let message = TransactionMessage::Register(mpsc::channel(1).0);
let result = message.extract_transaction_operation();
assert!(result.is_err());
assert_eq!("TransactionMessage not TransactionOperation", result.err().unwrap().message);
}
#[test]
fn test_extract_transaction_no_server_id() {
let transaction = ServerTransactionMessage::Put(MessagePut {
key: b"key".to_vec(),
value: b"value".to_vec(),
version: None,
});
let wrapped = WrappedServerMessage::new(
1,
ServerMessage::SendOperation(transaction),
CONNECTION_ID.into(),
);
let message = TransactionMessage::TransactionOperation(wrapped);
let result = message.extract_transaction_operation();
assert!(result.is_err());
assert_eq!("Server id not found", result.err().unwrap().message);
}
}