cln_plugin/
lib.rs

1use crate::codec::{JsonCodec, JsonRpcCodec};
2pub use anyhow::anyhow;
3use anyhow::{Context, Result};
4use futures::sink::SinkExt;
5use tokio::io::{AsyncReadExt, AsyncWriteExt};
6extern crate log;
7use log::trace;
8use messages::{Configuration, FeatureBits, NotificationTopic};
9use options::{OptionType, UntypedConfigOption};
10use std::collections::HashMap;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use tokio::io::{AsyncRead, AsyncWrite};
15use tokio::sync::Mutex;
16use tokio_stream::StreamExt;
17use tokio_util::codec::FramedRead;
18use tokio_util::codec::FramedWrite;
19
20mod codec;
21mod logging;
22pub mod messages;
23
24#[macro_use]
25extern crate serde_json;
26
27pub mod options;
28
29/// Need to tell us about something that went wrong? Use this error
30/// type to do that. Use this alias to be safe from future changes in
31/// our internal error handling, since we'll implement any necessary
32/// conversions for you :-)
33pub type Error = anyhow::Error;
34
35/// Builder for a new plugin.
36pub struct Builder<S, I, O>
37where
38    I: AsyncRead + Unpin,
39    O: Send + AsyncWrite + Unpin,
40    S: Clone + Send,
41{
42    input: Option<I>,
43    output: Option<O>,
44
45    hooks: HashMap<String, Hook<S>>,
46    options: HashMap<String, UntypedConfigOption>,
47    option_values: HashMap<String, Option<options::Value>>,
48    rpcmethods: HashMap<String, RpcMethod<S>>,
49    setconfig_callback: Option<AsyncCallback<S>>,
50    subscriptions: HashMap<String, Subscription<S>>,
51    // Contains a Subscription if the user subscribed to "*"
52    wildcard_subscription: Option<Subscription<S>>,
53    notifications: Vec<NotificationTopic>,
54    custommessages: Vec<u16>,
55    featurebits: FeatureBits,
56    dynamic: bool,
57    // Do we want the plugin framework to automatically register a logging handler?
58    logging: bool,
59}
60
61/// A plugin that has registered with the lightning daemon, and gotten
62/// its options filled, however has not yet acknowledged the `init`
63/// message. This is a mid-state allowing a plugin to disable itself,
64/// based on the options.
65pub struct ConfiguredPlugin<S, I, O>
66where
67    S: Clone + Send,
68{
69    init_id: serde_json::Value,
70    input: FramedRead<I, JsonRpcCodec>,
71    output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
72    options: HashMap<String, UntypedConfigOption>,
73    option_values: HashMap<String, Option<options::Value>>,
74    configuration: Configuration,
75    rpcmethods: HashMap<String, AsyncCallback<S>>,
76    setconfig_callback: Option<AsyncCallback<S>>,
77    hooks: HashMap<String, AsyncCallback<S>>,
78    subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
79    wildcard_subscription: Option<AsyncNotificationCallback<S>>,
80    #[allow(dead_code)] // unsure why rust thinks this field isn't used
81    notifications: Vec<NotificationTopic>,
82}
83
84/// The [PluginDriver] is used to run the IO loop, reading messages
85/// from the Lightning daemon, dispatching calls and notifications to
86/// the plugin, and returning responses to the the daemon. We also use
87/// it to handle spontaneous messages like Notifications and logging
88/// events.
89struct PluginDriver<S>
90where
91    S: Send + Clone,
92{
93    plugin: Plugin<S>,
94    rpcmethods: HashMap<String, AsyncCallback<S>>,
95    setconfig_callback: Option<AsyncCallback<S>>,
96
97    #[allow(dead_code)] // Unused until we fill in the Hook structs.
98    hooks: HashMap<String, AsyncCallback<S>>,
99    subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
100    wildcard_subscription: Option<AsyncNotificationCallback<S>>,
101}
102
103#[derive(Clone)]
104pub struct Plugin<S>
105where
106    S: Clone + Send,
107{
108    /// The state gets cloned for each request
109    state: S,
110    /// "options" field of "init" message sent by cln
111    options: HashMap<String, UntypedConfigOption>,
112    option_values: Arc<std::sync::Mutex<HashMap<String, Option<options::Value>>>>,
113    /// "configuration" field of "init" message sent by cln
114    configuration: Configuration,
115    /// A signal that allows us to wait on the plugin's shutdown.
116    wait_handle: tokio::sync::broadcast::Sender<()>,
117
118    sender: tokio::sync::mpsc::Sender<serde_json::Value>,
119}
120
121impl<S, I, O> Builder<S, I, O>
122where
123    O: Send + AsyncWrite + Unpin + 'static,
124    S: Clone + Sync + Send + 'static,
125    I: AsyncRead + Send + Unpin + 'static,
126{
127    pub fn new(input: I, output: O) -> Self {
128        Self {
129            input: Some(input),
130            output: Some(output),
131            hooks: HashMap::new(),
132            subscriptions: HashMap::new(),
133            wildcard_subscription: None,
134            options: HashMap::new(),
135            // Should not be configured by user.
136            // This values are set when parsing the init-call
137            option_values: HashMap::new(),
138            rpcmethods: HashMap::new(),
139            setconfig_callback: None,
140            notifications: vec![],
141            featurebits: FeatureBits::default(),
142            dynamic: false,
143            custommessages: vec![],
144            logging: true,
145        }
146    }
147
148    pub fn option<'a, V: options::OptionType<'a>>(
149        mut self,
150        opt: options::ConfigOption<'a, V>,
151    ) -> Builder<S, I, O> {
152        self.options.insert(opt.name().to_string(), opt.build());
153        self
154    }
155
156    pub fn notification(mut self, notif: messages::NotificationTopic) -> Builder<S, I, O> {
157        self.notifications.push(notif);
158        self
159    }
160
161    /// Subscribe to notifications for the given `topic`. The handler
162    /// is an async function that takes a `Plugin<S>` and the
163    /// notification as a `serde_json::Value` as inputs. Since
164    /// notifications do not expect a result the handler should only
165    /// report errors while processing. Any error reported while
166    /// processing the notification will be logged in the cln logs.
167    ///
168    /// ```
169    /// use cln_plugin::{options, Builder, Error, Plugin};
170    ///
171    /// async fn connect_handler(_p: Plugin<()>, v: serde_json::Value) -> Result<(), Error> {
172    ///     println!("Got a connect notification: {}", v);
173    ///     Ok(())
174    /// }
175    ///
176    /// let b = Builder::new(tokio::io::stdin(), tokio::io::stdout())
177    ///     .subscribe("connect", connect_handler);
178    /// ```
179    pub fn subscribe<C, F>(mut self, topic: &str, callback: C) -> Builder<S, I, O>
180    where
181        C: Send + Sync + 'static,
182        C: Fn(Plugin<S>, Request) -> F + 'static,
183        F: Future<Output = Result<(), Error>> + Send + 'static,
184    {
185        let subscription = Subscription {
186            callback: Box::new(move |p, r| Box::pin(callback(p, r))),
187        };
188
189        if topic == "*" {
190            self.wildcard_subscription = Some(subscription);
191        } else {
192            self.subscriptions.insert(topic.to_string(), subscription);
193        };
194        self
195    }
196
197    /// Add a subscription to a given `hookname`
198    pub fn hook<C, F>(mut self, hookname: &str, callback: C) -> Self
199    where
200        C: Send + Sync + 'static,
201        C: Fn(Plugin<S>, Request) -> F + 'static,
202        F: Future<Output = Response> + Send + 'static,
203    {
204        self.hooks.insert(
205            hookname.to_string(),
206            Hook {
207                callback: Box::new(move |p, r| Box::pin(callback(p, r))),
208            },
209        );
210        self
211    }
212
213    /// Register a custom RPC method for the RPC passthrough from the
214    /// main daemon
215    pub fn rpcmethod<C, F>(mut self, name: &str, description: &str, callback: C) -> Builder<S, I, O>
216    where
217        C: Send + Sync + 'static,
218        C: Fn(Plugin<S>, Request) -> F + 'static,
219        F: Future<Output = Response> + Send + 'static,
220    {
221        self.rpcmethods.insert(
222            name.to_string(),
223            RpcMethod {
224                name: name.to_string(),
225                description: description.to_string(),
226                usage: String::default(),
227                callback: Box::new(move |p, r| Box::pin(callback(p, r))),
228            },
229        );
230        self
231    }
232
233    pub fn rpcmethod_from_builder(mut self, rpc_method: RpcMethodBuilder<S>) -> Builder<S, I, O> {
234        self.rpcmethods
235            .insert(rpc_method.name.to_string(), rpc_method.build());
236        self
237    }
238
239    /// Register a callback for setconfig to accept changes for dynamic options
240    pub fn setconfig_callback<C, F>(mut self, setconfig_callback: C) -> Builder<S, I, O>
241    where
242        C: Send + Sync + 'static,
243        C: Fn(Plugin<S>, Request) -> F + 'static,
244        F: Future<Output = Response> + Send + 'static,
245    {
246        self.setconfig_callback = Some(Box::new(move |p, r| Box::pin(setconfig_callback(p, r))));
247        self
248    }
249
250    /// Send true value for "dynamic" field in "getmanifest" response
251    pub fn dynamic(mut self) -> Builder<S, I, O> {
252        self.dynamic = true;
253        self
254    }
255
256    /// Sets the "featurebits" in the "getmanifest" response
257    pub fn featurebits(mut self, kind: FeatureBitsKind, hex: String) -> Self {
258        match kind {
259            FeatureBitsKind::Node => self.featurebits.node = Some(hex),
260            FeatureBitsKind::Channel => self.featurebits.channel = Some(hex),
261            FeatureBitsKind::Init => self.featurebits.init = Some(hex),
262            FeatureBitsKind::Invoice => self.featurebits.invoice = Some(hex),
263        }
264        self
265    }
266
267    /// Should the plugin automatically register a logging handler? If
268    /// not you may need to register a logging handler yourself. Be
269    /// careful not to print raw lines to `stdout` if you do, since
270    /// that'll interfere with the plugin communication. See the CLN
271    /// documentation on logging to see what logging events should
272    /// look like.
273    pub fn with_logging(mut self, log: bool) -> Builder<S, I, O> {
274        self.logging = log;
275        self
276    }
277
278    /// Tells lightningd explicitly to allow custommmessages of the provided
279    /// type
280    pub fn custommessages(mut self, custommessages: Vec<u16>) -> Self {
281        self.custommessages = custommessages;
282        self
283    }
284
285    /// Communicate with `lightningd` to tell it about our options,
286    /// RPC methods and subscribe to hooks, and then process the
287    /// initialization, configuring the plugin.
288    ///
289    /// Returns `None` if we were invoked with `--help` and thus
290    /// should exit after this handshake
291    pub async fn configure(mut self) -> Result<Option<ConfiguredPlugin<S, I, O>>, anyhow::Error> {
292        let mut input = FramedRead::new(self.input.take().unwrap(), JsonRpcCodec::default());
293
294        // Sadly we need to wrap the output in a mutex in order to
295        // enable early logging, i.e., logging that is done before the
296        // PluginDriver is processing events during the
297        // handshake. Otherwise we could just write the log events to
298        // the event queue and have the PluginDriver be the sole owner
299        // of `Stdout`.
300        let output = Arc::new(Mutex::new(FramedWrite::new(
301            self.output.take().unwrap(),
302            JsonCodec::default(),
303        )));
304
305        // Now configure the logging, so any `log` call is wrapped
306        // in a JSON-RPC notification and sent to Core Lightning
307        if self.logging {
308            crate::logging::init(output.clone()).await?;
309            trace!("Plugin logging initialized");
310        }
311
312        // Read the `getmanifest` message:
313        match input.next().await {
314            Some(Ok(messages::JsonRpc::Request(id, messages::Request::Getmanifest(m)))) => {
315                output
316                    .lock()
317                    .await
318                    .send(json!({
319                        "jsonrpc": "2.0",
320                        "result": self.handle_get_manifest(m),
321                        "id": id,
322                    }))
323                    .await?
324            }
325            Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
326            None => {
327                return Err(anyhow!(
328                    "Lost connection to lightning expecting getmanifest"
329                ))
330            }
331        };
332        let (init_id, configuration) = match input.next().await {
333            Some(Ok(messages::JsonRpc::Request(id, messages::Request::Init(m)))) => {
334                (id, self.handle_init(m)?)
335            }
336
337            Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
338            None => {
339                // If we are being called with --help we will get
340                // disconnected here. That's expected, so don't
341                // complain about it.
342                return Ok(None);
343            }
344        };
345
346        // TODO Split the two hashmaps once we fill in the hook
347        // payload structs in messages.rs
348        let mut rpcmethods: HashMap<String, AsyncCallback<S>> =
349            HashMap::from_iter(self.rpcmethods.drain().map(|(k, v)| (k, v.callback)));
350        rpcmethods.extend(self.hooks.drain().map(|(k, v)| (k, v.callback)));
351
352        let subscriptions =
353            HashMap::from_iter(self.subscriptions.drain().map(|(k, v)| (k, v.callback)));
354        let all_subscription = self.wildcard_subscription.map(|s| s.callback);
355
356        // Leave the `init` reply pending, so we can disable based on
357        // the options if required.
358        Ok(Some(ConfiguredPlugin {
359            // The JSON-RPC `id` field so we can reply correctly.
360            init_id,
361            input,
362            output,
363            rpcmethods,
364            setconfig_callback: self.setconfig_callback,
365            notifications: self.notifications,
366            subscriptions,
367            wildcard_subscription: all_subscription,
368            options: self.options,
369            option_values: self.option_values,
370            configuration,
371            hooks: HashMap::new(),
372        }))
373    }
374
375    /// Build and start the plugin loop. This performs the handshake
376    /// and spawns a new task that accepts incoming messages from
377    /// Core Lightning and dispatches them to the handlers. It only
378    /// returns after completing the handshake to ensure that the
379    /// configuration and initialization was successfull.
380    ///
381    /// If `lightningd` was called with `--help` we won't get a
382    /// `Plugin` instance and return `None` instead. This signals that
383    /// we should exit, and not continue running. `start()` returns in
384    /// order to allow user code to perform cleanup if necessary.
385    pub async fn start(self, state: S) -> Result<Option<Plugin<S>>, anyhow::Error> {
386        if let Some(cp) = self.configure().await? {
387            Ok(Some(cp.start(state).await?))
388        } else {
389            Ok(None)
390        }
391    }
392
393    fn handle_get_manifest(
394        &mut self,
395        _call: messages::GetManifestCall,
396    ) -> messages::GetManifestResponse {
397        let rpcmethods: Vec<_> = self
398            .rpcmethods
399            .values()
400            .map(|v| messages::RpcMethod {
401                name: v.name.clone(),
402                description: v.description.clone(),
403                usage: v.usage.clone(),
404            })
405            .collect();
406
407        let subscriptions = self
408            .subscriptions
409            .keys()
410            .map(|s| s.clone())
411            .chain(self.wildcard_subscription.iter().map(|_| String::from("*")))
412            .collect();
413
414        messages::GetManifestResponse {
415            options: self.options.values().cloned().collect(),
416            subscriptions,
417            hooks: self.hooks.keys().map(|s| s.clone()).collect(),
418            rpcmethods,
419            notifications: self.notifications.clone(),
420            featurebits: self.featurebits.clone(),
421            dynamic: self.dynamic,
422            nonnumericids: true,
423            custommessages: self.custommessages.clone(),
424        }
425    }
426
427    fn handle_init(&mut self, call: messages::InitCall) -> Result<Configuration, Error> {
428        use options::Value as OValue;
429        use serde_json::Value as JValue;
430
431        // Match up the ConfigOptions and fill in their values if we
432        // have a matching entry.
433        for (name, option) in self.options.iter() {
434            let json_value = call.options.get(name);
435            let default_value = option.default();
436
437            let option_value: Option<options::Value> = match (json_value, default_value) {
438                (None, None) => None,
439                (None, Some(default)) => Some(default.clone()),
440                (Some(JValue::Array(a)), _) => match a.first() {
441                    Some(JValue::String(_)) => Some(OValue::StringArray(
442                        a.iter().map(|x| x.as_str().unwrap().to_string()).collect(),
443                    )),
444                    Some(JValue::Number(_)) => Some(OValue::IntegerArray(
445                        a.iter().map(|x| x.as_i64().unwrap()).collect(),
446                    )),
447                    _ => panic!("Array type not supported for option: {}", name),
448                },
449                (Some(JValue::String(s)), _) => Some(OValue::String(s.to_string())),
450                (Some(JValue::Number(i)), _) => Some(OValue::Integer(i.as_i64().unwrap())),
451                (Some(JValue::Bool(b)), _) => Some(OValue::Boolean(*b)),
452                _ => panic!("Type mismatch for option {}", name),
453            };
454
455            self.option_values.insert(name.to_string(), option_value);
456        }
457        Ok(call.configuration)
458    }
459}
460
461impl<S> RpcMethodBuilder<S>
462where
463    S: Send + Clone,
464{
465    pub fn new<C, F>(name: &str, callback: C) -> Self
466    where
467        C: Send + Sync + 'static,
468        C: Fn(Plugin<S>, Request) -> F + 'static,
469        F: Future<Output = Response> + Send + 'static,
470    {
471        Self {
472            name: name.to_string(),
473            callback: Box::new(move |p, r| Box::pin(callback(p, r))),
474            usage: None,
475            description: None,
476        }
477    }
478
479    pub fn description(mut self, description: &str) -> Self {
480        self.description = Some(description.to_string());
481        self
482    }
483
484    pub fn usage(mut self, usage: &str) -> Self {
485        self.usage = Some(usage.to_string());
486        self
487    }
488
489    fn build(self) -> RpcMethod<S> {
490        RpcMethod {
491            callback: self.callback,
492            name: self.name,
493            description: self.description.unwrap_or_default(),
494            usage: self.usage.unwrap_or_default(),
495        }
496    }
497}
498
499// Just some type aliases so we don't get confused in a lisp-like sea
500// of parentheses.
501type Request = serde_json::Value;
502type Response = Result<serde_json::Value, Error>;
503type AsyncCallback<S> =
504    Box<dyn Fn(Plugin<S>, Request) -> Pin<Box<dyn Future<Output = Response> + Send>> + Send + Sync>;
505type AsyncNotificationCallback<S> = Box<
506    dyn Fn(Plugin<S>, Request) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
507        + Send
508        + Sync,
509>;
510
511/// A struct collecting the metadata required to register a custom
512/// rpcmethod with the main daemon upon init. It'll get deconstructed
513/// into just the callback after the init.
514struct RpcMethod<S>
515where
516    S: Clone + Send,
517{
518    callback: AsyncCallback<S>,
519    description: String,
520    name: String,
521    usage: String,
522}
523
524pub struct RpcMethodBuilder<S>
525where
526    S: Clone + Send,
527{
528    callback: AsyncCallback<S>,
529    name: String,
530    description: Option<String>,
531    usage: Option<String>,
532}
533
534struct Subscription<S>
535where
536    S: Clone + Send,
537{
538    callback: AsyncNotificationCallback<S>,
539}
540
541struct Hook<S>
542where
543    S: Clone + Send,
544{
545    callback: AsyncCallback<S>,
546}
547
548impl<S> Plugin<S>
549where
550    S: Clone + Send,
551{
552    pub fn option_str(&self, name: &str) -> Result<Option<options::Value>> {
553        self.option_values
554            .lock()
555            .unwrap()
556            .get(name)
557            .ok_or(anyhow!("No option named {}", name))
558            .cloned()
559    }
560
561    pub fn option<'a, OV: OptionType<'a>>(
562        &self,
563        config_option: &options::ConfigOption<'a, OV>,
564    ) -> Result<OV::OutputValue> {
565        let value = self.option_str(config_option.name())?;
566        Ok(OV::from_value(&value))
567    }
568
569    pub fn set_option_str(&self, name: &str, value: options::Value) -> Result<()> {
570        *self
571            .option_values
572            .lock()
573            .unwrap()
574            .get_mut(name)
575            .ok_or(anyhow!("No option named {}", name))? = Some(value);
576        Ok(())
577    }
578
579    pub fn set_option<'a, OV: OptionType<'a>>(
580        &self,
581        config_option: &options::ConfigOption<'a, OV>,
582        value: options::Value,
583    ) -> Result<()> {
584        self.set_option_str(config_option.name(), value)?;
585        Ok(())
586    }
587}
588
589impl<S, I, O> ConfiguredPlugin<S, I, O>
590where
591    S: Send + Clone + Sync + 'static,
592    I: AsyncRead + Send + Unpin + 'static,
593    O: Send + AsyncWrite + Unpin + 'static,
594{
595    #[allow(unused_mut)]
596    pub async fn start(mut self, state: S) -> Result<Plugin<S>, anyhow::Error> {
597        let output = self.output;
598        let input = self.input;
599        let (wait_handle, _) = tokio::sync::broadcast::channel(1);
600
601        // An MPSC pair used by anything that needs to send messages
602        // to the main daemon.
603        let (sender, receiver) = tokio::sync::mpsc::channel(4);
604
605        let plugin = Plugin {
606            state,
607            options: self.options,
608            option_values: Arc::new(std::sync::Mutex::new(self.option_values)),
609            configuration: self.configuration,
610            wait_handle,
611            sender,
612        };
613
614        let driver = PluginDriver {
615            plugin: plugin.clone(),
616            rpcmethods: self.rpcmethods,
617            setconfig_callback: self.setconfig_callback,
618            hooks: self.hooks,
619            subscriptions: self.subscriptions,
620            wildcard_subscription: self.wildcard_subscription,
621        };
622
623        output
624            .lock()
625            .await
626            .send(json!(
627                {
628                    "jsonrpc": "2.0",
629                    "id": self.init_id,
630            "result": crate::messages::InitResponse{disable: None}
631                }
632            ))
633            .await
634            .context("sending init response")?;
635
636        let joiner = plugin.wait_handle.clone();
637        // Start the PluginDriver to handle plugin IO
638        tokio::spawn(async move {
639            if let Err(e) = driver.run(receiver, input, output).await {
640                log::warn!("Plugin loop returned error {:?}", e);
641            }
642
643            // Now that we have left the reader loop its time to
644            // notify any waiting tasks. This most likely will cause
645            // the main task to exit and the plugin to terminate.
646            joiner.send(())
647        });
648        Ok(plugin)
649    }
650
651    /// Abort the plugin startup. Communicate that we're about to exit
652    /// voluntarily, and this is not an error.
653    #[allow(unused_mut)]
654    pub async fn disable(mut self, reason: &str) -> Result<(), anyhow::Error> {
655        self.output
656            .lock()
657            .await
658            .send(json!(
659                {
660                    "jsonrpc": "2.0",
661                    "id": self.init_id,
662            "result": crate::messages::InitResponse{
663            disable: Some(reason.to_string())
664            }
665                }
666            ))
667            .await
668            .context("sending init response")?;
669        Ok(())
670    }
671
672    pub fn option_str(&self, name: &str) -> Result<Option<options::Value>> {
673        self.option_values
674            .get(name)
675            .ok_or(anyhow!("No option named '{}'", name))
676            .map(|c| c.clone())
677    }
678
679    pub fn option<'a, OV: OptionType<'a>>(
680        &self,
681        config_option: &options::ConfigOption<'a, OV>,
682    ) -> Result<OV::OutputValue> {
683        let value = self.option_str(config_option.name())?;
684        Ok(OV::from_value(&value))
685    }
686
687    /// return the cln configuration send to the
688    /// plugin after the initialization.
689    pub fn configuration(&self) -> Configuration {
690        self.configuration.clone()
691    }
692}
693
694impl<S> PluginDriver<S>
695where
696    S: Send + Clone,
697{
698    /// Run the plugin until we get a shutdown command.
699    async fn run<I, O>(
700        self,
701        mut receiver: tokio::sync::mpsc::Receiver<serde_json::Value>,
702        mut input: FramedRead<I, JsonRpcCodec>,
703        output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
704    ) -> Result<(), Error>
705    where
706        I: Send + AsyncReadExt + Unpin,
707        O: Send + AsyncWriteExt + Unpin,
708    {
709        loop {
710            // If we encounter any error reading or writing from/to
711            // the master we hand them up, so we can return control to
712            // the user-code, which may require some cleanups or
713            // similar.
714            tokio::select! {
715                    e = self.dispatch_one(&mut input, &self.plugin) => {
716                        if let Err(e) = e {
717                return Err(e)
718                        }
719            },
720            v = receiver.recv() => {
721                        output.lock().await.send(
722                v.context("internal communication error")?
723                        ).await?;
724            },
725                }
726        }
727    }
728
729    /// Dispatch one server-side event and then return. Just so we
730    /// have a nicer looking `select` statement in `run` :-)
731    async fn dispatch_one<I>(
732        &self,
733        input: &mut FramedRead<I, JsonRpcCodec>,
734        plugin: &Plugin<S>,
735    ) -> Result<(), Error>
736    where
737        I: Send + AsyncReadExt + Unpin,
738    {
739        match input.next().await {
740            Some(Ok(msg)) => {
741                trace!("Received a message: {:?}", msg);
742                match msg {
743                    messages::JsonRpc::Request(_id, _p) => {
744                        todo!("This is unreachable until we start filling in messages:Request. Until then the custom dispatcher below is used exclusively.");
745                    }
746                    messages::JsonRpc::Notification(_n) => {
747                        todo!("As soon as we define the full structure of the messages::Notification we'll get here. Until then the custom dispatcher below is used.")
748                    }
749                    messages::JsonRpc::CustomRequest(id, request) => {
750                        trace!("Dispatching custom method {:?}", request);
751                        let method = request
752                            .get("method")
753                            .context("Missing 'method' in request")?
754                            .as_str()
755                            .context("'method' is not a string")?;
756                        let callback = match method {
757                            name if name.eq("setconfig") => {
758                                self.setconfig_callback.as_ref().ok_or_else(|| {
759                                    anyhow!("No handler for method '{}' registered", method)
760                                })?
761                            }
762                            _ => self.rpcmethods.get(method).with_context(|| {
763                                anyhow!("No handler for method '{}' registered", method)
764                            })?,
765                        };
766                        let params = request
767                            .get("params")
768                            .context("Missing 'params' field in request")?
769                            .clone();
770
771                        let plugin = plugin.clone();
772                        let call = callback(plugin.clone(), params);
773
774                        tokio::spawn(async move {
775                            match call.await {
776                                Ok(v) => plugin
777                                    .sender
778                                    .send(json!({
779                                    "jsonrpc": "2.0",
780                                    "id": id,
781                                    "result": v
782                                    }))
783                                    .await
784                                    .context("returning custom response"),
785                                Err(e) => plugin
786                                    .sender
787                                    .send(json!({
788                                    "jsonrpc": "2.0",
789                                    "id": id,
790                                    "error": parse_error(e.to_string()),
791                                    }))
792                                    .await
793                                    .context("returning custom error"),
794                            }
795                        });
796                        Ok(())
797                    }
798                    messages::JsonRpc::CustomNotification(request) => {
799                        // This code handles notifications
800                        trace!("Dispatching custom notification {:?}", request);
801                        let method = request
802                            .get("method")
803                            .context("Missing 'method' in request")?
804                            .as_str()
805                            .context("'method' is not a string")?;
806
807                        let params = request
808                            .get("params")
809                            .context("Missing 'params' field in request")?;
810
811                        // Send to notification to the wildcard
812                        // subscription "*" it it exists
813                        match &self.wildcard_subscription {
814                            Some(cb) => {
815                                let call = cb(plugin.clone(), params.clone());
816                                tokio::spawn(async move { call.await.unwrap() });
817                            }
818                            None => {}
819                        };
820
821                        // Find the appropriate callback and process it
822                        // We'll log a warning if no handler is defined
823                        match self.subscriptions.get(method) {
824                            Some(cb) => {
825                                let call = cb(plugin.clone(), params.clone());
826                                tokio::spawn(async move { call.await.unwrap() });
827                            }
828                            None => {
829                                if self.wildcard_subscription.is_none() {
830                                    log::warn!(
831                                        "No handler for notification '{}' registered",
832                                        method
833                                    );
834                                }
835                            }
836                        };
837                        Ok(())
838                    }
839                }
840            }
841            Some(Err(e)) => Err(anyhow!("Error reading command: {}", e)),
842            None => Err(anyhow!("Error reading from master")),
843        }
844    }
845}
846
847impl<S> Plugin<S>
848where
849    S: Clone + Send,
850{
851    pub fn options(&self) -> Vec<UntypedConfigOption> {
852        self.options.values().cloned().collect()
853    }
854    pub fn configuration(&self) -> Configuration {
855        self.configuration.clone()
856    }
857    pub fn state(&self) -> &S {
858        &self.state
859    }
860}
861
862impl<S> Plugin<S>
863where
864    S: Send + Clone,
865{
866    pub async fn send_custom_notification(
867        &self,
868        method: String,
869        v: serde_json::Value,
870    ) -> Result<(), Error> {
871        self.sender
872            .send(json!({
873                "jsonrpc": "2.0",
874                "method": method,
875                "params": v,
876            }))
877            .await
878            .context("sending custom notification")?;
879        Ok(())
880    }
881
882    /// Wait for plugin shutdown
883    pub async fn join(&self) -> Result<(), Error> {
884        self.wait_handle
885            .subscribe()
886            .recv()
887            .await
888            .context("error waiting for shutdown")
889    }
890
891    /// Request plugin shutdown
892    pub fn shutdown(&self) -> Result<(), Error> {
893        self.wait_handle
894            .send(())
895            .context("error waiting for shutdown")?;
896        Ok(())
897    }
898}
899
900pub enum FeatureBitsKind {
901    Node,
902    Channel,
903    Invoice,
904    Init,
905}
906
907#[derive(Clone, serde::Serialize, serde::Deserialize, Debug)]
908struct RpcError {
909    pub code: Option<i32>,
910    pub message: String,
911    pub data: Option<serde_json::Value>,
912}
913fn parse_error(error: String) -> RpcError {
914    match serde_json::from_str::<RpcError>(&error) {
915        Ok(o) => o,
916        Err(_) => RpcError {
917            code: Some(-32700),
918            message: error,
919            data: None,
920        },
921    }
922}
923
924#[cfg(test)]
925mod test {
926    use super::*;
927
928    #[tokio::test]
929    async fn init() {
930        let state = ();
931        let builder = Builder::new(tokio::io::stdin(), tokio::io::stdout());
932        let _ = builder.start(state);
933    }
934}