arti_client/status.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
//! Code to collect and publish information about a client's bootstrapping
//! status.
use std::{borrow::Cow, fmt, fmt::Display, time::SystemTime};
use educe::Educe;
use futures::{Stream, StreamExt};
use tor_basic_utils::skip_fmt;
use tor_chanmgr::{ConnBlockage, ConnStatus, ConnStatusEvents};
use tor_circmgr::{ClockSkewEvents, SkewEstimate};
use tor_dirmgr::{DirBlockage, DirBootstrapStatus};
use tracing::debug;
/// Information about how ready a [`crate::TorClient`] is to handle requests.
///
/// Note that this status does not change monotonically: a `TorClient` can
/// become more _or less_ bootstrapped over time. (For example, a client can
/// become less bootstrapped if it loses its internet connectivity, or if its
/// directory information expires before it's able to replace it.)
//
// # Note
//
// We need to keep this type fairly small, since it will get cloned whenever
// it's observed on a stream. If it grows large, we can add an Arc<> around
// its data.
#[derive(Debug, Clone, Default)]
pub struct BootstrapStatus {
/// Status for our connection to the tor network
conn_status: ConnStatus,
/// Status for our directory information.
dir_status: DirBootstrapStatus,
/// Current estimate of our clock skew.
skew: Option<SkewEstimate>,
}
impl BootstrapStatus {
/// Return a rough fraction (from 0.0 to 1.0) representing how far along
/// the client's bootstrapping efforts are.
///
/// 0 is defined as "just started"; 1 is defined as "ready to use."
pub fn as_frac(&self) -> f32 {
// Coefficients chosen arbitrarily.
self.conn_status.frac() * 0.15 + self.dir_status.frac_at(SystemTime::now()) * 0.85
}
/// Return true if the status indicates that the client is ready for
/// traffic.
///
/// For the purposes of this function, the client is "ready for traffic" if,
/// as far as we know, we can start acting on a new client request immediately.
pub fn ready_for_traffic(&self) -> bool {
let now = SystemTime::now();
self.conn_status.usable() && self.dir_status.usable_at(now)
}
/// If the client is unable to make forward progress for some reason, return
/// that reason.
///
/// (Returns None if the client doesn't seem to be stuck.)
///
/// # Caveats
///
/// This function provides a "best effort" diagnostic: there
/// will always be some blockage types that it can't diagnose
/// correctly. It may declare that Arti is stuck for reasons that
/// are incorrect; or it may declare that the client is not stuck
/// when in fact no progress is being made.
///
/// Therefore, the caller should always use a certain amount of
/// modesty when reporting these values to the user. For example,
/// it's probably better to say "Arti says it's stuck because it
/// can't make connections to the internet" rather than "You are
/// not on the internet."
pub fn blocked(&self) -> Option<Blockage> {
if let Some(b) = self.conn_status.blockage() {
let message = b.to_string().into();
let kind = b.into();
if matches!(kind, BlockageKind::ClockSkewed) && self.skew_is_noteworthy() {
Some(Blockage {
kind,
message: format!("Clock is {}", self.skew.as_ref().expect("logic error"))
.into(),
})
} else {
Some(Blockage { kind, message })
}
} else if let Some(b) = self.dir_status.blockage(SystemTime::now()) {
let message = b.to_string().into();
let kind = b.into();
Some(Blockage { kind, message })
} else {
None
}
}
/// Adjust this status based on new connection-status information.
fn apply_conn_status(&mut self, status: ConnStatus) {
self.conn_status = status;
}
/// Adjust this status based on new directory-status information.
fn apply_dir_status(&mut self, status: DirBootstrapStatus) {
self.dir_status = status;
}
/// Adjust this status based on new estimated clock skew information.
fn apply_skew_estimate(&mut self, status: Option<SkewEstimate>) {
self.skew = status;
}
/// Return true if our current clock skew estimate is considered noteworthy.
fn skew_is_noteworthy(&self) -> bool {
matches!(&self.skew, Some(s) if s.noteworthy())
}
}
/// A reason why a client believes it is stuck.
#[derive(Clone, Debug, derive_more::Display)]
#[display("{} ({})", kind, message)]
pub struct Blockage {
/// Why do we think we're blocked?
kind: BlockageKind,
/// A human-readable message about the blockage.
message: Cow<'static, str>,
}
impl Blockage {
/// Get a programmatic indication of the kind of blockage this is.
pub fn kind(&self) -> BlockageKind {
self.kind.clone()
}
/// Get a human-readable message about the blockage.
pub fn message(&self) -> impl Display + '_ {
&self.message
}
}
/// A specific type of blockage that a client believes it is experiencing.
///
/// Used to distinguish among instances of [`Blockage`].
#[derive(Clone, Debug, derive_more::Display)]
#[non_exhaustive]
pub enum BlockageKind {
/// There is some kind of problem with connecting to the network.
#[display("We seem to be offline")]
Offline,
/// We can connect, but our connections seem to be filtered.
#[display("Our internet connection seems filtered")]
Filtering,
/// We have some other kind of problem connecting to Tor
#[display("Can't reach the Tor network")]
CantReachTor,
/// We believe our clock is set incorrectly, and that's preventing us from
/// successfully with relays and/or from finding a directory that we trust.
#[display("Clock is skewed.")]
ClockSkewed,
/// We've encountered some kind of problem downloading directory
/// information, and it doesn't seem to be caused by any particular
/// connection problem.
#[display("Can't bootstrap a Tor directory.")]
CantBootstrap,
}
impl From<ConnBlockage> for BlockageKind {
fn from(b: ConnBlockage) -> BlockageKind {
match b {
ConnBlockage::NoTcp => BlockageKind::Offline,
ConnBlockage::NoHandshake => BlockageKind::Filtering,
ConnBlockage::CertsExpired => BlockageKind::ClockSkewed,
_ => BlockageKind::CantReachTor,
}
}
}
impl From<DirBlockage> for BlockageKind {
fn from(_: DirBlockage) -> Self {
BlockageKind::CantBootstrap
}
}
impl fmt::Display for BootstrapStatus {
/// Format this [`BootstrapStatus`].
///
/// Note that the string returned by this function is designed for human
/// readability, not for machine parsing. Other code *should not* depend
/// on particular elements of this string.
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let percent = (self.as_frac() * 100.0).round() as u32;
if let Some(problem) = self.blocked() {
write!(f, "Stuck at {}%: {}", percent, problem)?;
} else {
write!(
f,
"{}%: {}; {}",
percent, &self.conn_status, &self.dir_status
)?;
}
if let Some(skew) = &self.skew {
if skew.noteworthy() {
write!(f, ". Clock is {}", skew)?;
}
}
Ok(())
}
}
/// Task that runs forever, updating a client's status via the provided
/// `sender`.
///
/// TODO(nickm): Eventually this will use real stream of events to see when we
/// are bootstrapped or not. For now, it just says that we're not-ready until
/// the given Receiver fires.
///
/// TODO(nickm): This should eventually close the stream when the client is
/// dropped.
pub(crate) async fn report_status(
mut sender: postage::watch::Sender<BootstrapStatus>,
conn_status: ConnStatusEvents,
dir_status: impl Stream<Item = DirBootstrapStatus> + Send + Unpin,
skew_status: ClockSkewEvents,
) {
/// Internal enumeration to combine incoming status changes.
#[allow(clippy::large_enum_variant)]
enum Event {
/// A connection status change
Conn(ConnStatus),
/// A directory status change
Dir(DirBootstrapStatus),
/// A clock skew change
Skew(Option<SkewEstimate>),
}
let mut stream = futures::stream::select_all(vec![
conn_status.map(Event::Conn).boxed(),
dir_status.map(Event::Dir).boxed(),
skew_status.map(Event::Skew).boxed(),
]);
while let Some(event) = stream.next().await {
let mut b = sender.borrow_mut();
match event {
Event::Conn(e) => b.apply_conn_status(e),
Event::Dir(e) => b.apply_dir_status(e),
Event::Skew(e) => b.apply_skew_estimate(e),
}
debug!("{}", *b);
}
}
/// A [`Stream`] of [`BootstrapStatus`] events.
///
/// This stream isn't guaranteed to receive every change in bootstrap status; if
/// changes happen more frequently than the receiver can observe, some of them
/// will be dropped.
//
// Note: We use a wrapper type around watch::Receiver here, in order to hide its
// implementation type. We do that because we might want to change the type in
// the future, and because some of the functionality exposed by Receiver (like
// `borrow()` and the postage::Stream trait) are extraneous to the API we want.
#[derive(Clone, Educe)]
#[educe(Debug)]
pub struct BootstrapEvents {
/// The receiver that implements this stream.
#[educe(Debug(method = "skip_fmt"))]
pub(crate) inner: postage::watch::Receiver<BootstrapStatus>,
}
impl Stream for BootstrapEvents {
type Item = BootstrapStatus;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}