sqlx_core/pool/
executor.rs

1use either::Either;
2use futures_core::future::BoxFuture;
3use futures_core::stream::BoxStream;
4use futures_util::TryStreamExt;
5
6use crate::database::Database;
7use crate::describe::Describe;
8use crate::error::Error;
9use crate::executor::{Execute, Executor};
10use crate::pool::Pool;
11
12impl<'p, DB: Database> Executor<'p> for &'_ Pool<DB>
13where
14    for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
15{
16    type Database = DB;
17
18    fn fetch_many<'e, 'q: 'e, E>(
19        self,
20        query: E,
21    ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
22    where
23        E: 'q + Execute<'q, Self::Database>,
24    {
25        let pool = self.clone();
26
27        Box::pin(try_stream! {
28            let mut conn = pool.acquire().await?;
29            let mut s = conn.fetch_many(query);
30
31            while let Some(v) = s.try_next().await? {
32                r#yield!(v);
33            }
34
35            Ok(())
36        })
37    }
38
39    fn fetch_optional<'e, 'q: 'e, E>(
40        self,
41        query: E,
42    ) -> BoxFuture<'e, Result<Option<DB::Row>, Error>>
43    where
44        E: 'q + Execute<'q, Self::Database>,
45    {
46        let pool = self.clone();
47
48        Box::pin(async move { pool.acquire().await?.fetch_optional(query).await })
49    }
50
51    fn prepare_with<'e, 'q: 'e>(
52        self,
53        sql: &'q str,
54        parameters: &'e [<Self::Database as Database>::TypeInfo],
55    ) -> BoxFuture<'e, Result<<Self::Database as Database>::Statement<'q>, Error>> {
56        let pool = self.clone();
57
58        Box::pin(async move { pool.acquire().await?.prepare_with(sql, parameters).await })
59    }
60
61    #[doc(hidden)]
62    fn describe<'e, 'q: 'e>(
63        self,
64        sql: &'q str,
65    ) -> BoxFuture<'e, Result<Describe<Self::Database>, Error>> {
66        let pool = self.clone();
67
68        Box::pin(async move { pool.acquire().await?.describe(sql).await })
69    }
70}
71
72// Causes an overflow when evaluating `&mut DB::Connection: Executor`.
73//
74//
75// impl<'c, DB: Database> crate::executor::Executor<'c> for &'c mut crate::pool::PoolConnection<DB>
76// where
77//     &'c mut DB::Connection: Executor<'c, Database = DB>,
78// {
79//     type Database = DB;
80//
81//
82//
83//     #[inline]
84//     fn fetch_many<'e, 'q: 'e, E: 'q>(
85//         self,
86//         query: E,
87//     ) -> futures_core::stream::BoxStream<
88//         'e,
89//         Result<
90//             either::Either<<DB as crate::database::Database>::QueryResult, DB::Row>,
91//             crate::error::Error,
92//         >,
93//     >
94//     where
95//         'c: 'e,
96//         E: crate::executor::Execute<'q, DB>,
97//     {
98//         (**self).fetch_many(query)
99//     }
100//
101//     #[inline]
102//     fn fetch_optional<'e, 'q: 'e, E: 'q>(
103//         self,
104//         query: E,
105//     ) -> futures_core::future::BoxFuture<'e, Result<Option<DB::Row>, crate::error::Error>>
106//     where
107//         'c: 'e,
108//         E: crate::executor::Execute<'q, DB>,
109//     {
110//         (**self).fetch_optional(query)
111//     }
112//
113//     #[inline]
114//     fn prepare_with<'e, 'q: 'e>(
115//         self,
116//         sql: &'q str,
117//         parameters: &'e [<DB as crate::database::Database>::TypeInfo],
118//     ) -> futures_core::future::BoxFuture<
119//         'e,
120//         Result<<DB as crate::database::Database>::Statement<'q>, crate::error::Error>,
121//     >
122//     where
123//         'c: 'e,
124//     {
125//         (**self).prepare_with(sql, parameters)
126//     }
127//
128//     #[doc(hidden)]
129//     #[inline]
130//     fn describe<'e, 'q: 'e>(
131//         self,
132//         sql: &'q str,
133//     ) -> futures_core::future::BoxFuture<
134//         'e,
135//         Result<crate::describe::Describe<DB>, crate::error::Error>,
136//     >
137//     where
138//         'c: 'e,
139//     {
140//         (**self).describe(sql)
141//     }
142// }