framework_cqrs_lib/cqrs/infra/daos/
mongo_dao.rsuse async_trait::async_trait;
use futures::TryStreamExt;
use log::{error, info};
use mongodb::{Client, Collection};
use mongodb::bson::doc;
use serde::de::DeserializeOwned;
use serde::Serialize;
use crate::cqrs::core::daos::{DAO, ReadOnlyDAO, WriteOnlyDAO};
use crate::cqrs::core::repositories::query::Query;
use crate::cqrs::models::errors::{Error, ResultErr};
pub struct MongoDAO<DBO>
where
DBO: Send + Sync,
{
pub collection: Collection<DBO>,
pub uri: String,
pub db_name: String,
pub client: Client,
}
impl<DBO> MongoDAO<DBO>
where
DBO: Send + Sync,
{
pub async fn new(dbname: &str, name: &str) -> Self {
let uri = std::env::var("MONGO_URI").unwrap();
let client: Client = Client::with_uri_str(uri.clone()).await.unwrap();
let db = client.database(dbname);
let collection: Collection<DBO> = db.collection(name);
Self {
collection,
uri,
db_name: dbname.to_string(),
client,
}
}
pub async fn is_connected(&self) -> bool {
match self.client.database("admin").run_command(doc! {"ping": 1}).await {
Ok(_) => true,
Err(_) => false,
}
}
}
#[async_trait]
impl<DBO> DAO<DBO, String> for MongoDAO<DBO>
where
DBO: Serialize + DeserializeOwned + Send + Sync,
{}
#[async_trait]
impl<DBO> ReadOnlyDAO<DBO, String> for MongoDAO<DBO>
where
DBO: DeserializeOwned + Send + Sync,
{
async fn fetch_one(&self, id: &String) -> ResultErr<Option<DBO>> {
let filter = doc! {"id": id};
self.collection
.find_one(filter)
.await
.map_err(|err| Error::Simple(err.to_string()))
}
async fn fetch_all(&self, query: Query) -> ResultErr<Vec<DBO>> {
if !self.is_connected().await {
error!("la connexion au client mongo est perdu");
} else {
info!("la connexion est ok");
}
self.find_all(query).await
.map_err(|err| Error::Simple(err.to_string()))
}
}
#[async_trait]
impl<DBO> WriteOnlyDAO<DBO, String> for MongoDAO<DBO>
where
DBO: Serialize + Send + Sync,
{
async fn insert(&self, entity: &DBO, entity_id: &String) -> ResultErr<String> {
self.collection
.insert_one(entity)
.await
.map_err(|err| Error::Simple(err.to_string()))
.map(|_| entity_id.clone())
}
async fn update(&self, id: &String, entity: &DBO) -> ResultErr<String> {
let filter = doc! { "id": id };
self.collection
.replace_one(filter, entity)
.await
.map(|_| id.clone())
.map_err(|err| Error::Simple(err.to_string()))
}
}
impl<DBO> MongoDAO<DBO>
where
DBO: DeserializeOwned + Send + Sync,
{
async fn find_all(&self, query: Query) -> Result<Vec<DBO>, mongodb::error::Error> {
self.collection
.find(query.into())
.await?
.try_collect::<Vec<DBO>>()
.await
.map_err(|error| {
error!("mongodb error: {error:?}");
error
})
}
}