webrtc_ice/candidate/
candidate_base.rs

1use std::fmt;
2use std::ops::Add;
3use std::sync::atomic::Ordering;
4use std::sync::Arc;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use async_trait::async_trait;
8use crc::{Crc, CRC_32_ISCSI};
9use portable_atomic::{AtomicU16, AtomicU64, AtomicU8};
10use tokio::sync::{broadcast, Mutex};
11use util::sync::Mutex as SyncMutex;
12
13use super::*;
14use crate::candidate::candidate_host::CandidateHostConfig;
15use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
16use crate::candidate::candidate_relay::CandidateRelayConfig;
17use crate::candidate::candidate_server_reflexive::CandidateServerReflexiveConfig;
18use crate::error::*;
19use crate::util::*;
20
21#[derive(Default)]
22pub struct CandidateBaseConfig {
23    pub candidate_id: String,
24    pub network: String,
25    pub address: String,
26    pub port: u16,
27    pub component: u16,
28    pub priority: u32,
29    pub foundation: String,
30    pub conn: Option<Arc<dyn util::Conn + Send + Sync>>,
31    pub initialized_ch: Option<broadcast::Receiver<()>>,
32}
33
34pub struct CandidateBase {
35    pub(crate) id: String,
36    pub(crate) network_type: AtomicU8,
37    pub(crate) candidate_type: CandidateType,
38
39    pub(crate) component: AtomicU16,
40    pub(crate) address: String,
41    pub(crate) port: u16,
42    pub(crate) related_address: Option<CandidateRelatedAddress>,
43    pub(crate) tcp_type: TcpType,
44
45    pub(crate) resolved_addr: SyncMutex<SocketAddr>,
46
47    pub(crate) last_sent: AtomicU64,
48    pub(crate) last_received: AtomicU64,
49
50    pub(crate) conn: Option<Arc<dyn util::Conn + Send + Sync>>,
51    pub(crate) closed_ch: Arc<Mutex<Option<broadcast::Sender<()>>>>,
52
53    pub(crate) foundation_override: String,
54    pub(crate) priority_override: u32,
55
56    //CandidateHost
57    pub(crate) network: String,
58    //CandidateRelay
59    pub(crate) relay_client: Option<Arc<turn::client::Client>>,
60}
61
62impl Default for CandidateBase {
63    fn default() -> Self {
64        Self {
65            id: String::new(),
66            network_type: AtomicU8::new(0),
67            candidate_type: CandidateType::default(),
68
69            component: AtomicU16::new(0),
70            address: String::new(),
71            port: 0,
72            related_address: None,
73            tcp_type: TcpType::default(),
74
75            resolved_addr: SyncMutex::new(SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0)),
76
77            last_sent: AtomicU64::new(0),
78            last_received: AtomicU64::new(0),
79
80            conn: None,
81            closed_ch: Arc::new(Mutex::new(None)),
82
83            foundation_override: String::new(),
84            priority_override: 0,
85            network: String::new(),
86            relay_client: None,
87        }
88    }
89}
90
91// String makes the candidateBase printable
92impl fmt::Display for CandidateBase {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        if let Some(related_address) = self.related_address() {
95            write!(
96                f,
97                "{} {} {}:{}{}",
98                self.network_type(),
99                self.candidate_type(),
100                self.address(),
101                self.port(),
102                related_address,
103            )
104        } else {
105            write!(
106                f,
107                "{} {} {}:{}",
108                self.network_type(),
109                self.candidate_type(),
110                self.address(),
111                self.port(),
112            )
113        }
114    }
115}
116
117#[async_trait]
118impl Candidate for CandidateBase {
119    fn foundation(&self) -> String {
120        if !self.foundation_override.is_empty() {
121            return self.foundation_override.clone();
122        }
123
124        let mut buf = vec![];
125        buf.extend_from_slice(self.candidate_type().to_string().as_bytes());
126        buf.extend_from_slice(self.address.as_bytes());
127        buf.extend_from_slice(self.network_type().to_string().as_bytes());
128
129        let checksum = Crc::<u32>::new(&CRC_32_ISCSI).checksum(&buf);
130
131        format!("{checksum}")
132    }
133
134    /// Returns Candidate ID.
135    fn id(&self) -> String {
136        self.id.clone()
137    }
138
139    /// Returns candidate component.
140    fn component(&self) -> u16 {
141        self.component.load(Ordering::SeqCst)
142    }
143
144    fn set_component(&self, component: u16) {
145        self.component.store(component, Ordering::SeqCst);
146    }
147
148    /// Returns a time indicating the last time this candidate was received.
149    fn last_received(&self) -> SystemTime {
150        UNIX_EPOCH.add(Duration::from_nanos(
151            self.last_received.load(Ordering::SeqCst),
152        ))
153    }
154
155    /// Returns a time indicating the last time this candidate was sent.
156    fn last_sent(&self) -> SystemTime {
157        UNIX_EPOCH.add(Duration::from_nanos(self.last_sent.load(Ordering::SeqCst)))
158    }
159
160    /// Returns candidate NetworkType.
161    fn network_type(&self) -> NetworkType {
162        NetworkType::from(self.network_type.load(Ordering::SeqCst))
163    }
164
165    /// Returns Candidate Address.
166    fn address(&self) -> String {
167        self.address.clone()
168    }
169
170    /// Returns Candidate Port.
171    fn port(&self) -> u16 {
172        self.port
173    }
174
175    /// Computes the priority for this ICE Candidate.
176    fn priority(&self) -> u32 {
177        if self.priority_override != 0 {
178            return self.priority_override;
179        }
180
181        // The local preference MUST be an integer from 0 (lowest preference) to
182        // 65535 (highest preference) inclusive.  When there is only a single IP
183        // address, this value SHOULD be set to 65535.  If there are multiple
184        // candidates for a particular component for a particular data stream
185        // that have the same type, the local preference MUST be unique for each
186        // one.
187        (1 << 24) * u32::from(self.candidate_type().preference())
188            + (1 << 8) * u32::from(self.local_preference())
189            + (256 - u32::from(self.component()))
190    }
191
192    /// Returns `Option<CandidateRelatedAddress>`.
193    fn related_address(&self) -> Option<CandidateRelatedAddress> {
194        self.related_address.as_ref().cloned()
195    }
196
197    /// Returns candidate type.
198    fn candidate_type(&self) -> CandidateType {
199        self.candidate_type
200    }
201
202    fn tcp_type(&self) -> TcpType {
203        self.tcp_type
204    }
205
206    /// Returns the string representation of the ICECandidate.
207    fn marshal(&self) -> String {
208        let mut val = format!(
209            "{} {} {} {} {} {} typ {}",
210            self.foundation(),
211            self.component(),
212            self.network_type().network_short(),
213            self.priority(),
214            self.address(),
215            self.port(),
216            self.candidate_type()
217        );
218
219        if self.tcp_type != TcpType::Unspecified {
220            val += format!(" tcptype {}", self.tcp_type()).as_str();
221        }
222
223        if let Some(related_address) = self.related_address() {
224            val += format!(
225                " raddr {} rport {}",
226                related_address.address, related_address.port,
227            )
228            .as_str();
229        }
230
231        val
232    }
233
234    fn addr(&self) -> SocketAddr {
235        *self.resolved_addr.lock()
236    }
237
238    /// Stops the recvLoop.
239    async fn close(&self) -> Result<()> {
240        {
241            let mut closed_ch = self.closed_ch.lock().await;
242            if closed_ch.is_none() {
243                return Err(Error::ErrClosed);
244            }
245            closed_ch.take();
246        }
247
248        if let Some(relay_client) = &self.relay_client {
249            let _ = relay_client.close().await;
250        }
251
252        if let Some(conn) = &self.conn {
253            let _ = conn.close().await;
254        }
255
256        Ok(())
257    }
258
259    fn seen(&self, outbound: bool) {
260        let d = SystemTime::now()
261            .duration_since(UNIX_EPOCH)
262            .unwrap_or_else(|_| Duration::from_secs(0));
263
264        if outbound {
265            self.set_last_sent(d);
266        } else {
267            self.set_last_received(d);
268        }
269    }
270
271    async fn write_to(&self, raw: &[u8], dst: &(dyn Candidate + Send + Sync)) -> Result<usize> {
272        let n = if let Some(conn) = &self.conn {
273            let addr = dst.addr();
274            conn.send_to(raw, addr).await?
275        } else {
276            0
277        };
278        self.seen(true);
279        Ok(n)
280    }
281
282    /// Used to compare two candidateBases.
283    fn equal(&self, other: &dyn Candidate) -> bool {
284        self.network_type() == other.network_type()
285            && self.candidate_type() == other.candidate_type()
286            && self.address() == other.address()
287            && self.port() == other.port()
288            && self.tcp_type() == other.tcp_type()
289            && self.related_address() == other.related_address()
290    }
291
292    fn set_ip(&self, ip: &IpAddr) -> Result<()> {
293        let network_type = determine_network_type(&self.network, ip)?;
294
295        self.network_type
296            .store(network_type as u8, Ordering::SeqCst);
297
298        let addr = create_addr(network_type, *ip, self.port);
299        *self.resolved_addr.lock() = addr;
300
301        Ok(())
302    }
303
304    fn get_conn(&self) -> Option<&Arc<dyn util::Conn + Send + Sync>> {
305        self.conn.as_ref()
306    }
307
308    fn get_closed_ch(&self) -> Arc<Mutex<Option<broadcast::Sender<()>>>> {
309        self.closed_ch.clone()
310    }
311}
312
313impl CandidateBase {
314    pub fn set_last_received(&self, d: Duration) {
315        #[allow(clippy::cast_possible_truncation)]
316        self.last_received
317            .store(d.as_nanos() as u64, Ordering::SeqCst);
318    }
319
320    pub fn set_last_sent(&self, d: Duration) {
321        #[allow(clippy::cast_possible_truncation)]
322        self.last_sent.store(d.as_nanos() as u64, Ordering::SeqCst);
323    }
324
325    /// Returns the local preference for this candidate.
326    pub fn local_preference(&self) -> u16 {
327        if self.network_type().is_tcp() {
328            // RFC 6544, section 4.2
329            //
330            // In Section 4.1.2.1 of [RFC5245], a recommended formula for UDP ICE
331            // candidate prioritization is defined.  For TCP candidates, the same
332            // formula and candidate type preferences SHOULD be used, and the
333            // RECOMMENDED type preferences for the new candidate types defined in
334            // this document (see Section 5) are 105 for NAT-assisted candidates and
335            // 75 for UDP-tunneled candidates.
336            //
337            // (...)
338            //
339            // With TCP candidates, the local preference part of the recommended
340            // priority formula is updated to also include the directionality
341            // (active, passive, or simultaneous-open) of the TCP connection.  The
342            // RECOMMENDED local preference is then defined as:
343            //
344            //     local preference = (2^13) * direction-pref + other-pref
345            //
346            // The direction-pref MUST be between 0 and 7 (both inclusive), with 7
347            // being the most preferred.  The other-pref MUST be between 0 and 8191
348            // (both inclusive), with 8191 being the most preferred.  It is
349            // RECOMMENDED that the host, UDP-tunneled, and relayed TCP candidates
350            // have the direction-pref assigned as follows: 6 for active, 4 for
351            // passive, and 2 for S-O.  For the NAT-assisted and server reflexive
352            // candidates, the RECOMMENDED values are: 6 for S-O, 4 for active, and
353            // 2 for passive.
354            //
355            // (...)
356            //
357            // If any two candidates have the same type-preference and direction-
358            // pref, they MUST have a unique other-pref.  With this specification,
359            // this usually only happens with multi-homed hosts, in which case
360            // other-pref is the preference for the particular IP address from which
361            // the candidate was obtained.  When there is only a single IP address,
362            // this value SHOULD be set to the maximum allowed value (8191).
363            let other_pref: u16 = 8191;
364
365            let direction_pref: u16 = match self.candidate_type() {
366                CandidateType::Host | CandidateType::Relay => match self.tcp_type() {
367                    TcpType::Active => 6,
368                    TcpType::Passive => 4,
369                    TcpType::SimultaneousOpen => 2,
370                    TcpType::Unspecified => 0,
371                },
372                CandidateType::PeerReflexive | CandidateType::ServerReflexive => {
373                    match self.tcp_type() {
374                        TcpType::SimultaneousOpen => 6,
375                        TcpType::Active => 4,
376                        TcpType::Passive => 2,
377                        TcpType::Unspecified => 0,
378                    }
379                }
380                CandidateType::Unspecified => 0,
381            };
382
383            (1 << 13) * direction_pref + other_pref
384        } else {
385            DEFAULT_LOCAL_PREFERENCE
386        }
387    }
388}
389
390/// Creates a Candidate from its string representation.
391pub fn unmarshal_candidate(raw: &str) -> Result<impl Candidate> {
392    let split: Vec<&str> = raw.split_whitespace().collect();
393    if split.len() < 8 {
394        return Err(Error::Other(format!(
395            "{:?} ({})",
396            Error::ErrAttributeTooShortIceCandidate,
397            split.len()
398        )));
399    }
400
401    // Foundation
402    let foundation = split[0].to_owned();
403
404    // Component
405    let component: u16 = split[1].parse()?;
406
407    // Network
408    let network = split[2].to_owned();
409
410    // Priority
411    let priority: u32 = split[3].parse()?;
412
413    // Address
414    let address = split[4].to_owned();
415
416    // Port
417    let port: u16 = split[5].parse()?;
418
419    let typ = split[7];
420
421    let mut rel_addr = String::new();
422    let mut rel_port = 0;
423    let mut tcp_type = TcpType::Unspecified;
424
425    if split.len() > 8 {
426        let split2 = &split[8..];
427
428        if split2[0] == "raddr" {
429            if split2.len() < 4 {
430                return Err(Error::Other(format!(
431                    "{:?}: incorrect length",
432                    Error::ErrParseRelatedAddr
433                )));
434            }
435
436            // RelatedAddress
437            split2[1].clone_into(&mut rel_addr);
438
439            // RelatedPort
440            rel_port = split2[3].parse()?;
441        } else if split2[0] == "tcptype" {
442            if split2.len() < 2 {
443                return Err(Error::Other(format!(
444                    "{:?}: incorrect length",
445                    Error::ErrParseType
446                )));
447            }
448
449            tcp_type = TcpType::from(split2[1]);
450        }
451    }
452
453    match typ {
454        "host" => {
455            let config = CandidateHostConfig {
456                base_config: CandidateBaseConfig {
457                    network,
458                    address,
459                    port,
460                    component,
461                    priority,
462                    foundation,
463                    ..CandidateBaseConfig::default()
464                },
465                tcp_type,
466            };
467            config.new_candidate_host()
468        }
469        "srflx" => {
470            let config = CandidateServerReflexiveConfig {
471                base_config: CandidateBaseConfig {
472                    network,
473                    address,
474                    port,
475                    component,
476                    priority,
477                    foundation,
478                    ..CandidateBaseConfig::default()
479                },
480                rel_addr,
481                rel_port,
482            };
483            config.new_candidate_server_reflexive()
484        }
485        "prflx" => {
486            let config = CandidatePeerReflexiveConfig {
487                base_config: CandidateBaseConfig {
488                    network,
489                    address,
490                    port,
491                    component,
492                    priority,
493                    foundation,
494                    ..CandidateBaseConfig::default()
495                },
496                rel_addr,
497                rel_port,
498            };
499
500            config.new_candidate_peer_reflexive()
501        }
502        "relay" => {
503            let config = CandidateRelayConfig {
504                base_config: CandidateBaseConfig {
505                    network,
506                    address,
507                    port,
508                    component,
509                    priority,
510                    foundation,
511                    ..CandidateBaseConfig::default()
512                },
513                rel_addr,
514                rel_port,
515                ..CandidateRelayConfig::default()
516            };
517            config.new_candidate_relay()
518        }
519        _ => Err(Error::Other(format!(
520            "{:?} ({})",
521            Error::ErrUnknownCandidateType,
522            typ
523        ))),
524    }
525}