use crate::{
error::*, ActiveModelTrait, ColumnTrait, ConnectionTrait, DbBackend, EntityTrait, Insert,
IntoActiveModel, Iterable, PrimaryKeyToColumn, PrimaryKeyTrait, SelectModel, SelectorRaw,
TryFromU64, TryInsert,
};
use sea_query::{FromValueTuple, Iden, InsertStatement, Query, ValueTuple};
use std::{future::Future, marker::PhantomData};
#[derive(Debug)]
pub struct Inserter<A>
where
A: ActiveModelTrait,
{
primary_key: Option<ValueTuple>,
query: InsertStatement,
model: PhantomData<A>,
}
#[derive(Debug)]
pub struct InsertResult<A>
where
A: ActiveModelTrait,
{
pub last_insert_id: <<<A as ActiveModelTrait>::Entity as EntityTrait>::PrimaryKey as PrimaryKeyTrait>::ValueType,
}
#[derive(Debug)]
pub enum TryInsertResult<T> {
Empty,
Conflicted,
Inserted(T),
}
impl<A> TryInsert<A>
where
A: ActiveModelTrait,
{
#[allow(unused_mut)]
pub async fn exec<'a, C>(self, db: &'a C) -> Result<TryInsertResult<InsertResult<A>>, DbErr>
where
C: ConnectionTrait,
A: 'a,
{
if self.insert_struct.columns.is_empty() {
return Ok(TryInsertResult::Empty);
}
let res = self.insert_struct.exec(db).await;
match res {
Ok(res) => Ok(TryInsertResult::Inserted(res)),
Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
Err(err) => Err(err),
}
}
pub async fn exec_without_returning<'a, C>(
self,
db: &'a C,
) -> Result<TryInsertResult<u64>, DbErr>
where
<A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
C: ConnectionTrait,
A: 'a,
{
if self.insert_struct.columns.is_empty() {
return Ok(TryInsertResult::Empty);
}
let res = self.insert_struct.exec_without_returning(db).await;
match res {
Ok(res) => Ok(TryInsertResult::Inserted(res)),
Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
Err(err) => Err(err),
}
}
pub async fn exec_with_returning<'a, C>(
self,
db: &'a C,
) -> Result<TryInsertResult<<A::Entity as EntityTrait>::Model>, DbErr>
where
<A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
C: ConnectionTrait,
A: 'a,
{
if self.insert_struct.columns.is_empty() {
return Ok(TryInsertResult::Empty);
}
let res = self.insert_struct.exec_with_returning(db).await;
match res {
Ok(res) => Ok(TryInsertResult::Inserted(res)),
Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
Err(err) => Err(err),
}
}
}
impl<A> Insert<A>
where
A: ActiveModelTrait,
{
#[allow(unused_mut)]
pub fn exec<'a, C>(self, db: &'a C) -> impl Future<Output = Result<InsertResult<A>, DbErr>> + '_
where
C: ConnectionTrait,
A: 'a,
{
let mut query = self.query;
if db.support_returning() {
let db_backend = db.get_database_backend();
let returning =
Query::returning().exprs(<A::Entity as EntityTrait>::PrimaryKey::iter().map(|c| {
c.into_column()
.select_as(c.into_column().into_returning_expr(db_backend))
}));
query.returning(returning);
}
Inserter::<A>::new(self.primary_key, query).exec(db)
}
pub fn exec_without_returning<'a, C>(
self,
db: &'a C,
) -> impl Future<Output = Result<u64, DbErr>> + '_
where
<A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
C: ConnectionTrait,
A: 'a,
{
Inserter::<A>::new(self.primary_key, self.query).exec_without_returning(db)
}
pub fn exec_with_returning<'a, C>(
self,
db: &'a C,
) -> impl Future<Output = Result<<A::Entity as EntityTrait>::Model, DbErr>> + '_
where
<A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
C: ConnectionTrait,
A: 'a,
{
Inserter::<A>::new(self.primary_key, self.query).exec_with_returning(db)
}
}
impl<A> Inserter<A>
where
A: ActiveModelTrait,
{
pub fn new(primary_key: Option<ValueTuple>, query: InsertStatement) -> Self {
Self {
primary_key,
query,
model: PhantomData,
}
}
pub fn exec<'a, C>(self, db: &'a C) -> impl Future<Output = Result<InsertResult<A>, DbErr>> + '_
where
C: ConnectionTrait,
A: 'a,
{
exec_insert(self.primary_key, self.query, db)
}
pub fn exec_without_returning<'a, C>(
self,
db: &'a C,
) -> impl Future<Output = Result<u64, DbErr>> + '_
where
C: ConnectionTrait,
A: 'a,
{
exec_insert_without_returning(self.query, db)
}
pub fn exec_with_returning<'a, C>(
self,
db: &'a C,
) -> impl Future<Output = Result<<A::Entity as EntityTrait>::Model, DbErr>> + '_
where
<A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
C: ConnectionTrait,
A: 'a,
{
exec_insert_with_returning::<A, _>(self.primary_key, self.query, db)
}
}
async fn exec_insert<A, C>(
primary_key: Option<ValueTuple>,
statement: InsertStatement,
db: &C,
) -> Result<InsertResult<A>, DbErr>
where
C: ConnectionTrait,
A: ActiveModelTrait,
{
let db_backend = db.get_database_backend();
let statement = db_backend.build(&statement);
type PrimaryKey<A> = <<A as ActiveModelTrait>::Entity as EntityTrait>::PrimaryKey;
type ValueTypeOf<A> = <PrimaryKey<A> as PrimaryKeyTrait>::ValueType;
let last_insert_id = match (primary_key, db.support_returning()) {
(Some(value_tuple), _) => {
let res = db.execute(statement).await?;
if res.rows_affected() == 0 {
return Err(DbErr::RecordNotInserted);
}
FromValueTuple::from_value_tuple(value_tuple)
}
(None, true) => {
let mut rows = db.query_all(statement).await?;
let row = match rows.pop() {
Some(row) => row,
None => return Err(DbErr::RecordNotInserted),
};
let cols = PrimaryKey::<A>::iter()
.map(|col| col.to_string())
.collect::<Vec<_>>();
row.try_get_many("", cols.as_ref())
.map_err(|_| DbErr::UnpackInsertId)?
}
(None, false) => {
let res = db.execute(statement).await?;
if res.rows_affected() == 0 {
return Err(DbErr::RecordNotInserted);
}
let last_insert_id = res.last_insert_id();
if db_backend == DbBackend::MySql && last_insert_id == 0 {
return Err(DbErr::RecordNotInserted);
}
ValueTypeOf::<A>::try_from_u64(last_insert_id).map_err(|_| DbErr::UnpackInsertId)?
}
};
Ok(InsertResult { last_insert_id })
}
async fn exec_insert_without_returning<C>(
insert_statement: InsertStatement,
db: &C,
) -> Result<u64, DbErr>
where
C: ConnectionTrait,
{
let db_backend = db.get_database_backend();
let insert_statement = db_backend.build(&insert_statement);
let exec_result = db.execute(insert_statement).await?;
Ok(exec_result.rows_affected())
}
async fn exec_insert_with_returning<A, C>(
primary_key: Option<ValueTuple>,
mut insert_statement: InsertStatement,
db: &C,
) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
where
<A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
C: ConnectionTrait,
A: ActiveModelTrait,
{
let db_backend = db.get_database_backend();
let found = match db.support_returning() {
true => {
let returning = Query::returning().exprs(
<A::Entity as EntityTrait>::Column::iter()
.map(|c| c.select_as(c.into_returning_expr(db_backend))),
);
insert_statement.returning(returning);
let insert_statement = db_backend.build(&insert_statement);
SelectorRaw::<SelectModel<<A::Entity as EntityTrait>::Model>>::from_statement(
insert_statement,
)
.one(db)
.await?
}
false => {
let insert_res = exec_insert::<A, _>(primary_key, insert_statement, db).await?;
<A::Entity as EntityTrait>::find_by_id(insert_res.last_insert_id)
.one(db)
.await?
}
};
match found {
Some(model) => Ok(model),
None => Err(DbErr::RecordNotFound(
"Failed to find inserted item".to_owned(),
)),
}
}