1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use rabbitmq_stream_protocol::{
    error::{DecodeError, EncodeError},
    ResponseCode,
};
use thiserror::Error;

#[derive(Error, Debug)]
pub enum ClientError {
    #[error(transparent)]
    Io(#[from] std::io::Error),

    #[error(transparent)]
    Protocol(#[from] ProtocolError),
    #[error("Cast Error: {0}")]
    CastError(String),
    #[error(transparent)]
    GenericError(#[from] Box<dyn std::error::Error + Send + Sync>),
    #[error("Client already closed")]
    AlreadyClosed,
    #[error(transparent)]
    Tls(#[from] tokio_rustls::rustls::Error),
    #[error("Request error: {0:?}")]
    RequestError(ResponseCode),
}

#[derive(Error, Debug)]
pub enum ConsumerStoreOffsetError {
    #[error("Failed to store offset, missing consumer name")]
    NameMissing,
    #[error(transparent)]
    Client(#[from] ClientError),
}

#[derive(Error, Debug)]
pub enum ProtocolError {
    #[error("Encode Error {0:?}")]
    Encode(EncodeError),
    #[error("Decode Error {0:?}")]
    Decode(DecodeError),
}

impl From<EncodeError> for ClientError {
    fn from(err: EncodeError) -> Self {
        ClientError::Protocol(ProtocolError::Encode(err))
    }
}

impl From<DecodeError> for ClientError {
    fn from(err: DecodeError) -> Self {
        ClientError::Protocol(ProtocolError::Decode(err))
    }
}

#[derive(Error, Debug)]
pub enum StreamCreateError {
    #[error("Failed to create stream {stream} status: {status:?}")]
    Create {
        stream: String,
        status: ResponseCode,
    },
    #[error(transparent)]
    Client(#[from] ClientError),
}

#[derive(Error, Debug)]
pub enum StreamDeleteError {
    #[error("Failed to delete stream {stream} status: {status:?}")]
    Delete {
        stream: String,
        status: ResponseCode,
    },
    #[error(transparent)]
    Client(#[from] ClientError),
}

#[derive(Error, Debug)]
pub enum ProducerCreateError {
    #[error("Failed to create producer for stream {stream} status {status:?}")]
    Create {
        stream: String,
        status: ResponseCode,
    },

    #[error("Stream {stream} does not exist")]
    StreamDoesNotExist { stream: String },

    #[error(transparent)]
    Client(#[from] ClientError),
}

#[derive(Error, Debug)]
pub enum ProducerPublishError {
    #[error("Failed to publish message for stream {stream} status {status:?}")]
    Create {
        stream: String,
        publisher_id: u8,
        status: ResponseCode,
    },
    #[error("Failed to send batch messages for stream {stream}")]
    Batch { stream: String },
    #[error("Failed to publish message, the producer is closed")]
    Closed,
    #[error("Failed to publish message, confirmation channel returned None for stream {stream}")]
    Confirmation { stream: String },
    #[error(transparent)]
    Client(#[from] ClientError),
}
#[derive(Error, Debug)]
pub enum ProducerCloseError {
    #[error("Failed to close producer for stream {stream} status {status:?}")]
    Close {
        stream: String,
        status: ResponseCode,
    },
    #[error("Producer already closed")]
    AlreadyClosed,
    #[error(transparent)]
    Client(#[from] ClientError),
}

#[derive(Error, Debug)]
pub enum ConsumerCreateError {
    #[error("Failed to create consumer for stream {stream} status {status:?}")]
    Create {
        stream: String,
        status: ResponseCode,
    },

    #[error("Stream {stream} does not exist")]
    StreamDoesNotExist { stream: String },

    #[error(transparent)]
    Client(#[from] ClientError),
}

#[derive(Error, Debug)]
pub enum ConsumerDeliveryError {
    #[error("Failed to create consumer for stream {stream} status {status:?}")]
    Credit {
        stream: String,
        status: ResponseCode,
    },
    #[error(transparent)]
    Client(#[from] ClientError),
}
#[derive(Error, Debug)]
pub enum ConsumerCloseError {
    #[error("Failed to close consumer for stream {stream} status {status:?}")]
    Close {
        stream: String,
        status: ResponseCode,
    },
    #[error("Consumer already closed")]
    AlreadyClosed,
    #[error(transparent)]
    Client(#[from] ClientError),
}