use std::{
collections::HashMap,
env,
future::Future,
io::BufReader,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use ntex::{
io::IoBoxed,
service::{chain_factory, fn_factory_with_config, fn_service},
util::{ByteString, Bytes, Ready},
ServiceFactory,
};
use ntex_mqtt::{v3, v5, MqttError, MqttServer};
use ntex_tls::rustls::TlsAcceptor;
use rustls::{
pki_types::{CertificateDer, PrivateKeyDer},
server::WebPkiClientVerifier,
RootCertStore, ServerConfig,
};
use secrecy::ExposeSecret;
use serde_json::Value;
use tokio::task::JoinHandle;
use zenoh::{
bytes::{Encoding, ZBytes},
internal::{
plugins::{RunningPluginTrait, ZenohPlugin},
runtime::Runtime,
zerror,
},
key_expr::keyexpr,
query::Query,
try_init_log_from_env, Error as ZError, Result as ZResult, Session, Wait,
};
use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
pub mod config;
mod mqtt_helpers;
mod mqtt_session_state;
use config::{AuthConfig, Config, TLSConfig};
use mqtt_session_state::MqttSessionState;
lazy_static::lazy_static! {
static ref WORK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_WORK_THREAD_NUM);
static ref MAX_BLOCK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_MAX_BLOCK_THREAD_NUM);
static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(WORK_THREAD_NUM.load(Ordering::SeqCst))
.max_blocking_threads(MAX_BLOCK_THREAD_NUM.load(Ordering::SeqCst))
.enable_all()
.build()
.expect("Unable to create runtime");
}
#[inline(always)]
pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match tokio::runtime::Handle::try_current() {
Ok(rt) => {
rt.spawn(task)
}
Err(_) => {
TOKIO_RUNTIME.spawn(task)
}
}
}
lazy_static::lazy_static! {
static ref KE_PREFIX_ADMIN_SPACE: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@") };
static ref KE_PREFIX_MQTT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("mqtt") };
static ref ADMIN_SPACE_KE_VERSION: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("version") };
static ref ADMIN_SPACE_KE_CONFIG: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("config") };
}
#[cfg(feature = "dynamic_plugin")]
zenoh_plugin_trait::declare_plugin!(MqttPlugin);
pub struct MqttPlugin;
type User = Vec<u8>;
type Password = Vec<u8>;
impl ZenohPlugin for MqttPlugin {}
impl Plugin for MqttPlugin {
type StartArgs = Runtime;
type Instance = zenoh::internal::plugins::RunningPlugin;
const DEFAULT_NAME: &'static str = "mqtt";
const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
const PLUGIN_VERSION: &'static str = plugin_version!();
fn start(
name: &str,
runtime: &Self::StartArgs,
) -> ZResult<zenoh::internal::plugins::RunningPlugin> {
try_init_log_from_env();
let runtime_conf = runtime.config().lock();
let plugin_conf = runtime_conf
.plugin(name)
.ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?;
let config: Config = serde_json::from_value(plugin_conf.clone())
.map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
let tls_config = match config.tls.as_ref() {
Some(tls) => Some(create_tls_config(tls)?),
None => None,
};
let auth_dictionary = match config.auth.as_ref() {
Some(auth) => Some(create_auth_dictionary(auth)?),
None => None,
};
WORK_THREAD_NUM.store(config.work_thread_num, Ordering::SeqCst);
MAX_BLOCK_THREAD_NUM.store(config.max_block_thread_num, Ordering::SeqCst);
spawn_runtime(run(runtime.clone(), config, tls_config, auth_dictionary));
Ok(Box::new(MqttPlugin))
}
}
impl PluginControl for MqttPlugin {}
impl RunningPluginTrait for MqttPlugin {}
async fn run(
runtime: Runtime,
config: Config,
tls_config: Option<Arc<ServerConfig>>,
auth_dictionary: Option<HashMap<User, Password>>,
) {
try_init_log_from_env();
tracing::debug!("MQTT plugin {}", MqttPlugin::PLUGIN_LONG_VERSION);
tracing::info!("MQTT plugin {:?}", config);
let zsession = match zenoh::session::init(runtime)
.aggregated_subscribers(config.generalise_subs.clone())
.aggregated_publishers(config.generalise_pubs.clone())
.await
{
Ok(session) => Arc::new(session),
Err(e) => {
tracing::error!("Unable to init zenoh session for MQTT plugin : {:?}", e);
return;
}
};
let admin_keyexpr_prefix =
*KE_PREFIX_ADMIN_SPACE / &zsession.zid().into_keyexpr() / *KE_PREFIX_MQTT;
let admin_keyexpr_expr = (&admin_keyexpr_prefix) / unsafe { keyexpr::from_str_unchecked("**") };
tracing::debug!("Declare admin space on {}", admin_keyexpr_expr);
let config2 = config.clone();
let _admin_queryable = zsession
.declare_queryable(admin_keyexpr_expr)
.callback(move |query| treat_admin_query(query, &admin_keyexpr_prefix, &config2))
.await
.expect("Failed to create AdminSpace queryable");
if auth_dictionary.is_some() && tls_config.is_none() {
tracing::warn!(
"Warning: MQTT client username/password authentication enabled without TLS!"
);
}
let config = Arc::new(config);
let auth_dictionary = Arc::new(auth_dictionary);
tokio::task::spawn_blocking(|| {
let session = zsession.clone();
let rt = tokio::runtime::Handle::try_current()
.expect("Unable to get the current runtime, which should not happen.");
if let Err(e) = rt.block_on(ntex::rt::System::new(MqttPlugin::DEFAULT_NAME).run_local(
async move {
let server = match tls_config {
Some(tls) => ntex::server::Server::build().bind(
"mqtt",
config.port.clone(),
move |_| {
chain_factory(TlsAcceptor::new(tls.clone()))
.map_err(|err| MqttError::Service(MqttPluginError::from(err)))
.and_then(create_mqtt_server(
zsession.clone(),
config.clone(),
auth_dictionary.clone(),
))
},
)?,
None => ntex::server::Server::build().bind(
"mqtt",
config.port.clone(),
move |_| {
create_mqtt_server(
zsession.clone(),
config.clone(),
auth_dictionary.clone(),
)
},
)?,
};
server.workers(1).disable_signals().run().await
},
)) {
tracing::error!("Unable to start MQTT server: {:?}", e);
}
drop(session);
});
}
fn create_tls_config(config: &TLSConfig) -> ZResult<Arc<ServerConfig>> {
let key_bytes = match (
config.server_private_key.as_ref(),
config.server_private_key_base64.as_ref(),
) {
(Some(file), None) => {
std::fs::read(file).map_err(|e| zerror!("Invalid private key file: {e:?}"))?
}
(None, Some(base64)) => base64_decode(base64.expose_secret())?,
(None, None) => {
return Err(zerror!(
"Either 'server_private_key' or 'server_private_key_base64' must be present!"
)
.into());
}
_ => {
return Err(zerror!(
"Only one of 'server_private_key' and 'server_private_key_base64' can be present!"
)
.into());
}
};
let key = load_private_key(key_bytes)?;
let certs_bytes = match (
config.server_certificate.as_ref(),
config.server_certificate_base64.as_ref(),
) {
(Some(file), None) => {
std::fs::read(file).map_err(|e| zerror!("Invalid certificate file: {e:?}"))?
}
(None, Some(base64)) => base64_decode(base64.expose_secret())?,
(None, None) => {
return Err(zerror!(
"Either 'server_certificate' or 'server_certificate_base64' must be present!"
)
.into());
}
_ => {
return Err(zerror!(
"Only one of 'server_certificate' and 'server_certificate_base64' can be present!"
)
.into());
}
};
let certs = load_certs(certs_bytes)?;
let rootca_bytes = match (
config.root_ca_certificate.as_ref(),
config.root_ca_certificate_base64.as_ref(),
) {
(Some(file), None) => {
Some(std::fs::read(file).map_err(|e| zerror!("Invalid root certificate file: {e:?}"))?)
}
(None, Some(base64)) => Some(base64_decode(base64.expose_secret())?),
(None, None) => None,
_ => {
return Err(zerror!("Only one of 'root_ca_certificate' and 'root_ca_certificate_base64' can be present!").into());
}
};
let tls_config = match rootca_bytes {
Some(bytes) => {
let root_cert_store = load_trust_anchors(bytes)?;
ServerConfig::builder()
.with_client_cert_verifier(
WebPkiClientVerifier::builder(root_cert_store.into()).build()?,
)
.with_single_cert(certs, key)?
}
None => ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)?,
};
Ok(Arc::new(tls_config))
}
pub fn base64_decode(data: &str) -> ZResult<Vec<u8>> {
use base64::{engine::general_purpose, Engine};
Ok(general_purpose::STANDARD
.decode(data)
.map_err(|e| zerror!("Unable to perform base64 decoding: {e:?}"))?)
}
fn load_private_key(bytes: Vec<u8>) -> ZResult<PrivateKeyDer<'static>> {
let mut reader = BufReader::new(bytes.as_slice());
loop {
match rustls_pemfile::read_one(&mut reader) {
Ok(item) => match item {
Some(rustls_pemfile::Item::Pkcs1Key(key)) => return Ok(key.into()),
Some(rustls_pemfile::Item::Pkcs8Key(key)) => return Ok(key.into()),
Some(rustls_pemfile::Item::Sec1Key(key)) => return Ok(key.into()),
None => break,
_ => continue,
},
Err(e) => return Err(zerror!("Cannot parse private key: {e:?}").into()),
}
}
Err(zerror!("No supported private keys found").into())
}
fn load_certs(bytes: Vec<u8>) -> ZResult<Vec<CertificateDer<'static>>> {
let mut reader = BufReader::new(bytes.as_slice());
let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut reader)
.collect::<Result<_, _>>()
.map_err(|err| zerror!("Error processing client certificate: {err}."))?;
match certs.is_empty() {
true => Err(zerror!("No certificates found").into()),
false => Ok(certs),
}
}
fn load_trust_anchors(bytes: Vec<u8>) -> ZResult<RootCertStore> {
let mut root_cert_store = RootCertStore::empty();
let roots = load_certs(bytes)?;
for root in roots {
root_cert_store.add(root)?;
}
Ok(root_cert_store)
}
fn create_auth_dictionary(config: &AuthConfig) -> ZResult<HashMap<User, Password>> {
let mut dictionary: HashMap<User, Password> = HashMap::new();
let content = std::fs::read_to_string(config.dictionary_file.as_str())
.map_err(|e| zerror!("Invalid user/password dictionary file: {}", e))?;
for line in content.lines() {
let idx = line
.find(':')
.ok_or_else(|| zerror!("Invalid user/password dictionary file: invalid format"))?;
let user = line[..idx].as_bytes().to_owned();
if user.is_empty() {
return Err(zerror!("Invalid user/password dictionary file: empty user").into());
}
let password = line[idx + 1..].as_bytes().to_owned();
if password.is_empty() {
return Err(zerror!("Invalid user/password dictionary file: empty password").into());
}
dictionary.insert(user, password);
}
Ok(dictionary)
}
fn is_authorized(
dictionary: Option<&HashMap<User, Password>>,
usr: Option<&ByteString>,
pwd: Option<&Bytes>,
) -> Result<(), String> {
match (dictionary, usr, pwd) {
(None, _, _) => Ok(()),
(Some(dictionary), Some(usr), Some(pwd)) => {
match dictionary.get(&usr.as_bytes().to_vec()) {
Some(expected_pwd) => {
if pwd == expected_pwd {
Ok(())
} else {
Err(format!("Incorrect password for user {usr:?}"))
}
}
None => Err(format!("Unknown user {usr:?}")),
}
}
(Some(_), Some(usr), None) => Err(format!("Missing password for user {usr:?}")),
(Some(_), None, Some(_)) => Err(("Missing user name").to_string()),
(Some(_), None, None) => Err(("Missing user credentials").to_string()),
}
}
fn create_mqtt_server(
session: Arc<Session>,
config: Arc<Config>,
auth_dictionary: Arc<Option<HashMap<User, Password>>>,
) -> MqttServer<
impl ServiceFactory<IoBoxed, (), Response = (), Error = MqttError<MqttPluginError>, InitError = ()>,
impl ServiceFactory<IoBoxed, (), Response = (), Error = MqttError<MqttPluginError>, InitError = ()>,
MqttPluginError,
(),
> {
let zs_v3 = session.clone();
let zs_v5 = session.clone();
let config_v3 = config.clone();
let config_v5 = config.clone();
let auth_dictionary_v3 = auth_dictionary.clone();
let auth_dictionary_v5 = auth_dictionary.clone();
MqttServer::new()
.v3(v3::MqttServer::new(fn_factory_with_config(move |_| {
let zs = zs_v3.clone();
let config = config_v3.clone();
let auth_dictionary = auth_dictionary_v3.clone();
Ready::Ok::<_, ()>(fn_service(move |h| {
handshake_v3(h, zs.clone(), config.clone(), auth_dictionary.clone())
}))
}))
.publish(fn_factory_with_config(
|session: v3::Session<MqttSessionState>| {
Ready::Ok::<_, MqttPluginError>(fn_service(move |req| {
publish_v3(session.clone(), req)
}))
},
))
.control(fn_factory_with_config(
|session: v3::Session<MqttSessionState>| {
Ready::Ok::<_, MqttPluginError>(fn_service(move |req| {
control_v3(session.clone(), req)
}))
},
)))
.v5(v5::MqttServer::new(fn_factory_with_config(move |_| {
let zs = zs_v5.clone();
let config = config_v5.clone();
let auth_dictionary = auth_dictionary_v5.clone();
Ready::Ok::<_, ()>(fn_service(move |h| {
handshake_v5(h, zs.clone(), config.clone(), auth_dictionary.clone())
}))
}))
.publish(fn_factory_with_config(
|session: v5::Session<MqttSessionState>| {
Ready::Ok::<_, MqttPluginError>(fn_service(move |req| {
publish_v5(session.clone(), req)
}))
},
))
.control(fn_factory_with_config(
|session: v5::Session<MqttSessionState>| {
Ready::Ok::<_, MqttPluginError>(fn_service(move |req| {
control_v5(session.clone(), req)
}))
},
)))
}
fn treat_admin_query(query: Query, admin_keyexpr_prefix: &keyexpr, config: &Config) {
let selector = query.selector();
tracing::debug!("Query on admin space: {:?}", selector);
let sub_kes = selector.key_expr().strip_prefix(admin_keyexpr_prefix);
if sub_kes.is_empty() {
tracing::error!("Received query for admin space: '{}' - but it's not prefixed by admin_keyexpr_prefix='{}'", selector, admin_keyexpr_prefix);
return;
}
let mut kvs: Vec<(&keyexpr, Value)> = Vec::with_capacity(sub_kes.len());
for sub_ke in sub_kes {
if sub_ke.intersects(&ADMIN_SPACE_KE_VERSION) {
kvs.push((
&ADMIN_SPACE_KE_VERSION,
Value::String(MqttPlugin::PLUGIN_LONG_VERSION.to_string()),
));
}
if sub_ke.intersects(&ADMIN_SPACE_KE_CONFIG) {
kvs.push((
&ADMIN_SPACE_KE_CONFIG,
serde_json::to_value(config).unwrap(),
));
}
}
for (ke, v) in kvs.drain(..) {
let admin_keyexpr = admin_keyexpr_prefix / ke;
match serde_json::to_vec(&v) {
Ok(bytes) => {
if let Err(e) = query
.reply(admin_keyexpr, ZBytes::from(bytes))
.encoding(Encoding::APPLICATION_JSON)
.wait()
{
tracing::warn!("Error replying to admin query {:?}: {}", query, e);
}
}
Err(err) => {
tracing::error!("Could not Serialize serde_json::Value to ZBytes {}", err)
}
}
}
}
#[derive(Debug)]
struct MqttPluginError {
err: Box<dyn std::error::Error + Send + Sync + 'static>,
}
impl From<ZError> for MqttPluginError {
fn from(e: ZError) -> Self {
MqttPluginError { err: e }
}
}
impl From<std::io::Error> for MqttPluginError {
fn from(e: std::io::Error) -> Self {
MqttPluginError { err: e.into() }
}
}
impl From<rustls::Error> for MqttPluginError {
fn from(e: rustls::Error) -> Self {
MqttPluginError { err: e.into() }
}
}
impl From<String> for MqttPluginError {
fn from(e: String) -> Self {
MqttPluginError { err: e.into() }
}
}
impl std::convert::TryFrom<MqttPluginError> for v5::PublishAck {
type Error = MqttPluginError;
fn try_from(err: MqttPluginError) -> Result<Self, Self::Error> {
Err(err)
}
}
async fn handshake_v3<'a>(
handshake: v3::Handshake,
zsession: Arc<Session>,
config: Arc<Config>,
auth_dictionary: Arc<Option<HashMap<User, Password>>>,
) -> Result<v3::HandshakeAck<MqttSessionState>, MqttPluginError> {
let client_id = handshake.packet().client_id.to_string();
match is_authorized(
(*auth_dictionary).as_ref(),
handshake.packet().username.as_ref(),
handshake.packet().password.as_ref(),
) {
Ok(_) => {
tracing::info!("MQTT client {} connects using v3", client_id);
let session =
MqttSessionState::new(client_id, zsession, config, handshake.sink().into());
Ok(handshake.ack(session, false))
}
Err(err) => {
tracing::warn!(
"MQTT client {} connect using v3 rejected: {}",
client_id,
err
);
Ok(handshake.not_authorized())
}
}
}
async fn publish_v3(
session: v3::Session<MqttSessionState>,
publish: v3::Publish,
) -> Result<(), MqttPluginError> {
session
.state()
.route_mqtt_to_zenoh(publish.topic(), publish.payload())
.await
.map_err(MqttPluginError::from)
}
async fn control_v3(
session: v3::Session<MqttSessionState>,
control: v3::Control<MqttPluginError>,
) -> Result<v3::ControlAck, MqttPluginError> {
tracing::trace!(
"MQTT client {} sent control: {:?}",
session.client_id,
control
);
match control {
v3::Control::Ping(ref msg) => Ok(msg.ack()),
v3::Control::Disconnect(msg) => {
tracing::debug!("MQTT client {} disconnected", session.client_id);
session.sink().close();
Ok(msg.ack())
}
v3::Control::Subscribe(mut msg) => {
for mut s in msg.iter_mut() {
let topic = s.topic().as_str();
tracing::debug!(
"MQTT client {} subscribes to '{}'",
session.client_id,
topic
);
match session.state().map_mqtt_subscription(topic).await {
Ok(()) => s.confirm(v3::QoS::AtMostOnce),
Err(e) => {
tracing::error!("Subscription to '{}' failed: {}", topic, e);
s.fail()
}
}
}
Ok(msg.ack())
}
v3::Control::Unsubscribe(msg) => {
for topic in msg.iter() {
tracing::debug!(
"MQTT client {} unsubscribes from '{}'",
session.client_id,
topic.as_str()
);
}
Ok(msg.ack())
}
v3::Control::WrBackpressure(msg) => {
tracing::debug!(
"MQTT client {} WrBackpressure received: {}",
session.client_id,
msg.enabled()
);
Ok(msg.ack())
}
v3::Control::Closed(msg) => {
tracing::debug!("MQTT client {} closed connection", session.client_id);
session.sink().force_close();
Ok(msg.ack())
}
v3::Control::Error(msg) => {
tracing::warn!(
"MQTT client {} Error received: {}",
session.client_id,
msg.get_ref().err
);
Ok(msg.ack())
}
v3::Control::ProtocolError(ref msg) => {
tracing::warn!(
"MQTT client {}: ProtocolError received: {} => disconnect it",
session.client_id,
msg.get_ref()
);
Ok(control.disconnect())
}
v3::Control::PeerGone(msg) => {
tracing::debug!(
"MQTT client {}: PeerGone => close connection",
session.client_id
);
session.sink().close();
Ok(msg.ack())
}
}
}
async fn handshake_v5<'a>(
handshake: v5::Handshake,
zsession: Arc<Session>,
config: Arc<Config>,
auth_dictionary: Arc<Option<HashMap<User, Password>>>,
) -> Result<v5::HandshakeAck<MqttSessionState>, MqttPluginError> {
let client_id = handshake.packet().client_id.to_string();
match is_authorized(
(*auth_dictionary).as_ref(),
handshake.packet().username.as_ref(),
handshake.packet().password.as_ref(),
) {
Ok(_) => {
tracing::info!("MQTT client {} connects using v5", client_id);
let session =
MqttSessionState::new(client_id, zsession, config, handshake.sink().into());
Ok(handshake.ack(session))
}
Err(err) => {
tracing::warn!(
"MQTT client {} connect using v5 rejected: {}",
client_id,
err
);
Ok(handshake.failed(ntex_mqtt::v5::codec::ConnectAckReason::NotAuthorized))
}
}
}
async fn publish_v5(
session: v5::Session<MqttSessionState>,
publish: v5::Publish,
) -> Result<v5::PublishAck, MqttPluginError> {
session
.state()
.route_mqtt_to_zenoh(publish.topic(), publish.payload())
.await
.map(|()| publish.ack())
.map_err(MqttPluginError::from)
}
async fn control_v5(
session: v5::Session<MqttSessionState>,
control: v5::Control<MqttPluginError>,
) -> Result<v5::ControlAck, MqttPluginError> {
tracing::trace!(
"MQTT client {} sent control: {:?}",
session.client_id,
control
);
use v5::codec::{Disconnect, DisconnectReasonCode};
match control {
v5::Control::Auth(_) => {
tracing::debug!(
"MQTT client {} wants to authenticate... not yet supported!",
session.client_id
);
Ok(control.disconnect_with(Disconnect::new(
DisconnectReasonCode::ImplementationSpecificError,
)))
}
v5::Control::Ping(msg) => Ok(msg.ack()),
v5::Control::Disconnect(msg) => {
tracing::debug!("MQTT client {} disconnected", session.client_id);
session.sink().close();
Ok(msg.ack())
}
v5::Control::Subscribe(mut msg) => {
for mut s in msg.iter_mut() {
let topic = s.topic().as_str();
tracing::debug!(
"MQTT client {} subscribes to '{}'",
session.client_id,
topic
);
match session.state().map_mqtt_subscription(topic).await {
Ok(()) => s.confirm(v5::QoS::AtMostOnce),
Err(e) => {
tracing::error!("Subscription to '{}' failed: {}", topic, e);
s.fail(v5::codec::SubscribeAckReason::ImplementationSpecificError)
}
}
}
Ok(msg.ack())
}
v5::Control::Unsubscribe(msg) => {
for topic in msg.iter() {
tracing::debug!(
"MQTT client {} unsubscribes from '{}'",
session.client_id,
topic.as_str()
);
}
Ok(msg.ack())
}
v5::Control::WrBackpressure(msg) => {
tracing::debug!(
"MQTT client {} WrBackpressure received: {}",
session.client_id,
msg.enabled()
);
Ok(msg.ack())
}
v5::Control::Closed(msg) => {
tracing::debug!("MQTT client {} closed connection", session.client_id);
session.sink().close();
Ok(msg.ack())
}
v5::Control::Error(msg) => {
tracing::warn!(
"MQTT client {} Error received: {}",
session.client_id,
msg.get_ref().err
);
Ok(msg.ack(DisconnectReasonCode::UnspecifiedError))
}
v5::Control::ProtocolError(msg) => {
tracing::warn!(
"MQTT client {}: ProtocolError received: {}",
session.client_id,
msg.get_ref()
);
session.sink().close();
Ok(msg.reason_code(DisconnectReasonCode::ProtocolError).ack())
}
v5::Control::PeerGone(msg) => {
tracing::debug!(
"MQTT client {}: PeerGone => close connection",
session.client_id
);
session.sink().close();
Ok(msg.ack())
}
}
}