webrtc_ice/agent/
mod.rs

1#[cfg(test)]
2mod agent_gather_test;
3#[cfg(test)]
4mod agent_test;
5#[cfg(test)]
6mod agent_transport_test;
7#[cfg(test)]
8pub(crate) mod agent_vnet_test;
9
10pub mod agent_config;
11pub mod agent_gather;
12pub(crate) mod agent_internal;
13pub mod agent_selector;
14pub mod agent_stats;
15pub mod agent_transport;
16
17use std::collections::HashMap;
18use std::future::Future;
19use std::net::{Ipv4Addr, SocketAddr};
20use std::pin::Pin;
21use std::sync::atomic::Ordering;
22use std::sync::Arc;
23use std::time::SystemTime;
24
25use agent_config::*;
26use agent_internal::*;
27use agent_stats::*;
28use mdns::conn::*;
29use portable_atomic::{AtomicU8, AtomicUsize};
30use stun::agent::*;
31use stun::attributes::*;
32use stun::fingerprint::*;
33use stun::integrity::*;
34use stun::message::*;
35use stun::xoraddr::*;
36use tokio::sync::{broadcast, mpsc, Mutex};
37use tokio::time::{Duration, Instant};
38use util::vnet::net::*;
39use util::Buffer;
40
41use crate::agent::agent_gather::GatherCandidatesInternalParams;
42use crate::candidate::*;
43use crate::error::*;
44use crate::external_ip_mapper::*;
45use crate::mdns::*;
46use crate::network_type::*;
47use crate::rand::*;
48use crate::state::*;
49use crate::tcp_type::TcpType;
50use crate::udp_mux::UDPMux;
51use crate::udp_network::UDPNetwork;
52use crate::url::*;
53
54#[derive(Debug, Clone)]
55pub(crate) struct BindingRequest {
56    pub(crate) timestamp: Instant,
57    pub(crate) transaction_id: TransactionId,
58    pub(crate) destination: SocketAddr,
59    pub(crate) is_use_candidate: bool,
60}
61
62impl Default for BindingRequest {
63    fn default() -> Self {
64        Self {
65            timestamp: Instant::now(),
66            transaction_id: TransactionId::default(),
67            destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
68            is_use_candidate: false,
69        }
70    }
71}
72
73pub type OnConnectionStateChangeHdlrFn = Box<
74    dyn (FnMut(ConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
75        + Send
76        + Sync,
77>;
78pub type OnSelectedCandidatePairChangeHdlrFn = Box<
79    dyn (FnMut(
80            &Arc<dyn Candidate + Send + Sync>,
81            &Arc<dyn Candidate + Send + Sync>,
82        ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
83        + Send
84        + Sync,
85>;
86pub type OnCandidateHdlrFn = Box<
87    dyn (FnMut(
88            Option<Arc<dyn Candidate + Send + Sync>>,
89        ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
90        + Send
91        + Sync,
92>;
93pub type GatherCandidateCancelFn = Box<dyn Fn() + Send + Sync>;
94
95struct ChanReceivers {
96    chan_state_rx: mpsc::Receiver<ConnectionState>,
97    chan_candidate_rx: mpsc::Receiver<Option<Arc<dyn Candidate + Send + Sync>>>,
98    chan_candidate_pair_rx: mpsc::Receiver<()>,
99}
100
101/// Represents the ICE agent.
102pub struct Agent {
103    pub(crate) internal: Arc<AgentInternal>,
104
105    pub(crate) udp_network: UDPNetwork,
106    pub(crate) interface_filter: Arc<Option<InterfaceFilterFn>>,
107    pub(crate) include_loopback: bool,
108    pub(crate) ip_filter: Arc<Option<IpFilterFn>>,
109    pub(crate) mdns_mode: MulticastDnsMode,
110    pub(crate) mdns_name: String,
111    pub(crate) mdns_conn: Option<Arc<DnsConn>>,
112    pub(crate) net: Arc<Net>,
113
114    // 1:1 D-NAT IP address mapping
115    pub(crate) ext_ip_mapper: Arc<Option<ExternalIpMapper>>,
116    pub(crate) gathering_state: Arc<AtomicU8>, //GatheringState,
117    pub(crate) candidate_types: Vec<CandidateType>,
118    pub(crate) urls: Vec<Url>,
119    pub(crate) network_types: Vec<NetworkType>,
120
121    pub(crate) gather_candidate_cancel: Option<GatherCandidateCancelFn>,
122}
123
124impl Agent {
125    /// Creates a new Agent.
126    pub async fn new(config: AgentConfig) -> Result<Self> {
127        let mut mdns_name = config.multicast_dns_host_name.clone();
128        if mdns_name.is_empty() {
129            mdns_name = generate_multicast_dns_name();
130        }
131
132        if !mdns_name.ends_with(".local") || mdns_name.split('.').count() != 2 {
133            return Err(Error::ErrInvalidMulticastDnshostName);
134        }
135
136        let mdns_mode = config.multicast_dns_mode;
137
138        let mdns_conn =
139            match create_multicast_dns(mdns_mode, &mdns_name, &config.multicast_dns_dest_addr) {
140                Ok(c) => c,
141                Err(err) => {
142                    // Opportunistic mDNS: If we can't open the connection, that's ok: we
143                    // can continue without it.
144                    log::warn!("Failed to initialize mDNS {}: {}", mdns_name, err);
145                    None
146                }
147            };
148
149        let (mut ai, chan_receivers) = AgentInternal::new(&config);
150        let (chan_state_rx, chan_candidate_rx, chan_candidate_pair_rx) = (
151            chan_receivers.chan_state_rx,
152            chan_receivers.chan_candidate_rx,
153            chan_receivers.chan_candidate_pair_rx,
154        );
155
156        config.init_with_defaults(&mut ai);
157
158        let candidate_types = if config.candidate_types.is_empty() {
159            default_candidate_types()
160        } else {
161            config.candidate_types.clone()
162        };
163
164        if ai.lite.load(Ordering::SeqCst)
165            && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host)
166        {
167            Self::close_multicast_conn(&mdns_conn).await;
168            return Err(Error::ErrLiteUsingNonHostCandidates);
169        }
170
171        if !config.urls.is_empty()
172            && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types)
173            && !contains_candidate_type(CandidateType::Relay, &candidate_types)
174        {
175            Self::close_multicast_conn(&mdns_conn).await;
176            return Err(Error::ErrUselessUrlsProvided);
177        }
178
179        let ext_ip_mapper = match config.init_ext_ip_mapping(mdns_mode, &candidate_types) {
180            Ok(ext_ip_mapper) => ext_ip_mapper,
181            Err(err) => {
182                Self::close_multicast_conn(&mdns_conn).await;
183                return Err(err);
184            }
185        };
186
187        let net = if let Some(net) = config.net {
188            if net.is_virtual() {
189                log::warn!("vnet is enabled");
190                if mdns_mode != MulticastDnsMode::Disabled {
191                    log::warn!("vnet does not support mDNS yet");
192                }
193            }
194
195            net
196        } else {
197            Arc::new(Net::new(None))
198        };
199
200        let agent = Self {
201            udp_network: config.udp_network,
202            internal: Arc::new(ai),
203            interface_filter: Arc::clone(&config.interface_filter),
204            include_loopback: config.include_loopback,
205            ip_filter: Arc::clone(&config.ip_filter),
206            mdns_mode,
207            mdns_name,
208            mdns_conn,
209            net,
210            ext_ip_mapper: Arc::new(ext_ip_mapper),
211            gathering_state: Arc::new(AtomicU8::new(0)), //GatheringState::New,
212            candidate_types,
213            urls: config.urls.clone(),
214            network_types: config.network_types.clone(),
215
216            gather_candidate_cancel: None, //TODO: add cancel
217        };
218
219        agent.internal.start_on_connection_state_change_routine(
220            chan_state_rx,
221            chan_candidate_rx,
222            chan_candidate_pair_rx,
223        );
224
225        // Restart is also used to initialize the agent for the first time
226        if let Err(err) = agent.restart(config.local_ufrag, config.local_pwd).await {
227            Self::close_multicast_conn(&agent.mdns_conn).await;
228            let _ = agent.close().await;
229            return Err(err);
230        }
231
232        Ok(agent)
233    }
234
235    pub fn get_bytes_received(&self) -> usize {
236        self.internal.agent_conn.bytes_received()
237    }
238
239    pub fn get_bytes_sent(&self) -> usize {
240        self.internal.agent_conn.bytes_sent()
241    }
242
243    /// Sets a handler that is fired when the connection state changes.
244    pub fn on_connection_state_change(&self, f: OnConnectionStateChangeHdlrFn) {
245        self.internal
246            .on_connection_state_change_hdlr
247            .store(Some(Arc::new(Mutex::new(f))))
248    }
249
250    /// Sets a handler that is fired when the final candidate pair is selected.
251    pub fn on_selected_candidate_pair_change(&self, f: OnSelectedCandidatePairChangeHdlrFn) {
252        self.internal
253            .on_selected_candidate_pair_change_hdlr
254            .store(Some(Arc::new(Mutex::new(f))))
255    }
256
257    /// Sets a handler that is fired when new candidates gathered. When the gathering process
258    /// complete the last candidate is nil.
259    pub fn on_candidate(&self, f: OnCandidateHdlrFn) {
260        self.internal
261            .on_candidate_hdlr
262            .store(Some(Arc::new(Mutex::new(f))));
263    }
264
265    /// Adds a new remote candidate.
266    pub fn add_remote_candidate(&self, c: &Arc<dyn Candidate + Send + Sync>) -> Result<()> {
267        // cannot check for network yet because it might not be applied
268        // when mDNS hostame is used.
269        if c.tcp_type() == TcpType::Active {
270            // TCP Candidates with tcptype active will probe server passive ones, so
271            // no need to do anything with them.
272            log::info!("Ignoring remote candidate with tcpType active: {}", c);
273            return Ok(());
274        }
275
276        // If we have a mDNS Candidate lets fully resolve it before adding it locally
277        if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") {
278            if self.mdns_mode == MulticastDnsMode::Disabled {
279                log::warn!(
280                    "remote mDNS candidate added, but mDNS is disabled: ({})",
281                    c.address()
282                );
283                return Ok(());
284            }
285
286            if c.candidate_type() != CandidateType::Host {
287                return Err(Error::ErrAddressParseFailed);
288            }
289
290            let ai = Arc::clone(&self.internal);
291            let host_candidate = Arc::clone(c);
292            let mdns_conn = self.mdns_conn.clone();
293            tokio::spawn(async move {
294                if let Some(mdns_conn) = mdns_conn {
295                    if let Ok(candidate) =
296                        Self::resolve_and_add_multicast_candidate(mdns_conn, host_candidate).await
297                    {
298                        ai.add_remote_candidate(&candidate).await;
299                    }
300                }
301            });
302        } else {
303            let ai = Arc::clone(&self.internal);
304            let candidate = Arc::clone(c);
305            tokio::spawn(async move {
306                ai.add_remote_candidate(&candidate).await;
307            });
308        }
309
310        Ok(())
311    }
312
313    /// Returns the local candidates.
314    pub async fn get_local_candidates(&self) -> Result<Vec<Arc<dyn Candidate + Send + Sync>>> {
315        let mut res = vec![];
316
317        {
318            let local_candidates = self.internal.local_candidates.lock().await;
319            for candidates in local_candidates.values() {
320                for candidate in candidates {
321                    res.push(Arc::clone(candidate));
322                }
323            }
324        }
325
326        Ok(res)
327    }
328
329    /// Returns the local user credentials.
330    pub async fn get_local_user_credentials(&self) -> (String, String) {
331        let ufrag_pwd = self.internal.ufrag_pwd.lock().await;
332        (ufrag_pwd.local_ufrag.clone(), ufrag_pwd.local_pwd.clone())
333    }
334
335    /// Returns the remote user credentials.
336    pub async fn get_remote_user_credentials(&self) -> (String, String) {
337        let ufrag_pwd = self.internal.ufrag_pwd.lock().await;
338        (ufrag_pwd.remote_ufrag.clone(), ufrag_pwd.remote_pwd.clone())
339    }
340
341    /// Cleans up the Agent.
342    pub async fn close(&self) -> Result<()> {
343        if let Some(gather_candidate_cancel) = &self.gather_candidate_cancel {
344            gather_candidate_cancel();
345        }
346
347        if let UDPNetwork::Muxed(ref udp_mux) = self.udp_network {
348            let (ufrag, _) = self.get_local_user_credentials().await;
349            udp_mux.remove_conn_by_ufrag(&ufrag).await;
350        }
351
352        Self::close_multicast_conn(&self.mdns_conn).await;
353
354        //FIXME: deadlock here
355        self.internal.close().await
356    }
357
358    /// Returns the selected pair or nil if there is none
359    pub fn get_selected_candidate_pair(&self) -> Option<Arc<CandidatePair>> {
360        self.internal.agent_conn.get_selected_pair()
361    }
362
363    /// Sets the credentials of the remote agent.
364    pub async fn set_remote_credentials(
365        &self,
366        remote_ufrag: String,
367        remote_pwd: String,
368    ) -> Result<()> {
369        self.internal
370            .set_remote_credentials(remote_ufrag, remote_pwd)
371            .await
372    }
373
374    /// Restarts the ICE Agent with the provided ufrag/pwd
375    /// If no ufrag/pwd is provided the Agent will generate one itself.
376    ///
377    /// Restart must only be called when `GatheringState` is `GatheringStateComplete`
378    /// a user must then call `GatherCandidates` explicitly to start generating new ones.
379    pub async fn restart(&self, mut ufrag: String, mut pwd: String) -> Result<()> {
380        if ufrag.is_empty() {
381            ufrag = generate_ufrag();
382        }
383        if pwd.is_empty() {
384            pwd = generate_pwd();
385        }
386
387        if ufrag.len() * 8 < 24 {
388            return Err(Error::ErrLocalUfragInsufficientBits);
389        }
390        if pwd.len() * 8 < 128 {
391            return Err(Error::ErrLocalPwdInsufficientBits);
392        }
393
394        if GatheringState::from(self.gathering_state.load(Ordering::SeqCst))
395            == GatheringState::Gathering
396        {
397            return Err(Error::ErrRestartWhenGathering);
398        }
399        self.gathering_state
400            .store(GatheringState::New as u8, Ordering::SeqCst);
401
402        {
403            let done_tx = self.internal.done_tx.lock().await;
404            if done_tx.is_none() {
405                return Err(Error::ErrClosed);
406            }
407        }
408
409        // Clear all agent needed to take back to fresh state
410        {
411            let mut ufrag_pwd = self.internal.ufrag_pwd.lock().await;
412            ufrag_pwd.local_ufrag = ufrag;
413            ufrag_pwd.local_pwd = pwd;
414            ufrag_pwd.remote_ufrag = String::new();
415            ufrag_pwd.remote_pwd = String::new();
416        }
417        {
418            let mut pending_binding_requests = self.internal.pending_binding_requests.lock().await;
419            *pending_binding_requests = vec![];
420        }
421
422        {
423            let mut checklist = self.internal.agent_conn.checklist.lock().await;
424            *checklist = vec![];
425        }
426
427        self.internal.set_selected_pair(None).await;
428        self.internal.delete_all_candidates().await;
429        self.internal.start().await;
430
431        // Restart is used by NewAgent. Accept/Connect should be used to move to checking
432        // for new Agents
433        if self.internal.connection_state.load(Ordering::SeqCst) != ConnectionState::New as u8 {
434            self.internal
435                .update_connection_state(ConnectionState::Checking)
436                .await;
437        }
438
439        Ok(())
440    }
441
442    /// Initiates the trickle based gathering process.
443    pub fn gather_candidates(&self) -> Result<()> {
444        if self.gathering_state.load(Ordering::SeqCst) != GatheringState::New as u8 {
445            return Err(Error::ErrMultipleGatherAttempted);
446        }
447
448        if self.internal.on_candidate_hdlr.load().is_none() {
449            return Err(Error::ErrNoOnCandidateHandler);
450        }
451
452        if let Some(gather_candidate_cancel) = &self.gather_candidate_cancel {
453            gather_candidate_cancel(); // Cancel previous gathering routine
454        }
455
456        //TODO: a.gatherCandidateCancel = cancel
457
458        let params = GatherCandidatesInternalParams {
459            udp_network: self.udp_network.clone(),
460            candidate_types: self.candidate_types.clone(),
461            urls: self.urls.clone(),
462            network_types: self.network_types.clone(),
463            mdns_mode: self.mdns_mode,
464            mdns_name: self.mdns_name.clone(),
465            net: Arc::clone(&self.net),
466            interface_filter: self.interface_filter.clone(),
467            ip_filter: self.ip_filter.clone(),
468            ext_ip_mapper: Arc::clone(&self.ext_ip_mapper),
469            agent_internal: Arc::clone(&self.internal),
470            gathering_state: Arc::clone(&self.gathering_state),
471            chan_candidate_tx: Arc::clone(&self.internal.chan_candidate_tx),
472            include_loopback: self.include_loopback,
473        };
474        tokio::spawn(async move {
475            Self::gather_candidates_internal(params).await;
476        });
477
478        Ok(())
479    }
480
481    /// Returns a list of candidate pair stats.
482    pub async fn get_candidate_pairs_stats(&self) -> Vec<CandidatePairStats> {
483        self.internal.get_candidate_pairs_stats().await
484    }
485
486    /// Returns a list of local candidates stats.
487    pub async fn get_local_candidates_stats(&self) -> Vec<CandidateStats> {
488        self.internal.get_local_candidates_stats().await
489    }
490
491    /// Returns a list of remote candidates stats.
492    pub async fn get_remote_candidates_stats(&self) -> Vec<CandidateStats> {
493        self.internal.get_remote_candidates_stats().await
494    }
495
496    async fn resolve_and_add_multicast_candidate(
497        mdns_conn: Arc<DnsConn>,
498        c: Arc<dyn Candidate + Send + Sync>,
499    ) -> Result<Arc<dyn Candidate + Send + Sync>> {
500        //TODO: hook up _close_query_signal_tx to Agent or Candidate's Close signal?
501        let (_close_query_signal_tx, close_query_signal_rx) = mpsc::channel(1);
502        let src = match mdns_conn.query(&c.address(), close_query_signal_rx).await {
503            Ok((_, src)) => src,
504            Err(err) => {
505                log::warn!("Failed to discover mDNS candidate {}: {}", c.address(), err);
506                return Err(err.into());
507            }
508        };
509
510        c.set_ip(&src.ip())?;
511
512        Ok(c)
513    }
514
515    async fn close_multicast_conn(mdns_conn: &Option<Arc<DnsConn>>) {
516        if let Some(conn) = mdns_conn {
517            if let Err(err) = conn.close().await {
518                log::warn!("failed to close mDNS Conn: {}", err);
519            }
520        }
521    }
522}