use crate::AeronErrorType::Unknown;
#[cfg(debug_assertions)]
use std::backtrace::Backtrace;
use std::fmt::{Debug, Formatter};
use std::ops::{Deref, DerefMut};
use std::{any, fmt, ptr};
pub struct ManagedCResource<T> {
resource: *mut T,
cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
cleanup_struct: bool,
borrowed: bool,
}
impl<T> Debug for ManagedCResource<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("ManagedCResource")
.field("resource", &self.resource)
.field("type", &any::type_name::<T>())
.finish()
}
}
impl<T> ManagedCResource<T> {
pub fn new(
init: impl FnOnce(*mut *mut T) -> i32,
cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
cleanup_struct: bool,
) -> Result<Self, AeronCError> {
let mut resource: *mut T = ptr::null_mut();
let result = init(&mut resource);
if result < 0 || resource.is_null() {
return Err(AeronCError::from_code(result));
}
let result = Self {
resource,
cleanup,
cleanup_struct,
borrowed: false,
};
#[cfg(debug_assertions)]
log::debug!("created c resource: {:?}", result);
Ok(result)
}
pub fn new_borrowed(value: *const T) -> Self {
Self {
resource: value as *mut _,
cleanup: None,
cleanup_struct: false,
borrowed: true,
}
}
#[inline(always)]
pub fn get(&self) -> *mut T {
self.resource
}
#[inline(always)]
pub fn get_mut(&self) -> &mut T {
unsafe { &mut *self.resource }
}
pub fn close(&mut self) -> Result<(), AeronCError> {
if let Some(mut cleanup) = self.cleanup.take() {
if !self.resource.is_null() {
let result = cleanup(&mut self.resource);
if result < 0 {
return Err(AeronCError::from_code(result));
}
self.resource = std::ptr::null_mut();
}
}
Ok(())
}
}
impl<T> Drop for ManagedCResource<T> {
fn drop(&mut self) {
if !self.resource.is_null() && !self.borrowed {
let resource = self.resource.clone();
#[cfg(debug_assertions)]
log::debug!("closing c resource: {:?}", self);
let _ = self.close(); if self.cleanup_struct {
#[cfg(debug_assertions)]
log::debug!("closing rust struct resource: {:?}", resource);
unsafe {
let _ = Box::from_raw(resource);
}
}
}
}
}
#[derive(Debug)]
pub enum AeronErrorType {
NullOrNotConnected,
ClientErrorDriverTimeout,
ClientErrorClientTimeout,
ClientErrorConductorServiceTimeout,
ClientErrorBufferFull,
PublicationBackPressured,
PublicationAdminAction,
PublicationClosed,
PublicationMaxPositionExceeded,
PublicationError,
TimedOut,
Unknown(i32),
}
impl From<AeronErrorType> for AeronCError {
fn from(value: AeronErrorType) -> Self {
AeronCError::from_code(value.code())
}
}
impl AeronErrorType {
pub fn code(&self) -> i32 {
match self {
AeronErrorType::NullOrNotConnected => -1,
AeronErrorType::ClientErrorDriverTimeout => -1000,
AeronErrorType::ClientErrorClientTimeout => -1001,
AeronErrorType::ClientErrorConductorServiceTimeout => -1002,
AeronErrorType::ClientErrorBufferFull => -1003,
AeronErrorType::PublicationBackPressured => -2,
AeronErrorType::PublicationAdminAction => -3,
AeronErrorType::PublicationClosed => -4,
AeronErrorType::PublicationMaxPositionExceeded => -5,
AeronErrorType::PublicationError => -6,
AeronErrorType::TimedOut => -234324,
AeronErrorType::Unknown(code) => *code,
}
}
pub fn from_code(code: i32) -> Self {
match code {
-1 => AeronErrorType::NullOrNotConnected,
-1000 => AeronErrorType::ClientErrorDriverTimeout,
-1001 => AeronErrorType::ClientErrorClientTimeout,
-1002 => AeronErrorType::ClientErrorConductorServiceTimeout,
-1003 => AeronErrorType::ClientErrorBufferFull,
-2 => AeronErrorType::PublicationBackPressured,
-3 => AeronErrorType::PublicationAdminAction,
-4 => AeronErrorType::PublicationClosed,
-5 => AeronErrorType::PublicationMaxPositionExceeded,
-6 => AeronErrorType::PublicationError,
-234324 => AeronErrorType::TimedOut,
_ => Unknown(code),
}
}
pub fn to_string(&self) -> &'static str {
match self {
AeronErrorType::NullOrNotConnected => "Null Value or Not Connected",
AeronErrorType::ClientErrorDriverTimeout => "Client Error Driver Timeout",
AeronErrorType::ClientErrorClientTimeout => "Client Error Client Timeout",
AeronErrorType::ClientErrorConductorServiceTimeout => {
"Client Error Conductor Service Timeout"
}
AeronErrorType::ClientErrorBufferFull => "Client Error Buffer Full",
AeronErrorType::PublicationBackPressured => "Publication Back Pressured",
AeronErrorType::PublicationAdminAction => "Publication Admin Action",
AeronErrorType::PublicationClosed => "Publication Closed",
AeronErrorType::PublicationMaxPositionExceeded => "Publication Max Position Exceeded",
AeronErrorType::PublicationError => "Publication Error",
AeronErrorType::TimedOut => "Timed Out",
AeronErrorType::Unknown(_) => "Unknown Error",
}
}
}
#[derive(Eq, PartialEq)]
pub struct AeronCError {
pub code: i32,
}
impl AeronCError {
pub fn from_code(code: i32) -> Self {
#[cfg(debug_assertions)]
{
if code < 0 {
let backtrace = Backtrace::capture();
let backtrace = format!("{:?}", backtrace);
let re =
regex::Regex::new(r#"fn: "([^"]+)", file: "([^"]+)", line: (\d+)"#).unwrap();
let mut lines = String::new();
re.captures_iter(&backtrace).for_each(|cap| {
let function = &cap[1];
let mut file = cap[2].to_string();
let line = &cap[3];
if file.starts_with("./") {
file = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), &file[2..]);
} else if file.starts_with("/rustc/") {
file = file.split("/").last().unwrap().to_string();
}
lines.push_str(&format!(" {file}:{line} in {function}\n"));
});
log::error!(
"Aeron C error code: {}, kind: '{:?}'\n{}",
code,
AeronErrorType::from_code(code),
lines
);
}
}
AeronCError { code }
}
pub fn kind(&self) -> AeronErrorType {
AeronErrorType::from_code(self.code)
}
}
impl fmt::Display for AeronCError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Aeron error {}: {:?}", self.code, self.kind())
}
}
impl fmt::Debug for AeronCError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AeronCError")
.field("code", &self.code)
.field("kind", &self.kind())
.finish()
}
}
impl std::error::Error for AeronCError {}
pub struct Handler<T> {
raw_ptr: *mut T,
should_drop: bool,
}
pub struct Handlers;
impl<T> Handler<T> {
pub fn leak(handler: T) -> Self {
let raw_ptr = Box::into_raw(Box::new(handler)) as *mut _;
Self {
raw_ptr,
should_drop: true,
}
}
pub fn is_none(&self) -> bool {
self.raw_ptr.is_null()
}
pub fn as_raw(&self) -> *mut std::os::raw::c_void {
self.raw_ptr as *mut std::os::raw::c_void
}
pub fn release(self) {
if self.should_drop && !self.raw_ptr.is_null() {
unsafe {
let _ = Box::from_raw(self.raw_ptr as *mut Box<T>);
}
}
}
}
impl<T> Deref for Handler<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.raw_ptr as &T }
}
}
impl<T> DerefMut for Handler<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.raw_ptr as &mut T }
}
}
pub fn find_unused_udp_port(start_port: u16) -> Option<u16> {
let end_port = u16::MAX;
for port in start_port..=end_port {
if is_udp_port_available(port) {
return Some(port);
}
}
None
}
pub fn is_udp_port_available(port: u16) -> bool {
std::net::UdpSocket::bind(("127.0.0.1", port)).is_ok()
}
pub struct ChannelUri {}
impl ChannelUri {
pub const AERON_SCHEME: &'static str = "aeron";
pub const SPY_QUALIFIER: &'static str = "aeron-spy";
pub const MAX_URI_LENGTH: usize = 4095;
}
pub const DRIVER_TIMEOUT_MS_DEFAULT: u64 = 10_000;
pub const AERON_DIR_PROP_NAME: &str = "aeron.dir";
pub const AERON_IPC_MEDIA: &str = "aeron:ipc";
pub const AERON_UDP_MEDIA: &str = "aeron:udp";
pub const SPY_PREFIX: &str = "aeron-spy:";
pub const TAG_PREFIX: &str = "tag:";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Media {
Ipc,
Udp,
}
impl Media {
pub fn as_str(&self) -> &'static str {
match self {
Media::Ipc => "ipc",
Media::Udp => "udp",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ControlMode {
Manual,
Dynamic,
Response,
}
impl ControlMode {
pub fn as_str(&self) -> &'static str {
match self {
ControlMode::Manual => "manual",
ControlMode::Dynamic => "dynamic",
ControlMode::Response => "response",
}
}
}