mpd_utils/
multi_host_client.rsuse crate::error::{Error, Result};
use crate::persistent_client::PersistentClient;
use mpd_client::client::{CommandError, ConnectionEvent};
use mpd_client::responses::{PlayState, SongInQueue, Status};
use mpd_client::Client;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::error::RecvError;
pub struct MultiHostClient {
clients: Vec<PersistentClient>,
}
impl MultiHostClient {
pub fn new(hosts: Vec<String>, retry_interval: Duration) -> Self {
let hosts = hosts
.into_iter()
.map(|host| PersistentClient::new(host, retry_interval))
.collect();
Self { clients: hosts }
}
pub fn init(&self) {
for client in &self.clients {
client.init();
}
}
pub async fn wait_for_any_client(&self) -> Arc<Client> {
let waits = self
.clients
.iter()
.map(|client| Box::pin(client.wait_for_client()));
futures::future::select_all(waits).await.0
}
pub async fn wait_for_all_clients(&self) -> Vec<Arc<Client>> {
let waits = self.clients.iter().map(|client| client.wait_for_client());
futures::future::join_all(waits).await
}
async fn get_current_client(
&self,
) -> std::result::Result<Option<&PersistentClient>, CommandError> {
self.wait_for_any_client().await;
let connected_clients = self
.clients
.iter()
.filter(|client| client.is_connected())
.collect::<Vec<_>>();
if connected_clients.is_empty() {
Ok(None)
} else {
let player_states = connected_clients.iter().map(|&client| async move {
client.status().await.map(|status| (client, status.state))
});
let player_states = futures::future::join_all(player_states)
.await
.into_iter()
.collect::<std::result::Result<Vec<_>, _>>();
player_states.map(|player_states| {
player_states
.iter()
.find(|(_, state)| state == &PlayState::Playing)
.or_else(|| {
player_states
.iter()
.find(|(_, state)| state == &PlayState::Paused)
})
.or_else(|| {
player_states
.iter()
.find(|(_, state)| state == &PlayState::Stopped)
})
.map(|(client, _)| *client)
})
}
}
pub async fn with_client<F, Fut, T>(&self, f: F) -> Result<T>
where
F: FnOnce(Arc<Client>) -> Fut,
Fut: Future<Output = T>,
{
let client = self.get_current_client().await;
match client {
Ok(Some(client)) => Ok(client.with_client(f).await),
Ok(None) => Err(Error::NoHostConnectedError),
Err(err) => Err(Error::CommandError(err)),
}
}
pub async fn recv(&mut self) -> std::result::Result<Arc<ConnectionEvent>, RecvError> {
let waits = self
.clients
.iter_mut()
.map(|client| Box::pin(client.recv()));
futures::future::select_all(waits).await.0
}
pub async fn status(&self) -> Result<Status> {
let client = self.get_current_client().await;
match client {
Ok(Some(client)) => client.status().await.map_err(Error::CommandError),
Ok(None) => Err(Error::NoHostConnectedError),
Err(err) => Err(Error::CommandError(err)),
}
}
pub async fn current_song(&self) -> Result<Option<SongInQueue>> {
match self.get_current_client().await {
Ok(Some(client)) => client.current_song().await.map_err(Error::CommandError),
Ok(None) => Err(Error::NoHostConnectedError),
Err(err) => Err(Error::CommandError(err)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test() {
let client = MultiHostClient::new(
vec!["localhost:6600".into_string(), "chloe:6600".into_string()],
Duration::from_secs(5),
);
client.init();
client.wait_for_all_clients().await;
let current_client = client.get_current_client().await;
println!("{current_client:?}");
}
}