use std::fmt;
use std::ops::Add;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use async_trait::async_trait;
use crc::{Crc, CRC_32_ISCSI};
use portable_atomic::{AtomicU16, AtomicU64, AtomicU8};
use tokio::sync::{broadcast, Mutex};
use util::sync::Mutex as SyncMutex;
use super::*;
use crate::candidate::candidate_host::CandidateHostConfig;
use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
use crate::candidate::candidate_relay::CandidateRelayConfig;
use crate::candidate::candidate_server_reflexive::CandidateServerReflexiveConfig;
use crate::error::*;
use crate::util::*;
#[derive(Default)]
pub struct CandidateBaseConfig {
pub candidate_id: String,
pub network: String,
pub address: String,
pub port: u16,
pub component: u16,
pub priority: u32,
pub foundation: String,
pub conn: Option<Arc<dyn util::Conn + Send + Sync>>,
pub initialized_ch: Option<broadcast::Receiver<()>>,
}
pub struct CandidateBase {
pub(crate) id: String,
pub(crate) network_type: AtomicU8,
pub(crate) candidate_type: CandidateType,
pub(crate) component: AtomicU16,
pub(crate) address: String,
pub(crate) port: u16,
pub(crate) related_address: Option<CandidateRelatedAddress>,
pub(crate) tcp_type: TcpType,
pub(crate) resolved_addr: SyncMutex<SocketAddr>,
pub(crate) last_sent: AtomicU64,
pub(crate) last_received: AtomicU64,
pub(crate) conn: Option<Arc<dyn util::Conn + Send + Sync>>,
pub(crate) closed_ch: Arc<Mutex<Option<broadcast::Sender<()>>>>,
pub(crate) foundation_override: String,
pub(crate) priority_override: u32,
pub(crate) network: String,
pub(crate) relay_client: Option<Arc<turn::client::Client>>,
}
impl Default for CandidateBase {
fn default() -> Self {
Self {
id: String::new(),
network_type: AtomicU8::new(0),
candidate_type: CandidateType::default(),
component: AtomicU16::new(0),
address: String::new(),
port: 0,
related_address: None,
tcp_type: TcpType::default(),
resolved_addr: SyncMutex::new(SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0)),
last_sent: AtomicU64::new(0),
last_received: AtomicU64::new(0),
conn: None,
closed_ch: Arc::new(Mutex::new(None)),
foundation_override: String::new(),
priority_override: 0,
network: String::new(),
relay_client: None,
}
}
}
impl fmt::Display for CandidateBase {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(related_address) = self.related_address() {
write!(
f,
"{} {} {}:{}{}",
self.network_type(),
self.candidate_type(),
self.address(),
self.port(),
related_address,
)
} else {
write!(
f,
"{} {} {}:{}",
self.network_type(),
self.candidate_type(),
self.address(),
self.port(),
)
}
}
}
#[async_trait]
impl Candidate for CandidateBase {
fn foundation(&self) -> String {
if !self.foundation_override.is_empty() {
return self.foundation_override.clone();
}
let mut buf = vec![];
buf.extend_from_slice(self.candidate_type().to_string().as_bytes());
buf.extend_from_slice(self.address.as_bytes());
buf.extend_from_slice(self.network_type().to_string().as_bytes());
let checksum = Crc::<u32>::new(&CRC_32_ISCSI).checksum(&buf);
format!("{checksum}")
}
fn id(&self) -> String {
self.id.clone()
}
fn component(&self) -> u16 {
self.component.load(Ordering::SeqCst)
}
fn set_component(&self, component: u16) {
self.component.store(component, Ordering::SeqCst);
}
fn last_received(&self) -> SystemTime {
UNIX_EPOCH.add(Duration::from_nanos(
self.last_received.load(Ordering::SeqCst),
))
}
fn last_sent(&self) -> SystemTime {
UNIX_EPOCH.add(Duration::from_nanos(self.last_sent.load(Ordering::SeqCst)))
}
fn network_type(&self) -> NetworkType {
NetworkType::from(self.network_type.load(Ordering::SeqCst))
}
fn address(&self) -> String {
self.address.clone()
}
fn port(&self) -> u16 {
self.port
}
fn priority(&self) -> u32 {
if self.priority_override != 0 {
return self.priority_override;
}
(1 << 24) * u32::from(self.candidate_type().preference())
+ (1 << 8) * u32::from(self.local_preference())
+ (256 - u32::from(self.component()))
}
fn related_address(&self) -> Option<CandidateRelatedAddress> {
self.related_address.as_ref().cloned()
}
fn candidate_type(&self) -> CandidateType {
self.candidate_type
}
fn tcp_type(&self) -> TcpType {
self.tcp_type
}
fn marshal(&self) -> String {
let mut val = format!(
"{} {} {} {} {} {} typ {}",
self.foundation(),
self.component(),
self.network_type().network_short(),
self.priority(),
self.address(),
self.port(),
self.candidate_type()
);
if self.tcp_type != TcpType::Unspecified {
val += format!(" tcptype {}", self.tcp_type()).as_str();
}
if let Some(related_address) = self.related_address() {
val += format!(
" raddr {} rport {}",
related_address.address, related_address.port,
)
.as_str();
}
val
}
fn addr(&self) -> SocketAddr {
*self.resolved_addr.lock()
}
async fn close(&self) -> Result<()> {
{
let mut closed_ch = self.closed_ch.lock().await;
if closed_ch.is_none() {
return Err(Error::ErrClosed);
}
closed_ch.take();
}
if let Some(relay_client) = &self.relay_client {
let _ = relay_client.close().await;
}
if let Some(conn) = &self.conn {
let _ = conn.close().await;
}
Ok(())
}
fn seen(&self, outbound: bool) {
let d = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0));
if outbound {
self.set_last_sent(d);
} else {
self.set_last_received(d);
}
}
async fn write_to(&self, raw: &[u8], dst: &(dyn Candidate + Send + Sync)) -> Result<usize> {
let n = if let Some(conn) = &self.conn {
let addr = dst.addr();
conn.send_to(raw, addr).await?
} else {
0
};
self.seen(true);
Ok(n)
}
fn equal(&self, other: &dyn Candidate) -> bool {
self.network_type() == other.network_type()
&& self.candidate_type() == other.candidate_type()
&& self.address() == other.address()
&& self.port() == other.port()
&& self.tcp_type() == other.tcp_type()
&& self.related_address() == other.related_address()
}
fn set_ip(&self, ip: &IpAddr) -> Result<()> {
let network_type = determine_network_type(&self.network, ip)?;
self.network_type
.store(network_type as u8, Ordering::SeqCst);
let addr = create_addr(network_type, *ip, self.port);
*self.resolved_addr.lock() = addr;
Ok(())
}
fn get_conn(&self) -> Option<&Arc<dyn util::Conn + Send + Sync>> {
self.conn.as_ref()
}
fn get_closed_ch(&self) -> Arc<Mutex<Option<broadcast::Sender<()>>>> {
self.closed_ch.clone()
}
}
impl CandidateBase {
pub fn set_last_received(&self, d: Duration) {
#[allow(clippy::cast_possible_truncation)]
self.last_received
.store(d.as_nanos() as u64, Ordering::SeqCst);
}
pub fn set_last_sent(&self, d: Duration) {
#[allow(clippy::cast_possible_truncation)]
self.last_sent.store(d.as_nanos() as u64, Ordering::SeqCst);
}
pub fn local_preference(&self) -> u16 {
if self.network_type().is_tcp() {
let other_pref: u16 = 8191;
let direction_pref: u16 = match self.candidate_type() {
CandidateType::Host | CandidateType::Relay => match self.tcp_type() {
TcpType::Active => 6,
TcpType::Passive => 4,
TcpType::SimultaneousOpen => 2,
TcpType::Unspecified => 0,
},
CandidateType::PeerReflexive | CandidateType::ServerReflexive => {
match self.tcp_type() {
TcpType::SimultaneousOpen => 6,
TcpType::Active => 4,
TcpType::Passive => 2,
TcpType::Unspecified => 0,
}
}
CandidateType::Unspecified => 0,
};
(1 << 13) * direction_pref + other_pref
} else {
DEFAULT_LOCAL_PREFERENCE
}
}
}
pub fn unmarshal_candidate(raw: &str) -> Result<impl Candidate> {
let split: Vec<&str> = raw.split_whitespace().collect();
if split.len() < 8 {
return Err(Error::Other(format!(
"{:?} ({})",
Error::ErrAttributeTooShortIceCandidate,
split.len()
)));
}
let foundation = split[0].to_owned();
let component: u16 = split[1].parse()?;
let network = split[2].to_owned();
let priority: u32 = split[3].parse()?;
let address = split[4].to_owned();
let port: u16 = split[5].parse()?;
let typ = split[7];
let mut rel_addr = String::new();
let mut rel_port = 0;
let mut tcp_type = TcpType::Unspecified;
if split.len() > 8 {
let split2 = &split[8..];
if split2[0] == "raddr" {
if split2.len() < 4 {
return Err(Error::Other(format!(
"{:?}: incorrect length",
Error::ErrParseRelatedAddr
)));
}
rel_addr = split2[1].to_owned();
rel_port = split2[3].parse()?;
} else if split2[0] == "tcptype" {
if split2.len() < 2 {
return Err(Error::Other(format!(
"{:?}: incorrect length",
Error::ErrParseType
)));
}
tcp_type = TcpType::from(split2[1]);
}
}
match typ {
"host" => {
let config = CandidateHostConfig {
base_config: CandidateBaseConfig {
network,
address,
port,
component,
priority,
foundation,
..CandidateBaseConfig::default()
},
tcp_type,
};
config.new_candidate_host()
}
"srflx" => {
let config = CandidateServerReflexiveConfig {
base_config: CandidateBaseConfig {
network,
address,
port,
component,
priority,
foundation,
..CandidateBaseConfig::default()
},
rel_addr,
rel_port,
};
config.new_candidate_server_reflexive()
}
"prflx" => {
let config = CandidatePeerReflexiveConfig {
base_config: CandidateBaseConfig {
network,
address,
port,
component,
priority,
foundation,
..CandidateBaseConfig::default()
},
rel_addr,
rel_port,
};
config.new_candidate_peer_reflexive()
}
"relay" => {
let config = CandidateRelayConfig {
base_config: CandidateBaseConfig {
network,
address,
port,
component,
priority,
foundation,
..CandidateBaseConfig::default()
},
rel_addr,
rel_port,
..CandidateRelayConfig::default()
};
config.new_candidate_relay()
}
_ => Err(Error::Other(format!(
"{:?} ({})",
Error::ErrUnknownCandidateType,
typ
))),
}
}