pub use http::Extensions;
use pingora_error::{Error, ErrorType::*, OrErr, Result};
use pingora_http::{HMap, ResponseHeader};
use serde::{Deserialize, Serialize};
use std::time::{Duration, SystemTime};
use crate::key::HashBinary;
pub(crate) type InternalMeta = internal_meta::InternalMetaLatest;
mod internal_meta {
use super::*;
pub(crate) type InternalMetaLatest = InternalMetaV2;
#[derive(Debug, Deserialize, Serialize, Clone)]
pub(crate) struct InternalMetaV0 {
pub(crate) fresh_until: SystemTime,
pub(crate) created: SystemTime,
pub(crate) stale_while_revalidate_sec: u32,
pub(crate) stale_if_error_sec: u32,
}
impl InternalMetaV0 {
#[allow(dead_code)]
fn serialize(&self) -> Result<Vec<u8>> {
rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
}
fn deserialize(buf: &[u8]) -> Result<Self> {
rmp_serde::decode::from_slice(buf)
.or_err(InternalError, "failed to decode cache meta v0")
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub(crate) struct InternalMetaV1 {
pub(crate) version: u8,
pub(crate) fresh_until: SystemTime,
pub(crate) created: SystemTime,
pub(crate) stale_while_revalidate_sec: u32,
pub(crate) stale_if_error_sec: u32,
}
impl InternalMetaV1 {
#[allow(dead_code)]
pub const VERSION: u8 = 1;
#[allow(dead_code)]
pub fn serialize(&self) -> Result<Vec<u8>> {
assert_eq!(self.version, 1);
rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
}
fn deserialize(buf: &[u8]) -> Result<Self> {
rmp_serde::decode::from_slice(buf)
.or_err(InternalError, "failed to decode cache meta v1")
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub(crate) struct InternalMetaV2 {
pub(crate) version: u8,
pub(crate) fresh_until: SystemTime,
pub(crate) created: SystemTime,
pub(crate) updated: SystemTime,
pub(crate) stale_while_revalidate_sec: u32,
pub(crate) stale_if_error_sec: u32,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) variance: Option<HashBinary>,
}
impl Default for InternalMetaV2 {
fn default() -> Self {
let epoch = SystemTime::UNIX_EPOCH;
InternalMetaV2 {
version: InternalMetaV2::VERSION,
fresh_until: epoch,
created: epoch,
updated: epoch,
stale_while_revalidate_sec: 0,
stale_if_error_sec: 0,
variance: None,
}
}
}
impl InternalMetaV2 {
pub const VERSION: u8 = 2;
pub fn serialize(&self) -> Result<Vec<u8>> {
assert_eq!(self.version, Self::VERSION);
rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
}
fn deserialize(buf: &[u8]) -> Result<Self> {
rmp_serde::decode::from_slice(buf)
.or_err(InternalError, "failed to decode cache meta v2")
}
}
impl From<InternalMetaV0> for InternalMetaV2 {
fn from(v0: InternalMetaV0) -> Self {
InternalMetaV2 {
version: InternalMetaV2::VERSION,
fresh_until: v0.fresh_until,
created: v0.created,
updated: v0.created,
stale_while_revalidate_sec: v0.stale_while_revalidate_sec,
stale_if_error_sec: v0.stale_if_error_sec,
..Default::default()
}
}
}
impl From<InternalMetaV1> for InternalMetaV2 {
fn from(v1: InternalMetaV1) -> Self {
InternalMetaV2 {
version: InternalMetaV2::VERSION,
fresh_until: v1.fresh_until,
created: v1.created,
updated: v1.created,
stale_while_revalidate_sec: v1.stale_while_revalidate_sec,
stale_if_error_sec: v1.stale_if_error_sec,
..Default::default()
}
}
}
pub(crate) fn deserialize(buf: &[u8]) -> Result<InternalMetaLatest> {
const MIN_SIZE: usize = 10; if buf.len() < MIN_SIZE {
return Error::e_explain(
InternalError,
format!("Buf too short ({}) to be InternalMeta", buf.len()),
);
}
let preread_buf = &mut &buf[..MIN_SIZE];
match rmp::decode::read_array_len(preread_buf)
.or_err(InternalError, "failed to decode cache meta array size")?
{
4 => Ok(InternalMetaV0::deserialize(buf)?.into()),
_ => {
let version = rmp::decode::read_pfix(preread_buf)
.or_err(InternalError, "failed to decode meta version")?;
match version {
1 => Ok(InternalMetaV1::deserialize(buf)?.into()),
2 => InternalMetaV2::deserialize(buf),
_ => Error::e_explain(
InternalError,
format!("Unknown InternalMeta version {version}"),
),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_internal_meta_serde_v0() {
let meta = InternalMetaV0 {
fresh_until: SystemTime::now(),
created: SystemTime::now(),
stale_while_revalidate_sec: 0,
stale_if_error_sec: 0,
};
let binary = meta.serialize().unwrap();
let meta2 = InternalMetaV0::deserialize(&binary).unwrap();
assert_eq!(meta.fresh_until, meta2.fresh_until);
}
#[test]
fn test_internal_meta_serde_v1() {
let meta = InternalMetaV1 {
version: InternalMetaV1::VERSION,
fresh_until: SystemTime::now(),
created: SystemTime::now(),
stale_while_revalidate_sec: 0,
stale_if_error_sec: 0,
};
let binary = meta.serialize().unwrap();
let meta2 = InternalMetaV1::deserialize(&binary).unwrap();
assert_eq!(meta.fresh_until, meta2.fresh_until);
}
#[test]
fn test_internal_meta_serde_v2() {
let meta = InternalMetaV2::default();
let binary = meta.serialize().unwrap();
let meta2 = InternalMetaV2::deserialize(&binary).unwrap();
assert_eq!(meta2.version, 2);
assert_eq!(meta.fresh_until, meta2.fresh_until);
assert_eq!(meta.created, meta2.created);
assert_eq!(meta.updated, meta2.updated);
}
#[test]
fn test_internal_meta_serde_across_versions() {
let meta = InternalMetaV0 {
fresh_until: SystemTime::now(),
created: SystemTime::now(),
stale_while_revalidate_sec: 0,
stale_if_error_sec: 0,
};
let binary = meta.serialize().unwrap();
let meta2 = deserialize(&binary).unwrap();
assert_eq!(meta2.version, 2);
assert_eq!(meta.fresh_until, meta2.fresh_until);
let meta = InternalMetaV1 {
version: 1,
fresh_until: SystemTime::now(),
created: SystemTime::now(),
stale_while_revalidate_sec: 0,
stale_if_error_sec: 0,
};
let binary = meta.serialize().unwrap();
let meta2 = deserialize(&binary).unwrap();
assert_eq!(meta2.version, 2);
assert_eq!(meta.fresh_until, meta2.fresh_until);
assert_eq!(meta2.created, meta2.updated);
}
#[test]
fn test_internal_meta_serde_v2_extend_fields() {
#[derive(Deserialize, Serialize)]
pub(crate) struct InternalMetaV2Base {
pub(crate) version: u8,
pub(crate) fresh_until: SystemTime,
pub(crate) created: SystemTime,
pub(crate) updated: SystemTime,
pub(crate) stale_while_revalidate_sec: u32,
pub(crate) stale_if_error_sec: u32,
}
impl InternalMetaV2Base {
pub const VERSION: u8 = 2;
pub fn serialize(&self) -> Result<Vec<u8>> {
assert!(self.version >= Self::VERSION);
rmp_serde::encode::to_vec(self)
.or_err(InternalError, "failed to encode cache meta")
}
fn deserialize(buf: &[u8]) -> Result<Self> {
rmp_serde::decode::from_slice(buf)
.or_err(InternalError, "failed to decode cache meta v2")
}
}
let meta = InternalMetaV2::default();
let binary = meta.serialize().unwrap();
let meta2 = InternalMetaV2Base::deserialize(&binary).unwrap();
assert_eq!(meta2.version, 2);
assert_eq!(meta.fresh_until, meta2.fresh_until);
assert_eq!(meta.created, meta2.created);
assert_eq!(meta.updated, meta2.updated);
let now = SystemTime::now();
let meta = InternalMetaV2Base {
version: InternalMetaV2::VERSION,
fresh_until: now,
created: now,
updated: now,
stale_while_revalidate_sec: 0,
stale_if_error_sec: 0,
};
let binary = meta.serialize().unwrap();
let meta2 = InternalMetaV2::deserialize(&binary).unwrap();
assert_eq!(meta2.version, 2);
assert_eq!(meta.fresh_until, meta2.fresh_until);
assert_eq!(meta.created, meta2.created);
assert_eq!(meta.updated, meta2.updated);
}
}
}
#[derive(Debug)]
pub(crate) struct CacheMetaInner {
pub(crate) internal: InternalMeta,
pub(crate) header: ResponseHeader,
pub extensions: Extensions,
}
#[derive(Debug)]
pub struct CacheMeta(pub(crate) Box<CacheMetaInner>);
impl CacheMeta {
pub fn new(
fresh_until: SystemTime,
created: SystemTime,
stale_while_revalidate_sec: u32,
stale_if_error_sec: u32,
header: ResponseHeader,
) -> CacheMeta {
CacheMeta(Box::new(CacheMetaInner {
internal: InternalMeta {
version: InternalMeta::VERSION,
fresh_until,
created,
updated: created, stale_while_revalidate_sec,
stale_if_error_sec,
..Default::default()
},
header,
extensions: Extensions::new(),
}))
}
pub fn created(&self) -> SystemTime {
self.0.internal.created
}
pub fn updated(&self) -> SystemTime {
self.0.internal.updated
}
pub fn is_fresh(&self, time: SystemTime) -> bool {
self.0.internal.fresh_until >= time
}
pub fn fresh_sec(&self) -> u64 {
self.0
.internal
.fresh_until
.duration_since(self.0.internal.updated)
.map_or(0, |duration| duration.as_secs())
}
pub fn fresh_until(&self) -> SystemTime {
self.0.internal.fresh_until
}
pub fn age(&self) -> Duration {
SystemTime::now()
.duration_since(self.updated())
.unwrap_or_default()
}
pub fn stale_while_revalidate_sec(&self) -> u32 {
self.0.internal.stale_while_revalidate_sec
}
pub fn stale_if_error_sec(&self) -> u32 {
self.0.internal.stale_if_error_sec
}
pub fn serve_stale_while_revalidate(&self, time: SystemTime) -> bool {
self.can_serve_stale(self.0.internal.stale_while_revalidate_sec, time)
}
pub fn serve_stale_if_error(&self, time: SystemTime) -> bool {
self.can_serve_stale(self.0.internal.stale_if_error_sec, time)
}
pub fn disable_serve_stale(&mut self) {
self.0.internal.stale_if_error_sec = 0;
self.0.internal.stale_while_revalidate_sec = 0;
}
pub fn variance(&self) -> Option<HashBinary> {
self.0.internal.variance
}
pub fn set_variance_key(&mut self, variance_key: HashBinary) {
self.0.internal.variance = Some(variance_key);
}
pub fn set_variance(&mut self, variance: HashBinary) {
self.0.internal.variance = Some(variance)
}
pub fn remove_variance(&mut self) {
self.0.internal.variance = None
}
pub fn response_header(&self) -> &ResponseHeader {
&self.0.header
}
pub fn response_header_mut(&mut self) -> &mut ResponseHeader {
&mut self.0.header
}
pub fn extensions(&self) -> &Extensions {
&self.0.extensions
}
pub fn extensions_mut(&mut self) -> &mut Extensions {
&mut self.0.extensions
}
pub fn response_header_copy(&self) -> ResponseHeader {
self.0.header.clone()
}
pub fn headers(&self) -> &HMap {
&self.0.header.headers
}
fn can_serve_stale(&self, serve_stale_sec: u32, time: SystemTime) -> bool {
if serve_stale_sec == 0 {
return false;
}
if let Some(stale_until) = self
.0
.internal
.fresh_until
.checked_add(Duration::from_secs(serve_stale_sec.into()))
{
stale_until >= time
} else {
true
}
}
pub fn serialize(&self) -> Result<(Vec<u8>, Vec<u8>)> {
let internal = self.0.internal.serialize()?;
let header = header_serialize(&self.0.header)?;
Ok((internal, header))
}
pub fn deserialize(internal: &[u8], header: &[u8]) -> Result<Self> {
let internal = internal_meta::deserialize(internal)?;
let header = header_deserialize(header)?;
Ok(CacheMeta(Box::new(CacheMetaInner {
internal,
header,
extensions: Extensions::new(),
})))
}
}
use http::StatusCode;
pub type FreshSecByStatusFn = fn(StatusCode) -> Option<u32>;
pub struct CacheMetaDefaults {
fresh_sec_fn: FreshSecByStatusFn,
stale_while_revalidate_sec: u32,
stale_if_error_sec: u32,
}
impl CacheMetaDefaults {
pub const fn new(
fresh_sec_fn: FreshSecByStatusFn,
stale_while_revalidate_sec: u32,
stale_if_error_sec: u32,
) -> Self {
CacheMetaDefaults {
fresh_sec_fn,
stale_while_revalidate_sec,
stale_if_error_sec,
}
}
pub fn fresh_sec(&self, resp_status: StatusCode) -> Option<u32> {
if resp_status == StatusCode::NOT_MODIFIED {
(self.fresh_sec_fn)(StatusCode::OK)
} else {
(self.fresh_sec_fn)(resp_status)
}
}
pub fn serve_stale_while_revalidate_sec(&self) -> u32 {
self.stale_while_revalidate_sec
}
pub fn serve_stale_if_error_sec(&self) -> u32 {
self.stale_if_error_sec
}
}
use log::warn;
use once_cell::sync::{Lazy, OnceCell};
use pingora_header_serde::HeaderSerde;
use std::fs::File;
use std::io::Read;
pub(crate) static COMPRESSION_DICT_PATH: OnceCell<String> = OnceCell::new();
fn load_file(path: &String) -> Option<Vec<u8>> {
let mut file = File::open(path)
.map_err(|e| {
warn!(
"failed to open header compress dictionary file at {}, {:?}",
path, e
);
e
})
.ok()?;
let mut dict = Vec::new();
file.read_to_end(&mut dict)
.map_err(|e| {
warn!(
"failed to read header compress dictionary file at {}, {:?}",
path, e
);
e
})
.ok()?;
Some(dict)
}
static HEADER_SERDE: Lazy<HeaderSerde> = Lazy::new(|| {
let dict_path_opt = COMPRESSION_DICT_PATH.get();
if dict_path_opt.is_none() {
warn!("COMPRESSION_DICT_PATH is not set");
}
let result = dict_path_opt.and_then(load_file);
if result.is_none() {
warn!("HeaderSerde not loaded from file");
}
HeaderSerde::new(result)
});
pub(crate) fn header_serialize(header: &ResponseHeader) -> Result<Vec<u8>> {
HEADER_SERDE.serialize(header)
}
pub(crate) fn header_deserialize<T: AsRef<[u8]>>(buf: T) -> Result<ResponseHeader> {
HEADER_SERDE.deserialize(buf.as_ref())
}