1use std::fmt;
2use std::ops::Add;
3use std::sync::atomic::Ordering;
4use std::sync::Arc;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use async_trait::async_trait;
8use crc::{Crc, CRC_32_ISCSI};
9use portable_atomic::{AtomicU16, AtomicU64, AtomicU8};
10use tokio::sync::{broadcast, Mutex};
11use util::sync::Mutex as SyncMutex;
12
13use super::*;
14use crate::candidate::candidate_host::CandidateHostConfig;
15use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
16use crate::candidate::candidate_relay::CandidateRelayConfig;
17use crate::candidate::candidate_server_reflexive::CandidateServerReflexiveConfig;
18use crate::error::*;
19use crate::util::*;
20
21#[derive(Default)]
22pub struct CandidateBaseConfig {
23 pub candidate_id: String,
24 pub network: String,
25 pub address: String,
26 pub port: u16,
27 pub component: u16,
28 pub priority: u32,
29 pub foundation: String,
30 pub conn: Option<Arc<dyn util::Conn + Send + Sync>>,
31 pub initialized_ch: Option<broadcast::Receiver<()>>,
32}
33
34pub struct CandidateBase {
35 pub(crate) id: String,
36 pub(crate) network_type: AtomicU8,
37 pub(crate) candidate_type: CandidateType,
38
39 pub(crate) component: AtomicU16,
40 pub(crate) address: String,
41 pub(crate) port: u16,
42 pub(crate) related_address: Option<CandidateRelatedAddress>,
43 pub(crate) tcp_type: TcpType,
44
45 pub(crate) resolved_addr: SyncMutex<SocketAddr>,
46
47 pub(crate) last_sent: AtomicU64,
48 pub(crate) last_received: AtomicU64,
49
50 pub(crate) conn: Option<Arc<dyn util::Conn + Send + Sync>>,
51 pub(crate) closed_ch: Arc<Mutex<Option<broadcast::Sender<()>>>>,
52
53 pub(crate) foundation_override: String,
54 pub(crate) priority_override: u32,
55
56 pub(crate) network: String,
58 pub(crate) relay_client: Option<Arc<turn::client::Client>>,
60}
61
62impl Default for CandidateBase {
63 fn default() -> Self {
64 Self {
65 id: String::new(),
66 network_type: AtomicU8::new(0),
67 candidate_type: CandidateType::default(),
68
69 component: AtomicU16::new(0),
70 address: String::new(),
71 port: 0,
72 related_address: None,
73 tcp_type: TcpType::default(),
74
75 resolved_addr: SyncMutex::new(SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0)),
76
77 last_sent: AtomicU64::new(0),
78 last_received: AtomicU64::new(0),
79
80 conn: None,
81 closed_ch: Arc::new(Mutex::new(None)),
82
83 foundation_override: String::new(),
84 priority_override: 0,
85 network: String::new(),
86 relay_client: None,
87 }
88 }
89}
90
91impl fmt::Display for CandidateBase {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 if let Some(related_address) = self.related_address() {
95 write!(
96 f,
97 "{} {} {}:{}{}",
98 self.network_type(),
99 self.candidate_type(),
100 self.address(),
101 self.port(),
102 related_address,
103 )
104 } else {
105 write!(
106 f,
107 "{} {} {}:{}",
108 self.network_type(),
109 self.candidate_type(),
110 self.address(),
111 self.port(),
112 )
113 }
114 }
115}
116
117#[async_trait]
118impl Candidate for CandidateBase {
119 fn foundation(&self) -> String {
120 if !self.foundation_override.is_empty() {
121 return self.foundation_override.clone();
122 }
123
124 let mut buf = vec![];
125 buf.extend_from_slice(self.candidate_type().to_string().as_bytes());
126 buf.extend_from_slice(self.address.as_bytes());
127 buf.extend_from_slice(self.network_type().to_string().as_bytes());
128
129 let checksum = Crc::<u32>::new(&CRC_32_ISCSI).checksum(&buf);
130
131 format!("{checksum}")
132 }
133
134 fn id(&self) -> String {
136 self.id.clone()
137 }
138
139 fn component(&self) -> u16 {
141 self.component.load(Ordering::SeqCst)
142 }
143
144 fn set_component(&self, component: u16) {
145 self.component.store(component, Ordering::SeqCst);
146 }
147
148 fn last_received(&self) -> SystemTime {
150 UNIX_EPOCH.add(Duration::from_nanos(
151 self.last_received.load(Ordering::SeqCst),
152 ))
153 }
154
155 fn last_sent(&self) -> SystemTime {
157 UNIX_EPOCH.add(Duration::from_nanos(self.last_sent.load(Ordering::SeqCst)))
158 }
159
160 fn network_type(&self) -> NetworkType {
162 NetworkType::from(self.network_type.load(Ordering::SeqCst))
163 }
164
165 fn address(&self) -> String {
167 self.address.clone()
168 }
169
170 fn port(&self) -> u16 {
172 self.port
173 }
174
175 fn priority(&self) -> u32 {
177 if self.priority_override != 0 {
178 return self.priority_override;
179 }
180
181 (1 << 24) * u32::from(self.candidate_type().preference())
188 + (1 << 8) * u32::from(self.local_preference())
189 + (256 - u32::from(self.component()))
190 }
191
192 fn related_address(&self) -> Option<CandidateRelatedAddress> {
194 self.related_address.as_ref().cloned()
195 }
196
197 fn candidate_type(&self) -> CandidateType {
199 self.candidate_type
200 }
201
202 fn tcp_type(&self) -> TcpType {
203 self.tcp_type
204 }
205
206 fn marshal(&self) -> String {
208 let mut val = format!(
209 "{} {} {} {} {} {} typ {}",
210 self.foundation(),
211 self.component(),
212 self.network_type().network_short(),
213 self.priority(),
214 self.address(),
215 self.port(),
216 self.candidate_type()
217 );
218
219 if self.tcp_type != TcpType::Unspecified {
220 val += format!(" tcptype {}", self.tcp_type()).as_str();
221 }
222
223 if let Some(related_address) = self.related_address() {
224 val += format!(
225 " raddr {} rport {}",
226 related_address.address, related_address.port,
227 )
228 .as_str();
229 }
230
231 val
232 }
233
234 fn addr(&self) -> SocketAddr {
235 *self.resolved_addr.lock()
236 }
237
238 async fn close(&self) -> Result<()> {
240 {
241 let mut closed_ch = self.closed_ch.lock().await;
242 if closed_ch.is_none() {
243 return Err(Error::ErrClosed);
244 }
245 closed_ch.take();
246 }
247
248 if let Some(relay_client) = &self.relay_client {
249 let _ = relay_client.close().await;
250 }
251
252 if let Some(conn) = &self.conn {
253 let _ = conn.close().await;
254 }
255
256 Ok(())
257 }
258
259 fn seen(&self, outbound: bool) {
260 let d = SystemTime::now()
261 .duration_since(UNIX_EPOCH)
262 .unwrap_or_else(|_| Duration::from_secs(0));
263
264 if outbound {
265 self.set_last_sent(d);
266 } else {
267 self.set_last_received(d);
268 }
269 }
270
271 async fn write_to(&self, raw: &[u8], dst: &(dyn Candidate + Send + Sync)) -> Result<usize> {
272 let n = if let Some(conn) = &self.conn {
273 let addr = dst.addr();
274 conn.send_to(raw, addr).await?
275 } else {
276 0
277 };
278 self.seen(true);
279 Ok(n)
280 }
281
282 fn equal(&self, other: &dyn Candidate) -> bool {
284 self.network_type() == other.network_type()
285 && self.candidate_type() == other.candidate_type()
286 && self.address() == other.address()
287 && self.port() == other.port()
288 && self.tcp_type() == other.tcp_type()
289 && self.related_address() == other.related_address()
290 }
291
292 fn set_ip(&self, ip: &IpAddr) -> Result<()> {
293 let network_type = determine_network_type(&self.network, ip)?;
294
295 self.network_type
296 .store(network_type as u8, Ordering::SeqCst);
297
298 let addr = create_addr(network_type, *ip, self.port);
299 *self.resolved_addr.lock() = addr;
300
301 Ok(())
302 }
303
304 fn get_conn(&self) -> Option<&Arc<dyn util::Conn + Send + Sync>> {
305 self.conn.as_ref()
306 }
307
308 fn get_closed_ch(&self) -> Arc<Mutex<Option<broadcast::Sender<()>>>> {
309 self.closed_ch.clone()
310 }
311}
312
313impl CandidateBase {
314 pub fn set_last_received(&self, d: Duration) {
315 #[allow(clippy::cast_possible_truncation)]
316 self.last_received
317 .store(d.as_nanos() as u64, Ordering::SeqCst);
318 }
319
320 pub fn set_last_sent(&self, d: Duration) {
321 #[allow(clippy::cast_possible_truncation)]
322 self.last_sent.store(d.as_nanos() as u64, Ordering::SeqCst);
323 }
324
325 pub fn local_preference(&self) -> u16 {
327 if self.network_type().is_tcp() {
328 let other_pref: u16 = 8191;
364
365 let direction_pref: u16 = match self.candidate_type() {
366 CandidateType::Host | CandidateType::Relay => match self.tcp_type() {
367 TcpType::Active => 6,
368 TcpType::Passive => 4,
369 TcpType::SimultaneousOpen => 2,
370 TcpType::Unspecified => 0,
371 },
372 CandidateType::PeerReflexive | CandidateType::ServerReflexive => {
373 match self.tcp_type() {
374 TcpType::SimultaneousOpen => 6,
375 TcpType::Active => 4,
376 TcpType::Passive => 2,
377 TcpType::Unspecified => 0,
378 }
379 }
380 CandidateType::Unspecified => 0,
381 };
382
383 (1 << 13) * direction_pref + other_pref
384 } else {
385 DEFAULT_LOCAL_PREFERENCE
386 }
387 }
388}
389
390pub fn unmarshal_candidate(raw: &str) -> Result<impl Candidate> {
392 let split: Vec<&str> = raw.split_whitespace().collect();
393 if split.len() < 8 {
394 return Err(Error::Other(format!(
395 "{:?} ({})",
396 Error::ErrAttributeTooShortIceCandidate,
397 split.len()
398 )));
399 }
400
401 let foundation = split[0].to_owned();
403
404 let component: u16 = split[1].parse()?;
406
407 let network = split[2].to_owned();
409
410 let priority: u32 = split[3].parse()?;
412
413 let address = split[4].to_owned();
415
416 let port: u16 = split[5].parse()?;
418
419 let typ = split[7];
420
421 let mut rel_addr = String::new();
422 let mut rel_port = 0;
423 let mut tcp_type = TcpType::Unspecified;
424
425 if split.len() > 8 {
426 let split2 = &split[8..];
427
428 if split2[0] == "raddr" {
429 if split2.len() < 4 {
430 return Err(Error::Other(format!(
431 "{:?}: incorrect length",
432 Error::ErrParseRelatedAddr
433 )));
434 }
435
436 split2[1].clone_into(&mut rel_addr);
438
439 rel_port = split2[3].parse()?;
441 } else if split2[0] == "tcptype" {
442 if split2.len() < 2 {
443 return Err(Error::Other(format!(
444 "{:?}: incorrect length",
445 Error::ErrParseType
446 )));
447 }
448
449 tcp_type = TcpType::from(split2[1]);
450 }
451 }
452
453 match typ {
454 "host" => {
455 let config = CandidateHostConfig {
456 base_config: CandidateBaseConfig {
457 network,
458 address,
459 port,
460 component,
461 priority,
462 foundation,
463 ..CandidateBaseConfig::default()
464 },
465 tcp_type,
466 };
467 config.new_candidate_host()
468 }
469 "srflx" => {
470 let config = CandidateServerReflexiveConfig {
471 base_config: CandidateBaseConfig {
472 network,
473 address,
474 port,
475 component,
476 priority,
477 foundation,
478 ..CandidateBaseConfig::default()
479 },
480 rel_addr,
481 rel_port,
482 };
483 config.new_candidate_server_reflexive()
484 }
485 "prflx" => {
486 let config = CandidatePeerReflexiveConfig {
487 base_config: CandidateBaseConfig {
488 network,
489 address,
490 port,
491 component,
492 priority,
493 foundation,
494 ..CandidateBaseConfig::default()
495 },
496 rel_addr,
497 rel_port,
498 };
499
500 config.new_candidate_peer_reflexive()
501 }
502 "relay" => {
503 let config = CandidateRelayConfig {
504 base_config: CandidateBaseConfig {
505 network,
506 address,
507 port,
508 component,
509 priority,
510 foundation,
511 ..CandidateBaseConfig::default()
512 },
513 rel_addr,
514 rel_port,
515 ..CandidateRelayConfig::default()
516 };
517 config.new_candidate_relay()
518 }
519 _ => Err(Error::Other(format!(
520 "{:?} ({})",
521 Error::ErrUnknownCandidateType,
522 typ
523 ))),
524 }
525}