use crate::ops::OpCtx;
use crate::serde::Serialize;
use crate::OpDecl;
use crate::OpId;
use std::cell::Ref;
use std::cell::RefCell;
use std::cell::RefMut;
use std::rc::Rc;
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[repr(u8)]
pub enum OpMetricsEvent {
Dispatched,
Completed,
CompletedAsync,
Error,
ErrorAsync,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[repr(u8)]
pub enum OpMetricsSource {
Slow,
Fast,
Async,
}
pub type OpMetricsFn = Rc<dyn Fn(&OpCtx, OpMetricsEvent, OpMetricsSource)>;
pub type OpMetricsFactoryFn =
Box<dyn Fn(OpId, usize, &OpDecl) -> Option<OpMetricsFn>>;
pub fn merge_op_metrics(
fn1: impl Fn(OpId, usize, &OpDecl) -> Option<OpMetricsFn> + 'static,
fn2: impl Fn(OpId, usize, &OpDecl) -> Option<OpMetricsFn> + 'static,
) -> OpMetricsFactoryFn {
Box::new(move |op, count, decl| {
match (fn1(op, count, decl), fn2(op, count, decl)) {
(None, None) => None,
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(Some(a), Some(b)) => Some(Rc::new(move |ctx, event, source| {
a(ctx, event, source);
b(ctx, event, source);
})),
}
})
}
#[doc(hidden)]
pub fn dispatch_metrics_fast(opctx: &OpCtx, metrics: OpMetricsEvent) {
unsafe {
(opctx.metrics_fn.as_ref().unwrap_unchecked())(
opctx,
metrics,
OpMetricsSource::Fast,
)
}
}
#[doc(hidden)]
pub fn dispatch_metrics_slow(opctx: &OpCtx, metrics: OpMetricsEvent) {
unsafe {
(opctx.metrics_fn.as_ref().unwrap_unchecked())(
opctx,
metrics,
OpMetricsSource::Slow,
)
}
}
#[doc(hidden)]
pub fn dispatch_metrics_async(opctx: &OpCtx, metrics: OpMetricsEvent) {
unsafe {
(opctx.metrics_fn.as_ref().unwrap_unchecked())(
opctx,
metrics,
OpMetricsSource::Async,
)
}
}
#[derive(Clone, Default, Debug, Serialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct OpMetricsSummary {
pub ops_dispatched_sync: u64,
pub ops_dispatched_async: u64,
pub ops_dispatched_fast: u64,
pub ops_completed_async: u64,
}
impl OpMetricsSummary {
pub fn has_outstanding_ops(&self) -> bool {
self.ops_dispatched_async > self.ops_completed_async
}
}
#[derive(Default, Debug)]
pub struct OpMetricsSummaryTracker {
ops: RefCell<Vec<OpMetricsSummary>>,
}
impl OpMetricsSummaryTracker {
pub fn per_op(&self) -> Ref<'_, Vec<OpMetricsSummary>> {
self.ops.borrow()
}
pub fn aggregate(&self) -> OpMetricsSummary {
let mut sum = OpMetricsSummary::default();
for metrics in self.ops.borrow().iter() {
sum.ops_dispatched_sync += metrics.ops_dispatched_sync;
sum.ops_dispatched_fast += metrics.ops_dispatched_fast;
sum.ops_dispatched_async += metrics.ops_dispatched_async;
sum.ops_completed_async += metrics.ops_completed_async;
}
sum
}
#[inline]
fn metrics_mut(&self, id: OpId) -> RefMut<OpMetricsSummary> {
RefMut::map(self.ops.borrow_mut(), |ops| &mut ops[id as usize])
}
fn op_metrics_fn(self: Rc<Self>) -> OpMetricsFn {
Rc::new(move |ctx, event, source| match event {
OpMetricsEvent::Dispatched => {
let mut m = self.metrics_mut(ctx.id);
if source == OpMetricsSource::Fast {
m.ops_dispatched_fast += 1;
}
if ctx.decl.is_async {
m.ops_dispatched_async += 1;
} else {
m.ops_dispatched_sync += 1;
}
}
OpMetricsEvent::Completed
| OpMetricsEvent::Error
| OpMetricsEvent::CompletedAsync
| OpMetricsEvent::ErrorAsync => {
if ctx.decl.is_async {
self.metrics_mut(ctx.id).ops_completed_async += 1;
}
}
})
}
pub fn op_metrics_factory_fn(
self: Rc<Self>,
op_enabled: impl Fn(&OpDecl) -> bool + 'static,
) -> OpMetricsFactoryFn {
Box::new(move |_, total, op| {
let mut ops = self.ops.borrow_mut();
if ops.capacity() == 0 {
ops.reserve_exact(total);
}
ops.push(OpMetricsSummary::default());
if op_enabled(op) {
Some(self.clone().op_metrics_fn())
} else {
None
}
})
}
}