1#[cfg(test)]
2mod router_test;
3
4use std::collections::HashMap;
5use std::future::Future;
6use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
7use std::ops::{Add, Sub};
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::atomic::Ordering;
11use std::sync::{Arc, Weak};
12use std::time::SystemTime;
13
14use async_trait::async_trait;
15use ipnet::*;
16use portable_atomic::AtomicU64;
17use tokio::sync::{mpsc, Mutex};
18use tokio::time::Duration;
19
20use crate::error::*;
21use crate::vnet::chunk::*;
22use crate::vnet::chunk_queue::*;
23use crate::vnet::interface::*;
24use crate::vnet::nat::*;
25use crate::vnet::net::*;
26use crate::vnet::resolver::*;
27
28const DEFAULT_ROUTER_QUEUE_SIZE: usize = 0; lazy_static! {
31 pub static ref ROUTER_ID_CTR: AtomicU64 = AtomicU64::new(0);
32}
33
34fn assign_router_name() -> String {
36 let n = ROUTER_ID_CTR.fetch_add(1, Ordering::SeqCst);
37 format!("router{n}")
38}
39
40#[derive(Default)]
42pub struct RouterConfig {
43 pub name: String,
45 pub cidr: String,
47 pub static_ips: Vec<String>,
52 pub static_ip: String,
54 pub queue_size: usize,
56 pub nat_type: Option<NatType>,
58 pub min_delay: Duration,
60 pub max_jitter: Duration,
62}
63
64#[async_trait]
66pub trait Nic {
67 async fn get_interface(&self, ifc_name: &str) -> Option<Interface>;
68 async fn add_addrs_to_interface(&mut self, ifc_name: &str, addrs: &[IpNet]) -> Result<()>;
69 async fn on_inbound_chunk(&self, c: Box<dyn Chunk + Send + Sync>);
70 async fn get_static_ips(&self) -> Vec<IpAddr>;
71 async fn set_router(&self, r: Arc<Mutex<Router>>) -> Result<()>;
72}
73
74pub type ChunkFilterFn = Box<dyn (Fn(&(dyn Chunk + Send + Sync)) -> bool) + Send + Sync>;
77
78#[derive(Default)]
79pub struct RouterInternal {
80 pub(crate) nat_type: Option<NatType>, pub(crate) ipv4net: IpNet, pub(crate) parent: Option<Weak<Mutex<Router>>>, pub(crate) nat: NetworkAddressTranslator, pub(crate) nics: HashMap<String, Weak<Mutex<dyn Nic + Send + Sync>>>, pub(crate) chunk_filters: Vec<ChunkFilterFn>, pub(crate) last_id: u8, }
88
89#[derive(Default)]
91pub struct Router {
92 name: String, ipv4net: IpNet, min_delay: Duration, max_jitter: Duration, queue: Arc<ChunkQueue>, interfaces: Vec<Interface>, static_ips: Vec<IpAddr>, static_local_ips: HashMap<String, IpAddr>, children: Vec<Arc<Mutex<Router>>>, done: Option<mpsc::Sender<()>>, pub(crate) resolver: Arc<Mutex<Resolver>>, push_ch: Option<mpsc::Sender<()>>, router_internal: Arc<Mutex<RouterInternal>>,
105}
106
107#[async_trait]
108impl Nic for Router {
109 async fn get_interface(&self, ifc_name: &str) -> Option<Interface> {
110 for ifc in &self.interfaces {
111 if ifc.name == ifc_name {
112 return Some(ifc.clone());
113 }
114 }
115 None
116 }
117
118 async fn add_addrs_to_interface(&mut self, ifc_name: &str, addrs: &[IpNet]) -> Result<()> {
119 for ifc in &mut self.interfaces {
120 if ifc.name == ifc_name {
121 for addr in addrs {
122 ifc.add_addr(*addr);
123 }
124 return Ok(());
125 }
126 }
127
128 Err(Error::ErrNotFound)
129 }
130
131 async fn on_inbound_chunk(&self, c: Box<dyn Chunk + Send + Sync>) {
132 let from_parent: Box<dyn Chunk + Send + Sync> = {
133 let router_internal = self.router_internal.lock().await;
134 match router_internal.nat.translate_inbound(&*c).await {
135 Ok(from) => {
136 if let Some(from) = from {
137 from
138 } else {
139 return;
140 }
141 }
142 Err(err) => {
143 log::warn!("[{}] {}", self.name, err);
144 return;
145 }
146 }
147 };
148
149 self.push(from_parent).await;
150 }
151
152 async fn get_static_ips(&self) -> Vec<IpAddr> {
153 self.static_ips.clone()
154 }
155
156 async fn set_router(&self, parent: Arc<Mutex<Router>>) -> Result<()> {
158 {
159 let mut router_internal = self.router_internal.lock().await;
160 router_internal.parent = Some(Arc::downgrade(&parent));
161 }
162
163 let parent_resolver = {
164 let p = parent.lock().await;
165 Arc::clone(&p.resolver)
166 };
167 {
168 let mut resolver = self.resolver.lock().await;
169 resolver.set_parent(Arc::downgrade(&parent_resolver));
170 }
171
172 let mut mapped_ips = vec![];
173 let mut local_ips = vec![];
174
175 if let Some(ifc) = self.get_interface("eth0").await {
178 for ifc_addr in ifc.addrs() {
179 let ip = ifc_addr.addr();
180 mapped_ips.push(ip);
181
182 if let Some(loc_ip) = self.static_local_ips.get(&ip.to_string()) {
183 local_ips.push(*loc_ip);
184 }
185 }
186 } else {
187 return Err(Error::ErrNoIpaddrEth0);
188 }
189
190 {
192 let mut router_internal = self.router_internal.lock().await;
193 if router_internal.nat_type.is_none() {
194 router_internal.nat_type = Some(NatType {
195 mapping_behavior: EndpointDependencyType::EndpointIndependent,
196 filtering_behavior: EndpointDependencyType::EndpointAddrPortDependent,
197 hair_pining: false,
198 port_preservation: false,
199 mapping_life_time: Duration::from_secs(30),
200 ..Default::default()
201 });
202 }
203
204 router_internal.nat = NetworkAddressTranslator::new(NatConfig {
205 name: self.name.clone(),
206 nat_type: router_internal.nat_type.unwrap(),
207 mapped_ips,
208 local_ips,
209 })?;
210 }
211
212 Ok(())
213 }
214}
215
216impl Router {
217 pub fn new(config: RouterConfig) -> Result<Self> {
218 let ipv4net: IpNet = config.cidr.parse()?;
219
220 let queue_size = if config.queue_size > 0 {
221 config.queue_size
222 } else {
223 DEFAULT_ROUTER_QUEUE_SIZE
224 };
225
226 let mut lo0 = Interface::new(LO0_STR.to_owned(), vec![]);
228 if let Ok(ipnet) = Interface::convert(
229 SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0),
230 Some(SocketAddr::new(Ipv4Addr::new(255, 0, 0, 0).into(), 0)),
231 ) {
232 lo0.add_addr(ipnet);
233 }
234
235 let eth0 = Interface::new("eth0".to_owned(), vec![]);
237
238 let resolver = Arc::new(Mutex::new(Resolver::new()));
240
241 let name = if config.name.is_empty() {
242 assign_router_name()
243 } else {
244 config.name.clone()
245 };
246
247 let mut static_ips = vec![];
248 let mut static_local_ips = HashMap::new();
249 for ip_str in &config.static_ips {
250 let ip_pair: Vec<&str> = ip_str.split('/').collect();
251 if let Ok(ip) = IpAddr::from_str(ip_pair[0]) {
252 if ip_pair.len() > 1 {
253 let loc_ip = IpAddr::from_str(ip_pair[1])?;
254 if !ipv4net.contains(&loc_ip) {
255 return Err(Error::ErrLocalIpBeyondStaticIpsSubset);
256 }
257 static_local_ips.insert(ip.to_string(), loc_ip);
258 }
259 static_ips.push(ip);
260 }
261 }
262 if !config.static_ip.is_empty() {
263 log::warn!("static_ip is deprecated. Use static_ips instead");
264 if let Ok(ip) = IpAddr::from_str(&config.static_ip) {
265 static_ips.push(ip);
266 }
267 }
268
269 let n_static_local = static_local_ips.len();
270 if n_static_local > 0 && n_static_local != static_ips.len() {
271 return Err(Error::ErrLocalIpNoStaticsIpsAssociated);
272 }
273
274 let router_internal = RouterInternal {
275 nat_type: config.nat_type,
276 ipv4net,
277 nics: HashMap::new(),
278 ..Default::default()
279 };
280
281 Ok(Router {
282 name,
283 ipv4net,
284 interfaces: vec![lo0, eth0],
285 static_ips,
286 static_local_ips,
287 resolver,
288 router_internal: Arc::new(Mutex::new(router_internal)),
289 queue: Arc::new(ChunkQueue::new(queue_size)),
290 min_delay: config.min_delay,
291 max_jitter: config.max_jitter,
292 ..Default::default()
293 })
294 }
295
296 pub(crate) fn get_interfaces(&self) -> &[Interface] {
298 &self.interfaces
299 }
300
301 pub fn start(&mut self) -> Pin<Box<dyn Future<Output = Result<()>>>> {
303 if self.done.is_some() {
304 return Box::pin(async move { Err(Error::ErrRouterAlreadyStarted) });
305 }
306
307 let (done_tx, mut done_rx) = mpsc::channel(1);
308 let (push_ch_tx, mut push_ch_rx) = mpsc::channel(1);
309 self.done = Some(done_tx);
310 self.push_ch = Some(push_ch_tx);
311
312 let router_internal = Arc::clone(&self.router_internal);
313 let queue = Arc::clone(&self.queue);
314 let max_jitter = self.max_jitter;
315 let min_delay = self.min_delay;
316 let name = self.name.clone();
317 let ipv4net = self.ipv4net;
318
319 tokio::spawn(async move {
320 while let Ok(d) = Router::process_chunks(
321 &name,
322 ipv4net,
323 max_jitter,
324 min_delay,
325 &queue,
326 &router_internal,
327 )
328 .await
329 {
330 if d == Duration::from_secs(0) {
331 tokio::select! {
332 _ = push_ch_rx.recv() =>{},
333 _ = done_rx.recv() => break,
334 }
335 } else {
336 let t = tokio::time::sleep(d);
337 tokio::pin!(t);
338
339 tokio::select! {
340 _ = t.as_mut() => {},
341 _ = done_rx.recv() => break,
342 }
343 }
344 }
345 });
346
347 let children = self.children.clone();
348 Box::pin(async move { Router::start_children(children).await })
349 }
350
351 pub fn stop(&mut self) -> Pin<Box<dyn Future<Output = Result<()>>>> {
353 if self.done.is_none() {
354 return Box::pin(async move { Err(Error::ErrRouterAlreadyStopped) });
355 }
356 self.push_ch.take();
357 self.done.take();
358
359 let children = self.children.clone();
360 Box::pin(async move { Router::stop_children(children).await })
361 }
362
363 async fn start_children(children: Vec<Arc<Mutex<Router>>>) -> Result<()> {
364 for child in children {
365 let mut c = child.lock().await;
366 c.start().await?;
367 }
368
369 Ok(())
370 }
371
372 async fn stop_children(children: Vec<Arc<Mutex<Router>>>) -> Result<()> {
373 for child in children {
374 let mut c = child.lock().await;
375 c.stop().await?;
376 }
377
378 Ok(())
379 }
380
381 pub async fn add_router(&mut self, child: Arc<Mutex<Router>>) -> Result<()> {
384 let nic = Arc::clone(&child) as Arc<Mutex<dyn Nic + Send + Sync>>;
387 self.children.push(child);
388 self.add_net(nic).await
389 }
390
391 pub async fn add_net(&mut self, nic: Arc<Mutex<dyn Nic + Send + Sync>>) -> Result<()> {
394 let mut router_internal = self.router_internal.lock().await;
395 router_internal.add_nic(nic).await
396 }
397
398 pub async fn add_host(&mut self, host_name: String, ip_addr: String) -> Result<()> {
400 let mut resolver = self.resolver.lock().await;
401 resolver.add_host(host_name, ip_addr)
402 }
403
404 pub async fn add_chunk_filter(&self, filter: ChunkFilterFn) {
408 let mut router_internal = self.router_internal.lock().await;
409 router_internal.chunk_filters.push(filter);
410 }
411
412 pub(crate) async fn push(&self, mut c: Box<dyn Chunk + Send + Sync>) {
413 log::debug!("[{}] route {}", self.name, c);
414 if self.done.is_some() {
415 c.set_timestamp();
416
417 if self.queue.push(c).await {
418 if let Some(push_ch) = &self.push_ch {
419 let _ = push_ch.try_send(());
420 }
421 } else {
422 log::warn!("[{}] queue was full. dropped a chunk", self.name);
423 }
424 } else {
425 log::warn!("router is done");
426 }
427 }
428
429 async fn process_chunks(
430 name: &str,
431 ipv4net: IpNet,
432 max_jitter: Duration,
433 min_delay: Duration,
434 queue: &Arc<ChunkQueue>,
435 router_internal: &Arc<Mutex<RouterInternal>>,
436 ) -> Result<Duration> {
437 let mj = max_jitter.as_nanos() as u64;
439 if mj > 0 {
440 let jitter = Duration::from_nanos(rand::random::<u64>() % mj);
441 tokio::time::sleep(jitter).await;
442 }
443
444 let entered_at = SystemTime::now();
454 let cut_off = entered_at.sub(min_delay);
455
456 let mut d;
458
459 loop {
460 d = Duration::from_secs(0);
461
462 if let Some(c) = queue.peek().await {
463 if c.get_timestamp().duration_since(cut_off).is_ok() {
465 let next_expire = c.get_timestamp().add(min_delay);
468 if let Ok(diff) = next_expire.duration_since(entered_at) {
469 d = diff;
470 break;
471 }
472 }
473 } else {
474 break; }
476
477 if let Some(c) = queue.pop().await {
478 let ri = router_internal.lock().await;
479 let mut blocked = false;
480 for filter in &ri.chunk_filters {
481 if !filter(&*c) {
482 blocked = true;
483 break;
484 }
485 }
486 if blocked {
487 continue; }
489
490 let dst_ip = c.get_destination_ip();
491
492 if ipv4net.contains(&dst_ip) {
494 if let Some(nic) = ri.nics.get(&dst_ip.to_string()).and_then(|p| p.upgrade()) {
496 let ni = nic.lock().await;
499 ni.on_inbound_chunk(c).await;
500 } else {
501 log::debug!("[{}] {} unreachable", name, c);
503 }
504 } else {
505 if let Some(parent) = &ri.parent.clone().and_then(|p| p.upgrade()) {
508 if let Some(to_parent) = ri.nat.translate_outbound(&*c).await? {
510 let p = parent.lock().await;
512 p.push(to_parent).await;
513 }
514 } else {
515 log::debug!("[{}] no route found for {}", name, c);
517 }
518 }
519 } else {
520 break; }
522 }
523
524 Ok(d)
525 }
526}
527
528impl RouterInternal {
529 pub(crate) async fn add_nic(&mut self, nic: Arc<Mutex<dyn Nic + Send + Sync>>) -> Result<()> {
531 let mut ips = {
532 let ni = nic.lock().await;
533 ni.get_static_ips().await
534 };
535
536 if ips.is_empty() {
537 let ip = self.assign_ip_address()?;
539 log::debug!("assign_ip_address: {}", ip);
540 ips.push(ip);
541 }
542
543 let mut ipnets = vec![];
544 for ip in &ips {
545 if !self.ipv4net.contains(ip) {
546 return Err(Error::ErrStaticIpIsBeyondSubnet);
547 }
548 self.nics.insert(ip.to_string(), Arc::downgrade(&nic));
549 ipnets.push(IpNet::from_str(&format!(
550 "{}/{}",
551 ip,
552 self.ipv4net.prefix_len()
553 ))?);
554 }
555
556 {
557 let mut ni = nic.lock().await;
558 let _ = ni.add_addrs_to_interface("eth0", &ipnets).await;
559 }
560
561 Ok(())
562 }
563
564 fn assign_ip_address(&mut self) -> Result<IpAddr> {
566 if self.last_id == 0xfe {
569 return Err(Error::ErrAddressSpaceExhausted);
570 }
571
572 self.last_id += 1;
573 match self.ipv4net.addr() {
574 IpAddr::V4(ipv4) => {
575 let mut ip = ipv4.octets();
576 ip[3] = self.last_id;
577 Ok(IpAddr::V4(Ipv4Addr::from(ip)))
578 }
579 IpAddr::V6(ipv6) => {
580 let mut ip = ipv6.octets();
581 ip[15] += self.last_id;
582 Ok(IpAddr::V6(Ipv6Addr::from(ip)))
583 }
584 }
585 }
586}