#![warn(missing_docs)]
#[macro_use]
extern crate slog;
extern crate crossbeam_channel;
extern crate take_mut;
extern crate thread_local;
use crossbeam_channel::Sender;
use slog::{BorrowedKV, Level, Record, RecordStatic, SingleKV, KV};
use slog::{Key, OwnedKVList, Serializer};
use slog::Drain;
use std::fmt;
use std::sync;
use std::{io, thread};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use take_mut::take;
use std::panic::{catch_unwind, AssertUnwindSafe};
struct ToSendSerializer {
kv: Box<dyn KV + Send>,
}
impl ToSendSerializer {
fn new() -> Self {
ToSendSerializer { kv: Box::new(()) }
}
fn finish(self) -> Box<dyn KV + Send> {
self.kv
}
}
impl Serializer for ToSendSerializer {
fn emit_bool(&mut self, key: Key, val: bool) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_unit(&mut self, key: Key) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, ()))));
Ok(())
}
fn emit_none(&mut self, key: Key) -> slog::Result {
let val: Option<()> = None;
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_char(&mut self, key: Key, val: char) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u8(&mut self, key: Key, val: u8) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i8(&mut self, key: Key, val: i8) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u16(&mut self, key: Key, val: u16) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i16(&mut self, key: Key, val: i16) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u32(&mut self, key: Key, val: u32) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i32(&mut self, key: Key, val: i32) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_f32(&mut self, key: Key, val: f32) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u64(&mut self, key: Key, val: u64) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i64(&mut self, key: Key, val: i64) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_f64(&mut self, key: Key, val: f64) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
#[cfg(integer128)]
fn emit_u128(&mut self, key: Key, val: u128) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
#[cfg(integer128)]
fn emit_i128(&mut self, key: Key, val: i128) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_usize(&mut self, key: Key, val: usize) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_isize(&mut self, key: Key, val: isize) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_str(&mut self, key: Key, val: &str) -> slog::Result {
let val = val.to_owned();
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_arguments(
&mut self,
key: Key,
val: &fmt::Arguments,
) -> slog::Result {
let val = fmt::format(*val);
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
#[cfg(feature = "nested-values")]
fn emit_serde(
&mut self,
key: Key,
value: &slog::SerdeValue,
) -> slog::Result {
let val = value.to_sendable();
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
}
#[derive(Debug)]
pub enum AsyncError {
Full,
Fatal(Box<dyn std::error::Error>),
}
impl<T> From<crossbeam_channel::TrySendError<T>> for AsyncError {
fn from(_: crossbeam_channel::TrySendError<T>) -> AsyncError {
AsyncError::Full
}
}
impl<T> From<crossbeam_channel::SendError<T>> for AsyncError {
fn from(_: crossbeam_channel::SendError<T>) -> AsyncError {
AsyncError::Fatal(Box::new(io::Error::new(
io::ErrorKind::BrokenPipe,
"The logger thread terminated",
)))
}
}
impl<T> From<std::sync::PoisonError<T>> for AsyncError {
fn from(err: std::sync::PoisonError<T>) -> AsyncError {
AsyncError::Fatal(Box::new(io::Error::new(
io::ErrorKind::BrokenPipe,
err.to_string(),
)))
}
}
pub type AsyncResult<T> = std::result::Result<T, AsyncError>;
pub struct AsyncCoreBuilder<D>
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
{
chan_size: usize,
blocking: bool,
drain: D,
thread_name: Option<String>,
}
impl<D> AsyncCoreBuilder<D>
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
{
fn new(drain: D) -> Self {
AsyncCoreBuilder {
chan_size: 128,
blocking: false,
drain,
thread_name: None,
}
}
pub fn thread_name(mut self, name: String) -> Self {
assert!(name.find('\0').is_none(), "Name with \\'0\\' in it passed");
self.thread_name = Some(name);
self
}
pub fn chan_size(mut self, s: usize) -> Self {
self.chan_size = s;
self
}
pub fn blocking(mut self, blocking: bool) -> Self {
self.blocking = blocking;
self
}
fn spawn_thread(self) -> (thread::JoinHandle<()>, Sender<AsyncMsg>) {
let (tx, rx) = crossbeam_channel::bounded(self.chan_size);
let mut builder = thread::Builder::new();
if let Some(thread_name) = self.thread_name {
builder = builder.name(thread_name);
}
let drain = self.drain;
let join = builder
.spawn(move || {
let drain = AssertUnwindSafe(&drain);
if let Err(panic_cause) = catch_unwind(move || loop {
match rx.recv() {
Ok(AsyncMsg::Record(r)) => {
if r.log_to(&*drain).is_err() {
eprintln!("slog-async failed while writing");
return;
}
}
Ok(AsyncMsg::Finish) => return,
Err(recv_error) => {
eprintln!("slog-async failed while receiving: {recv_error}");
return;
}
}
}) {
eprintln!("slog-async failed with panic: {panic_cause:?}")
}
})
.unwrap();
(join, tx)
}
pub fn build(self) -> AsyncCore {
self.build_no_guard()
}
pub fn build_no_guard(self) -> AsyncCore {
let blocking = self.blocking;
let (join, tx) = self.spawn_thread();
AsyncCore {
ref_sender: tx,
tl_sender: thread_local::ThreadLocal::new(),
join: Mutex::new(Some(join)),
blocking,
}
}
pub fn build_with_guard(self) -> (AsyncCore, AsyncGuard) {
let blocking = self.blocking;
let (join, tx) = self.spawn_thread();
(
AsyncCore {
ref_sender: tx.clone(),
tl_sender: thread_local::ThreadLocal::new(),
join: Mutex::new(None),
blocking,
},
AsyncGuard {
join: Some(join),
tx,
},
)
}
}
pub struct AsyncGuard {
join: Option<thread::JoinHandle<()>>,
tx: Sender<AsyncMsg>,
}
impl Drop for AsyncGuard {
fn drop(&mut self) {
let _err: Result<(), Box<dyn std::error::Error>> = {
|| {
let _ = self.tx.send(AsyncMsg::Finish);
let join = self.join.take().unwrap();
if join.thread().id() != thread::current().id() {
join.join().map_err(|_| {
io::Error::new(
io::ErrorKind::BrokenPipe,
"Logging thread worker join error",
)
})?;
}
Ok(())
}
}();
}
}
pub struct AsyncCore {
ref_sender: Sender<AsyncMsg>,
tl_sender: thread_local::ThreadLocal<Sender<AsyncMsg>>,
join: Mutex<Option<thread::JoinHandle<()>>>,
blocking: bool,
}
impl AsyncCore {
pub fn new<D>(drain: D) -> Self
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
D: std::panic::RefUnwindSafe,
{
AsyncCoreBuilder::new(drain).build()
}
pub fn custom<
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
>(
drain: D,
) -> AsyncCoreBuilder<D> {
AsyncCoreBuilder::new(drain)
}
fn get_sender(
&self,
) -> Result<
&crossbeam_channel::Sender<AsyncMsg>,
std::sync::PoisonError<
sync::MutexGuard<crossbeam_channel::Sender<AsyncMsg>>,
>,
> {
self.tl_sender.get_or_try(|| Ok(self.ref_sender.clone()))
}
fn send(&self, r: AsyncRecord) -> AsyncResult<()> {
let sender = self.get_sender()?;
if self.blocking {
sender.send(AsyncMsg::Record(r))?;
} else {
sender.try_send(AsyncMsg::Record(r))?;
}
Ok(())
}
}
impl Drain for AsyncCore {
type Ok = ();
type Err = AsyncError;
fn log(
&self,
record: &Record,
logger_values: &OwnedKVList,
) -> AsyncResult<()> {
self.send(AsyncRecord::from(record, logger_values))
}
}
pub struct AsyncRecord {
msg: String,
level: Level,
location: Box<slog::RecordLocation>,
tag: String,
logger_values: OwnedKVList,
kv: Box<dyn KV + Send>,
}
impl AsyncRecord {
pub fn from(record: &Record, logger_values: &OwnedKVList) -> Self {
let mut ser = ToSendSerializer::new();
record
.kv()
.serialize(record, &mut ser)
.expect("`ToSendSerializer` can't fail");
AsyncRecord {
msg: fmt::format(*record.msg()),
level: record.level(),
location: Box::new(*record.location()),
tag: String::from(record.tag()),
logger_values: logger_values.clone(),
kv: ser.finish(),
}
}
pub fn log_to<D: Drain>(self, drain: &D) -> Result<D::Ok, D::Err> {
let rs = RecordStatic {
location: &*self.location,
level: self.level,
tag: &self.tag,
};
drain.log(
&Record::new(
&rs,
&format_args!("{}", self.msg),
BorrowedKV(&self.kv),
),
&self.logger_values,
)
}
pub fn as_record_values(&self, mut f: impl FnMut(&Record, &OwnedKVList)) {
let rs = RecordStatic {
location: &*self.location,
level: self.level,
tag: &self.tag,
};
f(
&Record::new(
&rs,
&format_args!("{}", self.msg),
BorrowedKV(&self.kv),
),
&self.logger_values,
)
}
}
enum AsyncMsg {
Record(AsyncRecord),
Finish,
}
impl Drop for AsyncCore {
fn drop(&mut self) {
let _err: Result<(), Box<dyn std::error::Error>> = {
|| {
if let Some(join) = self.join.lock()?.take() {
let _ = self.get_sender()?.send(AsyncMsg::Finish);
if join.thread().id() != thread::current().id() {
join.join().map_err(|_| {
io::Error::new(
io::ErrorKind::BrokenPipe,
"Logging thread worker join error",
)
})?;
}
}
Ok(())
}
}();
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub enum OverflowStrategy {
DropAndReport,
Drop,
Block,
#[doc(hidden)]
DoNotMatchAgainstThisAndReadTheDocs,
}
pub struct AsyncBuilder<D>
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
{
core: AsyncCoreBuilder<D>,
inc_dropped: bool,
}
impl<D> AsyncBuilder<D>
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
{
fn new(drain: D) -> AsyncBuilder<D> {
AsyncBuilder {
core: AsyncCoreBuilder::new(drain),
inc_dropped: true,
}
}
pub fn chan_size(self, s: usize) -> Self {
AsyncBuilder {
core: self.core.chan_size(s),
..self
}
}
pub fn overflow_strategy(
self,
overflow_strategy: OverflowStrategy,
) -> Self {
let (block, inc) = match overflow_strategy {
OverflowStrategy::Block => (true, false),
OverflowStrategy::Drop => (false, false),
OverflowStrategy::DropAndReport => (false, true),
OverflowStrategy::DoNotMatchAgainstThisAndReadTheDocs => {
panic!("Invalid variant")
}
};
AsyncBuilder {
core: self.core.blocking(block),
inc_dropped: inc,
}
}
pub fn thread_name(self, name: String) -> Self {
AsyncBuilder {
core: self.core.thread_name(name),
..self
}
}
pub fn build(self) -> Async {
Async {
core: self.core.build_no_guard(),
dropped: AtomicUsize::new(0),
inc_dropped: self.inc_dropped,
}
}
pub fn build_no_guard(self) -> Async {
Async {
core: self.core.build_no_guard(),
dropped: AtomicUsize::new(0),
inc_dropped: self.inc_dropped,
}
}
pub fn build_with_guard(self) -> (Async, AsyncGuard) {
let (core, guard) = self.core.build_with_guard();
(
Async {
core,
dropped: AtomicUsize::new(0),
inc_dropped: self.inc_dropped,
},
guard,
)
}
}
pub struct Async {
core: AsyncCore,
dropped: AtomicUsize,
inc_dropped: bool,
}
impl Async {
pub fn default<
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
>(
drain: D,
) -> Self {
AsyncBuilder::new(drain).build()
}
pub fn new<D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static>(
drain: D,
) -> AsyncBuilder<D> {
AsyncBuilder::new(drain)
}
fn push_dropped(&self, logger_values: &OwnedKVList) -> AsyncResult<()> {
let dropped = self.dropped.swap(0, Ordering::Relaxed);
if dropped > 0 {
match self.core.log(
&record!(
slog::Level::Error,
"slog-async",
&format_args!(
"slog-async: logger dropped messages \
due to channel \
overflow"
),
b!("count" => dropped)
),
logger_values,
) {
Ok(()) => {}
Err(AsyncError::Full) => {
self.dropped.fetch_add(dropped + 1, Ordering::Relaxed);
return Ok(());
}
Err(e) => return Err(e),
}
}
Ok(())
}
}
impl Drain for Async {
type Ok = ();
type Err = AsyncError;
fn log(
&self,
record: &Record,
logger_values: &OwnedKVList,
) -> AsyncResult<()> {
self.push_dropped(logger_values)?;
match self.core.log(record, logger_values) {
Ok(()) => {}
Err(AsyncError::Full) if self.inc_dropped => {
self.dropped.fetch_add(1, Ordering::Relaxed);
}
Err(AsyncError::Full) => {}
Err(e) => return Err(e),
}
Ok(())
}
}
impl Drop for Async {
fn drop(&mut self) {
let _ = self.push_dropped(&o!().into());
}
}
#[cfg(test)]
mod test {
use super::*;
use std::sync::mpsc;
#[test]
fn integration_test() {
let (mock_drain, mock_drain_rx) = MockDrain::new();
let async_drain = AsyncBuilder::new(mock_drain)
.build();
let slog = slog::Logger::root(async_drain.fuse(), o!("field1" => "value1"));
info!(slog, "Message 1"; "field2" => "value2");
warn!(slog, "Message 2"; "field3" => "value3");
assert_eq!(mock_drain_rx.recv().unwrap(), r#"INFO Message 1: [("field1", "value1"), ("field2", "value2")]"#);
assert_eq!(mock_drain_rx.recv().unwrap(), r#"WARN Message 2: [("field1", "value1"), ("field3", "value3")]"#);
}
#[derive(Debug)]
struct MockDrain {
tx: mpsc::Sender<String>,
}
impl MockDrain {
fn new() -> (Self, mpsc::Receiver<String>) {
let (tx, rx) = mpsc::channel();
(Self { tx }, rx)
}
}
impl slog::Drain for MockDrain {
type Ok = ();
type Err = slog::Never;
fn log(&self, record: &Record, logger_kv: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
let mut serializer = MockSerializer::default();
logger_kv.serialize(record, &mut serializer).unwrap();
record.kv().serialize(record, &mut serializer).unwrap();
let level = record.level().as_short_str();
let msg = record.msg().to_string();
let entry = format!("{} {}: {:?}", level, msg, serializer.kvs);
self.tx.send(entry).unwrap();
Ok(())
}
}
#[derive(Default)]
struct MockSerializer {
kvs: Vec<(String, String)>,
}
impl slog::Serializer for MockSerializer {
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> Result<(), slog::Error> {
self.kvs.push((key.to_string(), val.to_string()));
Ok(())
}
}
}