use std::{borrow::Cow, fmt, fmt::Debug, sync::Arc};
use futures_util::{
future::BoxFuture, stream::BoxStream, Future, FutureExt, Stream, StreamExt, TryStreamExt,
};
use indexmap::IndexMap;
use crate::{
dynamic::{
resolve::resolve, FieldValue, InputValue, ObjectAccessor, ResolverContext, Schema,
SchemaError, TypeRef,
},
extensions::ResolveInfo,
parser::types::Selection,
registry::{Deprecation, MetaField, MetaType, Registry},
subscription::BoxFieldStream,
ContextSelectionSet, Data, Name, QueryPathNode, QueryPathSegment, Response, Result,
ServerResult, Value,
};
type BoxResolveFut<'a> = BoxFuture<'a, Result<BoxStream<'a, Result<FieldValue<'a>>>>>;
pub struct SubscriptionFieldFuture<'a>(pub(crate) BoxResolveFut<'a>);
impl<'a> SubscriptionFieldFuture<'a> {
pub fn new<Fut, S, T>(future: Fut) -> Self
where
Fut: Future<Output = Result<S>> + Send + 'a,
S: Stream<Item = Result<T>> + Send + 'a,
T: Into<FieldValue<'a>> + Send + 'a,
{
Self(
async move {
let res = future.await?.map_ok(Into::into);
Ok(res.boxed())
}
.boxed(),
)
}
}
type BoxResolverFn =
Arc<(dyn for<'a> Fn(ResolverContext<'a>) -> SubscriptionFieldFuture<'a> + Send + Sync)>;
pub struct SubscriptionField {
pub(crate) name: String,
pub(crate) description: Option<String>,
pub(crate) arguments: IndexMap<String, InputValue>,
pub(crate) ty: TypeRef,
pub(crate) resolver_fn: BoxResolverFn,
pub(crate) deprecation: Deprecation,
}
impl SubscriptionField {
pub fn new<N, T, F>(name: N, ty: T, resolver_fn: F) -> Self
where
N: Into<String>,
T: Into<TypeRef>,
F: for<'a> Fn(ResolverContext<'a>) -> SubscriptionFieldFuture<'a> + Send + Sync + 'static,
{
Self {
name: name.into(),
description: None,
arguments: Default::default(),
ty: ty.into(),
resolver_fn: Arc::new(resolver_fn),
deprecation: Deprecation::NoDeprecated,
}
}
impl_set_description!();
impl_set_deprecation!();
#[inline]
pub fn argument(mut self, input_value: InputValue) -> Self {
self.arguments.insert(input_value.name.clone(), input_value);
self
}
}
impl Debug for SubscriptionField {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Field")
.field("name", &self.name)
.field("description", &self.description)
.field("arguments", &self.arguments)
.field("ty", &self.ty)
.field("deprecation", &self.deprecation)
.finish()
}
}
#[derive(Debug)]
pub struct Subscription {
pub(crate) name: String,
pub(crate) description: Option<String>,
pub(crate) fields: IndexMap<String, SubscriptionField>,
}
impl Subscription {
#[inline]
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
description: None,
fields: Default::default(),
}
}
impl_set_description!();
#[inline]
pub fn field(mut self, field: SubscriptionField) -> Self {
assert!(
!self.fields.contains_key(&field.name),
"Field `{}` already exists",
field.name
);
self.fields.insert(field.name.clone(), field);
self
}
#[inline]
pub fn type_name(&self) -> &str {
&self.name
}
pub(crate) fn register(&self, registry: &mut Registry) -> Result<(), SchemaError> {
let mut fields = IndexMap::new();
for field in self.fields.values() {
let mut args = IndexMap::new();
for argument in field.arguments.values() {
args.insert(argument.name.clone(), argument.to_meta_input_value());
}
fields.insert(
field.name.clone(),
MetaField {
name: field.name.clone(),
description: field.description.clone(),
args,
ty: field.ty.to_string(),
deprecation: field.deprecation.clone(),
cache_control: Default::default(),
external: false,
requires: None,
provides: None,
visible: None,
shareable: false,
inaccessible: false,
tags: vec![],
override_from: None,
compute_complexity: None,
directive_invocations: vec![],
},
);
}
registry.types.insert(
self.name.clone(),
MetaType::Object {
name: self.name.clone(),
description: self.description.clone(),
fields,
cache_control: Default::default(),
extends: false,
shareable: false,
resolvable: true,
keys: None,
visible: None,
inaccessible: false,
interface_object: false,
tags: vec![],
is_subscription: true,
rust_typename: None,
directive_invocations: vec![],
},
);
Ok(())
}
pub(crate) fn collect_streams<'a>(
&self,
schema: &Schema,
ctx: &ContextSelectionSet<'a>,
streams: &mut Vec<BoxFieldStream<'a>>,
root_value: &'a FieldValue<'static>,
) {
for selection in &ctx.item.node.items {
if let Selection::Field(field) = &selection.node {
if let Some(field_def) = self.fields.get(field.node.name.node.as_str()) {
let schema = schema.clone();
let field_type = field_def.ty.clone();
let resolver_fn = field_def.resolver_fn.clone();
let ctx = ctx.clone();
streams.push(
async_stream::try_stream! {
let ctx_field = ctx.with_field(field);
let field_name = ctx_field.item.node.response_key().node.clone();
let arguments = ObjectAccessor(Cow::Owned(
field
.node
.arguments
.iter()
.map(|(name, value)| {
ctx_field
.resolve_input_value(value.clone())
.map(|value| (name.node.clone(), value))
})
.collect::<ServerResult<IndexMap<Name, Value>>>()?,
));
let mut stream = resolver_fn(ResolverContext {
ctx: &ctx_field,
args: arguments,
parent_value: root_value,
})
.0
.await
.map_err(|err| ctx_field.set_error_path(err.into_server_error(ctx_field.item.pos)))?;
while let Some(value) = stream.next().await.transpose().map_err(|err| ctx_field.set_error_path(err.into_server_error(ctx_field.item.pos)))? {
let f = |execute_data: Option<Data>| {
let schema = schema.clone();
let field_name = field_name.clone();
let field_type = field_type.clone();
let ctx_field = ctx_field.clone();
async move {
let mut ctx_field = ctx_field.clone();
ctx_field.execute_data = execute_data.as_ref();
let ri = ResolveInfo {
path_node: &QueryPathNode {
parent: None,
segment: QueryPathSegment::Name(&field_name),
},
parent_type: schema.0.env.registry.subscription_type.as_ref().unwrap(),
return_type: &field_type.to_string(),
name: field.node.name.node.as_str(),
alias: field.node.alias.as_ref().map(|alias| alias.node.as_str()),
is_for_introspection: false,
field: &field.node,
};
let resolve_fut = resolve(&schema, &ctx_field, &field_type, Some(&value));
futures_util::pin_mut!(resolve_fut);
let value = ctx_field.query_env.extensions.resolve(ri, &mut resolve_fut).await;
match value {
Ok(value) => {
let mut map = IndexMap::new();
map.insert(field_name.clone(), value.unwrap_or_default());
Response::new(Value::Object(map))
},
Err(err) => Response::from_errors(vec![err]),
}
}
};
let resp = ctx_field.query_env.extensions.execute(ctx_field.query_env.operation_name.as_deref(), f).await;
let is_err = !resp.errors.is_empty();
yield resp;
if is_err {
break;
}
}
}.map(|res| {
res.unwrap_or_else(|err| Response::from_errors(vec![err]))
})
.boxed(),
);
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use futures_util::StreamExt;
use crate::{dynamic::*, value, Value};
#[tokio::test]
async fn subscription() {
struct MyObjData {
value: i32,
}
let my_obj = Object::new("MyObject").field(Field::new(
"value",
TypeRef::named_nn(TypeRef::INT),
|ctx| {
FieldFuture::new(async {
Ok(Some(Value::from(
ctx.parent_value.try_downcast_ref::<MyObjData>()?.value,
)))
})
},
));
let query = Object::new("Query").field(Field::new(
"value",
TypeRef::named_nn(TypeRef::INT),
|_| FieldFuture::new(async { Ok(FieldValue::none()) }),
));
let subscription = Subscription::new("Subscription").field(SubscriptionField::new(
"obj",
TypeRef::named_nn(my_obj.type_name()),
|_| {
SubscriptionFieldFuture::new(async {
Ok(async_stream::try_stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(100)).await;
yield FieldValue::owned_any(MyObjData { value: i });
}
})
})
},
));
let schema = Schema::build(query.type_name(), None, Some(subscription.type_name()))
.register(my_obj)
.register(query)
.register(subscription)
.finish()
.unwrap();
let mut stream = schema.execute_stream("subscription { obj { value } }");
for i in 0..10 {
assert_eq!(
stream.next().await.unwrap().into_result().unwrap().data,
value!({
"obj": { "value": i }
})
);
}
}
#[tokio::test]
async fn borrow_context() {
struct State {
value: i32,
}
let query =
Object::new("Query").field(Field::new("value", TypeRef::named(TypeRef::INT), |_| {
FieldFuture::new(async { Ok(FieldValue::NONE) })
}));
let subscription = Subscription::new("Subscription").field(SubscriptionField::new(
"values",
TypeRef::named_nn(TypeRef::INT),
|ctx| {
SubscriptionFieldFuture::new(async move {
Ok(async_stream::try_stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(100)).await;
yield FieldValue::value(ctx.data_unchecked::<State>().value + i);
}
})
})
},
));
let schema = Schema::build("Query", None, Some(subscription.type_name()))
.register(query)
.register(subscription)
.data(State { value: 123 })
.finish()
.unwrap();
let mut stream = schema.execute_stream("subscription { values }");
for i in 0..10 {
assert_eq!(
stream.next().await.unwrap().into_result().unwrap().data,
value!({ "values": i + 123 })
);
}
}
}