1use crate::errors::*;
2use crate::{ClientResult, SlackClientHttpConnector, SlackClientSession};
3
4use futures::future::BoxFuture;
5use futures::stream::BoxStream;
6use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
7use std::marker::PhantomData;
8
9pub trait SlackApiResponseScroller<SCHC>
10where
11 SCHC: SlackClientHttpConnector + Send + Sync,
12{
13 type ResponseType;
14 type CursorType;
15 type ResponseItemType;
16
17 fn has_next(&self) -> bool;
18
19 fn next_mut<'a, 's>(
20 &'a mut self,
21 session: &'a SlackClientSession<'s, SCHC>,
22 ) -> BoxFuture<'a, ClientResult<Self::ResponseType>>;
23
24 fn to_stream<'a, 's>(
25 &'a self,
26 session: &'a SlackClientSession<'s, SCHC>,
27 ) -> BoxStream<'a, ClientResult<Self::ResponseType>>;
28
29 fn to_items_stream<'a, 's>(
30 &'a self,
31 session: &'a SlackClientSession<'s, SCHC>,
32 ) -> BoxStream<'a, ClientResult<Vec<Self::ResponseItemType>>>;
33}
34
35pub trait SlackApiScrollableRequest<SCHC>
36where
37 SCHC: SlackClientHttpConnector + Send + Sync + Clone + 'static,
38{
39 type ResponseType;
40 type CursorType;
41 type ResponseItemType;
42
43 fn scroller<'a, 'b>(
44 &'a self,
45 ) -> Box<
46 dyn SlackApiResponseScroller<
47 SCHC,
48 ResponseType = Self::ResponseType,
49 CursorType = Self::CursorType,
50 ResponseItemType = Self::ResponseItemType,
51 >
52 + 'b
53 + Send
54 + Sync,
55 >
56 where
57 Self: Send + Clone + Sync + 'b,
58 Self::ResponseType: Send
59 + Clone
60 + Sync
61 + SlackApiScrollableResponse<
62 CursorType = Self::CursorType,
63 ResponseItemType = Self::ResponseItemType,
64 > + 'b,
65 Self::CursorType: Send + Clone + Sync + 'b,
66 Self::ResponseItemType: Send + Clone + Sync + 'b,
67 {
68 Box::new(SlackApiResponseScrollerState::new(self))
69 }
70
71 fn with_new_cursor(&self, new_cursor: Option<&Self::CursorType>) -> Self;
72
73 fn scroll<'a, 's>(
74 &'a self,
75 session: &'a SlackClientSession<'s, SCHC>,
76 ) -> BoxFuture<'a, ClientResult<Self::ResponseType>>;
77}
78
79pub trait SlackApiScrollableResponse {
80 type CursorType;
81 type ResponseItemType;
82
83 fn next_cursor(&self) -> Option<&Self::CursorType>;
84 fn scrollable_items<'a>(&'a self) -> Box<dyn Iterator<Item = &'a Self::ResponseItemType> + 'a>;
85}
86
87#[derive(Debug, Clone)]
88pub struct SlackApiResponseScrollerState<RQ, RS, CT, RIT, SCHC>
89where
90 RQ: SlackApiScrollableRequest<SCHC, ResponseType = RS, CursorType = CT, ResponseItemType = RIT>
91 + Send
92 + Sync
93 + Clone,
94 RS: SlackApiScrollableResponse<CursorType = CT, ResponseItemType = RIT> + Send + Sync + Clone,
95 CT: Send + Sync + Clone,
96 RIT: Send + Sync + Clone,
97 SCHC: SlackClientHttpConnector + Send + Sync + Clone + 'static,
98{
99 pub request: RQ,
100 pub last_response: Option<RS>,
101 pub last_cursor: Option<CT>,
102 phantom: PhantomData<SCHC>,
103}
104
105impl<RQ, RS, CT, RIT, SCHC> SlackApiResponseScrollerState<RQ, RS, CT, RIT, SCHC>
106where
107 RQ: SlackApiScrollableRequest<SCHC, ResponseType = RS, CursorType = CT, ResponseItemType = RIT>
108 + Send
109 + Sync
110 + Clone,
111 RS: SlackApiScrollableResponse<CursorType = CT, ResponseItemType = RIT> + Send + Sync + Clone,
112 CT: Send + Sync + Clone,
113 RIT: Send + Sync + Clone,
114 SCHC: SlackClientHttpConnector + Send + Sync + Clone + 'static,
115{
116 pub fn new(request: &RQ) -> Self {
117 Self {
118 request: request.clone(),
119 last_cursor: None,
120 last_response: None,
121 phantom: PhantomData,
122 }
123 }
124}
125
126impl<RQ, RS, CT, RIT, SCHC> SlackApiResponseScroller<SCHC>
127 for SlackApiResponseScrollerState<RQ, RS, CT, RIT, SCHC>
128where
129 RQ: SlackApiScrollableRequest<SCHC, ResponseType = RS, CursorType = CT, ResponseItemType = RIT>
130 + Send
131 + Sync
132 + Clone,
133 RS: SlackApiScrollableResponse<CursorType = CT, ResponseItemType = RIT> + Send + Sync + Clone,
134 CT: Send + Sync + Clone,
135 RIT: Send + Sync + Clone,
136 SCHC: SlackClientHttpConnector + Send + Sync + Clone,
137{
138 type ResponseType = RS;
139 type CursorType = CT;
140 type ResponseItemType = RIT;
141
142 fn has_next(&self) -> bool {
143 self.last_response.is_none() || (self.last_response.is_some() && self.last_cursor.is_some())
144 }
145
146 fn next_mut<'a, 's>(
147 &'a mut self,
148 session: &'a SlackClientSession<'s, SCHC>,
149 ) -> BoxFuture<'a, ClientResult<Self::ResponseType>> {
150 let cursor = &self.last_cursor;
151
152 if !&self.has_next() {
153 async {
154 Err(SlackClientError::EndOfStream(
155 SlackClientEndOfStreamError::new(),
156 ))
157 }
158 .boxed()
159 } else {
160 let updated_request = self.request.with_new_cursor(cursor.as_ref());
161
162 async move {
163 updated_request
164 .scroll(session)
165 .map_ok(|res| {
166 self.last_response = Some(res.clone());
167 self.last_cursor = res.next_cursor().cloned();
168 res
169 })
170 .await
171 }
172 .boxed()
173 }
174 }
175
176 fn to_stream<'a, 's>(
177 &'a self,
178 session: &'a SlackClientSession<'s, SCHC>,
179 ) -> BoxStream<'a, ClientResult<Self::ResponseType>> {
180 let init_state = self.clone();
181 let stream = futures_util::stream::unfold(init_state, move |mut state| async move {
182 if state.has_next() {
183 let res = state.next_mut(session).await;
184 Some((res, state))
185 } else {
186 None
187 }
188 });
189
190 stream.boxed()
191 }
192
193 fn to_items_stream<'a, 's>(
194 &'a self,
195 session: &'a SlackClientSession<'s, SCHC>,
196 ) -> BoxStream<'a, ClientResult<Vec<Self::ResponseItemType>>> {
197 self.to_stream(session)
198 .map_ok(|rs| {
199 rs.scrollable_items()
200 .cloned()
201 .collect::<Vec<Self::ResponseItemType>>()
202 })
203 .boxed()
204 }
205}