use crate::codec::{JsonCodec, JsonRpcCodec};
pub use anyhow::anyhow;
use anyhow::{Context, Result};
use futures::sink::SinkExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
extern crate log;
use log::trace;
use messages::{Configuration, FeatureBits, NotificationTopic};
use options::{OptionType, UntypedConfigOption};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::Mutex;
use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;
use tokio_util::codec::FramedWrite;
mod codec;
mod logging;
pub mod messages;
#[macro_use]
extern crate serde_json;
pub mod options;
pub type Error = anyhow::Error;
pub struct Builder<S, I, O>
where
I: AsyncRead + Unpin,
O: Send + AsyncWrite + Unpin,
S: Clone + Send,
{
input: Option<I>,
output: Option<O>,
hooks: HashMap<String, Hook<S>>,
options: HashMap<String, UntypedConfigOption>,
option_values: HashMap<String, Option<options::Value>>,
rpcmethods: HashMap<String, RpcMethod<S>>,
setconfig_callback: Option<AsyncCallback<S>>,
subscriptions: HashMap<String, Subscription<S>>,
wildcard_subscription: Option<Subscription<S>>,
notifications: Vec<NotificationTopic>,
custommessages: Vec<u16>,
featurebits: FeatureBits,
dynamic: bool,
logging: bool,
}
pub struct ConfiguredPlugin<S, I, O>
where
S: Clone + Send,
{
init_id: serde_json::Value,
input: FramedRead<I, JsonRpcCodec>,
output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
options: HashMap<String, UntypedConfigOption>,
option_values: HashMap<String, Option<options::Value>>,
configuration: Configuration,
rpcmethods: HashMap<String, AsyncCallback<S>>,
setconfig_callback: Option<AsyncCallback<S>>,
hooks: HashMap<String, AsyncCallback<S>>,
subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
wildcard_subscription: Option<AsyncNotificationCallback<S>>,
#[allow(dead_code)] notifications: Vec<NotificationTopic>,
}
struct PluginDriver<S>
where
S: Send + Clone,
{
plugin: Plugin<S>,
rpcmethods: HashMap<String, AsyncCallback<S>>,
setconfig_callback: Option<AsyncCallback<S>>,
#[allow(dead_code)] hooks: HashMap<String, AsyncCallback<S>>,
subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
wildcard_subscription: Option<AsyncNotificationCallback<S>>,
}
#[derive(Clone)]
pub struct Plugin<S>
where
S: Clone + Send,
{
state: S,
options: HashMap<String, UntypedConfigOption>,
option_values: Arc<std::sync::Mutex<HashMap<String, Option<options::Value>>>>,
configuration: Configuration,
wait_handle: tokio::sync::broadcast::Sender<()>,
sender: tokio::sync::mpsc::Sender<serde_json::Value>,
}
impl<S, I, O> Builder<S, I, O>
where
O: Send + AsyncWrite + Unpin + 'static,
S: Clone + Sync + Send + 'static,
I: AsyncRead + Send + Unpin + 'static,
{
pub fn new(input: I, output: O) -> Self {
Self {
input: Some(input),
output: Some(output),
hooks: HashMap::new(),
subscriptions: HashMap::new(),
wildcard_subscription: None,
options: HashMap::new(),
option_values: HashMap::new(),
rpcmethods: HashMap::new(),
setconfig_callback: None,
notifications: vec![],
featurebits: FeatureBits::default(),
dynamic: false,
custommessages: vec![],
logging: true,
}
}
pub fn option<'a, V: options::OptionType<'a>>(
mut self,
opt: options::ConfigOption<'a, V>,
) -> Builder<S, I, O> {
self.options.insert(opt.name().to_string(), opt.build());
self
}
pub fn notification(mut self, notif: messages::NotificationTopic) -> Builder<S, I, O> {
self.notifications.push(notif);
self
}
pub fn subscribe<C, F>(mut self, topic: &str, callback: C) -> Builder<S, I, O>
where
C: Send + Sync + 'static,
C: Fn(Plugin<S>, Request) -> F + 'static,
F: Future<Output = Result<(), Error>> + Send + 'static,
{
let subscription = Subscription {
callback: Box::new(move |p, r| Box::pin(callback(p, r))),
};
if topic == "*" {
self.wildcard_subscription = Some(subscription);
} else {
self.subscriptions.insert(topic.to_string(), subscription);
};
self
}
pub fn hook<C, F>(mut self, hookname: &str, callback: C) -> Self
where
C: Send + Sync + 'static,
C: Fn(Plugin<S>, Request) -> F + 'static,
F: Future<Output = Response> + Send + 'static,
{
self.hooks.insert(
hookname.to_string(),
Hook {
callback: Box::new(move |p, r| Box::pin(callback(p, r))),
},
);
self
}
pub fn rpcmethod<C, F>(mut self, name: &str, description: &str, callback: C) -> Builder<S, I, O>
where
C: Send + Sync + 'static,
C: Fn(Plugin<S>, Request) -> F + 'static,
F: Future<Output = Response> + Send + 'static,
{
self.rpcmethods.insert(
name.to_string(),
RpcMethod {
name: name.to_string(),
description: description.to_string(),
usage: String::default(),
callback: Box::new(move |p, r| Box::pin(callback(p, r))),
},
);
self
}
pub fn rpcmethod_from_builder(mut self, rpc_method: RpcMethodBuilder<S>) -> Builder<S, I, O> {
self.rpcmethods
.insert(rpc_method.name.to_string(), rpc_method.build());
self
}
pub fn setconfig_callback<C, F>(mut self, setconfig_callback: C) -> Builder<S, I, O>
where
C: Send + Sync + 'static,
C: Fn(Plugin<S>, Request) -> F + 'static,
F: Future<Output = Response> + Send + 'static,
{
self.setconfig_callback = Some(Box::new(move |p, r| Box::pin(setconfig_callback(p, r))));
self
}
pub fn dynamic(mut self) -> Builder<S, I, O> {
self.dynamic = true;
self
}
pub fn featurebits(mut self, kind: FeatureBitsKind, hex: String) -> Self {
match kind {
FeatureBitsKind::Node => self.featurebits.node = Some(hex),
FeatureBitsKind::Channel => self.featurebits.channel = Some(hex),
FeatureBitsKind::Init => self.featurebits.init = Some(hex),
FeatureBitsKind::Invoice => self.featurebits.invoice = Some(hex),
}
self
}
pub fn with_logging(mut self, log: bool) -> Builder<S, I, O> {
self.logging = log;
self
}
pub fn custommessages(mut self, custommessages: Vec<u16>) -> Self {
self.custommessages = custommessages;
self
}
pub async fn configure(mut self) -> Result<Option<ConfiguredPlugin<S, I, O>>, anyhow::Error> {
let mut input = FramedRead::new(self.input.take().unwrap(), JsonRpcCodec::default());
let output = Arc::new(Mutex::new(FramedWrite::new(
self.output.take().unwrap(),
JsonCodec::default(),
)));
if self.logging {
crate::logging::init(output.clone()).await?;
trace!("Plugin logging initialized");
}
match input.next().await {
Some(Ok(messages::JsonRpc::Request(id, messages::Request::Getmanifest(m)))) => {
output
.lock()
.await
.send(json!({
"jsonrpc": "2.0",
"result": self.handle_get_manifest(m),
"id": id,
}))
.await?
}
Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
None => {
return Err(anyhow!(
"Lost connection to lightning expecting getmanifest"
))
}
};
let (init_id, configuration) = match input.next().await {
Some(Ok(messages::JsonRpc::Request(id, messages::Request::Init(m)))) => {
(id, self.handle_init(m)?)
}
Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
None => {
return Ok(None);
}
};
let mut rpcmethods: HashMap<String, AsyncCallback<S>> =
HashMap::from_iter(self.rpcmethods.drain().map(|(k, v)| (k, v.callback)));
rpcmethods.extend(self.hooks.drain().map(|(k, v)| (k, v.callback)));
let subscriptions =
HashMap::from_iter(self.subscriptions.drain().map(|(k, v)| (k, v.callback)));
let all_subscription = self.wildcard_subscription.map(|s| s.callback);
Ok(Some(ConfiguredPlugin {
init_id,
input,
output,
rpcmethods,
setconfig_callback: self.setconfig_callback,
notifications: self.notifications,
subscriptions,
wildcard_subscription: all_subscription,
options: self.options,
option_values: self.option_values,
configuration,
hooks: HashMap::new(),
}))
}
pub async fn start(self, state: S) -> Result<Option<Plugin<S>>, anyhow::Error> {
if let Some(cp) = self.configure().await? {
Ok(Some(cp.start(state).await?))
} else {
Ok(None)
}
}
fn handle_get_manifest(
&mut self,
_call: messages::GetManifestCall,
) -> messages::GetManifestResponse {
let rpcmethods: Vec<_> = self
.rpcmethods
.values()
.map(|v| messages::RpcMethod {
name: v.name.clone(),
description: v.description.clone(),
usage: v.usage.clone(),
})
.collect();
let subscriptions = self
.subscriptions
.keys()
.map(|s| s.clone())
.chain(self.wildcard_subscription.iter().map(|_| String::from("*")))
.collect();
messages::GetManifestResponse {
options: self.options.values().cloned().collect(),
subscriptions,
hooks: self.hooks.keys().map(|s| s.clone()).collect(),
rpcmethods,
notifications: self.notifications.clone(),
featurebits: self.featurebits.clone(),
dynamic: self.dynamic,
nonnumericids: true,
custommessages: self.custommessages.clone(),
}
}
fn handle_init(&mut self, call: messages::InitCall) -> Result<Configuration, Error> {
use options::Value as OValue;
use serde_json::Value as JValue;
for (name, option) in self.options.iter() {
let json_value = call.options.get(name);
let default_value = option.default();
let option_value: Option<options::Value> = match (json_value, default_value) {
(None, None) => None,
(None, Some(default)) => Some(default.clone()),
(Some(JValue::Array(a)), _) => match a.first() {
Some(JValue::String(_)) => Some(OValue::StringArray(
a.iter().map(|x| x.as_str().unwrap().to_string()).collect(),
)),
Some(JValue::Number(_)) => Some(OValue::IntegerArray(
a.iter().map(|x| x.as_i64().unwrap()).collect(),
)),
_ => panic!("Array type not supported for option: {}", name),
},
(Some(JValue::String(s)), _) => Some(OValue::String(s.to_string())),
(Some(JValue::Number(i)), _) => Some(OValue::Integer(i.as_i64().unwrap())),
(Some(JValue::Bool(b)), _) => Some(OValue::Boolean(*b)),
_ => panic!("Type mismatch for option {}", name),
};
self.option_values.insert(name.to_string(), option_value);
}
Ok(call.configuration)
}
}
impl<S> RpcMethodBuilder<S>
where
S: Send + Clone,
{
pub fn new<C, F>(name: &str, callback: C) -> Self
where
C: Send + Sync + 'static,
C: Fn(Plugin<S>, Request) -> F + 'static,
F: Future<Output = Response> + Send + 'static,
{
Self {
name: name.to_string(),
callback: Box::new(move |p, r| Box::pin(callback(p, r))),
usage: None,
description: None,
}
}
pub fn description(mut self, description: &str) -> Self {
self.description = Some(description.to_string());
self
}
pub fn usage(mut self, usage: &str) -> Self {
self.usage = Some(usage.to_string());
self
}
fn build(self) -> RpcMethod<S> {
RpcMethod {
callback: self.callback,
name: self.name,
description: self.description.unwrap_or_default(),
usage: self.usage.unwrap_or_default(),
}
}
}
type Request = serde_json::Value;
type Response = Result<serde_json::Value, Error>;
type AsyncCallback<S> =
Box<dyn Fn(Plugin<S>, Request) -> Pin<Box<dyn Future<Output = Response> + Send>> + Send + Sync>;
type AsyncNotificationCallback<S> = Box<
dyn Fn(Plugin<S>, Request) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
+ Send
+ Sync,
>;
struct RpcMethod<S>
where
S: Clone + Send,
{
callback: AsyncCallback<S>,
description: String,
name: String,
usage: String,
}
pub struct RpcMethodBuilder<S>
where
S: Clone + Send,
{
callback: AsyncCallback<S>,
name: String,
description: Option<String>,
usage: Option<String>,
}
struct Subscription<S>
where
S: Clone + Send,
{
callback: AsyncNotificationCallback<S>,
}
struct Hook<S>
where
S: Clone + Send,
{
callback: AsyncCallback<S>,
}
impl<S> Plugin<S>
where
S: Clone + Send,
{
pub fn option_str(&self, name: &str) -> Result<Option<options::Value>> {
self.option_values
.lock()
.unwrap()
.get(name)
.ok_or(anyhow!("No option named {}", name))
.cloned()
}
pub fn option<'a, OV: OptionType<'a>>(
&self,
config_option: &options::ConfigOption<'a, OV>,
) -> Result<OV::OutputValue> {
let value = self.option_str(config_option.name())?;
Ok(OV::from_value(&value))
}
pub fn set_option_str(&self, name: &str, value: options::Value) -> Result<()> {
*self
.option_values
.lock()
.unwrap()
.get_mut(name)
.ok_or(anyhow!("No option named {}", name))? = Some(value);
Ok(())
}
pub fn set_option<'a, OV: OptionType<'a>>(
&self,
config_option: &options::ConfigOption<'a, OV>,
value: options::Value,
) -> Result<()> {
self.set_option_str(config_option.name(), value)?;
Ok(())
}
}
impl<S, I, O> ConfiguredPlugin<S, I, O>
where
S: Send + Clone + Sync + 'static,
I: AsyncRead + Send + Unpin + 'static,
O: Send + AsyncWrite + Unpin + 'static,
{
#[allow(unused_mut)]
pub async fn start(mut self, state: S) -> Result<Plugin<S>, anyhow::Error> {
let output = self.output;
let input = self.input;
let (wait_handle, _) = tokio::sync::broadcast::channel(1);
let (sender, receiver) = tokio::sync::mpsc::channel(4);
let plugin = Plugin {
state,
options: self.options,
option_values: Arc::new(std::sync::Mutex::new(self.option_values)),
configuration: self.configuration,
wait_handle,
sender,
};
let driver = PluginDriver {
plugin: plugin.clone(),
rpcmethods: self.rpcmethods,
setconfig_callback: self.setconfig_callback,
hooks: self.hooks,
subscriptions: self.subscriptions,
wildcard_subscription: self.wildcard_subscription,
};
output
.lock()
.await
.send(json!(
{
"jsonrpc": "2.0",
"id": self.init_id,
"result": crate::messages::InitResponse{disable: None}
}
))
.await
.context("sending init response")?;
let joiner = plugin.wait_handle.clone();
tokio::spawn(async move {
if let Err(e) = driver.run(receiver, input, output).await {
log::warn!("Plugin loop returned error {:?}", e);
}
joiner.send(())
});
Ok(plugin)
}
#[allow(unused_mut)]
pub async fn disable(mut self, reason: &str) -> Result<(), anyhow::Error> {
self.output
.lock()
.await
.send(json!(
{
"jsonrpc": "2.0",
"id": self.init_id,
"result": crate::messages::InitResponse{
disable: Some(reason.to_string())
}
}
))
.await
.context("sending init response")?;
Ok(())
}
pub fn option_str(&self, name: &str) -> Result<Option<options::Value>> {
self.option_values
.get(name)
.ok_or(anyhow!("No option named '{}'", name))
.map(|c| c.clone())
}
pub fn option<'a, OV: OptionType<'a>>(
&self,
config_option: &options::ConfigOption<'a, OV>,
) -> Result<OV::OutputValue> {
let value = self.option_str(config_option.name())?;
Ok(OV::from_value(&value))
}
pub fn configuration(&self) -> Configuration {
self.configuration.clone()
}
}
impl<S> PluginDriver<S>
where
S: Send + Clone,
{
async fn run<I, O>(
self,
mut receiver: tokio::sync::mpsc::Receiver<serde_json::Value>,
mut input: FramedRead<I, JsonRpcCodec>,
output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
) -> Result<(), Error>
where
I: Send + AsyncReadExt + Unpin,
O: Send + AsyncWriteExt + Unpin,
{
loop {
tokio::select! {
e = self.dispatch_one(&mut input, &self.plugin) => {
if let Err(e) = e {
return Err(e)
}
},
v = receiver.recv() => {
output.lock().await.send(
v.context("internal communication error")?
).await?;
},
}
}
}
async fn dispatch_one<I>(
&self,
input: &mut FramedRead<I, JsonRpcCodec>,
plugin: &Plugin<S>,
) -> Result<(), Error>
where
I: Send + AsyncReadExt + Unpin,
{
match input.next().await {
Some(Ok(msg)) => {
trace!("Received a message: {:?}", msg);
match msg {
messages::JsonRpc::Request(_id, _p) => {
todo!("This is unreachable until we start filling in messages:Request. Until then the custom dispatcher below is used exclusively.");
}
messages::JsonRpc::Notification(_n) => {
todo!("As soon as we define the full structure of the messages::Notification we'll get here. Until then the custom dispatcher below is used.")
}
messages::JsonRpc::CustomRequest(id, request) => {
trace!("Dispatching custom method {:?}", request);
let method = request
.get("method")
.context("Missing 'method' in request")?
.as_str()
.context("'method' is not a string")?;
let callback = match method {
name if name.eq("setconfig") => {
self.setconfig_callback.as_ref().ok_or_else(|| {
anyhow!("No handler for method '{}' registered", method)
})?
}
_ => self.rpcmethods.get(method).with_context(|| {
anyhow!("No handler for method '{}' registered", method)
})?,
};
let params = request
.get("params")
.context("Missing 'params' field in request")?
.clone();
let plugin = plugin.clone();
let call = callback(plugin.clone(), params);
tokio::spawn(async move {
match call.await {
Ok(v) => plugin
.sender
.send(json!({
"jsonrpc": "2.0",
"id": id,
"result": v
}))
.await
.context("returning custom response"),
Err(e) => plugin
.sender
.send(json!({
"jsonrpc": "2.0",
"id": id,
"error": parse_error(e.to_string()),
}))
.await
.context("returning custom error"),
}
});
Ok(())
}
messages::JsonRpc::CustomNotification(request) => {
trace!("Dispatching custom notification {:?}", request);
let method = request
.get("method")
.context("Missing 'method' in request")?
.as_str()
.context("'method' is not a string")?;
let params = request
.get("params")
.context("Missing 'params' field in request")?;
match &self.wildcard_subscription {
Some(cb) => {
let call = cb(plugin.clone(), params.clone());
tokio::spawn(async move { call.await.unwrap() });
}
None => {}
};
match self.subscriptions.get(method) {
Some(cb) => {
let call = cb(plugin.clone(), params.clone());
tokio::spawn(async move { call.await.unwrap() });
}
None => {
if self.wildcard_subscription.is_none() {
log::warn!(
"No handler for notification '{}' registered",
method
);
}
}
};
Ok(())
}
}
}
Some(Err(e)) => Err(anyhow!("Error reading command: {}", e)),
None => Err(anyhow!("Error reading from master")),
}
}
}
impl<S> Plugin<S>
where
S: Clone + Send,
{
pub fn options(&self) -> Vec<UntypedConfigOption> {
self.options.values().cloned().collect()
}
pub fn configuration(&self) -> Configuration {
self.configuration.clone()
}
pub fn state(&self) -> &S {
&self.state
}
}
impl<S> Plugin<S>
where
S: Send + Clone,
{
pub async fn send_custom_notification(
&self,
method: String,
v: serde_json::Value,
) -> Result<(), Error> {
self.sender
.send(json!({
"jsonrpc": "2.0",
"method": method,
"params": v,
}))
.await
.context("sending custom notification")?;
Ok(())
}
pub async fn join(&self) -> Result<(), Error> {
self.wait_handle
.subscribe()
.recv()
.await
.context("error waiting for shutdown")
}
pub fn shutdown(&self) -> Result<(), Error> {
self.wait_handle
.send(())
.context("error waiting for shutdown")?;
Ok(())
}
}
pub enum FeatureBitsKind {
Node,
Channel,
Invoice,
Init,
}
#[derive(Clone, serde::Serialize, serde::Deserialize, Debug)]
struct RpcError {
pub code: Option<i32>,
pub message: String,
pub data: Option<serde_json::Value>,
}
fn parse_error(error: String) -> RpcError {
match serde_json::from_str::<RpcError>(&error) {
Ok(o) => o,
Err(_) => RpcError {
code: Some(-32700),
message: error,
data: None,
},
}
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn init() {
let state = ();
let builder = Builder::new(tokio::io::stdin(), tokio::io::stdout());
let _ = builder.start(state);
}
}