mod stream;
use core::fmt;
use crate::{prompt::Data, traits::ExecutorError};
use thiserror;
use tokio::sync::mpsc;
pub use stream::{OutputStream, StreamSegment};
pub use tokio_stream::{Stream, StreamExt};
pub enum Output {
Immediate(Immediate),
Stream(OutputStream),
}
#[derive(Debug, thiserror::Error)]
#[error("Trying to return a stream on an Immediate output")]
pub struct NotAStreamError;
impl Output {
pub async fn to_immediate(self) -> Result<Immediate, ExecutorError> {
match self {
Output::Immediate(x) => Ok(x),
Output::Stream(x) => Ok(Immediate(x.into_data().await?)),
}
}
pub async fn as_stream(self) -> Result<OutputStream, NotAStreamError> {
match self {
Output::Immediate(_) => Err(NotAStreamError),
Output::Stream(x) => Ok(x),
}
}
pub fn new_stream() -> (mpsc::UnboundedSender<StreamSegment>, Self) {
let (sender, stream) = OutputStream::new();
(sender, Output::Stream(stream))
}
pub fn from_stream<S>(stream: S) -> Self
where
S: Stream<Item = StreamSegment> + Send + 'static,
{
Output::Stream(OutputStream::from_stream(stream))
}
pub fn new_immediate(data: Data<String>) -> Self {
Output::Immediate(Immediate(data))
}
}
impl fmt::Display for Output {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Output::Immediate(Immediate(data)) => data.fmt(f),
Output::Stream(_) => write!(f, "<OutputStream>"),
}
}
}
pub struct Immediate(Data<String>);
impl Immediate {
pub fn get_content(&self) -> &Data<String> {
&self.0
}
pub fn as_content(self) -> Data<String> {
self.0
}
pub fn primary_textual_output(&self) -> Option<String> {
self.get_content().extract_last_body().cloned()
}
}
impl fmt::Display for Immediate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}