Trait grafana_plugin_sdk::backend::StreamService
source · pub trait StreamService {
type JsonValue: Serialize;
type Error: Error;
type Stream: Stream<Item = Result<StreamPacket<Self::JsonValue>, Self::Error>> + Send;
// Required methods
fn subscribe_stream<'life0, 'async_trait>(
&'life0 self,
request: SubscribeStreamRequest
) -> Pin<Box<dyn Future<Output = Result<SubscribeStreamResponse, Self::Error>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn run_stream<'life0, 'async_trait>(
&'life0 self,
request: RunStreamRequest
) -> Pin<Box<dyn Future<Output = Result<Self::Stream, Self::Error>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn publish_stream<'life0, 'async_trait>(
&'life0 self,
request: PublishStreamRequest
) -> Pin<Box<dyn Future<Output = Result<PublishStreamResponse, Self::Error>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
}
Expand description
Trait for plugins that wish to provide uni- or bi-directional streaming.
Example
use std::{sync::Arc, time::Duration};
use grafana_plugin_sdk::{backend, data, prelude::*};
use thiserror::Error;
use tokio::sync::RwLock;
use tokio_stream::StreamExt;
use tracing::{debug, info};
struct MyPlugin;
#[derive(Debug, Error)]
#[error("Error streaming data")]
struct StreamError;
impl From<data::Error> for StreamError {
fn from(_other: data::Error) -> StreamError {
StreamError
}
}
impl From<backend::ConvertToError> for StreamError {
fn from(_other: backend::ConvertToError) -> StreamError {
StreamError
}
}
#[backend::async_trait]
impl backend::StreamService for MyPlugin {
/// The type of JSON value we might return in our `initial_data`.
///
/// If we're not returning JSON we can just use `()`.
type JsonValue = ();
/// Handle a request to subscribe to a stream.
///
/// Here we just check that the path matches some fixed value
/// and return `NotFound` if not.
async fn subscribe_stream(
&self,
request: backend::SubscribeStreamRequest,
) -> Result<backend::SubscribeStreamResponse, Self::Error> {
let response = if request.path.as_str() == "stream" {
backend::SubscribeStreamResponse::ok(None)
} else {
backend::SubscribeStreamResponse::not_found()
};
info!(path = %request.path, "Subscribing to stream");
Ok(response)
}
type Error = StreamError;
type Stream = backend::BoxRunStream<Self::Error>;
/// Begin streaming data for a request.
///
/// This example just creates an in-memory `Frame` in each loop iteration,
/// sends an updated version of the frame once per second, and updates a loop variable
/// so that each frame is different.
async fn run_stream(&self, _request: backend::RunStreamRequest) -> Result<Self::Stream, Self::Error> {
info!("Running stream");
let mut x = 0u32;
let n = 3;
let mut frame = data::Frame::new("foo").with_field((x..x + n).into_field("x"));
Ok(Box::pin(
async_stream::try_stream! {
loop {
frame.fields_mut()[0].set_values(x..x + n);
let packet = backend::StreamPacket::from_frame(frame.check()?)?;
debug!("Yielding frame from {} to {}", x, x + n);
yield packet;
x += n;
}
}
.throttle(Duration::from_secs(1)),
))
}
/// Handle a request to publish data to a stream.
///
/// Currently unimplemented in this example, but the functionality _should_ work.
async fn publish_stream(
&self,
_request: backend::PublishStreamRequest,
) -> Result<backend::PublishStreamResponse, Self::Error> {
info!("Publishing to stream");
todo!()
}
}
Required Associated Types§
sourcetype JsonValue: Serialize
type JsonValue: Serialize
The type of JSON values returned by this stream service.
Each StreamPacket
can return either a data::Frame
or some arbitary JSON. This
associated type allows the JSON value to be statically typed, if desired.
If the implementation does not intend to return JSON variants, this
can be set to ()
. If the structure of the returned JSON is not statically known, this
should be set to serde_json::Value
.
sourcetype Stream: Stream<Item = Result<StreamPacket<Self::JsonValue>, Self::Error>> + Send
type Stream: Stream<Item = Result<StreamPacket<Self::JsonValue>, Self::Error>> + Send
The type of stream returned by run_stream
.
This will generally be impossible to name directly, so returning the
BoxRunStream
type alias will probably be more convenient.
Required Methods§
sourcefn subscribe_stream<'life0, 'async_trait>(
&'life0 self,
request: SubscribeStreamRequest
) -> Pin<Box<dyn Future<Output = Result<SubscribeStreamResponse, Self::Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn subscribe_stream<'life0, 'async_trait>(
&'life0 self,
request: SubscribeStreamRequest
) -> Pin<Box<dyn Future<Output = Result<SubscribeStreamResponse, Self::Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Handle requests to begin a subscription to a plugin or datasource managed channel path.
This function is called for every subscriber to a stream. Implementations should check the subscribe permissions of the incoming request, and can choose to return some initial data to prepopulate the stream.
run_stream
will generally be called shortly after returning a response with
SubscribeStreamStatus::Ok
; this is responsible for streaming any data after
the initial_data
.
sourcefn run_stream<'life0, 'async_trait>(
&'life0 self,
request: RunStreamRequest
) -> Pin<Box<dyn Future<Output = Result<Self::Stream, Self::Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn run_stream<'life0, 'async_trait>(
&'life0 self,
request: RunStreamRequest
) -> Pin<Box<dyn Future<Output = Result<Self::Stream, Self::Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Begin sending stream packets to a client.
This will only be called once per channel, shortly after the first successful subscription
to that channel by the first client (after subscribe_stream
returns a response with
SubscribeStreamStatus::Ok
for a specific Channel
).
Grafana will then multiplex the returned stream to any future subscribers.
When Grafana detects that there are no longer any subscribers to a channel, the stream will be terminated until the next active subscriber appears. Stream termination can may be slightly delayed, generally by a few seconds.
sourcefn publish_stream<'life0, 'async_trait>(
&'life0 self,
request: PublishStreamRequest
) -> Pin<Box<dyn Future<Output = Result<PublishStreamResponse, Self::Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn publish_stream<'life0, 'async_trait>(
&'life0 self,
request: PublishStreamRequest
) -> Pin<Box<dyn Future<Output = Result<PublishStreamResponse, Self::Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Handle requests to publish to a plugin or datasource managed channel path (currently unimplemented).
Implementations should check the publish permissions of the incoming request.