use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use async_trait::async_trait;
use stun::agent::*;
use stun::attributes::*;
use stun::fingerprint::*;
use stun::integrity::*;
use stun::message::*;
use stun::textattrs::*;
use tokio::time::{Duration, Instant};
use crate::agent::agent_internal::*;
use crate::candidate::*;
use crate::control::*;
use crate::priority::*;
use crate::use_candidate::*;
#[async_trait]
trait ControllingSelector {
async fn start(&self);
async fn contact_candidates(&self);
async fn ping_candidate(
&self,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
);
async fn handle_success_response(
&self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
remote_addr: SocketAddr,
);
async fn handle_binding_request(
&self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
);
}
#[async_trait]
trait ControlledSelector {
async fn start(&self);
async fn contact_candidates(&self);
async fn ping_candidate(
&self,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
);
async fn handle_success_response(
&self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
remote_addr: SocketAddr,
);
async fn handle_binding_request(
&self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
);
}
impl AgentInternal {
fn is_nominatable(&self, c: &Arc<dyn Candidate + Send + Sync>) -> bool {
let start_time = *self.start_time.lock();
match c.candidate_type() {
CandidateType::Host => {
Instant::now()
.checked_duration_since(start_time)
.unwrap_or_else(|| Duration::from_secs(0))
.as_nanos()
> self.host_acceptance_min_wait.as_nanos()
}
CandidateType::ServerReflexive => {
Instant::now()
.checked_duration_since(start_time)
.unwrap_or_else(|| Duration::from_secs(0))
.as_nanos()
> self.srflx_acceptance_min_wait.as_nanos()
}
CandidateType::PeerReflexive => {
Instant::now()
.checked_duration_since(start_time)
.unwrap_or_else(|| Duration::from_secs(0))
.as_nanos()
> self.prflx_acceptance_min_wait.as_nanos()
}
CandidateType::Relay => {
Instant::now()
.checked_duration_since(start_time)
.unwrap_or_else(|| Duration::from_secs(0))
.as_nanos()
> self.relay_acceptance_min_wait.as_nanos()
}
CandidateType::Unspecified => {
log::error!(
"is_nominatable invalid candidate type {}",
c.candidate_type()
);
false
}
}
}
async fn nominate_pair(&self) {
let result = {
let nominated_pair = self.nominated_pair.lock().await;
if let Some(pair) = &*nominated_pair {
let (msg, result) = {
let ufrag_pwd = self.ufrag_pwd.lock().await;
let username =
ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str();
let mut msg = Message::new();
let result = msg.build(&[
Box::new(BINDING_REQUEST),
Box::new(TransactionId::new()),
Box::new(Username::new(ATTR_USERNAME, username)),
Box::<UseCandidateAttr>::default(),
Box::new(AttrControlling(self.tie_breaker.load(Ordering::SeqCst))),
Box::new(PriorityAttr(pair.local.priority())),
Box::new(MessageIntegrity::new_short_term_integrity(
ufrag_pwd.remote_pwd.clone(),
)),
Box::new(FINGERPRINT),
]);
(msg, result)
};
if let Err(err) = result {
log::error!("{}", err);
None
} else {
log::trace!(
"ping STUN (nominate candidate pair from {} to {}",
pair.local,
pair.remote
);
let local = pair.local.clone();
let remote = pair.remote.clone();
Some((msg, local, remote))
}
} else {
None
}
};
if let Some((msg, local, remote)) = result {
self.send_binding_request(&msg, &local, &remote).await;
}
}
pub(crate) async fn start(&self) {
if self.is_controlling.load(Ordering::SeqCst) {
ControllingSelector::start(self).await;
} else {
ControlledSelector::start(self).await;
}
}
pub(crate) async fn contact_candidates(&self) {
if self.is_controlling.load(Ordering::SeqCst) {
ControllingSelector::contact_candidates(self).await;
} else {
ControlledSelector::contact_candidates(self).await;
}
}
pub(crate) async fn ping_candidate(
&self,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
) {
if self.is_controlling.load(Ordering::SeqCst) {
ControllingSelector::ping_candidate(self, local, remote).await;
} else {
ControlledSelector::ping_candidate(self, local, remote).await;
}
}
pub(crate) async fn handle_success_response(
&self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
remote_addr: SocketAddr,
) {
if self.is_controlling.load(Ordering::SeqCst) {
ControllingSelector::handle_success_response(self, m, local, remote, remote_addr).await;
} else {
ControlledSelector::handle_success_response(self, m, local, remote, remote_addr).await;
}
}
pub(crate) async fn handle_binding_request(
&self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
) {
if self.is_controlling.load(Ordering::SeqCst) {
ControllingSelector::handle_binding_request(self, m, local, remote).await;
} else {
ControlledSelector::handle_binding_request(self, m, local, remote).await;
}
}
}
#[async_trait]
impl ControllingSelector for AgentInternal {
async fn start(&self) {
{
let mut nominated_pair = self.nominated_pair.lock().await;
*nominated_pair = None;
}
*self.start_time.lock() = Instant::now();
}
async fn contact_candidates(&self) {
if self.lite.load(Ordering::SeqCst) {
log::trace!("now falling back to full agent");
}
let nominated_pair_is_some = {
let nominated_pair = self.nominated_pair.lock().await;
nominated_pair.is_some()
};
if self.agent_conn.get_selected_pair().is_some() {
if self.validate_selected_pair().await {
log::trace!("[{}]: checking keepalive", self.get_name());
self.check_keepalive().await;
}
} else if nominated_pair_is_some {
self.nominate_pair().await;
} else {
let has_nominated_pair =
if let Some(p) = self.agent_conn.get_best_valid_candidate_pair().await {
self.is_nominatable(&p.local) && self.is_nominatable(&p.remote)
} else {
false
};
if has_nominated_pair {
if let Some(p) = self.agent_conn.get_best_valid_candidate_pair().await {
log::trace!(
"Nominatable pair found, nominating ({}, {})",
p.local.to_string(),
p.remote.to_string()
);
p.nominated.store(true, Ordering::SeqCst);
{
let mut nominated_pair = self.nominated_pair.lock().await;
*nominated_pair = Some(p);
}
}
self.nominate_pair().await;
} else {
self.ping_all_candidates().await;
}
}
}
async fn ping_candidate(
&self,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
) {
let (msg, result) = {
let ufrag_pwd = self.ufrag_pwd.lock().await;
let username = ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str();
let mut msg = Message::new();
let result = msg.build(&[
Box::new(BINDING_REQUEST),
Box::new(TransactionId::new()),
Box::new(Username::new(ATTR_USERNAME, username)),
Box::new(AttrControlling(self.tie_breaker.load(Ordering::SeqCst))),
Box::new(PriorityAttr(local.priority())),
Box::new(MessageIntegrity::new_short_term_integrity(
ufrag_pwd.remote_pwd.clone(),
)),
Box::new(FINGERPRINT),
]);
(msg, result)
};
if let Err(err) = result {
log::error!("{}", err);
} else {
self.send_binding_request(&msg, local, remote).await;
}
}
async fn handle_success_response(
&self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
remote_addr: SocketAddr,
) {
if let Some(pending_request) = self.handle_inbound_binding_success(m.transaction_id).await {
let transaction_addr = pending_request.destination;
if transaction_addr != remote_addr {
log::debug!("discard message: transaction source and destination does not match expected({}), actual({})", transaction_addr, remote);
return;
}
log::trace!(
"inbound STUN (SuccessResponse) from {} to {}",
remote,
local
);
let selected_pair_is_none = self.agent_conn.get_selected_pair().is_none();
if let Some(p) = self.find_pair(local, remote).await {
p.state
.store(CandidatePairState::Succeeded as u8, Ordering::SeqCst);
log::trace!(
"Found valid candidate pair: {}, p.state: {}, isUseCandidate: {}, {}",
p,
p.state.load(Ordering::SeqCst),
pending_request.is_use_candidate,
selected_pair_is_none
);
if pending_request.is_use_candidate && selected_pair_is_none {
self.set_selected_pair(Some(Arc::clone(&p))).await;
}
} else {
log::error!("Success response from invalid candidate pair");
}
} else {
log::warn!(
"discard message from ({}), unknown TransactionID 0x{:?}",
remote,
m.transaction_id
);
}
}
async fn handle_binding_request(
&self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
) {
self.send_binding_success(m, local, remote).await;
log::trace!("controllingSelector: sendBindingSuccess");
if let Some(p) = self.find_pair(local, remote).await {
let nominated_pair_is_none = {
let nominated_pair = self.nominated_pair.lock().await;
nominated_pair.is_none()
};
log::trace!(
"controllingSelector: after findPair {}, p.state: {}, {}",
p,
p.state.load(Ordering::SeqCst),
nominated_pair_is_none,
);
if p.state.load(Ordering::SeqCst) == CandidatePairState::Succeeded as u8
&& nominated_pair_is_none
&& self.agent_conn.get_selected_pair().is_none()
{
if let Some(best_pair) = self.agent_conn.get_best_available_candidate_pair().await {
log::trace!(
"controllingSelector: getBestAvailableCandidatePair {}",
best_pair
);
if best_pair == p
&& self.is_nominatable(&p.local)
&& self.is_nominatable(&p.remote)
{
log::trace!("The candidate ({}, {}) is the best candidate available, marking it as nominated",
p.local, p.remote);
{
let mut nominated_pair = self.nominated_pair.lock().await;
*nominated_pair = Some(p);
}
self.nominate_pair().await;
}
} else {
log::trace!("No best pair available");
}
}
} else {
log::trace!("controllingSelector: addPair");
self.add_pair(local.clone(), remote.clone()).await;
}
}
}
#[async_trait]
impl ControlledSelector for AgentInternal {
async fn start(&self) {}
async fn contact_candidates(&self) {
if self.lite.load(Ordering::SeqCst) {
self.validate_selected_pair().await;
} else if self.agent_conn.get_selected_pair().is_some() {
if self.validate_selected_pair().await {
log::trace!("[{}]: checking keepalive", self.get_name());
self.check_keepalive().await;
}
} else {
self.ping_all_candidates().await;
}
}
async fn ping_candidate(
&self,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
) {
let (msg, result) = {
let ufrag_pwd = self.ufrag_pwd.lock().await;
let username = ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str();
let mut msg = Message::new();
let result = msg.build(&[
Box::new(BINDING_REQUEST),
Box::new(TransactionId::new()),
Box::new(Username::new(ATTR_USERNAME, username)),
Box::new(AttrControlled(self.tie_breaker.load(Ordering::SeqCst))),
Box::new(PriorityAttr(local.priority())),
Box::new(MessageIntegrity::new_short_term_integrity(
ufrag_pwd.remote_pwd.clone(),
)),
Box::new(FINGERPRINT),
]);
(msg, result)
};
if let Err(err) = result {
log::error!("{}", err);
} else {
self.send_binding_request(&msg, local, remote).await;
}
}
async fn handle_success_response(
&self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
remote_addr: SocketAddr,
) {
if let Some(pending_request) = self.handle_inbound_binding_success(m.transaction_id).await {
let transaction_addr = pending_request.destination;
if transaction_addr != remote_addr {
log::debug!("discard message: transaction source and destination does not match expected({}), actual({})", transaction_addr, remote);
return;
}
log::trace!(
"inbound STUN (SuccessResponse) from {} to {}",
remote,
local
);
if let Some(p) = self.find_pair(local, remote).await {
p.state
.store(CandidatePairState::Succeeded as u8, Ordering::SeqCst);
log::trace!("Found valid candidate pair: {}", p);
} else {
log::error!("Success response from invalid candidate pair");
}
} else {
log::warn!(
"discard message from ({}), unknown TransactionID 0x{:?}",
remote,
m.transaction_id
);
}
}
async fn handle_binding_request(
&self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
) {
if self.find_pair(local, remote).await.is_none() {
self.add_pair(local.clone(), remote.clone()).await;
}
if let Some(p) = self.find_pair(local, remote).await {
let use_candidate = m.contains(ATTR_USE_CANDIDATE);
if use_candidate {
if p.state.load(Ordering::SeqCst) == CandidatePairState::Succeeded as u8 {
if self.agent_conn.get_selected_pair().is_none() {
self.set_selected_pair(Some(Arc::clone(&p))).await;
}
self.send_binding_success(m, local, remote).await;
} else {
self.ping_candidate(local, remote).await;
}
} else {
self.send_binding_success(m, local, remote).await;
self.ping_candidate(local, remote).await;
}
}
}
}