arti_client/
rpc.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
//! Declare RPC functionality on for the `arti-client` crate.

use derive_deftly::Deftly;
use dyn_clone::DynClone;
use futures::{SinkExt as _, StreamExt as _};
use serde::{Deserialize, Serialize};
use std::{net::IpAddr, sync::Arc};
use tor_proto::stream::DataStream;

use tor_rpcbase as rpc;
use tor_rtcompat::Runtime;

use crate::{StreamPrefs, TorAddr, TorClient};

impl<R: Runtime> TorClient<R> {
    /// Ensure that every RPC method is registered for this instantiation of TorClient.
    ///
    /// We can't use [`rpc::static_rpc_invoke_fn`] for these, since TorClient is
    /// parameterized.
    pub fn rpc_methods() -> Vec<rpc::dispatch::InvokerEnt> {
        rpc::invoker_ent_list![
            get_client_status::<R>,
            watch_client_status::<R>,
            isolated_client::<R>,
            @special client_connect_with_prefs::<R>,
            @special client_resolve_with_prefs::<R>,
            @special client_resolve_ptr_with_prefs::<R>,
        ]
    }
}

/// Return current bootstrap and health information for a client.
#[derive(Deftly, Debug, Serialize, Deserialize)]
#[derive_deftly(rpc::DynMethod)]
#[deftly(rpc(method_name = "arti:get_client_status"))]
struct GetClientStatus {}

impl rpc::RpcMethod for GetClientStatus {
    type Output = ClientStatusInfo;
    type Update = rpc::NoUpdates;
}

/// Run forever, delivering updates about a client's bootstrap and health information.
///
/// (This method can return updates that have no visible changes.)
#[derive(Deftly, Debug, Serialize, Deserialize)]
#[derive_deftly(rpc::DynMethod)]
#[deftly(rpc(method_name = "arti:watch_client_status"))]
struct WatchClientStatus {}

impl rpc::RpcMethod for WatchClientStatus {
    type Output = rpc::Nil; // TODO: Possibly there should be an rpc::Never for methods that don't return.
    type Update = ClientStatusInfo;
}

/// Reported bootstrap and health information for a client.
///
/// Note that all `TorClient`s on a session share the same underlying bootstrap status:
/// if you check the status for one, you don't need to check the others.
#[derive(Serialize, Deserialize)]
struct ClientStatusInfo {
    /// True if the client is ready for traffic.
    ready: bool,
    /// Approximate estimate of how close the client is to being ready for traffic.
    ///
    /// This value is a rough approximation; its exact implementation may change over
    /// arti versions.  It is not guaranteed to be monotonic.
    fraction: f32,
    /// If present, a description of possible problem(s) that may be stopping
    /// the client from using the Tor network.
    blocked: Option<String>,
}

impl From<crate::status::BootstrapStatus> for ClientStatusInfo {
    fn from(s: crate::status::BootstrapStatus) -> Self {
        let ready = s.ready_for_traffic();
        let fraction = s.as_frac();
        let blocked = s.blocked().map(|b| b.to_string());
        Self {
            ready,
            fraction,
            blocked,
        }
    }
}

// NOTE: These functions could be defined as methods on TorClient<R>.
// I'm defining them like this to make it more clear that they are never
// invoked as client.method(), but only via the RPC system.
// We can revisit this later if we want.

// TODO RPC: Once we have one or two more get/watch combinations,
// we should look into some facility for automatically declaring them,
// so that their behavior stays uniform.
//
// See https://gitlab.torproject.org/tpo/core/arti/-/issues/1384#note_3023659

/// Invocable function to run [`GetClientStatus`] on a [`TorClient`].
async fn get_client_status<R: Runtime>(
    client: Arc<TorClient<R>>,
    _method: Box<GetClientStatus>,
    _ctx: Arc<dyn rpc::Context>,
) -> Result<ClientStatusInfo, rpc::RpcError> {
    Ok(client.bootstrap_status().into())
}

/// Invocable function to run [`WatchClientStatus`] on a [`TorClient`].
async fn watch_client_status<R: Runtime>(
    client: Arc<TorClient<R>>,
    _method: Box<WatchClientStatus>,
    _ctx: Arc<dyn rpc::Context>,
    mut updates: rpc::UpdateSink<ClientStatusInfo>,
) -> Result<rpc::Nil, rpc::RpcError> {
    let mut events = client.bootstrap_events();

    // Send the _current_ status, no matter what.
    // (We do this after constructing er)
    updates.send(client.bootstrap_status().into()).await?;

    // Send additional updates whenever the status changes.
    while let Some(status) = events.next().await {
        updates.send(status.into()).await?;
    }

    // This can only happen if the client exits.
    Ok(rpc::NIL)
}

/// Create a new isolated client instance.
///
/// Returned ObjectID is a handle for a new `TorClient`,
/// which is isolated from other `TorClients`:
/// any streams created with the new `TorClient` will not share circuits
/// with streams created with any other `TorClient`.
#[derive(Deftly, Debug, Serialize, Deserialize)]
#[derive_deftly(rpc::DynMethod)]
#[deftly(rpc(method_name = "arti:new_isolated_client"))]
#[non_exhaustive]
pub struct IsolatedClient {}

impl rpc::RpcMethod for IsolatedClient {
    type Output = rpc::SingleIdResponse;
    type Update = rpc::NoUpdates;
}

/// RPC method implementation: return a new isolated client based on a given client.
async fn isolated_client<R: Runtime>(
    client: Arc<TorClient<R>>,
    _method: Box<IsolatedClient>,
    ctx: Arc<dyn rpc::Context>,
) -> Result<rpc::SingleIdResponse, rpc::RpcError> {
    let new_client = Arc::new(client.isolated_client());
    let client_id = ctx.register_owned(new_client);
    Ok(rpc::SingleIdResponse::from(client_id))
}

/// Type-erased error returned by ClientConnectionTarget.
//
// TODO RPC: It would be handy if this implemented HasErrorHint, but HasErrorHint is sealed.
// Perhaps we could go and solve our problem by implementing HasErrorHint on dyn StdError?
pub trait ClientConnectionError:
    std::error::Error + tor_error::HasKind + DynClone + Send + Sync + seal::Sealed
{
}
impl<E> seal::Sealed for E where E: std::error::Error + tor_error::HasKind + DynClone + Send + Sync {}
impl<E> ClientConnectionError for E where
    E: std::error::Error + tor_error::HasKind + DynClone + Send + Sync + seal::Sealed
{
}
impl std::error::Error for Box<dyn ClientConnectionError> {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        self.as_ref().source()
    }
}
impl tor_error::HasKind for Box<dyn ClientConnectionError> {
    fn kind(&self) -> tor_error::ErrorKind {
        self.as_ref().kind()
    }
}
dyn_clone::clone_trait_object!(ClientConnectionError);

/// module to seal the ClientConnectionError trait.
mod seal {
    /// hidden trait to seal the ClientConnectionError trait.
    #[allow(unreachable_pub)]
    pub trait Sealed {}
}

/// Type alias for a Result return by ClientConnectionTarget
pub type ClientConnectionResult<T> = Result<T, Box<dyn ClientConnectionError>>;

/// RPC special method: make a connection to a chosen address and preferences.
///
/// This method has no method name, and is not invoked by an RPC session
/// directly.  Instead, it is invoked in response to a SOCKS request.
/// It receives its target from the SOCKS `DEST` field.
/// The isolation information in its `SrreamPrefs`, if any, is taken from
/// the SOCKS username/password.
/// Other information in the `StreamPrefs` is inferred
/// from the SOCKS port configuration in the usual way.
///
/// When this method returns successfully,
/// the proxy code sends a SOCKS reply indicating success,
/// and links the returned `DataStream` with the application's incoming socket,
/// copying data back and forth.
/// (The `DataStream`` need not actually be connected at this point;
/// an in-progress connection will work fine.
/// Tor calls such streams, which report readiness before receiving a CONNECTED,
/// "optimistic".)
///
/// If instead this method returns an error,
/// the error is either used to generate a SOCKS error code,
///
/// Note 1: in the future, this method will likely be used to integrate RPC data streams
/// with other proxy types other than SOCKS.
/// When this happens, we will specify how those proxy types
/// will provide `target` and `prefs`.
///
/// Note 2: This has to be a special method, because
/// it needs to return a DataStream, which can't be serialized.
///
/// > TODO RPC: The above documentation still isn't quite specific enough,
/// > and a lot of it belongs in socks.rs where it could explain how a SOCKS request
/// > is interpreted and converted into a ConnectWithPrefs call.
/// > See <https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2373#note_3071833>
/// > for discussion.
#[derive(Deftly, Debug)]
#[derive_deftly(rpc::DynMethod)]
#[deftly(rpc(no_method_name))]
#[allow(clippy::exhaustive_structs)]
pub struct ConnectWithPrefs {
    /// The target address
    pub target: TorAddr,
    /// The stream preferences implied by the SOCKS connect request.
    pub prefs: StreamPrefs,
}
impl rpc::Method for ConnectWithPrefs {
    // TODO RPC: I am not sure that this is the error type we truly want.
    type Output = Result<DataStream, Box<dyn ClientConnectionError>>;
    type Update = rpc::NoUpdates;
}

/// RPC special method: lookup an address with a chosen address and preferences.
///
/// This method has no method name, and is not invoked by an RPC connection
/// directly.  Instead, it is invoked in response to a SOCKS request.
//
// TODO RPC: We _could_ give this a method name so that it can be invoked as an RPC method, and
// maybe we should.  First, however, we would need to make `StreamPrefs` an RPC-visible serializable
// type, or replace it with an equivalent.
#[derive(Deftly, Debug)]
#[derive_deftly(rpc::DynMethod)]
#[deftly(rpc(no_method_name))]
#[allow(clippy::exhaustive_structs)]
pub struct ResolveWithPrefs {
    /// The hostname to resolve.
    pub hostname: String,
    /// The stream preferences implied by the SOCKS resolve request.
    pub prefs: StreamPrefs,
}
impl rpc::Method for ResolveWithPrefs {
    // TODO RPC: I am not sure that this is the error type we truly want.
    type Output = Result<Vec<IpAddr>, Box<dyn ClientConnectionError>>;
    type Update = rpc::NoUpdates;
}

/// RPC special method: reverse-lookup an address with a chosen address and preferences.
///
/// This method has no method name, and is not invoked by an RPC connection
/// directly.  Instead, it is invoked in response to a SOCKS request.
//
// TODO RPC: We _could_ give this a method name so that it can be invoked as an RPC method, and
// maybe we should.  First, however, we would need to make `StreamPrefs` an RPC-visible serializable
// type, or replace it with an equivalent.
#[derive(Deftly, Debug)]
#[derive_deftly(rpc::DynMethod)]
#[deftly(rpc(no_method_name))]
#[allow(clippy::exhaustive_structs)]
pub struct ResolvePtrWithPrefs {
    /// The address to resolve.
    pub addr: IpAddr,
    /// The stream preferences implied by the SOCKS resolve request.
    pub prefs: StreamPrefs,
}
impl rpc::Method for ResolvePtrWithPrefs {
    // TODO RPC: I am not sure that this is the error type we truly want.
    type Output = Result<Vec<String>, Box<dyn ClientConnectionError>>;
    type Update = rpc::NoUpdates;
}

/// RPC method implementation: start a connection on a `TorClient`.
async fn client_connect_with_prefs<R: Runtime>(
    client: Arc<TorClient<R>>,
    method: Box<ConnectWithPrefs>,
    _ctx: Arc<dyn rpc::Context>,
) -> Result<DataStream, Box<dyn ClientConnectionError>> {
    TorClient::connect_with_prefs(client.as_ref(), &method.target, &method.prefs)
        .await
        .map_err(|e| Box::new(e) as _)
}

/// RPC method implementation: perform a remote DNS lookup using a `TorClient`.
async fn client_resolve_with_prefs<R: Runtime>(
    client: Arc<TorClient<R>>,
    method: Box<ResolveWithPrefs>,
    _ctx: Arc<dyn rpc::Context>,
) -> Result<Vec<IpAddr>, Box<dyn ClientConnectionError>> {
    TorClient::resolve_with_prefs(client.as_ref(), &method.hostname, &method.prefs)
        .await
        .map_err(|e| Box::new(e) as _)
}

/// RPC method implementation: perform a remote DNS reverse lookup using a `TorClient`.
async fn client_resolve_ptr_with_prefs<R: Runtime>(
    client: Arc<TorClient<R>>,
    method: Box<ResolvePtrWithPrefs>,
    _ctx: Arc<dyn rpc::Context>,
) -> Result<Vec<String>, Box<dyn ClientConnectionError>> {
    TorClient::resolve_ptr_with_prefs(client.as_ref(), method.addr, &method.prefs)
        .await
        .map_err(|e| Box::new(e) as _)
}