pub trait StreamService {
    type JsonValue: Serialize;
    type Error: Error;
    type Stream: Stream<Item = Result<StreamPacket<Self::JsonValue>, Self::Error>> + Send;

    // Required methods
    fn subscribe_stream<'life0, 'async_trait>(
        &'life0 self,
        request: SubscribeStreamRequest
    ) -> Pin<Box<dyn Future<Output = Result<SubscribeStreamResponse, Self::Error>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn run_stream<'life0, 'async_trait>(
        &'life0 self,
        request: RunStreamRequest
    ) -> Pin<Box<dyn Future<Output = Result<Self::Stream, Self::Error>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn publish_stream<'life0, 'async_trait>(
        &'life0 self,
        request: PublishStreamRequest
    ) -> Pin<Box<dyn Future<Output = Result<PublishStreamResponse, Self::Error>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
}
Expand description

Trait for plugins that wish to provide uni- or bi-directional streaming.

Example

use std::{sync::Arc, time::Duration};

use grafana_plugin_sdk::{backend, data, prelude::*};
use thiserror::Error;
use tokio::sync::RwLock;
use tokio_stream::StreamExt;
use tracing::{debug, info};

struct MyPlugin;

#[derive(Debug, Error)]
#[error("Error streaming data")]
struct StreamError;

impl From<data::Error> for StreamError {
    fn from(_other: data::Error) -> StreamError {
        StreamError
    }
}

impl From<backend::ConvertToError> for StreamError {
    fn from(_other: backend::ConvertToError) -> StreamError {
        StreamError
    }
}

#[backend::async_trait]
impl backend::StreamService for MyPlugin {
    /// The type of JSON value we might return in our `initial_data`.
    ///
    /// If we're not returning JSON we can just use `()`.
    type JsonValue = ();

    /// Handle a request to subscribe to a stream.
    ///
    /// Here we just check that the path matches some fixed value
    /// and return `NotFound` if not.
    async fn subscribe_stream(
        &self,
        request: backend::SubscribeStreamRequest,
    ) -> Result<backend::SubscribeStreamResponse, Self::Error> {
        let response = if request.path.as_str() == "stream" {
            backend::SubscribeStreamResponse::ok(None)
        } else {
            backend::SubscribeStreamResponse::not_found()
        };
        info!(path = %request.path, "Subscribing to stream");
        Ok(response)
    }

    type Error = StreamError;
    type Stream = backend::BoxRunStream<Self::Error>;

    /// Begin streaming data for a request.
    ///
    /// This example just creates an in-memory `Frame` in each loop iteration,
    /// sends an updated version of the frame once per second, and updates a loop variable
    /// so that each frame is different.
    async fn run_stream(&self, _request: backend::RunStreamRequest) -> Result<Self::Stream, Self::Error> {
        info!("Running stream");
        let mut x = 0u32;
        let n = 3;
        let mut frame = data::Frame::new("foo").with_field((x..x + n).into_field("x"));
        Ok(Box::pin(
            async_stream::try_stream! {
                loop {
                    frame.fields_mut()[0].set_values(x..x + n);
                    let packet = backend::StreamPacket::from_frame(frame.check()?)?;
                    debug!("Yielding frame from {} to {}", x, x + n);
                    yield packet;
                    x += n;
                }
            }
            .throttle(Duration::from_secs(1)),
        ))
    }

    /// Handle a request to publish data to a stream.
    ///
    /// Currently unimplemented in this example, but the functionality _should_ work.
    async fn publish_stream(
        &self,
        _request: backend::PublishStreamRequest,
    ) -> Result<backend::PublishStreamResponse, Self::Error> {
        info!("Publishing to stream");
        todo!()
    }
}

Required Associated Types§

source

type JsonValue: Serialize

The type of JSON values returned by this stream service.

Each StreamPacket can return either a data::Frame or some arbitary JSON. This associated type allows the JSON value to be statically typed, if desired.

If the implementation does not intend to return JSON variants, this can be set to (). If the structure of the returned JSON is not statically known, this should be set to serde_json::Value.

source

type Error: Error

The type of error that can occur while fetching a stream packet.

source

type Stream: Stream<Item = Result<StreamPacket<Self::JsonValue>, Self::Error>> + Send

The type of stream returned by run_stream.

This will generally be impossible to name directly, so returning the BoxRunStream type alias will probably be more convenient.

Required Methods§

source

fn subscribe_stream<'life0, 'async_trait>( &'life0 self, request: SubscribeStreamRequest ) -> Pin<Box<dyn Future<Output = Result<SubscribeStreamResponse, Self::Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Handle requests to begin a subscription to a plugin or datasource managed channel path.

This function is called for every subscriber to a stream. Implementations should check the subscribe permissions of the incoming request, and can choose to return some initial data to prepopulate the stream.

run_stream will generally be called shortly after returning a response with SubscribeStreamStatus::Ok; this is responsible for streaming any data after the initial_data.

source

fn run_stream<'life0, 'async_trait>( &'life0 self, request: RunStreamRequest ) -> Pin<Box<dyn Future<Output = Result<Self::Stream, Self::Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Begin sending stream packets to a client.

This will only be called once per channel, shortly after the first successful subscription to that channel by the first client (after subscribe_stream returns a response with SubscribeStreamStatus::Ok for a specific Channel). Grafana will then multiplex the returned stream to any future subscribers.

When Grafana detects that there are no longer any subscribers to a channel, the stream will be terminated until the next active subscriber appears. Stream termination can may be slightly delayed, generally by a few seconds.

source

fn publish_stream<'life0, 'async_trait>( &'life0 self, request: PublishStreamRequest ) -> Pin<Box<dyn Future<Output = Result<PublishStreamResponse, Self::Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Handle requests to publish to a plugin or datasource managed channel path (currently unimplemented).

Implementations should check the publish permissions of the incoming request.

Implementors§