webrtc_util/vnet/
router.rs

1#[cfg(test)]
2mod router_test;
3
4use std::collections::HashMap;
5use std::future::Future;
6use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
7use std::ops::{Add, Sub};
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::atomic::Ordering;
11use std::sync::{Arc, Weak};
12use std::time::SystemTime;
13
14use async_trait::async_trait;
15use ipnet::*;
16use portable_atomic::AtomicU64;
17use tokio::sync::{mpsc, Mutex};
18use tokio::time::Duration;
19
20use crate::error::*;
21use crate::vnet::chunk::*;
22use crate::vnet::chunk_queue::*;
23use crate::vnet::interface::*;
24use crate::vnet::nat::*;
25use crate::vnet::net::*;
26use crate::vnet::resolver::*;
27
28const DEFAULT_ROUTER_QUEUE_SIZE: usize = 0; // unlimited
29
30lazy_static! {
31    pub static ref ROUTER_ID_CTR: AtomicU64 = AtomicU64::new(0);
32}
33
34// Generate a unique router name
35fn assign_router_name() -> String {
36    let n = ROUTER_ID_CTR.fetch_add(1, Ordering::SeqCst);
37    format!("router{n}")
38}
39
40// RouterConfig ...
41#[derive(Default)]
42pub struct RouterConfig {
43    // name of router. If not specified, a unique name will be assigned.
44    pub name: String,
45    // cidr notation, like "192.0.2.0/24"
46    pub cidr: String,
47    // static_ips is an array of static IP addresses to be assigned for this router.
48    // If no static IP address is given, the router will automatically assign
49    // an IP address.
50    // This will be ignored if this router is the root.
51    pub static_ips: Vec<String>,
52    // static_ip is deprecated. Use static_ips.
53    pub static_ip: String,
54    // Internal queue size
55    pub queue_size: usize,
56    // Effective only when this router has a parent router
57    pub nat_type: Option<NatType>,
58    // Minimum Delay
59    pub min_delay: Duration,
60    // Max Jitter
61    pub max_jitter: Duration,
62}
63
64// NIC is a network interface controller that interfaces Router
65#[async_trait]
66pub trait Nic {
67    async fn get_interface(&self, ifc_name: &str) -> Option<Interface>;
68    async fn add_addrs_to_interface(&mut self, ifc_name: &str, addrs: &[IpNet]) -> Result<()>;
69    async fn on_inbound_chunk(&self, c: Box<dyn Chunk + Send + Sync>);
70    async fn get_static_ips(&self) -> Vec<IpAddr>;
71    async fn set_router(&self, r: Arc<Mutex<Router>>) -> Result<()>;
72}
73
74// ChunkFilter is a handler users can add to filter chunks.
75// If the filter returns false, the packet will be dropped.
76pub type ChunkFilterFn = Box<dyn (Fn(&(dyn Chunk + Send + Sync)) -> bool) + Send + Sync>;
77
78#[derive(Default)]
79pub struct RouterInternal {
80    pub(crate) nat_type: Option<NatType>,           // read-only
81    pub(crate) ipv4net: IpNet,                      // read-only
82    pub(crate) parent: Option<Weak<Mutex<Router>>>, // read-only
83    pub(crate) nat: NetworkAddressTranslator,       // read-only
84    pub(crate) nics: HashMap<String, Weak<Mutex<dyn Nic + Send + Sync>>>, // read-only
85    pub(crate) chunk_filters: Vec<ChunkFilterFn>,   // requires mutex [x]
86    pub(crate) last_id: u8, // requires mutex [x], used to assign the last digit of IPv4 address
87}
88
89// Router ...
90#[derive(Default)]
91pub struct Router {
92    name: String,                              // read-only
93    ipv4net: IpNet,                            // read-only
94    min_delay: Duration,                       // requires mutex [x]
95    max_jitter: Duration,                      // requires mutex [x]
96    queue: Arc<ChunkQueue>,                    // read-only
97    interfaces: Vec<Interface>,                // read-only
98    static_ips: Vec<IpAddr>,                   // read-only
99    static_local_ips: HashMap<String, IpAddr>, // read-only,
100    children: Vec<Arc<Mutex<Router>>>,         // read-only
101    done: Option<mpsc::Sender<()>>,            // requires mutex [x]
102    pub(crate) resolver: Arc<Mutex<Resolver>>, // read-only
103    push_ch: Option<mpsc::Sender<()>>,         // writer requires mutex
104    router_internal: Arc<Mutex<RouterInternal>>,
105}
106
107#[async_trait]
108impl Nic for Router {
109    async fn get_interface(&self, ifc_name: &str) -> Option<Interface> {
110        for ifc in &self.interfaces {
111            if ifc.name == ifc_name {
112                return Some(ifc.clone());
113            }
114        }
115        None
116    }
117
118    async fn add_addrs_to_interface(&mut self, ifc_name: &str, addrs: &[IpNet]) -> Result<()> {
119        for ifc in &mut self.interfaces {
120            if ifc.name == ifc_name {
121                for addr in addrs {
122                    ifc.add_addr(*addr);
123                }
124                return Ok(());
125            }
126        }
127
128        Err(Error::ErrNotFound)
129    }
130
131    async fn on_inbound_chunk(&self, c: Box<dyn Chunk + Send + Sync>) {
132        let from_parent: Box<dyn Chunk + Send + Sync> = {
133            let router_internal = self.router_internal.lock().await;
134            match router_internal.nat.translate_inbound(&*c).await {
135                Ok(from) => {
136                    if let Some(from) = from {
137                        from
138                    } else {
139                        return;
140                    }
141                }
142                Err(err) => {
143                    log::warn!("[{}] {}", self.name, err);
144                    return;
145                }
146            }
147        };
148
149        self.push(from_parent).await;
150    }
151
152    async fn get_static_ips(&self) -> Vec<IpAddr> {
153        self.static_ips.clone()
154    }
155
156    // caller must hold the mutex
157    async fn set_router(&self, parent: Arc<Mutex<Router>>) -> Result<()> {
158        {
159            let mut router_internal = self.router_internal.lock().await;
160            router_internal.parent = Some(Arc::downgrade(&parent));
161        }
162
163        let parent_resolver = {
164            let p = parent.lock().await;
165            Arc::clone(&p.resolver)
166        };
167        {
168            let mut resolver = self.resolver.lock().await;
169            resolver.set_parent(Arc::downgrade(&parent_resolver));
170        }
171
172        let mut mapped_ips = vec![];
173        let mut local_ips = vec![];
174
175        // when this method is called, one or more IP address has already been assigned by
176        // the parent router.
177        if let Some(ifc) = self.get_interface("eth0").await {
178            for ifc_addr in ifc.addrs() {
179                let ip = ifc_addr.addr();
180                mapped_ips.push(ip);
181
182                if let Some(loc_ip) = self.static_local_ips.get(&ip.to_string()) {
183                    local_ips.push(*loc_ip);
184                }
185            }
186        } else {
187            return Err(Error::ErrNoIpaddrEth0);
188        }
189
190        // Set up NAT here
191        {
192            let mut router_internal = self.router_internal.lock().await;
193            if router_internal.nat_type.is_none() {
194                router_internal.nat_type = Some(NatType {
195                    mapping_behavior: EndpointDependencyType::EndpointIndependent,
196                    filtering_behavior: EndpointDependencyType::EndpointAddrPortDependent,
197                    hair_pining: false,
198                    port_preservation: false,
199                    mapping_life_time: Duration::from_secs(30),
200                    ..Default::default()
201                });
202            }
203
204            router_internal.nat = NetworkAddressTranslator::new(NatConfig {
205                name: self.name.clone(),
206                nat_type: router_internal.nat_type.unwrap(),
207                mapped_ips,
208                local_ips,
209            })?;
210        }
211
212        Ok(())
213    }
214}
215
216impl Router {
217    pub fn new(config: RouterConfig) -> Result<Self> {
218        let ipv4net: IpNet = config.cidr.parse()?;
219
220        let queue_size = if config.queue_size > 0 {
221            config.queue_size
222        } else {
223            DEFAULT_ROUTER_QUEUE_SIZE
224        };
225
226        // set up network interface, lo0
227        let mut lo0 = Interface::new(LO0_STR.to_owned(), vec![]);
228        if let Ok(ipnet) = Interface::convert(
229            SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0),
230            Some(SocketAddr::new(Ipv4Addr::new(255, 0, 0, 0).into(), 0)),
231        ) {
232            lo0.add_addr(ipnet);
233        }
234
235        // set up network interface, eth0
236        let eth0 = Interface::new("eth0".to_owned(), vec![]);
237
238        // local host name resolver
239        let resolver = Arc::new(Mutex::new(Resolver::new()));
240
241        let name = if config.name.is_empty() {
242            assign_router_name()
243        } else {
244            config.name.clone()
245        };
246
247        let mut static_ips = vec![];
248        let mut static_local_ips = HashMap::new();
249        for ip_str in &config.static_ips {
250            let ip_pair: Vec<&str> = ip_str.split('/').collect();
251            if let Ok(ip) = IpAddr::from_str(ip_pair[0]) {
252                if ip_pair.len() > 1 {
253                    let loc_ip = IpAddr::from_str(ip_pair[1])?;
254                    if !ipv4net.contains(&loc_ip) {
255                        return Err(Error::ErrLocalIpBeyondStaticIpsSubset);
256                    }
257                    static_local_ips.insert(ip.to_string(), loc_ip);
258                }
259                static_ips.push(ip);
260            }
261        }
262        if !config.static_ip.is_empty() {
263            log::warn!("static_ip is deprecated. Use static_ips instead");
264            if let Ok(ip) = IpAddr::from_str(&config.static_ip) {
265                static_ips.push(ip);
266            }
267        }
268
269        let n_static_local = static_local_ips.len();
270        if n_static_local > 0 && n_static_local != static_ips.len() {
271            return Err(Error::ErrLocalIpNoStaticsIpsAssociated);
272        }
273
274        let router_internal = RouterInternal {
275            nat_type: config.nat_type,
276            ipv4net,
277            nics: HashMap::new(),
278            ..Default::default()
279        };
280
281        Ok(Router {
282            name,
283            ipv4net,
284            interfaces: vec![lo0, eth0],
285            static_ips,
286            static_local_ips,
287            resolver,
288            router_internal: Arc::new(Mutex::new(router_internal)),
289            queue: Arc::new(ChunkQueue::new(queue_size)),
290            min_delay: config.min_delay,
291            max_jitter: config.max_jitter,
292            ..Default::default()
293        })
294    }
295
296    // caller must hold the mutex
297    pub(crate) fn get_interfaces(&self) -> &[Interface] {
298        &self.interfaces
299    }
300
301    // Start ...
302    pub fn start(&mut self) -> Pin<Box<dyn Future<Output = Result<()>>>> {
303        if self.done.is_some() {
304            return Box::pin(async move { Err(Error::ErrRouterAlreadyStarted) });
305        }
306
307        let (done_tx, mut done_rx) = mpsc::channel(1);
308        let (push_ch_tx, mut push_ch_rx) = mpsc::channel(1);
309        self.done = Some(done_tx);
310        self.push_ch = Some(push_ch_tx);
311
312        let router_internal = Arc::clone(&self.router_internal);
313        let queue = Arc::clone(&self.queue);
314        let max_jitter = self.max_jitter;
315        let min_delay = self.min_delay;
316        let name = self.name.clone();
317        let ipv4net = self.ipv4net;
318
319        tokio::spawn(async move {
320            while let Ok(d) = Router::process_chunks(
321                &name,
322                ipv4net,
323                max_jitter,
324                min_delay,
325                &queue,
326                &router_internal,
327            )
328            .await
329            {
330                if d == Duration::from_secs(0) {
331                    tokio::select! {
332                     _ = push_ch_rx.recv() =>{},
333                     _ = done_rx.recv() => break,
334                    }
335                } else {
336                    let t = tokio::time::sleep(d);
337                    tokio::pin!(t);
338
339                    tokio::select! {
340                    _ = t.as_mut() => {},
341                    _ = done_rx.recv() => break,
342                    }
343                }
344            }
345        });
346
347        let children = self.children.clone();
348        Box::pin(async move { Router::start_children(children).await })
349    }
350
351    // Stop ...
352    pub fn stop(&mut self) -> Pin<Box<dyn Future<Output = Result<()>>>> {
353        if self.done.is_none() {
354            return Box::pin(async move { Err(Error::ErrRouterAlreadyStopped) });
355        }
356        self.push_ch.take();
357        self.done.take();
358
359        let children = self.children.clone();
360        Box::pin(async move { Router::stop_children(children).await })
361    }
362
363    async fn start_children(children: Vec<Arc<Mutex<Router>>>) -> Result<()> {
364        for child in children {
365            let mut c = child.lock().await;
366            c.start().await?;
367        }
368
369        Ok(())
370    }
371
372    async fn stop_children(children: Vec<Arc<Mutex<Router>>>) -> Result<()> {
373        for child in children {
374            let mut c = child.lock().await;
375            c.stop().await?;
376        }
377
378        Ok(())
379    }
380
381    // AddRouter adds a chile Router.
382    // after parent.add_router(child), also call child.set_router(parent) to set child's parent router
383    pub async fn add_router(&mut self, child: Arc<Mutex<Router>>) -> Result<()> {
384        // Router is a NIC. Add it as a NIC so that packets are routed to this child
385        // router.
386        let nic = Arc::clone(&child) as Arc<Mutex<dyn Nic + Send + Sync>>;
387        self.children.push(child);
388        self.add_net(nic).await
389    }
390
391    // AddNet ...
392    // after router.add_net(nic), also call nic.set_router(router) to set nic's router
393    pub async fn add_net(&mut self, nic: Arc<Mutex<dyn Nic + Send + Sync>>) -> Result<()> {
394        let mut router_internal = self.router_internal.lock().await;
395        router_internal.add_nic(nic).await
396    }
397
398    // AddHost adds a mapping of hostname and an IP address to the local resolver.
399    pub async fn add_host(&mut self, host_name: String, ip_addr: String) -> Result<()> {
400        let mut resolver = self.resolver.lock().await;
401        resolver.add_host(host_name, ip_addr)
402    }
403
404    // AddChunkFilter adds a filter for chunks traversing this router.
405    // You may add more than one filter. The filters are called in the order of this method call.
406    // If a chunk is dropped by a filter, subsequent filter will not receive the chunk.
407    pub async fn add_chunk_filter(&self, filter: ChunkFilterFn) {
408        let mut router_internal = self.router_internal.lock().await;
409        router_internal.chunk_filters.push(filter);
410    }
411
412    pub(crate) async fn push(&self, mut c: Box<dyn Chunk + Send + Sync>) {
413        log::debug!("[{}] route {}", self.name, c);
414        if self.done.is_some() {
415            c.set_timestamp();
416
417            if self.queue.push(c).await {
418                if let Some(push_ch) = &self.push_ch {
419                    let _ = push_ch.try_send(());
420                }
421            } else {
422                log::warn!("[{}] queue was full. dropped a chunk", self.name);
423            }
424        } else {
425            log::warn!("router is done");
426        }
427    }
428
429    async fn process_chunks(
430        name: &str,
431        ipv4net: IpNet,
432        max_jitter: Duration,
433        min_delay: Duration,
434        queue: &Arc<ChunkQueue>,
435        router_internal: &Arc<Mutex<RouterInternal>>,
436    ) -> Result<Duration> {
437        // Introduce jitter by delaying the processing of chunks.
438        let mj = max_jitter.as_nanos() as u64;
439        if mj > 0 {
440            let jitter = Duration::from_nanos(rand::random::<u64>() % mj);
441            tokio::time::sleep(jitter).await;
442        }
443
444        //      cut_off
445        //         v min delay
446        //         |<--->|
447        //  +------------:--
448        //  |OOOOOOXXXXX :   --> time
449        //  +------------:--
450        //  |<--->|     now
451        //    due
452
453        let entered_at = SystemTime::now();
454        let cut_off = entered_at.sub(min_delay);
455
456        // the next sleep duration
457        let mut d;
458
459        loop {
460            d = Duration::from_secs(0);
461
462            if let Some(c) = queue.peek().await {
463                // check timestamp to find if the chunk is due
464                if c.get_timestamp().duration_since(cut_off).is_ok() {
465                    // There is one or more chunk in the queue but none of them are due.
466                    // Calculate the next sleep duration here.
467                    let next_expire = c.get_timestamp().add(min_delay);
468                    if let Ok(diff) = next_expire.duration_since(entered_at) {
469                        d = diff;
470                        break;
471                    }
472                }
473            } else {
474                break; // no more chunk in the queue
475            }
476
477            if let Some(c) = queue.pop().await {
478                let ri = router_internal.lock().await;
479                let mut blocked = false;
480                for filter in &ri.chunk_filters {
481                    if !filter(&*c) {
482                        blocked = true;
483                        break;
484                    }
485                }
486                if blocked {
487                    continue; // discard
488                }
489
490                let dst_ip = c.get_destination_ip();
491
492                // check if the destination is in our subnet
493                if ipv4net.contains(&dst_ip) {
494                    // search for the destination NIC
495                    if let Some(nic) = ri.nics.get(&dst_ip.to_string()).and_then(|p| p.upgrade()) {
496                        // found the NIC, forward the chunk to the NIC.
497                        // call to NIC must unlock mutex
498                        let ni = nic.lock().await;
499                        ni.on_inbound_chunk(c).await;
500                    } else {
501                        // NIC not found. drop it.
502                        log::debug!("[{}] {} unreachable", name, c);
503                    }
504                } else {
505                    // the destination is outside of this subnet
506                    // is this WAN?
507                    if let Some(parent) = &ri.parent.clone().and_then(|p| p.upgrade()) {
508                        // Pass it to the parent via NAT
509                        if let Some(to_parent) = ri.nat.translate_outbound(&*c).await? {
510                            // call to parent router mutex unlock mutex
511                            let p = parent.lock().await;
512                            p.push(to_parent).await;
513                        }
514                    } else {
515                        // this WAN. No route for this chunk
516                        log::debug!("[{}] no route found for {}", name, c);
517                    }
518                }
519            } else {
520                break; // no more chunk in the queue
521            }
522        }
523
524        Ok(d)
525    }
526}
527
528impl RouterInternal {
529    // caller must hold the mutex
530    pub(crate) async fn add_nic(&mut self, nic: Arc<Mutex<dyn Nic + Send + Sync>>) -> Result<()> {
531        let mut ips = {
532            let ni = nic.lock().await;
533            ni.get_static_ips().await
534        };
535
536        if ips.is_empty() {
537            // assign an IP address
538            let ip = self.assign_ip_address()?;
539            log::debug!("assign_ip_address: {}", ip);
540            ips.push(ip);
541        }
542
543        let mut ipnets = vec![];
544        for ip in &ips {
545            if !self.ipv4net.contains(ip) {
546                return Err(Error::ErrStaticIpIsBeyondSubnet);
547            }
548            self.nics.insert(ip.to_string(), Arc::downgrade(&nic));
549            ipnets.push(IpNet::from_str(&format!(
550                "{}/{}",
551                ip,
552                self.ipv4net.prefix_len()
553            ))?);
554        }
555
556        {
557            let mut ni = nic.lock().await;
558            let _ = ni.add_addrs_to_interface("eth0", &ipnets).await;
559        }
560
561        Ok(())
562    }
563
564    // caller should hold the mutex
565    fn assign_ip_address(&mut self) -> Result<IpAddr> {
566        // See: https://stackoverflow.com/questions/14915188/ip-address-ending-with-zero
567
568        if self.last_id == 0xfe {
569            return Err(Error::ErrAddressSpaceExhausted);
570        }
571
572        self.last_id += 1;
573        match self.ipv4net.addr() {
574            IpAddr::V4(ipv4) => {
575                let mut ip = ipv4.octets();
576                ip[3] = self.last_id;
577                Ok(IpAddr::V4(Ipv4Addr::from(ip)))
578            }
579            IpAddr::V6(ipv6) => {
580                let mut ip = ipv6.octets();
581                ip[15] += self.last_id;
582                Ok(IpAddr::V6(Ipv6Addr::from(ip)))
583            }
584        }
585    }
586}