ic_web3_rs/transports/
batch.rs1use crate::{
4 error::{self, Error},
5 rpc, BatchTransport, RequestId, Transport,
6};
7use futures::{
8 channel::oneshot,
9 task::{Context, Poll},
10 Future, FutureExt,
11};
12use ic_cdk::api::management_canister::http_request::TransformContext;
13use parking_lot::Mutex;
14use std::{collections::BTreeMap, pin::Pin, sync::Arc};
15
16use super::ic_http_client::CallOptions;
17
18type Pending = oneshot::Sender<error::Result<rpc::Value>>;
19type PendingRequests = Arc<Mutex<BTreeMap<RequestId, Pending>>>;
20
21#[derive(Debug, Clone)]
23pub struct Batch<T> {
24 transport: T,
25 pending: PendingRequests,
26 batch: Arc<Mutex<Vec<(RequestId, rpc::Call)>>>,
27}
28
29impl<T> Batch<T>
30where
31 T: BatchTransport,
32{
33 pub fn new(transport: T) -> Self {
35 Batch {
36 transport,
37 pending: Default::default(),
38 batch: Default::default(),
39 }
40 }
41
42 pub fn submit_batch(&self) -> impl Future<Output = error::Result<Vec<error::Result<rpc::Value>>>> {
44 let batch = std::mem::take(&mut *self.batch.lock());
45 let ids = batch.iter().map(|&(id, _)| id).collect::<Vec<_>>();
46
47 let batch = self.transport.send_batch(batch);
48 let pending = self.pending.clone();
49
50 async move {
51 let res = batch.await;
52 let mut pending = pending.lock();
53 for (idx, request_id) in ids.into_iter().enumerate() {
54 if let Some(rx) = pending.remove(&request_id) {
55 let _ = match res {
57 Ok(ref results) if results.len() > idx => rx.send(results[idx].clone()),
58 Err(ref err) => rx.send(Err(err.clone())),
59 _ => rx.send(Err(Error::Internal)),
60 };
61 }
62 }
63 res
64 }
65 }
66}
67
68impl<T> Transport for Batch<T>
69where
70 T: BatchTransport,
71{
72 type Out = SingleResult;
73
74 fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call) {
75 self.transport.prepare(method, params)
76 }
77
78 fn send(&self, id: RequestId, request: rpc::Call, options: CallOptions) -> Self::Out {
79 let (tx, rx) = oneshot::channel();
80 self.pending.lock().insert(id, tx);
81 self.batch.lock().push((id, request));
82
83 SingleResult(rx)
84 }
85
86 fn set_max_response_bytes(&mut self, v: u64) {
87 self.transport.set_max_response_bytes(v);
88 }
89}
90
91pub struct SingleResult(oneshot::Receiver<error::Result<rpc::Value>>);
94
95impl Future for SingleResult {
96 type Output = error::Result<rpc::Value>;
97
98 fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
99 Poll::Ready(ready!(self.0.poll_unpin(ctx)).map_err(|_| Error::Internal)?)
100 }
101}