pingora_load_balancing/selection/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Backend selection interfaces and algorithms
pub mod algorithms;
pub mod consistent;
pub mod weighted;
use super::Backend;
use std::collections::{BTreeSet, HashSet};
use std::sync::Arc;
use weighted::Weighted;
/// [BackendSelection] is the interface to implement backend selection mechanisms.
pub trait BackendSelection {
/// The [BackendIter] returned from iter() below.
type Iter;
/// The function to create a [BackendSelection] implementation.
fn build(backends: &BTreeSet<Backend>) -> Self;
/// Select backends for a given key.
///
/// An [BackendIter] should be returned. The first item in the iter is the first
/// choice backend. The user should continue to iterate over it if the first backend
/// cannot be used due to its health or other reasons.
fn iter(self: &Arc<Self>, key: &[u8]) -> Self::Iter
where
Self::Iter: BackendIter;
}
/// An iterator to find the suitable backend
///
/// Similar to [Iterator] but allow self referencing.
pub trait BackendIter {
/// Return `Some(&Backend)` when there are more backends left to choose from.
fn next(&mut self) -> Option<&Backend>;
}
/// [SelectionAlgorithm] is the interface to implement selection algorithms.
///
/// All [std::hash::Hasher] + [Default] can be used directly as a selection algorithm.
pub trait SelectionAlgorithm {
/// Create a new implementation
fn new() -> Self;
/// Return the next index of backend. The caller should perform modulo to get
/// the valid index of the backend.
fn next(&self, key: &[u8]) -> u64;
}
/// [FNV](https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function) hashing
/// on weighted backends
pub type FNVHash = Weighted<fnv::FnvHasher>;
/// Alias of [`FNVHash`] for backwards compatibility until the next breaking change
#[doc(hidden)]
pub type FVNHash = Weighted<fnv::FnvHasher>;
/// Random selection on weighted backends
pub type Random = Weighted<algorithms::Random>;
/// Round robin selection on weighted backends
pub type RoundRobin = Weighted<algorithms::RoundRobin>;
/// Consistent Ketama hashing on weighted backends
pub type Consistent = consistent::KetamaHashing;
// TODO: least conn
/// An iterator which wraps another iterator and yields unique items. It optionally takes a max
/// number of iterations if the wrapped iterator never returns.
pub struct UniqueIterator<I>
where
I: BackendIter,
{
iter: I,
seen: HashSet<u64>,
max_iterations: usize,
steps: usize,
}
impl<I> UniqueIterator<I>
where
I: BackendIter,
{
/// Wrap a new iterator and specify the maximum number of times we want to iterate.
pub fn new(iter: I, max_iterations: usize) -> Self {
Self {
iter,
max_iterations,
seen: HashSet::new(),
steps: 0,
}
}
pub fn get_next(&mut self) -> Option<Backend> {
while let Some(item) = self.iter.next() {
if self.steps >= self.max_iterations {
return None;
}
self.steps += 1;
let hash_key = item.hash_key();
if !self.seen.contains(&hash_key) {
self.seen.insert(hash_key);
return Some(item.clone());
}
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
struct TestIter {
seq: Vec<Backend>,
idx: usize,
}
impl TestIter {
fn new(input: &[&Backend]) -> Self {
Self {
seq: input.iter().cloned().cloned().collect(),
idx: 0,
}
}
}
impl BackendIter for TestIter {
fn next(&mut self) -> Option<&Backend> {
let idx = self.idx;
self.idx += 1;
self.seq.get(idx)
}
}
#[test]
fn unique_iter_max_iterations_is_correct() {
let b1 = Backend::new("1.1.1.1:80").unwrap();
let b2 = Backend::new("1.0.0.1:80").unwrap();
let b3 = Backend::new("1.0.0.255:80").unwrap();
let items = [&b1, &b2, &b3];
let mut all = UniqueIterator::new(TestIter::new(&items), 3);
assert_eq!(all.get_next(), Some(b1.clone()));
assert_eq!(all.get_next(), Some(b2.clone()));
assert_eq!(all.get_next(), Some(b3.clone()));
assert_eq!(all.get_next(), None);
let mut stop = UniqueIterator::new(TestIter::new(&items), 1);
assert_eq!(stop.get_next(), Some(b1));
assert_eq!(stop.get_next(), None);
}
#[test]
fn unique_iter_duplicate_items_are_filtered() {
let b1 = Backend::new("1.1.1.1:80").unwrap();
let b2 = Backend::new("1.0.0.1:80").unwrap();
let b3 = Backend::new("1.0.0.255:80").unwrap();
let items = [&b1, &b1, &b2, &b2, &b2, &b3];
let mut uniq = UniqueIterator::new(TestIter::new(&items), 10);
assert_eq!(uniq.get_next(), Some(b1));
assert_eq!(uniq.get_next(), Some(b2));
assert_eq!(uniq.get_next(), Some(b3));
}
}