use rama_core::error::{BoxError, ErrorContext, OpaqueError};
use rama_net::{asn::Asn, transport::TransportContext};
use rama_utils::str::NonEmptyString;
use serde::{Deserialize, Serialize};
use std::{fmt, future::Future};
#[cfg(feature = "live-update")]
mod update;
#[cfg(feature = "live-update")]
#[doc(inline)]
pub use update::{proxy_db_updater, LiveUpdateProxyDB, LiveUpdateProxyDBSetter};
mod internal;
#[doc(inline)]
pub use internal::Proxy;
#[cfg(feature = "csv")]
mod csv;
#[cfg(feature = "csv")]
#[doc(inline)]
pub use csv::{ProxyCsvRowReader, ProxyCsvRowReaderError, ProxyCsvRowReaderErrorKind};
pub(super) mod layer;
mod str;
#[doc(inline)]
pub use str::StringFilter;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ProxyID(NonEmptyString);
impl ProxyID {
pub fn as_str(&self) -> &str {
self.0.as_str()
}
}
impl AsRef<str> for ProxyID {
fn as_ref(&self) -> &str {
self.0.as_ref()
}
}
impl fmt::Display for ProxyID {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl From<NonEmptyString> for ProxyID {
fn from(value: NonEmptyString) -> Self {
Self(value)
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct ProxyFilter {
pub id: Option<NonEmptyString>,
#[serde(alias = "pool")]
pub pool_id: Option<Vec<StringFilter>>,
pub continent: Option<Vec<StringFilter>>,
pub country: Option<Vec<StringFilter>>,
pub state: Option<Vec<StringFilter>>,
pub city: Option<Vec<StringFilter>>,
pub datacenter: Option<bool>,
pub residential: Option<bool>,
pub mobile: Option<bool>,
pub carrier: Option<Vec<StringFilter>>,
pub asn: Option<Vec<Asn>>,
}
pub trait ProxyDB: Send + Sync + 'static {
type Error: Send + Sync + 'static;
fn get_proxy_if(
&self,
ctx: TransportContext,
filter: ProxyFilter,
predicate: impl ProxyQueryPredicate,
) -> impl Future<Output = Result<Proxy, Self::Error>> + Send + '_;
fn get_proxy(
&self,
ctx: TransportContext,
filter: ProxyFilter,
) -> impl Future<Output = Result<Proxy, Self::Error>> + Send + '_ {
self.get_proxy_if(ctx, filter, true)
}
}
impl ProxyDB for () {
type Error = OpaqueError;
#[inline]
async fn get_proxy_if(
&self,
_ctx: TransportContext,
_filter: ProxyFilter,
_predicate: impl ProxyQueryPredicate,
) -> Result<Proxy, Self::Error> {
Err(OpaqueError::from_display(
"()::get_proxy_if: no ProxyDB defined",
))
}
#[inline]
async fn get_proxy(
&self,
_ctx: TransportContext,
_filter: ProxyFilter,
) -> Result<Proxy, Self::Error> {
Err(OpaqueError::from_display(
"()::get_proxy: no ProxyDB defined",
))
}
}
impl<T> ProxyDB for Option<T>
where
T: ProxyDB<Error: Into<BoxError>>,
{
type Error = OpaqueError;
#[inline]
async fn get_proxy_if(
&self,
ctx: TransportContext,
filter: ProxyFilter,
predicate: impl ProxyQueryPredicate,
) -> Result<Proxy, Self::Error> {
match self {
Some(db) => db
.get_proxy_if(ctx, filter, predicate)
.await
.map_err(|err| OpaqueError::from_boxed(err.into()))
.context("Some::get_proxy_if"),
None => Err(OpaqueError::from_display(
"None::get_proxy_if: no ProxyDB defined",
)),
}
}
#[inline]
async fn get_proxy(
&self,
ctx: TransportContext,
filter: ProxyFilter,
) -> Result<Proxy, Self::Error> {
match self {
Some(db) => db
.get_proxy(ctx, filter)
.await
.map_err(|err| OpaqueError::from_boxed(err.into()))
.context("Some::get_proxy"),
None => Err(OpaqueError::from_display(
"None::get_proxy: no ProxyDB defined",
)),
}
}
}
impl<T> ProxyDB for std::sync::Arc<T>
where
T: ProxyDB,
{
type Error = T::Error;
#[inline]
fn get_proxy_if(
&self,
ctx: TransportContext,
filter: ProxyFilter,
predicate: impl ProxyQueryPredicate,
) -> impl Future<Output = Result<Proxy, Self::Error>> + Send + '_ {
(**self).get_proxy_if(ctx, filter, predicate)
}
#[inline]
fn get_proxy(
&self,
ctx: TransportContext,
filter: ProxyFilter,
) -> impl Future<Output = Result<Proxy, Self::Error>> + Send + '_ {
(**self).get_proxy(ctx, filter)
}
}
macro_rules! impl_proxydb_either {
($id:ident, $($param:ident),+ $(,)?) => {
impl<$($param),+> ProxyDB for rama_core::combinators::$id<$($param),+>
where
$(
$param: ProxyDB<Error: Into<BoxError>>,
)+
{
type Error = BoxError;
#[inline]
async fn get_proxy_if(
&self,
ctx: TransportContext,
filter: ProxyFilter,
predicate: impl ProxyQueryPredicate,
) -> Result<Proxy, Self::Error> {
match self {
$(
rama_core::combinators::$id::$param(s) => s.get_proxy_if(ctx, filter, predicate).await.map_err(Into::into),
)+
}
}
#[inline]
async fn get_proxy(
&self,
ctx: TransportContext,
filter: ProxyFilter,
) -> Result<Proxy, Self::Error> {
match self {
$(
rama_core::combinators::$id::$param(s) => s.get_proxy(ctx, filter).await.map_err(Into::into),
)+
}
}
}
};
}
rama_core::combinators::impl_either!(impl_proxydb_either);
pub trait ProxyQueryPredicate: Clone + Send + Sync + 'static {
fn execute(&self, proxy: &Proxy) -> bool;
}
impl ProxyQueryPredicate for bool {
fn execute(&self, _proxy: &Proxy) -> bool {
*self
}
}
impl<F> ProxyQueryPredicate for F
where
F: Fn(&Proxy) -> bool + Clone + Send + Sync + 'static,
{
fn execute(&self, proxy: &Proxy) -> bool {
(self)(proxy)
}
}
impl ProxyDB for Proxy {
type Error = rama_core::error::OpaqueError;
async fn get_proxy_if(
&self,
ctx: TransportContext,
filter: ProxyFilter,
predicate: impl ProxyQueryPredicate,
) -> Result<Proxy, Self::Error> {
(self.is_match(&ctx, &filter) && predicate.execute(self))
.then(|| self.clone())
.ok_or_else(|| rama_core::error::OpaqueError::from_display("hardcoded proxy no match"))
}
}
#[cfg(feature = "memory-db")]
mod memdb {
use super::*;
use crate::proxydb::internal::ProxyDBErrorKind;
use rama_net::transport::TransportProtocol;
#[derive(Debug)]
pub struct MemoryProxyDB {
data: internal::ProxyDB,
}
impl MemoryProxyDB {
pub fn try_from_rows(proxies: Vec<Proxy>) -> Result<Self, MemoryProxyDBInsertError> {
Ok(MemoryProxyDB {
data: internal::ProxyDB::from_rows(proxies).map_err(|err| match err.kind() {
ProxyDBErrorKind::DuplicateKey => {
MemoryProxyDBInsertError::duplicate_key(err.into_input())
}
ProxyDBErrorKind::InvalidRow => {
MemoryProxyDBInsertError::invalid_proxy(err.into_input())
}
})?,
})
}
pub fn try_from_iter<I>(proxies: I) -> Result<Self, MemoryProxyDBInsertError>
where
I: IntoIterator<Item = Proxy>,
{
Ok(MemoryProxyDB {
data: internal::ProxyDB::from_iter(proxies).map_err(|err| match err.kind() {
ProxyDBErrorKind::DuplicateKey => {
MemoryProxyDBInsertError::duplicate_key(err.into_input())
}
ProxyDBErrorKind::InvalidRow => {
MemoryProxyDBInsertError::invalid_proxy(err.into_input())
}
})?,
})
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
fn query_from_filter(
&self,
ctx: TransportContext,
filter: ProxyFilter,
) -> internal::ProxyDBQuery {
let mut query = self.data.query();
for pool_id in filter.pool_id.into_iter().flatten() {
query.pool_id(pool_id);
}
for continent in filter.continent.into_iter().flatten() {
query.continent(continent);
}
for country in filter.country.into_iter().flatten() {
query.country(country);
}
for state in filter.state.into_iter().flatten() {
query.state(state);
}
for city in filter.city.into_iter().flatten() {
query.city(city);
}
for carrier in filter.carrier.into_iter().flatten() {
query.carrier(carrier);
}
for asn in filter.asn.into_iter().flatten() {
query.asn(asn);
}
if let Some(value) = filter.datacenter {
query.datacenter(value);
}
if let Some(value) = filter.residential {
query.residential(value);
}
if let Some(value) = filter.mobile {
query.mobile(value);
}
match ctx.protocol {
TransportProtocol::Tcp => {
query.tcp(true);
}
TransportProtocol::Udp => {
query.udp(true).socks5(true);
}
}
query
}
}
impl ProxyDB for MemoryProxyDB {
type Error = MemoryProxyDBQueryError;
async fn get_proxy_if(
&self,
ctx: TransportContext,
filter: ProxyFilter,
predicate: impl ProxyQueryPredicate,
) -> Result<Proxy, Self::Error> {
match &filter.id {
Some(id) => match self.data.get_by_id(id) {
None => Err(MemoryProxyDBQueryError::not_found()),
Some(proxy) => {
if proxy.is_match(&ctx, &filter) && predicate.execute(proxy) {
Ok(proxy.clone())
} else {
Err(MemoryProxyDBQueryError::mismatch())
}
}
},
None => {
let query = self.query_from_filter(ctx, filter.clone());
match query
.execute()
.and_then(|result| result.filter(|proxy| predicate.execute(proxy)))
.map(|result| result.any())
{
None => Err(MemoryProxyDBQueryError::not_found()),
Some(proxy) => Ok(proxy.clone()),
}
}
}
}
}
#[derive(Debug)]
pub struct MemoryProxyDBInsertError {
kind: MemoryProxyDBInsertErrorKind,
proxies: Vec<Proxy>,
}
impl std::fmt::Display for MemoryProxyDBInsertError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.kind {
MemoryProxyDBInsertErrorKind::DuplicateKey => write!(
f,
"A proxy with the same key already exists in the database"
),
MemoryProxyDBInsertErrorKind::InvalidProxy => {
write!(f, "A proxy in the list is invalid for some reason")
}
}
}
}
impl std::error::Error for MemoryProxyDBInsertError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum MemoryProxyDBInsertErrorKind {
DuplicateKey,
InvalidProxy,
}
impl MemoryProxyDBInsertError {
fn duplicate_key(proxies: Vec<Proxy>) -> Self {
MemoryProxyDBInsertError {
kind: MemoryProxyDBInsertErrorKind::DuplicateKey,
proxies,
}
}
fn invalid_proxy(proxies: Vec<Proxy>) -> Self {
MemoryProxyDBInsertError {
kind: MemoryProxyDBInsertErrorKind::InvalidProxy,
proxies,
}
}
pub fn kind(&self) -> MemoryProxyDBInsertErrorKind {
self.kind
}
pub fn proxies(&self) -> &[Proxy] {
&self.proxies
}
pub fn into_proxies(self) -> Vec<Proxy> {
self.proxies
}
}
#[derive(Debug)]
pub struct MemoryProxyDBQueryError {
kind: MemoryProxyDBQueryErrorKind,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum MemoryProxyDBQueryErrorKind {
NotFound,
Mismatch,
}
impl std::fmt::Display for MemoryProxyDBQueryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.kind {
MemoryProxyDBQueryErrorKind::NotFound => write!(f, "No proxy match could be found"),
MemoryProxyDBQueryErrorKind::Mismatch => write!(
f,
"Proxy config did not match the given filters/requirements"
),
}
}
}
impl std::error::Error for MemoryProxyDBQueryError {}
impl MemoryProxyDBQueryError {
pub fn not_found() -> Self {
MemoryProxyDBQueryError {
kind: MemoryProxyDBQueryErrorKind::NotFound,
}
}
pub fn mismatch() -> Self {
MemoryProxyDBQueryError {
kind: MemoryProxyDBQueryErrorKind::Mismatch,
}
}
pub fn kind(&self) -> MemoryProxyDBQueryErrorKind {
self.kind
}
}
#[cfg(test)]
mod tests {
use super::*;
use itertools::Itertools;
use rama_net::{address::ProxyAddress, Protocol};
use rama_utils::str::NonEmptyString;
use std::str::FromStr;
const RAW_CSV_DATA: &str = include_str!("./test_proxydb_rows.csv");
async fn memproxydb() -> MemoryProxyDB {
let mut reader = ProxyCsvRowReader::raw(RAW_CSV_DATA);
let mut rows = Vec::new();
while let Some(proxy) = reader.next().await.unwrap() {
rows.push(proxy);
}
MemoryProxyDB::try_from_rows(rows).unwrap()
}
#[tokio::test]
async fn test_load_memproxydb_from_rows() {
let db = memproxydb().await;
assert_eq!(db.len(), 64);
}
fn h2_transport_context() -> TransportContext {
TransportContext {
protocol: TransportProtocol::Tcp,
app_protocol: Some(Protocol::HTTPS),
http_version: None,
authority: "localhost:8443".try_into().unwrap(),
}
}
#[tokio::test]
async fn test_memproxydb_get_proxy_by_id_found() {
let db = memproxydb().await;
let ctx = h2_transport_context();
let filter = ProxyFilter {
id: Some(NonEmptyString::from_static("3031533634")),
..Default::default()
};
let proxy = db.get_proxy(ctx, filter).await.unwrap();
assert_eq!(proxy.id, "3031533634");
}
#[tokio::test]
async fn test_memproxydb_get_proxy_by_id_found_correct_filters() {
let db = memproxydb().await;
let ctx = h2_transport_context();
let filter = ProxyFilter {
id: Some(NonEmptyString::from_static("3031533634")),
pool_id: Some(vec![StringFilter::new("poolF")]),
country: Some(vec![StringFilter::new("JP")]),
city: Some(vec![StringFilter::new("Yokohama")]),
datacenter: Some(true),
residential: Some(false),
mobile: Some(true),
carrier: Some(vec![StringFilter::new("Verizon")]),
..Default::default()
};
let proxy = db.get_proxy(ctx, filter).await.unwrap();
assert_eq!(proxy.id, "3031533634");
}
#[tokio::test]
async fn test_memproxydb_get_proxy_by_id_not_found() {
let db = memproxydb().await;
let ctx = h2_transport_context();
let filter = ProxyFilter {
id: Some(NonEmptyString::from_static("notfound")),
..Default::default()
};
let err = db.get_proxy(ctx, filter).await.unwrap_err();
assert_eq!(err.kind(), MemoryProxyDBQueryErrorKind::NotFound);
}
#[tokio::test]
async fn test_memproxydb_get_proxy_by_id_mismatch_filter() {
let db = memproxydb().await;
let ctx = h2_transport_context();
let filters = [
ProxyFilter {
id: Some(NonEmptyString::from_static("3031533634")),
pool_id: Some(vec![StringFilter::new("poolB")]),
..Default::default()
},
ProxyFilter {
id: Some(NonEmptyString::from_static("3031533634")),
country: Some(vec![StringFilter::new("US")]),
..Default::default()
},
ProxyFilter {
id: Some(NonEmptyString::from_static("3031533634")),
city: Some(vec![StringFilter::new("New York")]),
..Default::default()
},
ProxyFilter {
id: Some(NonEmptyString::from_static("3031533634")),
continent: Some(vec![StringFilter::new("americas")]),
..Default::default()
},
ProxyFilter {
id: Some(NonEmptyString::from_static("3732488183")),
state: Some(vec![StringFilter::new("Texas")]),
..Default::default()
},
ProxyFilter {
id: Some(NonEmptyString::from_static("3031533634")),
datacenter: Some(false),
..Default::default()
},
ProxyFilter {
id: Some(NonEmptyString::from_static("3031533634")),
residential: Some(true),
..Default::default()
},
ProxyFilter {
id: Some(NonEmptyString::from_static("3031533634")),
mobile: Some(false),
..Default::default()
},
ProxyFilter {
id: Some(NonEmptyString::from_static("3031533634")),
carrier: Some(vec![StringFilter::new("AT&T")]),
..Default::default()
},
ProxyFilter {
id: Some(NonEmptyString::from_static("292096733")),
asn: Some(vec![Asn::from_static(1)]),
..Default::default()
},
];
for filter in filters.iter() {
let err = db.get_proxy(ctx.clone(), filter.clone()).await.unwrap_err();
assert_eq!(err.kind(), MemoryProxyDBQueryErrorKind::Mismatch);
}
}
fn h3_transport_context() -> TransportContext {
TransportContext {
protocol: TransportProtocol::Udp,
app_protocol: Some(Protocol::HTTPS),
http_version: None,
authority: "localhost:8443".try_into().unwrap(),
}
}
#[tokio::test]
async fn test_memproxydb_get_proxy_by_id_mismatch_req_context() {
let db = memproxydb().await;
let ctx = h3_transport_context();
let filter = ProxyFilter {
id: Some(NonEmptyString::from_static("3031533634")),
..Default::default()
};
let err = db.get_proxy(ctx, filter).await.unwrap_err();
assert_eq!(err.kind(), MemoryProxyDBQueryErrorKind::Mismatch);
}
#[tokio::test]
async fn test_memorydb_get_h3_capable_proxies() {
let db = memproxydb().await;
let ctx = h3_transport_context();
let filter = ProxyFilter::default();
let mut found_ids = Vec::new();
for _ in 0..5000 {
let proxy = db.get_proxy(ctx.clone(), filter.clone()).await.unwrap();
if found_ids.contains(&proxy.id) {
continue;
}
assert!(proxy.udp);
assert!(proxy.socks5);
found_ids.push(proxy.id);
}
assert_eq!(found_ids.len(), 40);
assert_eq!(
found_ids.iter().sorted().join(","),
r##"1125300915,1259341971,1316455915,153202126,1571861931,1684342915,1742367441,1844412609,1916851007,20647117,2107229589,2261612122,2497865606,2521901221,2560727338,2593294918,2596743625,2745456299,2880295577,2909724448,2950022859,2951529660,3187902553,3269411602,3269465574,3269921904,3481200027,3498810974,362091157,3679054656,3732488183,3836943127,39048766,3951672504,3976711563,4187178960,56402588,724884866,738626121,906390012"##
);
}
#[tokio::test]
async fn test_memorydb_get_h2_capable_proxies() {
let db = memproxydb().await;
let ctx = h2_transport_context();
let filter = ProxyFilter::default();
let mut found_ids = Vec::new();
for _ in 0..5000 {
let proxy = db.get_proxy(ctx.clone(), filter.clone()).await.unwrap();
if found_ids.contains(&proxy.id) {
continue;
}
assert!(proxy.tcp);
found_ids.push(proxy.id);
}
assert_eq!(found_ids.len(), 50);
assert_eq!(
found_ids.iter().sorted().join(","),
r#"1125300915,1259341971,1264821985,129108927,1316455915,1425588737,1571861931,1810781137,1836040682,1844412609,1885107293,2021561518,2079461709,2107229589,2141152822,2438596154,2497865606,2521901221,2551759475,2560727338,2593294918,2798907087,2854473221,2880295577,2909724448,2912880381,292096733,2951529660,3031533634,3187902553,3269411602,3269465574,339020035,3481200027,3498810974,3503691556,362091157,3679054656,371209663,3861736957,39048766,3976711563,4062553709,49590203,56402588,724884866,738626121,767809962,846528631,906390012"#,
);
}
#[tokio::test]
async fn test_memorydb_get_any_country_proxies() {
let db = memproxydb().await;
let ctx = h2_transport_context();
let filter = ProxyFilter {
country: Some(vec!["BE".into()]),
..Default::default()
};
let mut found_ids = Vec::new();
for _ in 0..5000 {
let proxy = db.get_proxy(ctx.clone(), filter.clone()).await.unwrap();
if found_ids.contains(&proxy.id) {
continue;
}
found_ids.push(proxy.id);
}
assert_eq!(found_ids.len(), 5);
assert_eq!(
found_ids.iter().sorted().join(","),
r#"2141152822,2593294918,2912880381,371209663,767809962"#,
);
}
#[tokio::test]
async fn test_memorydb_get_illinois_proxies() {
let db = memproxydb().await;
let ctx = h2_transport_context();
let filter = ProxyFilter {
state: Some(vec!["illinois".into()]),
..Default::default()
};
let mut found_ids = Vec::new();
for _ in 0..5000 {
let proxy = db.get_proxy(ctx.clone(), filter.clone()).await.unwrap();
if found_ids.contains(&proxy.id) {
continue;
}
found_ids.push(proxy.id);
}
assert_eq!(found_ids.len(), 9);
assert_eq!(
found_ids.iter().sorted().join(","),
r#"2141152822,2521901221,2560727338,2593294918,2912880381,292096733,371209663,39048766,767809962"#,
);
}
#[tokio::test]
async fn test_memorydb_get_asn_proxies() {
let db = memproxydb().await;
let ctx = h2_transport_context();
let filter = ProxyFilter {
asn: Some(vec![Asn::from_static(42)]),
..Default::default()
};
let mut found_ids = Vec::new();
for _ in 0..5000 {
let proxy = db.get_proxy(ctx.clone(), filter.clone()).await.unwrap();
if found_ids.contains(&proxy.id) {
continue;
}
found_ids.push(proxy.id);
}
assert_eq!(found_ids.len(), 4);
assert_eq!(
found_ids.iter().sorted().join(","),
r#"2141152822,2912880381,292096733,3481200027"#,
);
}
#[tokio::test]
async fn test_memorydb_get_h3_capable_mobile_residential_be_asterix_proxies() {
let db = memproxydb().await;
let ctx = h3_transport_context();
let filter = ProxyFilter {
country: Some(vec!["BE".into()]),
mobile: Some(true),
residential: Some(true),
..Default::default()
};
for _ in 0..50 {
let proxy = db.get_proxy(ctx.clone(), filter.clone()).await.unwrap();
assert_eq!(proxy.id, "2593294918");
}
}
#[tokio::test]
async fn test_memorydb_get_blocked_proxies() {
let db = memproxydb().await;
let ctx = h2_transport_context();
let filter = ProxyFilter::default();
let mut blocked_proxies = vec![
"1125300915",
"1259341971",
"1264821985",
"129108927",
"1316455915",
"1425588737",
"1571861931",
"1810781137",
"1836040682",
"1844412609",
"1885107293",
"2021561518",
"2079461709",
"2107229589",
"2141152822",
"2438596154",
"2497865606",
"2521901221",
"2551759475",
"2560727338",
"2593294918",
"2798907087",
"2854473221",
"2880295577",
"2909724448",
"2912880381",
"292096733",
"2951529660",
"3031533634",
"3187902553",
"3269411602",
"3269465574",
"339020035",
"3481200027",
"3498810974",
"3503691556",
"362091157",
"3679054656",
"371209663",
"3861736957",
"39048766",
"3976711563",
"4062553709",
"49590203",
"56402588",
"724884866",
"738626121",
"767809962",
"846528631",
"906390012",
];
{
let blocked_proxies = blocked_proxies.clone();
assert_eq!(
MemoryProxyDBQueryErrorKind::NotFound,
db.get_proxy_if(ctx.clone(), filter.clone(), move |proxy: &Proxy| {
!blocked_proxies.contains(&proxy.id.as_str())
})
.await
.unwrap_err()
.kind()
);
}
let last_proxy_id = blocked_proxies.pop().unwrap();
let proxy = db
.get_proxy_if(ctx, filter.clone(), move |proxy: &Proxy| {
!blocked_proxies.contains(&proxy.id.as_str())
})
.await
.unwrap();
assert_eq!(proxy.id, last_proxy_id);
}
#[tokio::test]
async fn test_db_proxy_filter_any_use_filter_property() {
let db = MemoryProxyDB::try_from_iter([Proxy {
id: NonEmptyString::from_static("1"),
address: ProxyAddress::from_str("example.com").unwrap(),
tcp: true,
udp: true,
http: true,
https: true,
socks5: true,
socks5h: true,
datacenter: true,
residential: true,
mobile: true,
pool_id: Some("*".into()),
continent: Some("*".into()),
country: Some("*".into()),
state: Some("*".into()),
city: Some("*".into()),
carrier: Some("*".into()),
asn: Some(Asn::unspecified()),
}])
.unwrap();
let ctx = h2_transport_context();
for filter in [
ProxyFilter {
id: Some(NonEmptyString::from_static("1")),
..Default::default()
},
ProxyFilter {
pool_id: Some(vec![StringFilter::new("*")]),
..Default::default()
},
ProxyFilter {
pool_id: Some(vec![StringFilter::new("hq")]),
..Default::default()
},
ProxyFilter {
country: Some(vec![StringFilter::new("*")]),
..Default::default()
},
ProxyFilter {
country: Some(vec![StringFilter::new("US")]),
..Default::default()
},
ProxyFilter {
city: Some(vec![StringFilter::new("*")]),
..Default::default()
},
ProxyFilter {
city: Some(vec![StringFilter::new("NY")]),
..Default::default()
},
ProxyFilter {
carrier: Some(vec![StringFilter::new("*")]),
..Default::default()
},
ProxyFilter {
carrier: Some(vec![StringFilter::new("Telenet")]),
..Default::default()
},
ProxyFilter {
pool_id: Some(vec![StringFilter::new("hq")]),
country: Some(vec![StringFilter::new("US")]),
city: Some(vec![StringFilter::new("NY")]),
carrier: Some(vec![StringFilter::new("AT&T")]),
..Default::default()
},
] {
let proxy = db.get_proxy(ctx.clone(), filter.clone()).await.unwrap();
assert!(filter.id.map(|id| proxy.id == id).unwrap_or(true));
assert!(filter
.pool_id
.map(|pool_id| pool_id.contains(proxy.pool_id.as_ref().unwrap()))
.unwrap_or(true));
assert!(filter
.country
.map(|country| country.contains(proxy.country.as_ref().unwrap()))
.unwrap_or(true));
assert!(filter
.city
.map(|city| city.contains(proxy.city.as_ref().unwrap()))
.unwrap_or(true));
assert!(filter
.carrier
.map(|carrier| carrier.contains(proxy.carrier.as_ref().unwrap()))
.unwrap_or(true));
}
}
#[tokio::test]
async fn test_db_proxy_filter_any_only_matches_any_value() {
let db = MemoryProxyDB::try_from_iter([Proxy {
id: NonEmptyString::from_static("1"),
address: ProxyAddress::from_str("example.com").unwrap(),
tcp: true,
udp: true,
http: true,
https: true,
socks5: true,
socks5h: true,
datacenter: true,
residential: true,
mobile: true,
pool_id: Some("hq".into()),
continent: Some("americas".into()),
country: Some("US".into()),
state: Some("NY".into()),
city: Some("NY".into()),
carrier: Some("AT&T".into()),
asn: Some(Asn::from_static(7018)),
}])
.unwrap();
let ctx = h2_transport_context();
for filter in [
ProxyFilter {
pool_id: Some(vec![StringFilter::new("*")]),
..Default::default()
},
ProxyFilter {
continent: Some(vec![StringFilter::new("*")]),
..Default::default()
},
ProxyFilter {
country: Some(vec![StringFilter::new("*")]),
..Default::default()
},
ProxyFilter {
state: Some(vec![StringFilter::new("*")]),
..Default::default()
},
ProxyFilter {
city: Some(vec![StringFilter::new("*")]),
..Default::default()
},
ProxyFilter {
carrier: Some(vec![StringFilter::new("*")]),
..Default::default()
},
ProxyFilter {
asn: Some(vec![Asn::unspecified()]),
..Default::default()
},
ProxyFilter {
pool_id: Some(vec![StringFilter::new("*")]),
continent: Some(vec![StringFilter::new("*")]),
country: Some(vec![StringFilter::new("*")]),
state: Some(vec![StringFilter::new("*")]),
city: Some(vec![StringFilter::new("*")]),
carrier: Some(vec![StringFilter::new("*")]),
asn: Some(vec![Asn::unspecified()]),
..Default::default()
},
] {
let err = match db.get_proxy(ctx.clone(), filter.clone()).await {
Ok(proxy) => {
panic!(
"expected error for filter {:?}, not found proxy: {:?}",
filter, proxy
);
}
Err(err) => err,
};
assert_eq!(
MemoryProxyDBQueryErrorKind::NotFound,
err.kind(),
"filter: {:?}",
filter
);
}
}
#[tokio::test]
async fn test_search_proxy_for_any_of_given_pools() {
let db = MemoryProxyDB::try_from_iter([
Proxy {
id: NonEmptyString::from_static("1"),
address: ProxyAddress::from_str("example.com").unwrap(),
tcp: true,
udp: true,
http: true,
https: true,
socks5: true,
socks5h: true,
datacenter: true,
residential: true,
mobile: true,
pool_id: Some("a".into()),
continent: Some("americas".into()),
country: Some("US".into()),
state: Some("NY".into()),
city: Some("NY".into()),
carrier: Some("AT&T".into()),
asn: Some(Asn::from_static(7018)),
},
Proxy {
id: NonEmptyString::from_static("2"),
address: ProxyAddress::from_str("example.com").unwrap(),
tcp: true,
udp: true,
http: true,
https: true,
socks5: true,
socks5h: true,
datacenter: true,
residential: true,
mobile: true,
pool_id: Some("b".into()),
continent: Some("americas".into()),
country: Some("US".into()),
state: Some("NY".into()),
city: Some("NY".into()),
carrier: Some("AT&T".into()),
asn: Some(Asn::from_static(7018)),
},
Proxy {
id: NonEmptyString::from_static("3"),
address: ProxyAddress::from_str("example.com").unwrap(),
tcp: true,
udp: true,
http: true,
https: true,
socks5: true,
socks5h: true,
datacenter: true,
residential: true,
mobile: true,
pool_id: Some("b".into()),
continent: Some("americas".into()),
country: Some("US".into()),
state: Some("NY".into()),
city: Some("NY".into()),
carrier: Some("AT&T".into()),
asn: Some(Asn::from_static(7018)),
},
Proxy {
id: NonEmptyString::from_static("4"),
address: ProxyAddress::from_str("example.com").unwrap(),
tcp: true,
udp: true,
http: true,
https: true,
socks5: true,
socks5h: true,
datacenter: true,
residential: true,
mobile: true,
pool_id: Some("c".into()),
continent: Some("americas".into()),
country: Some("US".into()),
state: Some("NY".into()),
city: Some("NY".into()),
carrier: Some("AT&T".into()),
asn: Some(Asn::from_static(7018)),
},
])
.unwrap();
let ctx = h2_transport_context();
let filter = ProxyFilter {
pool_id: Some(vec![StringFilter::new("a"), StringFilter::new("c")]),
..Default::default()
};
let mut seen_1 = false;
let mut seen_4 = false;
for _ in 0..100 {
let proxy = db.get_proxy(ctx.clone(), filter.clone()).await.unwrap();
match proxy.id.as_str() {
"1" => seen_1 = true,
"4" => seen_4 = true,
_ => panic!("unexpected pool id"),
}
}
assert!(seen_1);
assert!(seen_4);
}
#[tokio::test]
async fn test_deserialize_url_proxy_filter() {
for (input, expected_output) in [
(
"id=1",
ProxyFilter {
id: Some(NonEmptyString::from_static("1")),
..Default::default()
},
),
(
"pool=hq&country=us",
ProxyFilter {
pool_id: Some(vec![StringFilter::new("hq")]),
country: Some(vec![StringFilter::new("us")]),
..Default::default()
},
),
(
"pool=hq&country=us&country=be",
ProxyFilter {
pool_id: Some(vec![StringFilter::new("hq")]),
country: Some(vec![StringFilter::new("us"), StringFilter::new("be")]),
..Default::default()
},
),
(
"pool=a&country=uk&pool=b",
ProxyFilter {
pool_id: Some(vec![StringFilter::new("a"), StringFilter::new("b")]),
country: Some(vec![StringFilter::new("uk")]),
..Default::default()
},
),
(
"continent=europe&continent=asia",
ProxyFilter {
continent: Some(vec![
StringFilter::new("europe"),
StringFilter::new("asia"),
]),
..Default::default()
},
),
(
"continent=americas&country=us&state=NY&city=buffalo&carrier=AT%26T&asn=7018",
ProxyFilter {
continent: Some(vec![StringFilter::new("americas")]),
country: Some(vec![StringFilter::new("us")]),
state: Some(vec![StringFilter::new("ny")]),
city: Some(vec![StringFilter::new("buffalo")]),
carrier: Some(vec![StringFilter::new("at&t")]),
asn: Some(vec![Asn::from_static(7018)]),
..Default::default()
},
),
(
"asn=1&asn=2",
ProxyFilter {
asn: Some(vec![Asn::from_static(1), Asn::from_static(2)]),
..Default::default()
},
),
] {
let filter: ProxyFilter = serde_html_form::from_str(input).unwrap();
assert_eq!(filter, expected_output);
}
}
}
}
#[cfg(feature = "memory-db")]
pub use memdb::{
MemoryProxyDB, MemoryProxyDBInsertError, MemoryProxyDBInsertErrorKind, MemoryProxyDBQueryError,
MemoryProxyDBQueryErrorKind,
};