framework_cqrs_lib/cqrs/infra/daos/
mongo_dao.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use async_trait::async_trait;
use futures::TryStreamExt;
use log::{error, info};
use mongodb::{Client, Collection};
use mongodb::bson::doc;
use serde::de::DeserializeOwned;
use serde::Serialize;

use crate::cqrs::core::daos::{DAO, ReadOnlyDAO, WriteOnlyDAO};
use crate::cqrs::core::repositories::query::Query;
use crate::cqrs::models::errors::{Error, ResultErr};

pub struct MongoDAO<DBO>
where
    DBO: Send + Sync,
{
    pub collection: Collection<DBO>,
    pub uri: String,
    pub db_name: String,
    pub client: Client,
}

impl<DBO> MongoDAO<DBO>
where
    DBO: Send + Sync,
{
    // TODO : prendre le client en input directement.
    pub async fn new(dbname: &str, name: &str) -> Self {
        let uri = std::env::var("MONGO_URI").unwrap();
        let client: Client = Client::with_uri_str(uri.clone()).await.unwrap();
        let db = client.database(dbname);
        let collection: Collection<DBO> = db.collection(name);
        Self {
            collection,
            uri,
            db_name: dbname.to_string(),
            client,
        }
    }

    pub async fn is_connected(&self) -> bool {
        match self.client.database("admin").run_command(doc! {"ping": 1}).await {
            Ok(_) => true,
            Err(_) => false,
        }
    }
}

#[async_trait]
impl<DBO> DAO<DBO, String> for MongoDAO<DBO>
where
    DBO: Serialize + DeserializeOwned + Send + Sync,
{}

#[async_trait]
impl<DBO> ReadOnlyDAO<DBO, String> for MongoDAO<DBO>
where
    DBO: DeserializeOwned + Send + Sync,
{
    async fn fetch_one(&self, id: &String) -> ResultErr<Option<DBO>> {
        let filter = doc! {"id": id};
        self.collection
            .find_one(filter)
            .await
            .map_err(|err| Error::Simple(err.to_string()))
    }

    async fn fetch_all(&self, query: Query) -> ResultErr<Vec<DBO>> {
        if !self.is_connected().await {
            error!("la connexion au client mongo est perdu");
        } else {
            info!("la connexion est ok");
        }

        self.find_all(query).await
            .map_err(|err| Error::Simple(err.to_string()))
    }
}

#[async_trait]
impl<DBO> WriteOnlyDAO<DBO, String> for MongoDAO<DBO>
where
    DBO: Serialize + Send + Sync,
{
    async fn insert(&self, entity: &DBO, entity_id: &String) -> ResultErr<String> {
        self.collection
            .insert_one(entity)
            .await
            .map_err(|err| Error::Simple(err.to_string()))
            .map(|_| entity_id.clone())
    }

    async fn update(&self, id: &String, entity: &DBO) -> ResultErr<String> {
        let filter = doc! { "id": id };
        self.collection
            .replace_one(filter, entity)
            .await
            .map(|_| id.clone())
            .map_err(|err| Error::Simple(err.to_string()))
    }
}

impl<DBO> MongoDAO<DBO>
where
    DBO: DeserializeOwned + Send + Sync,
{
    async fn find_all(&self, query: Query) -> Result<Vec<DBO>, mongodb::error::Error> {
        self.collection
            .find(query.into())
            .await?
            .try_collect::<Vec<DBO>>()
            .await
            .map_err(|error| {
                error!("mongodb error: {error:?}");
                error
            })
    }
}