use crate::agent::agent_internal::*;
use crate::candidate::*;
use crate::control::*;
use crate::priority::*;
use crate::use_candidate::*;
use stun::{agent::*, attributes::*, fingerprint::*, integrity::*, message::*, textattrs::*};
use async_trait::async_trait;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::time::Instant;
#[async_trait]
trait ControllingSelector {
fn start(&mut self);
async fn contact_candidates(&mut self);
async fn ping_candidate(
&mut self,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
);
async fn handle_success_response(
&mut self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
remote_addr: SocketAddr,
);
async fn handle_binding_request(
&mut self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
);
}
#[async_trait]
trait ControlledSelector {
fn start(&mut self);
async fn contact_candidates(&mut self);
async fn ping_candidate(
&mut self,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
);
async fn handle_success_response(
&mut self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
remote_addr: SocketAddr,
);
async fn handle_binding_request(
&mut self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
);
}
impl AgentInternal {
async fn is_nominatable(&self, c: &Arc<dyn Candidate + Send + Sync>) -> bool {
match c.candidate_type() {
CandidateType::Host => {
Instant::now().duration_since(self.start_time).as_nanos()
> self.host_acceptance_min_wait.as_nanos()
}
CandidateType::ServerReflexive => {
Instant::now().duration_since(self.start_time).as_nanos()
> self.srflx_acceptance_min_wait.as_nanos()
}
CandidateType::PeerReflexive => {
Instant::now().duration_since(self.start_time).as_nanos()
> self.prflx_acceptance_min_wait.as_nanos()
}
CandidateType::Relay => {
Instant::now().duration_since(self.start_time).as_nanos()
> self.relay_acceptance_min_wait.as_nanos()
}
_ => {
log::error!(
"is_nominatable invalid candidate type {}",
c.candidate_type()
);
false
}
}
}
async fn nominate_pair(&mut self) {
if let Some(pair) = &self.nominated_pair {
let (msg, result) = {
let username = self.remote_ufrag.clone() + ":" + self.local_ufrag.as_str();
let mut msg = Message::new();
let result = msg.build(&[
Box::new(BINDING_REQUEST),
Box::new(TransactionId::default()),
Box::new(Username::new(ATTR_USERNAME, username)),
Box::new(UseCandidateAttr::default()),
Box::new(AttrControlling(self.tie_breaker)),
Box::new(PriorityAttr(pair.local.priority())),
Box::new(MessageIntegrity::new_short_term_integrity(
self.remote_pwd.clone(),
)),
Box::new(FINGERPRINT),
]);
(msg, result)
};
if let Err(err) = result {
log::error!("{}", err);
} else {
log::trace!(
"ping STUN (nominate candidate pair from {} to {}",
pair.local,
pair.remote
);
let local = pair.local.clone();
let remote = pair.remote.clone();
self.send_binding_request(&msg, &local, &remote).await;
}
}
}
pub(crate) fn start(&mut self) {
if self.is_controlling {
ControllingSelector::start(self);
} else {
ControlledSelector::start(self);
}
}
pub(crate) async fn contact_candidates(&mut self) {
if self.is_controlling {
ControllingSelector::contact_candidates(self).await;
} else {
ControlledSelector::contact_candidates(self).await;
}
}
pub(crate) async fn ping_candidate(
&mut self,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
) {
if self.is_controlling {
ControllingSelector::ping_candidate(self, local, remote).await;
} else {
ControlledSelector::ping_candidate(self, local, remote).await;
}
}
pub(crate) async fn handle_success_response(
&mut self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
remote_addr: SocketAddr,
) {
if self.is_controlling {
ControllingSelector::handle_success_response(self, m, local, remote, remote_addr).await;
} else {
ControlledSelector::handle_success_response(self, m, local, remote, remote_addr).await;
}
}
pub(crate) async fn handle_binding_request(
&mut self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
) {
if self.is_controlling {
ControllingSelector::handle_binding_request(self, m, local, remote).await;
} else {
ControlledSelector::handle_binding_request(self, m, local, remote).await;
}
}
}
#[async_trait]
impl ControllingSelector for AgentInternal {
fn start(&mut self) {
self.start_time = Instant::now();
self.nominated_pair = None;
}
async fn contact_candidates(&mut self) {
if self.lite {
log::trace!("now falling back to full agent");
}
if self.get_selected_pair().is_some() {
if self.validate_selected_pair().await {
log::trace!("checking keepalive");
self.check_keepalive().await;
}
} else if self.nominated_pair.is_some() {
self.nominate_pair().await;
} else {
let has_nominated_pair = if let Some(p) = self.get_best_available_candidate_pair() {
self.is_nominatable(&p.local).await && self.is_nominatable(&p.remote).await
} else {
false
};
if has_nominated_pair {
if let Some(p) = self.get_best_available_candidate_pair_mut() {
log::trace!(
"Nominatable pair found, nominating ({}, {})",
p.local.to_string(),
p.remote.to_string()
);
p.nominated = true;
self.nominated_pair = Some(p.clone());
}
self.nominate_pair().await;
} else {
self.ping_all_candidates().await;
}
}
}
async fn ping_candidate(
&mut self,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
) {
let (msg, result) = {
let username = self.remote_ufrag.clone() + ":" + self.local_ufrag.as_str();
let mut msg = Message::new();
let result = msg.build(&[
Box::new(BINDING_REQUEST),
Box::new(TransactionId::default()),
Box::new(Username::new(ATTR_USERNAME, username)),
Box::new(AttrControlling(self.tie_breaker)),
Box::new(PriorityAttr(local.priority())),
Box::new(MessageIntegrity::new_short_term_integrity(
self.remote_pwd.clone(),
)),
Box::new(FINGERPRINT),
]);
(msg, result)
};
if let Err(err) = result {
log::error!("{}", err);
} else {
self.send_binding_request(&msg, local, remote).await;
}
}
async fn handle_success_response(
&mut self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
remote_addr: SocketAddr,
) {
if let Some(pending_request) = self.handle_inbound_binding_success(m.transaction_id) {
let transaction_addr = pending_request.destination;
if transaction_addr != remote_addr {
log::debug!("discard message: transaction source and destination does not match expected({}), actual({})", transaction_addr, remote);
return;
}
log::trace!(
"inbound STUN (SuccessResponse) from {} to {}",
remote,
local
);
let selected_pair_is_none = self.get_selected_pair().is_none();
if let Some(p) = self.find_pair(local, remote) {
let mut p = p.clone();
p.state = CandidatePairState::Succeeded;
log::trace!("Found valid candidate pair: {}", p);
if pending_request.is_use_candidate && selected_pair_is_none {
self.set_selected_pair(Some(p.clone())).await;
}
} else {
log::error!("Success response from invalid candidate pair");
}
} else {
log::warn!(
"discard message from ({}), unknown TransactionID 0x{:?}",
remote,
m.transaction_id
);
}
}
async fn handle_binding_request(
&mut self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
) {
self.send_binding_request(m, local, remote).await;
if let Some(p) = self.find_pair(local, remote) {
if p.state == CandidatePairState::Succeeded
&& self.nominated_pair.is_none()
&& self.get_selected_pair().is_none()
{
if let Some(best_pair) = self.get_best_available_candidate_pair() {
if best_pair == p
&& self.is_nominatable(&p.local).await
&& self.is_nominatable(&p.remote).await
{
log::trace!("The candidate ({}, {}) is the best candidate available, marking it as nominated",
p.local, p.remote);
self.nominated_pair = Some(p.clone());
self.nominate_pair().await;
}
} else {
log::trace!("No best pair available");
}
}
} else {
self.add_pair(local.clone(), remote.clone());
}
}
}
#[async_trait]
impl ControlledSelector for AgentInternal {
fn start(&mut self) {}
async fn contact_candidates(&mut self) {
if self.lite {
self.validate_selected_pair().await;
} else if self.get_selected_pair().is_some() {
if self.validate_selected_pair().await {
log::trace!("checking keepalive");
self.check_keepalive().await;
}
} else {
self.ping_all_candidates().await;
}
}
async fn ping_candidate(
&mut self,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
) {
let (msg, result) = {
let username = self.remote_ufrag.clone() + ":" + self.local_ufrag.as_str();
let mut msg = Message::new();
let result = msg.build(&[
Box::new(BINDING_REQUEST),
Box::new(TransactionId::default()),
Box::new(Username::new(ATTR_USERNAME, username)),
Box::new(AttrControlled(self.tie_breaker)),
Box::new(PriorityAttr(local.priority())),
Box::new(MessageIntegrity::new_short_term_integrity(
self.remote_pwd.clone(),
)),
Box::new(FINGERPRINT),
]);
(msg, result)
};
if let Err(err) = result {
log::error!("{}", err);
} else {
self.send_binding_request(&msg, local, remote).await;
}
}
async fn handle_success_response(
&mut self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
remote_addr: SocketAddr,
) {
if let Some(pending_request) = self.handle_inbound_binding_success(m.transaction_id) {
let transaction_addr = pending_request.destination;
if transaction_addr != remote_addr {
log::debug!("discard message: transaction source and destination does not match expected({}), actual({})", transaction_addr, remote);
return;
}
log::trace!(
"inbound STUN (SuccessResponse) from {} to {}",
remote,
local
);
if let Some(p) = self.get_pair_mut(local, remote) {
p.state = CandidatePairState::Succeeded;
log::trace!("Found valid candidate pair: {}", p);
} else {
log::error!("Success response from invalid candidate pair");
}
} else {
log::warn!(
"discard message from ({}), unknown TransactionID 0x{:?}",
remote,
m.transaction_id
);
}
}
async fn handle_binding_request(
&mut self,
m: &Message,
local: &Arc<dyn Candidate + Send + Sync>,
remote: &Arc<dyn Candidate + Send + Sync>,
) {
if self.find_pair(local, remote).is_none() {
self.add_pair(local.clone(), remote.clone());
}
if let Some(p) = self.find_pair(local, remote) {
let use_candidate = m.contains(ATTR_USE_CANDIDATE);
if use_candidate {
if p.state == CandidatePairState::Succeeded {
if self.get_selected_pair().is_none() {
let pair = p.clone();
self.set_selected_pair(Some(pair)).await;
}
self.send_binding_success(m, local, remote).await;
} else {
self.ping_candidate(local, remote).await;
}
} else {
self.send_binding_success(m, local, remote).await;
self.ping_candidate(local, remote).await;
}
}
}
}