zino_orm/
transaction.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
use super::{
    executor::Executor, mutation::MutationExt, query::QueryExt, schema::Schema, DatabaseDriver,
    EncodeColumn,
};
use std::fmt::Display;
use zino_core::{
    error::Error,
    extension::JsonValueExt,
    model::{Mutation, Query},
    BoxFuture, Map,
};

#[cfg(feature = "orm-sqlx")]
use sqlx::Acquire;

/// An in-progress database transaction.
///
/// # Examples
///
/// ```rust,ignore
/// use crate::model::{Account, AccountColumn, Order, Stock, StockColumn};
/// use zino_orm::{MutationBuilder, QueryBuilder, Schema, Transaction};
///
/// let user_id = "0193d8e6-2970-7b52-bc06-80a981212aa9";
/// let product_id = "0193c06d-bee6-7070-a5e7-9659161bddb5";
///
/// let order = Order::from_customer(user_id, product_id);
/// let quantity = order.quantity();
/// let total_price = order.total_price();
/// let order_ctx = order.prepare_insert()?;
///
/// let stock_query = QueryBuilder::<Stock>::new()
///     .and_eq(StockColumn::ProductId, product_id)
///     .and_ge(StockColumn::Quantity, quantity)
///     .build();
/// let mut stock_mutation = MutationBuilder::<Stock>::new()
///     .inc(StockColumn::Quantity, -quantity)
///     .build();
/// let stock_ctx = Stock::prepare_update_one(&stock_query, &mut stock_mutation).await?;
///
/// let account_query = QueryBuilder::<Account>::new()
///     .and_eq(AccountColumn::UserId, user_id)
///     .and_ge(AccountColumn::Balance, total_price)
///     .build();
/// let mut account_mutation = MutationBuilder::<Account>::new()
///     .inc(AccountColumn::Balance, -total_price)
///     .build();
/// let account_ctx = Account::prepare_update_one(&account_query, &mut account_mutation).await?;
///
/// Order::transaction(move |tx| Box::pin(async move {
///      let connection = tx.acquire().await?;
///      connection.execute(order_ctx.query()).await?;
///      connection.execute(stock_ctx.query()).await?;
///      connection.execute(account_ctx.query()).await?;
///      Ok(())
/// })).await?;
/// ```
pub trait Transaction<K, Tx>: Schema<PrimaryKey = K>
where
    K: Default + Display + PartialEq,
{
    /// Executes the specific operations inside of a transaction.
    /// If the operations return an error, the transaction will be rolled back;
    /// if not, the transaction will be committed.
    async fn transaction<F, T>(tx: F) -> Result<T, Error>
    where
        F: for<'t> FnOnce(&'t mut Tx) -> BoxFuture<'t, Result<T, Error>>;

    /// Executes the queries sequentially inside of a transaction.
    /// If it returns an error, the transaction will be rolled back;
    /// if not, the transaction will be committed.
    async fn transactional_execute(queries: &[&str], params: Option<&Map>) -> Result<u64, Error>;

    /// Inserts the model and its associations inside of a transaction.
    async fn transactional_insert<M: Schema>(self, models: Vec<M>) -> Result<u64, Error>;

    /// Updates the models inside of a transaction.
    async fn transactional_update<M: Schema>(
        queries: (&Query, &Query),
        mutations: (&mut Mutation, &mut Mutation),
    ) -> Result<u64, Error>;

    /// Deletes the models inside of a transaction.
    async fn transactional_delete<M: Schema>(queries: (&Query, &Query)) -> Result<u64, Error>;
}

#[cfg(feature = "orm-sqlx")]
impl<'c, M, K> Transaction<K, sqlx::Transaction<'c, DatabaseDriver>> for M
where
    M: Schema<PrimaryKey = K>,
    K: Default + Display + PartialEq,
{
    async fn transaction<F, T>(tx: F) -> Result<T, Error>
    where
        F: for<'t> FnOnce(
            &'t mut sqlx::Transaction<'c, DatabaseDriver>,
        ) -> BoxFuture<'t, Result<T, Error>>,
    {
        let mut transaction = Self::acquire_writer().await?.pool().begin().await?;
        let data = tx(&mut transaction).await?;
        transaction.commit().await?;
        Ok(data)
    }

    async fn transactional_execute(queries: &[&str], params: Option<&Map>) -> Result<u64, Error> {
        let mut transaction = Self::acquire_writer().await?.pool().begin().await?;
        let connection = transaction.acquire().await?;

        let mut total_rows = 0;
        for query in queries {
            let (sql, values) = Query::prepare_query(query, params);
            let mut ctx = Self::before_scan(&sql).await?;
            ctx.set_query(sql);

            let mut arguments = values
                .iter()
                .map(|v| v.to_string_unquoted())
                .collect::<Vec<_>>();
            let rows_affected = connection
                .execute_with(ctx.query(), &arguments)
                .await?
                .rows_affected();
            total_rows += rows_affected;
            ctx.append_arguments(&mut arguments);
            ctx.set_query_result(rows_affected, true);
            Self::after_scan(&ctx).await?;
        }
        transaction.commit().await?;
        Ok(total_rows)
    }

    async fn transactional_insert<S: Schema>(mut self, associations: Vec<S>) -> Result<u64, Error> {
        let mut transaction = Self::acquire_writer().await?.pool().begin().await?;
        let connection = transaction.acquire().await?;

        // Inserts the model
        let model_data = self.before_insert().await?;
        let map = self.into_map();
        let columns = Self::columns();

        let mut fields = Vec::with_capacity(columns.len());
        let values = columns
            .iter()
            .filter_map(|col| {
                if col.auto_increment() {
                    None
                } else {
                    let name = col.name();
                    fields.push(name);
                    Some(col.encode_value(map.get(name)))
                }
            })
            .collect::<Vec<_>>()
            .join(", ");
        let fields = fields.join(", ");
        let table_name = Query::table_name_escaped::<Self>();
        let sql = format!("INSERT INTO {table_name} ({fields}) VALUES ({values});");
        let mut ctx = Self::before_scan(&sql).await?;
        ctx.set_query(sql);

        let mut total_rows = 0;
        let query_result = connection.execute(ctx.query()).await?;
        let (last_insert_id, rows_affected) = Query::parse_query_result(query_result);
        let success = rows_affected == 1;
        if let Some(last_insert_id) = last_insert_id {
            ctx.set_last_insert_id(last_insert_id);
        }
        total_rows += rows_affected;
        ctx.set_query_result(rows_affected, success);
        Self::after_scan(&ctx).await?;
        Self::after_insert(&ctx, model_data).await?;

        // Inserts associations
        let columns = S::columns();
        let mut values = Vec::with_capacity(associations.len());
        for mut association in associations.into_iter() {
            let _association_data = association.before_insert().await?;
            let map = association.into_map();
            let entries = columns
                .iter()
                .map(|col| col.encode_value(map.get(col.name())))
                .collect::<Vec<_>>()
                .join(", ");
            values.push(format!("({entries})"));
        }

        let table_name = Query::table_name_escaped::<S>();
        let fields = S::fields().join(", ");
        let values = values.join(", ");
        let sql = format!("INSERT INTO {table_name} ({fields}) VALUES {values};");
        let mut ctx = S::before_scan(&sql).await?;
        ctx.set_query(sql);

        let rows_affected = connection.execute(ctx.query()).await?.rows_affected();
        total_rows += rows_affected;
        ctx.set_query_result(rows_affected, true);
        S::after_scan(&ctx).await?;

        // Commits the transaction
        transaction.commit().await?;
        Ok(total_rows)
    }

    async fn transactional_update<S: Schema>(
        queries: (&Query, &Query),
        mutations: (&mut Mutation, &mut Mutation),
    ) -> Result<u64, Error> {
        let mut transaction = Self::acquire_writer().await?.pool().begin().await?;
        let connection = transaction.acquire().await?;

        let query = queries.0;
        let mutation = mutations.0;
        Self::before_mutation(query, mutation).await?;

        let table_name = query.format_table_name::<Self>();
        let filters = query.format_filters::<Self>();
        let updates = mutation.format_updates::<Self>();
        let sql = format!("UPDATE {table_name} SET {updates} {filters};");
        let mut ctx = Self::before_scan(&sql).await?;
        ctx.set_query(sql);

        let mut total_rows = 0;
        let rows_affected = connection.execute(ctx.query()).await?.rows_affected();
        total_rows += rows_affected;
        ctx.set_query_result(rows_affected, true);
        Self::after_scan(&ctx).await?;
        Self::after_mutation(&ctx).await?;

        let query = queries.1;
        let mutation = mutations.1;
        S::before_mutation(query, mutation).await?;

        let table_name = query.format_table_name::<S>();
        let filters = query.format_filters::<S>();
        let updates = mutation.format_updates::<S>();
        let sql = format!("UPDATE {table_name} SET {updates} {filters};");
        let mut ctx = S::before_scan(&sql).await?;
        ctx.set_query(sql);

        let rows_affected = connection.execute(ctx.query()).await?.rows_affected();
        total_rows += rows_affected;
        ctx.set_query_result(rows_affected, true);
        S::after_scan(&ctx).await?;
        S::after_mutation(&ctx).await?;

        // Commits the transaction
        transaction.commit().await?;
        Ok(total_rows)
    }

    async fn transactional_delete<S: Schema>(queries: (&Query, &Query)) -> Result<u64, Error> {
        let mut transaction = Self::acquire_writer().await?.pool().begin().await?;
        let connection = transaction.acquire().await?;

        let query = queries.0;
        Self::before_query(query).await?;

        let table_name = query.format_table_name::<Self>();
        let filters = query.format_filters::<Self>();
        let sql = format!("DELETE FROM {table_name} {filters};");
        let mut ctx = Self::before_scan(&sql).await?;
        ctx.set_query(sql);

        let mut total_rows = 0;
        let rows_affected = connection.execute(ctx.query()).await?.rows_affected();
        total_rows += rows_affected;
        ctx.set_query_result(rows_affected, true);
        Self::after_scan(&ctx).await?;
        Self::after_query(&ctx).await?;

        let query = queries.1;
        S::before_query(query).await?;

        let table_name = query.format_table_name::<S>();
        let filters = query.format_filters::<S>();
        let sql = format!("DELETE FROM {table_name} {filters};");
        let mut ctx = S::before_scan(&sql).await?;
        ctx.set_query(sql);

        let rows_affected = connection.execute(ctx.query()).await?.rows_affected();
        total_rows += rows_affected;
        ctx.set_query_result(rows_affected, true);
        S::after_scan(&ctx).await?;
        S::after_query(&ctx).await?;

        // Commits the transaction
        transaction.commit().await?;
        Ok(total_rows)
    }
}