postcard_rpc/
test_utils.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
//! Test utilities for doctests and integration tests

use core::{fmt::Display, future::Future};

use crate::header::{VarHeader, VarKey, VarSeq, VarSeqKind};
use crate::host_client::util::Stopper;
use crate::{
    host_client::{HostClient, RpcFrame, WireRx, WireSpawn, WireTx},
    Endpoint, Topic,
};
use postcard_schema::Schema;
use serde::{de::DeserializeOwned, Serialize};
use tokio::{
    select,
    sync::mpsc::{channel, Receiver, Sender},
};

/// Rx Helper type
pub struct LocalRx {
    fake_error: Stopper,
    from_server: Receiver<Vec<u8>>,
}
/// Tx Helper type
pub struct LocalTx {
    fake_error: Stopper,
    to_server: Sender<Vec<u8>>,
}
/// Spawn helper type
pub struct LocalSpawn;
/// Server type
pub struct LocalFakeServer {
    fake_error: Stopper,
    /// from client to server
    pub from_client: Receiver<Vec<u8>>,
    /// from server to client
    pub to_client: Sender<Vec<u8>>,
}

impl LocalFakeServer {
    /// receive a frame
    pub async fn recv_from_client(&mut self) -> Result<RpcFrame, LocalError> {
        let msg = self.from_client.recv().await.ok_or(LocalError::TxClosed)?;
        let Some((hdr, body)) = VarHeader::take_from_slice(&msg) else {
            return Err(LocalError::BadFrame);
        };
        Ok(RpcFrame {
            header: hdr,
            body: body.to_vec(),
        })
    }

    /// Reply
    pub async fn reply<E: Endpoint>(
        &mut self,
        seq_no: u32,
        data: &E::Response,
    ) -> Result<(), LocalError>
    where
        E::Response: Serialize,
    {
        let frame = RpcFrame {
            header: VarHeader {
                key: VarKey::Key8(E::RESP_KEY),
                seq_no: VarSeq::Seq4(seq_no),
            },
            body: postcard::to_stdvec(data).unwrap(),
        };
        self.to_client
            .send(frame.to_bytes())
            .await
            .map_err(|_| LocalError::RxClosed)
    }

    /// Publish
    pub async fn publish<T: Topic>(
        &mut self,
        seq_no: u32,
        data: &T::Message,
    ) -> Result<(), LocalError>
    where
        T::Message: Serialize,
    {
        let frame = RpcFrame {
            header: VarHeader {
                key: VarKey::Key8(T::TOPIC_KEY),
                seq_no: VarSeq::Seq4(seq_no),
            },
            body: postcard::to_stdvec(data).unwrap(),
        };
        self.to_client
            .send(frame.to_bytes())
            .await
            .map_err(|_| LocalError::RxClosed)
    }

    /// oops
    pub fn cause_fatal_error(&self) {
        self.fake_error.stop();
    }
}

/// Local error type
#[derive(Debug, PartialEq)]
pub enum LocalError {
    /// RxClosed
    RxClosed,
    /// TxClosed
    TxClosed,
    /// BadFrame
    BadFrame,
    /// FatalError
    FatalError,
}

impl Display for LocalError {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        <Self as core::fmt::Debug>::fmt(self, f)
    }
}

impl std::error::Error for LocalError {}

impl WireRx for LocalRx {
    type Error = LocalError;

    #[allow(clippy::manual_async_fn)]
    fn receive(&mut self) -> impl Future<Output = Result<Vec<u8>, Self::Error>> + Send {
        async {
            // This is not usually necessary - HostClient machinery takes care of listening
            // to the stopper, but we have an EXTRA one to simulate I/O failure
            let recv_fut = self.from_server.recv();
            let error_fut = self.fake_error.wait_stopped();

            // Before we await, do a quick check to see if an error occured, this way
            // recv can't accidentally win the select
            if self.fake_error.is_stopped() {
                return Err(LocalError::FatalError);
            }

            select! {
                recv = recv_fut => recv.ok_or(LocalError::RxClosed),
                _err = error_fut => Err(LocalError::FatalError),
            }
        }
    }
}

impl WireTx for LocalTx {
    type Error = LocalError;

    #[allow(clippy::manual_async_fn)]
    fn send(&mut self, data: Vec<u8>) -> impl Future<Output = Result<(), Self::Error>> + Send {
        async {
            // This is not usually necessary - HostClient machinery takes care of listening
            // to the stopper, but we have an EXTRA one to simulate I/O failure
            let send_fut = self.to_server.send(data);
            let error_fut = self.fake_error.wait_stopped();

            // Before we await, do a quick check to see if an error occured, this way
            // send can't accidentally win the select
            if self.fake_error.is_stopped() {
                return Err(LocalError::FatalError);
            }

            select! {
                send = send_fut => send.map_err(|_| LocalError::TxClosed),
                _err = error_fut => Err(LocalError::FatalError),
            }
        }
    }
}

impl WireSpawn for LocalSpawn {
    fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
        tokio::task::spawn(fut);
    }
}

/// This function creates a directly-linked Server and Client.
///
/// This is useful for testing and demonstrating server/client behavior,
/// without actually requiring an external device.
pub fn local_setup<E>(bound: usize, err_uri_path: &str) -> (LocalFakeServer, HostClient<E>)
where
    E: Schema + DeserializeOwned,
{
    let (c2s_tx, c2s_rx) = channel(bound);
    let (s2c_tx, s2c_rx) = channel(bound);

    // NOTE: the normal HostClient machinery has it's own Stopper used for signalling
    // errors, this is an EXTRA stopper we use to simulate the error occurring, like
    // if our USB device disconnected or the serial port was closed
    let fake_error = Stopper::new();

    let client = HostClient::<E>::new_with_wire(
        LocalTx {
            to_server: c2s_tx,
            fake_error: fake_error.clone(),
        },
        LocalRx {
            from_server: s2c_rx,
            fake_error: fake_error.clone(),
        },
        LocalSpawn,
        VarSeqKind::Seq2,
        err_uri_path,
        bound,
    );

    let lfs = LocalFakeServer {
        from_client: c2s_rx,
        to_client: s2c_tx,
        fake_error: fake_error.clone(),
    };

    (lfs, client)
}