1#[cfg(test)]
2mod agent_gather_test;
3#[cfg(test)]
4mod agent_test;
5#[cfg(test)]
6mod agent_transport_test;
7#[cfg(test)]
8pub(crate) mod agent_vnet_test;
9
10pub mod agent_config;
11pub mod agent_gather;
12pub(crate) mod agent_internal;
13pub mod agent_selector;
14pub mod agent_stats;
15pub mod agent_transport;
16
17use std::collections::HashMap;
18use std::future::Future;
19use std::net::{Ipv4Addr, SocketAddr};
20use std::pin::Pin;
21use std::sync::atomic::Ordering;
22use std::sync::Arc;
23use std::time::SystemTime;
24
25use agent_config::*;
26use agent_internal::*;
27use agent_stats::*;
28use mdns::conn::*;
29use portable_atomic::{AtomicU8, AtomicUsize};
30use stun::agent::*;
31use stun::attributes::*;
32use stun::fingerprint::*;
33use stun::integrity::*;
34use stun::message::*;
35use stun::xoraddr::*;
36use tokio::sync::{broadcast, mpsc, Mutex};
37use tokio::time::{Duration, Instant};
38use util::vnet::net::*;
39use util::Buffer;
40
41use crate::agent::agent_gather::GatherCandidatesInternalParams;
42use crate::candidate::*;
43use crate::error::*;
44use crate::external_ip_mapper::*;
45use crate::mdns::*;
46use crate::network_type::*;
47use crate::rand::*;
48use crate::state::*;
49use crate::tcp_type::TcpType;
50use crate::udp_mux::UDPMux;
51use crate::udp_network::UDPNetwork;
52use crate::url::*;
53
54#[derive(Debug, Clone)]
55pub(crate) struct BindingRequest {
56 pub(crate) timestamp: Instant,
57 pub(crate) transaction_id: TransactionId,
58 pub(crate) destination: SocketAddr,
59 pub(crate) is_use_candidate: bool,
60}
61
62impl Default for BindingRequest {
63 fn default() -> Self {
64 Self {
65 timestamp: Instant::now(),
66 transaction_id: TransactionId::default(),
67 destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
68 is_use_candidate: false,
69 }
70 }
71}
72
73pub type OnConnectionStateChangeHdlrFn = Box<
74 dyn (FnMut(ConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
75 + Send
76 + Sync,
77>;
78pub type OnSelectedCandidatePairChangeHdlrFn = Box<
79 dyn (FnMut(
80 &Arc<dyn Candidate + Send + Sync>,
81 &Arc<dyn Candidate + Send + Sync>,
82 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
83 + Send
84 + Sync,
85>;
86pub type OnCandidateHdlrFn = Box<
87 dyn (FnMut(
88 Option<Arc<dyn Candidate + Send + Sync>>,
89 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
90 + Send
91 + Sync,
92>;
93pub type GatherCandidateCancelFn = Box<dyn Fn() + Send + Sync>;
94
95struct ChanReceivers {
96 chan_state_rx: mpsc::Receiver<ConnectionState>,
97 chan_candidate_rx: mpsc::Receiver<Option<Arc<dyn Candidate + Send + Sync>>>,
98 chan_candidate_pair_rx: mpsc::Receiver<()>,
99}
100
101pub struct Agent {
103 pub(crate) internal: Arc<AgentInternal>,
104
105 pub(crate) udp_network: UDPNetwork,
106 pub(crate) interface_filter: Arc<Option<InterfaceFilterFn>>,
107 pub(crate) include_loopback: bool,
108 pub(crate) ip_filter: Arc<Option<IpFilterFn>>,
109 pub(crate) mdns_mode: MulticastDnsMode,
110 pub(crate) mdns_name: String,
111 pub(crate) mdns_conn: Option<Arc<DnsConn>>,
112 pub(crate) net: Arc<Net>,
113
114 pub(crate) ext_ip_mapper: Arc<Option<ExternalIpMapper>>,
116 pub(crate) gathering_state: Arc<AtomicU8>, pub(crate) candidate_types: Vec<CandidateType>,
118 pub(crate) urls: Vec<Url>,
119 pub(crate) network_types: Vec<NetworkType>,
120
121 pub(crate) gather_candidate_cancel: Option<GatherCandidateCancelFn>,
122}
123
124impl Agent {
125 pub async fn new(config: AgentConfig) -> Result<Self> {
127 let mut mdns_name = config.multicast_dns_host_name.clone();
128 if mdns_name.is_empty() {
129 mdns_name = generate_multicast_dns_name();
130 }
131
132 if !mdns_name.ends_with(".local") || mdns_name.split('.').count() != 2 {
133 return Err(Error::ErrInvalidMulticastDnshostName);
134 }
135
136 let mdns_mode = config.multicast_dns_mode;
137
138 let mdns_conn =
139 match create_multicast_dns(mdns_mode, &mdns_name, &config.multicast_dns_dest_addr) {
140 Ok(c) => c,
141 Err(err) => {
142 log::warn!("Failed to initialize mDNS {}: {}", mdns_name, err);
145 None
146 }
147 };
148
149 let (mut ai, chan_receivers) = AgentInternal::new(&config);
150 let (chan_state_rx, chan_candidate_rx, chan_candidate_pair_rx) = (
151 chan_receivers.chan_state_rx,
152 chan_receivers.chan_candidate_rx,
153 chan_receivers.chan_candidate_pair_rx,
154 );
155
156 config.init_with_defaults(&mut ai);
157
158 let candidate_types = if config.candidate_types.is_empty() {
159 default_candidate_types()
160 } else {
161 config.candidate_types.clone()
162 };
163
164 if ai.lite.load(Ordering::SeqCst)
165 && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host)
166 {
167 Self::close_multicast_conn(&mdns_conn).await;
168 return Err(Error::ErrLiteUsingNonHostCandidates);
169 }
170
171 if !config.urls.is_empty()
172 && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types)
173 && !contains_candidate_type(CandidateType::Relay, &candidate_types)
174 {
175 Self::close_multicast_conn(&mdns_conn).await;
176 return Err(Error::ErrUselessUrlsProvided);
177 }
178
179 let ext_ip_mapper = match config.init_ext_ip_mapping(mdns_mode, &candidate_types) {
180 Ok(ext_ip_mapper) => ext_ip_mapper,
181 Err(err) => {
182 Self::close_multicast_conn(&mdns_conn).await;
183 return Err(err);
184 }
185 };
186
187 let net = if let Some(net) = config.net {
188 if net.is_virtual() {
189 log::warn!("vnet is enabled");
190 if mdns_mode != MulticastDnsMode::Disabled {
191 log::warn!("vnet does not support mDNS yet");
192 }
193 }
194
195 net
196 } else {
197 Arc::new(Net::new(None))
198 };
199
200 let agent = Self {
201 udp_network: config.udp_network,
202 internal: Arc::new(ai),
203 interface_filter: Arc::clone(&config.interface_filter),
204 include_loopback: config.include_loopback,
205 ip_filter: Arc::clone(&config.ip_filter),
206 mdns_mode,
207 mdns_name,
208 mdns_conn,
209 net,
210 ext_ip_mapper: Arc::new(ext_ip_mapper),
211 gathering_state: Arc::new(AtomicU8::new(0)), candidate_types,
213 urls: config.urls.clone(),
214 network_types: config.network_types.clone(),
215
216 gather_candidate_cancel: None, };
218
219 agent.internal.start_on_connection_state_change_routine(
220 chan_state_rx,
221 chan_candidate_rx,
222 chan_candidate_pair_rx,
223 );
224
225 if let Err(err) = agent.restart(config.local_ufrag, config.local_pwd).await {
227 Self::close_multicast_conn(&agent.mdns_conn).await;
228 let _ = agent.close().await;
229 return Err(err);
230 }
231
232 Ok(agent)
233 }
234
235 pub fn get_bytes_received(&self) -> usize {
236 self.internal.agent_conn.bytes_received()
237 }
238
239 pub fn get_bytes_sent(&self) -> usize {
240 self.internal.agent_conn.bytes_sent()
241 }
242
243 pub fn on_connection_state_change(&self, f: OnConnectionStateChangeHdlrFn) {
245 self.internal
246 .on_connection_state_change_hdlr
247 .store(Some(Arc::new(Mutex::new(f))))
248 }
249
250 pub fn on_selected_candidate_pair_change(&self, f: OnSelectedCandidatePairChangeHdlrFn) {
252 self.internal
253 .on_selected_candidate_pair_change_hdlr
254 .store(Some(Arc::new(Mutex::new(f))))
255 }
256
257 pub fn on_candidate(&self, f: OnCandidateHdlrFn) {
260 self.internal
261 .on_candidate_hdlr
262 .store(Some(Arc::new(Mutex::new(f))));
263 }
264
265 pub fn add_remote_candidate(&self, c: &Arc<dyn Candidate + Send + Sync>) -> Result<()> {
267 if c.tcp_type() == TcpType::Active {
270 log::info!("Ignoring remote candidate with tcpType active: {}", c);
273 return Ok(());
274 }
275
276 if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") {
278 if self.mdns_mode == MulticastDnsMode::Disabled {
279 log::warn!(
280 "remote mDNS candidate added, but mDNS is disabled: ({})",
281 c.address()
282 );
283 return Ok(());
284 }
285
286 if c.candidate_type() != CandidateType::Host {
287 return Err(Error::ErrAddressParseFailed);
288 }
289
290 let ai = Arc::clone(&self.internal);
291 let host_candidate = Arc::clone(c);
292 let mdns_conn = self.mdns_conn.clone();
293 tokio::spawn(async move {
294 if let Some(mdns_conn) = mdns_conn {
295 if let Ok(candidate) =
296 Self::resolve_and_add_multicast_candidate(mdns_conn, host_candidate).await
297 {
298 ai.add_remote_candidate(&candidate).await;
299 }
300 }
301 });
302 } else {
303 let ai = Arc::clone(&self.internal);
304 let candidate = Arc::clone(c);
305 tokio::spawn(async move {
306 ai.add_remote_candidate(&candidate).await;
307 });
308 }
309
310 Ok(())
311 }
312
313 pub async fn get_local_candidates(&self) -> Result<Vec<Arc<dyn Candidate + Send + Sync>>> {
315 let mut res = vec![];
316
317 {
318 let local_candidates = self.internal.local_candidates.lock().await;
319 for candidates in local_candidates.values() {
320 for candidate in candidates {
321 res.push(Arc::clone(candidate));
322 }
323 }
324 }
325
326 Ok(res)
327 }
328
329 pub async fn get_local_user_credentials(&self) -> (String, String) {
331 let ufrag_pwd = self.internal.ufrag_pwd.lock().await;
332 (ufrag_pwd.local_ufrag.clone(), ufrag_pwd.local_pwd.clone())
333 }
334
335 pub async fn get_remote_user_credentials(&self) -> (String, String) {
337 let ufrag_pwd = self.internal.ufrag_pwd.lock().await;
338 (ufrag_pwd.remote_ufrag.clone(), ufrag_pwd.remote_pwd.clone())
339 }
340
341 pub async fn close(&self) -> Result<()> {
343 if let Some(gather_candidate_cancel) = &self.gather_candidate_cancel {
344 gather_candidate_cancel();
345 }
346
347 if let UDPNetwork::Muxed(ref udp_mux) = self.udp_network {
348 let (ufrag, _) = self.get_local_user_credentials().await;
349 udp_mux.remove_conn_by_ufrag(&ufrag).await;
350 }
351
352 Self::close_multicast_conn(&self.mdns_conn).await;
353
354 self.internal.close().await
356 }
357
358 pub fn get_selected_candidate_pair(&self) -> Option<Arc<CandidatePair>> {
360 self.internal.agent_conn.get_selected_pair()
361 }
362
363 pub async fn set_remote_credentials(
365 &self,
366 remote_ufrag: String,
367 remote_pwd: String,
368 ) -> Result<()> {
369 self.internal
370 .set_remote_credentials(remote_ufrag, remote_pwd)
371 .await
372 }
373
374 pub async fn restart(&self, mut ufrag: String, mut pwd: String) -> Result<()> {
380 if ufrag.is_empty() {
381 ufrag = generate_ufrag();
382 }
383 if pwd.is_empty() {
384 pwd = generate_pwd();
385 }
386
387 if ufrag.len() * 8 < 24 {
388 return Err(Error::ErrLocalUfragInsufficientBits);
389 }
390 if pwd.len() * 8 < 128 {
391 return Err(Error::ErrLocalPwdInsufficientBits);
392 }
393
394 if GatheringState::from(self.gathering_state.load(Ordering::SeqCst))
395 == GatheringState::Gathering
396 {
397 return Err(Error::ErrRestartWhenGathering);
398 }
399 self.gathering_state
400 .store(GatheringState::New as u8, Ordering::SeqCst);
401
402 {
403 let done_tx = self.internal.done_tx.lock().await;
404 if done_tx.is_none() {
405 return Err(Error::ErrClosed);
406 }
407 }
408
409 {
411 let mut ufrag_pwd = self.internal.ufrag_pwd.lock().await;
412 ufrag_pwd.local_ufrag = ufrag;
413 ufrag_pwd.local_pwd = pwd;
414 ufrag_pwd.remote_ufrag = String::new();
415 ufrag_pwd.remote_pwd = String::new();
416 }
417 {
418 let mut pending_binding_requests = self.internal.pending_binding_requests.lock().await;
419 *pending_binding_requests = vec![];
420 }
421
422 {
423 let mut checklist = self.internal.agent_conn.checklist.lock().await;
424 *checklist = vec![];
425 }
426
427 self.internal.set_selected_pair(None).await;
428 self.internal.delete_all_candidates().await;
429 self.internal.start().await;
430
431 if self.internal.connection_state.load(Ordering::SeqCst) != ConnectionState::New as u8 {
434 self.internal
435 .update_connection_state(ConnectionState::Checking)
436 .await;
437 }
438
439 Ok(())
440 }
441
442 pub fn gather_candidates(&self) -> Result<()> {
444 if self.gathering_state.load(Ordering::SeqCst) != GatheringState::New as u8 {
445 return Err(Error::ErrMultipleGatherAttempted);
446 }
447
448 if self.internal.on_candidate_hdlr.load().is_none() {
449 return Err(Error::ErrNoOnCandidateHandler);
450 }
451
452 if let Some(gather_candidate_cancel) = &self.gather_candidate_cancel {
453 gather_candidate_cancel(); }
455
456 let params = GatherCandidatesInternalParams {
459 udp_network: self.udp_network.clone(),
460 candidate_types: self.candidate_types.clone(),
461 urls: self.urls.clone(),
462 network_types: self.network_types.clone(),
463 mdns_mode: self.mdns_mode,
464 mdns_name: self.mdns_name.clone(),
465 net: Arc::clone(&self.net),
466 interface_filter: self.interface_filter.clone(),
467 ip_filter: self.ip_filter.clone(),
468 ext_ip_mapper: Arc::clone(&self.ext_ip_mapper),
469 agent_internal: Arc::clone(&self.internal),
470 gathering_state: Arc::clone(&self.gathering_state),
471 chan_candidate_tx: Arc::clone(&self.internal.chan_candidate_tx),
472 include_loopback: self.include_loopback,
473 };
474 tokio::spawn(async move {
475 Self::gather_candidates_internal(params).await;
476 });
477
478 Ok(())
479 }
480
481 pub async fn get_candidate_pairs_stats(&self) -> Vec<CandidatePairStats> {
483 self.internal.get_candidate_pairs_stats().await
484 }
485
486 pub async fn get_local_candidates_stats(&self) -> Vec<CandidateStats> {
488 self.internal.get_local_candidates_stats().await
489 }
490
491 pub async fn get_remote_candidates_stats(&self) -> Vec<CandidateStats> {
493 self.internal.get_remote_candidates_stats().await
494 }
495
496 async fn resolve_and_add_multicast_candidate(
497 mdns_conn: Arc<DnsConn>,
498 c: Arc<dyn Candidate + Send + Sync>,
499 ) -> Result<Arc<dyn Candidate + Send + Sync>> {
500 let (_close_query_signal_tx, close_query_signal_rx) = mpsc::channel(1);
502 let src = match mdns_conn.query(&c.address(), close_query_signal_rx).await {
503 Ok((_, src)) => src,
504 Err(err) => {
505 log::warn!("Failed to discover mDNS candidate {}: {}", c.address(), err);
506 return Err(err.into());
507 }
508 };
509
510 c.set_ip(&src.ip())?;
511
512 Ok(c)
513 }
514
515 async fn close_multicast_conn(mdns_conn: &Option<Arc<DnsConn>>) {
516 if let Some(conn) = mdns_conn {
517 if let Err(err) = conn.close().await {
518 log::warn!("failed to close mDNS Conn: {}", err);
519 }
520 }
521 }
522}