slack_morphism/
scroller.rs

1use crate::errors::*;
2use crate::{ClientResult, SlackClientHttpConnector, SlackClientSession};
3
4use futures::future::BoxFuture;
5use futures::stream::BoxStream;
6use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
7use std::marker::PhantomData;
8
9pub trait SlackApiResponseScroller<SCHC>
10where
11    SCHC: SlackClientHttpConnector + Send + Sync,
12{
13    type ResponseType;
14    type CursorType;
15    type ResponseItemType;
16
17    fn has_next(&self) -> bool;
18
19    fn next_mut<'a, 's>(
20        &'a mut self,
21        session: &'a SlackClientSession<'s, SCHC>,
22    ) -> BoxFuture<'a, ClientResult<Self::ResponseType>>;
23
24    fn to_stream<'a, 's>(
25        &'a self,
26        session: &'a SlackClientSession<'s, SCHC>,
27    ) -> BoxStream<'a, ClientResult<Self::ResponseType>>;
28
29    fn to_items_stream<'a, 's>(
30        &'a self,
31        session: &'a SlackClientSession<'s, SCHC>,
32    ) -> BoxStream<'a, ClientResult<Vec<Self::ResponseItemType>>>;
33}
34
35pub trait SlackApiScrollableRequest<SCHC>
36where
37    SCHC: SlackClientHttpConnector + Send + Sync + Clone + 'static,
38{
39    type ResponseType;
40    type CursorType;
41    type ResponseItemType;
42
43    fn scroller<'a, 'b>(
44        &'a self,
45    ) -> Box<
46        dyn SlackApiResponseScroller<
47                SCHC,
48                ResponseType = Self::ResponseType,
49                CursorType = Self::CursorType,
50                ResponseItemType = Self::ResponseItemType,
51            >
52            + 'b
53            + Send
54            + Sync,
55    >
56    where
57        Self: Send + Clone + Sync + 'b,
58        Self::ResponseType: Send
59            + Clone
60            + Sync
61            + SlackApiScrollableResponse<
62                CursorType = Self::CursorType,
63                ResponseItemType = Self::ResponseItemType,
64            > + 'b,
65        Self::CursorType: Send + Clone + Sync + 'b,
66        Self::ResponseItemType: Send + Clone + Sync + 'b,
67    {
68        Box::new(SlackApiResponseScrollerState::new(self))
69    }
70
71    fn with_new_cursor(&self, new_cursor: Option<&Self::CursorType>) -> Self;
72
73    fn scroll<'a, 's>(
74        &'a self,
75        session: &'a SlackClientSession<'s, SCHC>,
76    ) -> BoxFuture<'a, ClientResult<Self::ResponseType>>;
77}
78
79pub trait SlackApiScrollableResponse {
80    type CursorType;
81    type ResponseItemType;
82
83    fn next_cursor(&self) -> Option<&Self::CursorType>;
84    fn scrollable_items<'a>(&'a self) -> Box<dyn Iterator<Item = &'a Self::ResponseItemType> + 'a>;
85}
86
87#[derive(Debug, Clone)]
88pub struct SlackApiResponseScrollerState<RQ, RS, CT, RIT, SCHC>
89where
90    RQ: SlackApiScrollableRequest<SCHC, ResponseType = RS, CursorType = CT, ResponseItemType = RIT>
91        + Send
92        + Sync
93        + Clone,
94    RS: SlackApiScrollableResponse<CursorType = CT, ResponseItemType = RIT> + Send + Sync + Clone,
95    CT: Send + Sync + Clone,
96    RIT: Send + Sync + Clone,
97    SCHC: SlackClientHttpConnector + Send + Sync + Clone + 'static,
98{
99    pub request: RQ,
100    pub last_response: Option<RS>,
101    pub last_cursor: Option<CT>,
102    phantom: PhantomData<SCHC>,
103}
104
105impl<RQ, RS, CT, RIT, SCHC> SlackApiResponseScrollerState<RQ, RS, CT, RIT, SCHC>
106where
107    RQ: SlackApiScrollableRequest<SCHC, ResponseType = RS, CursorType = CT, ResponseItemType = RIT>
108        + Send
109        + Sync
110        + Clone,
111    RS: SlackApiScrollableResponse<CursorType = CT, ResponseItemType = RIT> + Send + Sync + Clone,
112    CT: Send + Sync + Clone,
113    RIT: Send + Sync + Clone,
114    SCHC: SlackClientHttpConnector + Send + Sync + Clone + 'static,
115{
116    pub fn new(request: &RQ) -> Self {
117        Self {
118            request: request.clone(),
119            last_cursor: None,
120            last_response: None,
121            phantom: PhantomData,
122        }
123    }
124}
125
126impl<RQ, RS, CT, RIT, SCHC> SlackApiResponseScroller<SCHC>
127    for SlackApiResponseScrollerState<RQ, RS, CT, RIT, SCHC>
128where
129    RQ: SlackApiScrollableRequest<SCHC, ResponseType = RS, CursorType = CT, ResponseItemType = RIT>
130        + Send
131        + Sync
132        + Clone,
133    RS: SlackApiScrollableResponse<CursorType = CT, ResponseItemType = RIT> + Send + Sync + Clone,
134    CT: Send + Sync + Clone,
135    RIT: Send + Sync + Clone,
136    SCHC: SlackClientHttpConnector + Send + Sync + Clone,
137{
138    type ResponseType = RS;
139    type CursorType = CT;
140    type ResponseItemType = RIT;
141
142    fn has_next(&self) -> bool {
143        self.last_response.is_none() || (self.last_response.is_some() && self.last_cursor.is_some())
144    }
145
146    fn next_mut<'a, 's>(
147        &'a mut self,
148        session: &'a SlackClientSession<'s, SCHC>,
149    ) -> BoxFuture<'a, ClientResult<Self::ResponseType>> {
150        let cursor = &self.last_cursor;
151
152        if !&self.has_next() {
153            async {
154                Err(SlackClientError::EndOfStream(
155                    SlackClientEndOfStreamError::new(),
156                ))
157            }
158            .boxed()
159        } else {
160            let updated_request = self.request.with_new_cursor(cursor.as_ref());
161
162            async move {
163                updated_request
164                    .scroll(session)
165                    .map_ok(|res| {
166                        self.last_response = Some(res.clone());
167                        self.last_cursor = res.next_cursor().cloned();
168                        res
169                    })
170                    .await
171            }
172            .boxed()
173        }
174    }
175
176    fn to_stream<'a, 's>(
177        &'a self,
178        session: &'a SlackClientSession<'s, SCHC>,
179    ) -> BoxStream<'a, ClientResult<Self::ResponseType>> {
180        let init_state = self.clone();
181        let stream = futures_util::stream::unfold(init_state, move |mut state| async move {
182            if state.has_next() {
183                let res = state.next_mut(session).await;
184                Some((res, state))
185            } else {
186                None
187            }
188        });
189
190        stream.boxed()
191    }
192
193    fn to_items_stream<'a, 's>(
194        &'a self,
195        session: &'a SlackClientSession<'s, SCHC>,
196    ) -> BoxStream<'a, ClientResult<Vec<Self::ResponseItemType>>> {
197        self.to_stream(session)
198            .map_ok(|rs| {
199                rs.scrollable_items()
200                    .cloned()
201                    .collect::<Vec<Self::ResponseItemType>>()
202            })
203            .boxed()
204    }
205}