pingora_load_balancing/selection/
consistent.rsuse super::*;
use pingora_core::protocols::l4::socket::SocketAddr;
use pingora_ketama::{Bucket, Continuum};
use std::collections::HashMap;
pub struct KetamaHashing {
ring: Continuum,
backends: HashMap<SocketAddr, Backend>,
}
impl BackendSelection for KetamaHashing {
type Iter = OwnedNodeIterator;
fn build(backends: &BTreeSet<Backend>) -> Self {
let buckets: Vec<_> = backends
.iter()
.filter_map(|b| {
if let SocketAddr::Inet(addr) = b.addr {
Some(Bucket::new(addr, b.weight as u32))
} else {
None
}
})
.collect();
let new_backends = backends
.iter()
.map(|b| (b.addr.clone(), b.clone()))
.collect();
KetamaHashing {
ring: Continuum::new(&buckets),
backends: new_backends,
}
}
fn iter(self: &Arc<Self>, key: &[u8]) -> Self::Iter {
OwnedNodeIterator {
idx: self.ring.node_idx(key),
ring: self.clone(),
}
}
}
pub struct OwnedNodeIterator {
idx: usize,
ring: Arc<KetamaHashing>,
}
impl BackendIter for OwnedNodeIterator {
fn next(&mut self) -> Option<&Backend> {
self.ring.ring.get_addr(&mut self.idx).and_then(|addr| {
let addr = SocketAddr::Inet(*addr);
self.ring.backends.get(&addr)
})
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_ketama() {
let b1 = Backend::new("1.1.1.1:80").unwrap();
let b2 = Backend::new("1.0.0.1:80").unwrap();
let b3 = Backend::new("1.0.0.255:80").unwrap();
let backends = BTreeSet::from_iter([b1.clone(), b2.clone(), b3.clone()]);
let hash = Arc::new(KetamaHashing::build(&backends));
let mut iter = hash.iter(b"test0");
assert_eq!(iter.next(), Some(&b2));
let mut iter = hash.iter(b"test1");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test2");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test3");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test4");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test5");
assert_eq!(iter.next(), Some(&b3));
let mut iter = hash.iter(b"test6");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test7");
assert_eq!(iter.next(), Some(&b3));
let mut iter = hash.iter(b"test8");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test9");
assert_eq!(iter.next(), Some(&b2));
let backends = BTreeSet::from_iter([b1.clone(), b2.clone()]);
let hash = Arc::new(KetamaHashing::build(&backends));
let mut iter = hash.iter(b"test0");
assert_eq!(iter.next(), Some(&b2));
let mut iter = hash.iter(b"test1");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test2");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test3");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test4");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test5");
assert_eq!(iter.next(), Some(&b2)); let mut iter = hash.iter(b"test6");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test7");
assert_eq!(iter.next(), Some(&b1)); let mut iter = hash.iter(b"test8");
assert_eq!(iter.next(), Some(&b1));
let mut iter = hash.iter(b"test9");
assert_eq!(iter.next(), Some(&b2));
}
}