nu_plugin_engine/
persistent.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
use crate::{
    init::{create_command, make_plugin_interface},
    PluginGc,
};

use super::{PluginInterface, PluginSource};
use nu_plugin_core::CommunicationMode;
use nu_protocol::{
    engine::{EngineState, Stack},
    HandlerGuard, Handlers, PluginGcConfig, PluginIdentity, PluginMetadata, RegisteredPlugin,
    ShellError,
};
use std::{
    collections::HashMap,
    sync::{Arc, Mutex},
};

/// A box that can keep a plugin that was spawned persistent for further uses. The plugin may or
/// may not be currently running. [`.get()`] gets the currently running plugin, or spawns it if it's
/// not running.
#[derive(Debug)]
pub struct PersistentPlugin {
    /// Identity (filename, shell, name) of the plugin
    identity: PluginIdentity,
    /// Mutable state
    mutable: Mutex<MutableState>,
}

/// The mutable state for the persistent plugin. This should all be behind one lock to prevent lock
/// order problems.
#[derive(Debug)]
struct MutableState {
    /// Reference to the plugin if running
    running: Option<RunningPlugin>,
    /// Metadata for the plugin, e.g. version.
    metadata: Option<PluginMetadata>,
    /// Plugin's preferred communication mode (if known)
    preferred_mode: Option<PreferredCommunicationMode>,
    /// Garbage collector config
    gc_config: PluginGcConfig,
    /// RAII guard for this plugin's signal handler
    signal_guard: Option<HandlerGuard>,
}

#[derive(Debug, Clone, Copy)]
enum PreferredCommunicationMode {
    Stdio,
    #[cfg(feature = "local-socket")]
    LocalSocket,
}

#[derive(Debug)]
struct RunningPlugin {
    /// Interface (which can be cloned) to the running plugin
    interface: PluginInterface,
    /// Garbage collector for the plugin
    gc: PluginGc,
}

impl PersistentPlugin {
    /// Create a new persistent plugin. The plugin will not be spawned immediately.
    pub fn new(identity: PluginIdentity, gc_config: PluginGcConfig) -> PersistentPlugin {
        PersistentPlugin {
            identity,
            mutable: Mutex::new(MutableState {
                running: None,
                metadata: None,
                preferred_mode: None,
                gc_config,
                signal_guard: None,
            }),
        }
    }

    /// Get the plugin interface of the running plugin, or spawn it if it's not currently running.
    ///
    /// Will call `envs` to get environment variables to spawn the plugin if the plugin needs to be
    /// spawned.
    pub fn get(
        self: Arc<Self>,
        envs: impl FnOnce() -> Result<HashMap<String, String>, ShellError>,
    ) -> Result<PluginInterface, ShellError> {
        let mut mutable = self.mutable.lock().map_err(|_| ShellError::NushellFailed {
            msg: format!(
                "plugin `{}` mutex poisoned, probably panic during spawn",
                self.identity.name()
            ),
        })?;

        if let Some(ref running) = mutable.running {
            // It exists, so just clone the interface
            Ok(running.interface.clone())
        } else {
            // Try to spawn. On success, `mutable.running` should have been set to the new running
            // plugin by `spawn()` so we just then need to clone the interface from there.
            //
            // We hold the lock the whole time to prevent others from trying to spawn and ending
            // up with duplicate plugins
            //
            // TODO: We should probably store the envs somewhere, in case we have to launch without
            // envs (e.g. from a custom value)
            let envs = envs()?;
            let result = self.clone().spawn(&envs, &mut mutable);

            // Check if we were using an alternate communication mode and may need to fall back to
            // stdio.
            if result.is_err()
                && !matches!(
                    mutable.preferred_mode,
                    Some(PreferredCommunicationMode::Stdio)
                )
            {
                log::warn!("{}: Trying again with stdio communication because mode {:?} failed with {result:?}",
                    self.identity.name(),
                    mutable.preferred_mode);
                // Reset to stdio and try again, but this time don't catch any error
                mutable.preferred_mode = Some(PreferredCommunicationMode::Stdio);
                self.clone().spawn(&envs, &mut mutable)?;
            }

            Ok(mutable
                .running
                .as_ref()
                .ok_or_else(|| ShellError::NushellFailed {
                    msg: "spawn() succeeded but didn't set interface".into(),
                })?
                .interface
                .clone())
        }
    }

    /// Run the plugin command, then set up and set `mutable.running` to the new running plugin.
    fn spawn(
        self: Arc<Self>,
        envs: &HashMap<String, String>,
        mutable: &mut MutableState,
    ) -> Result<(), ShellError> {
        // Make sure `running` is set to None to begin
        if let Some(running) = mutable.running.take() {
            // Stop the GC if there was a running plugin
            running.gc.stop_tracking();
        }

        let source_file = self.identity.filename();

        // Determine the mode to use based on the preferred mode
        let mode = match mutable.preferred_mode {
            // If not set, we try stdio first and then might retry if another mode is supported
            Some(PreferredCommunicationMode::Stdio) | None => CommunicationMode::Stdio,
            // Local socket only if enabled
            #[cfg(feature = "local-socket")]
            Some(PreferredCommunicationMode::LocalSocket) => {
                CommunicationMode::local_socket(source_file)
            }
        };

        let mut plugin_cmd = create_command(source_file, self.identity.shell(), &mode);

        // We need the current environment variables for `python` based plugins
        // Or we'll likely have a problem when a plugin is implemented in a virtual Python environment.
        plugin_cmd.envs(envs);

        let program_name = plugin_cmd.get_program().to_os_string().into_string();

        // Before running the command, prepare communication
        let comm = mode.serve()?;

        // Run the plugin command
        let child = plugin_cmd.spawn().map_err(|err| {
            let error_msg = match err.kind() {
                std::io::ErrorKind::NotFound => match program_name {
                    Ok(prog_name) => {
                        format!("Can't find {prog_name}, please make sure that {prog_name} is in PATH.")
                    }
                    _ => {
                        format!("Error spawning child process: {err}")
                    }
                },
                _ => {
                    format!("Error spawning child process: {err}")
                }
            };
            ShellError::PluginFailedToLoad { msg: error_msg }
        })?;

        // Start the plugin garbage collector
        let gc = PluginGc::new(mutable.gc_config.clone(), &self)?;

        let pid = child.id();
        let interface = make_plugin_interface(
            child,
            comm,
            Arc::new(PluginSource::new(self.clone())),
            Some(pid),
            Some(gc.clone()),
        )?;

        // If our current preferred mode is None, check to see if the plugin might support another
        // mode. If so, retry spawn() with that mode
        #[cfg(feature = "local-socket")]
        if mutable.preferred_mode.is_none()
            && interface
                .protocol_info()?
                .supports_feature(&nu_plugin_protocol::Feature::LocalSocket)
        {
            log::trace!(
                "{}: Attempting to upgrade to local socket mode",
                self.identity.name()
            );
            // Stop the GC we just created from tracking so that we don't accidentally try to
            // stop the new plugin
            gc.stop_tracking();
            // Set the mode and try again
            mutable.preferred_mode = Some(PreferredCommunicationMode::LocalSocket);
            return self.spawn(envs, mutable);
        }

        mutable.running = Some(RunningPlugin { interface, gc });
        Ok(())
    }

    fn stop_internal(&self, reset: bool) -> Result<(), ShellError> {
        let mut mutable = self.mutable.lock().map_err(|_| ShellError::NushellFailed {
            msg: format!(
                "plugin `{}` mutable mutex poisoned, probably panic during spawn",
                self.identity.name()
            ),
        })?;

        // If the plugin is running, stop its GC, so that the GC doesn't accidentally try to stop
        // a future plugin
        if let Some(ref running) = mutable.running {
            running.gc.stop_tracking();
        }

        // We don't try to kill the process or anything, we just drop the RunningPlugin. It should
        // exit soon after
        mutable.running = None;

        // If this is a reset, we should also reset other learned attributes like preferred_mode
        if reset {
            mutable.preferred_mode = None;
        }
        Ok(())
    }
}

impl RegisteredPlugin for PersistentPlugin {
    fn identity(&self) -> &PluginIdentity {
        &self.identity
    }

    fn is_running(&self) -> bool {
        // If the lock is poisoned, we return false here. That may not be correct, but this is a
        // failure state anyway that would be noticed at some point
        self.mutable
            .lock()
            .map(|m| m.running.is_some())
            .unwrap_or(false)
    }

    fn pid(&self) -> Option<u32> {
        // Again, we return None for a poisoned lock.
        self.mutable
            .lock()
            .ok()
            .and_then(|r| r.running.as_ref().and_then(|r| r.interface.pid()))
    }

    fn stop(&self) -> Result<(), ShellError> {
        self.stop_internal(false)
    }

    fn reset(&self) -> Result<(), ShellError> {
        self.stop_internal(true)
    }

    fn metadata(&self) -> Option<PluginMetadata> {
        self.mutable.lock().ok().and_then(|m| m.metadata.clone())
    }

    fn set_metadata(&self, metadata: Option<PluginMetadata>) {
        if let Ok(mut mutable) = self.mutable.lock() {
            mutable.metadata = metadata;
        }
    }

    fn set_gc_config(&self, gc_config: &PluginGcConfig) {
        if let Ok(mut mutable) = self.mutable.lock() {
            // Save the new config for future calls
            mutable.gc_config = gc_config.clone();

            // If the plugin is already running, propagate the config change to the running GC
            if let Some(gc) = mutable.running.as_ref().map(|running| running.gc.clone()) {
                // We don't want to get caught holding the lock
                drop(mutable);
                gc.set_config(gc_config.clone());
                gc.flush();
            }
        }
    }

    fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync> {
        self
    }

    fn configure_signal_handler(self: Arc<Self>, handlers: &Handlers) -> Result<(), ShellError> {
        let guard = {
            // We take a weakref to the plugin so that we don't create a cycle to the
            // RAII guard that will be stored on the plugin.
            let plugin = Arc::downgrade(&self);
            handlers.register(Box::new(move |action| {
                // write a signal packet through the PluginInterface if the plugin is alive and
                // running
                if let Some(plugin) = plugin.upgrade() {
                    if let Ok(mutable) = plugin.mutable.lock() {
                        if let Some(ref running) = mutable.running {
                            let _ = running.interface.signal(action);
                        }
                    }
                }
            }))?
        };

        if let Ok(mut mutable) = self.mutable.lock() {
            mutable.signal_guard = Some(guard);
        }

        Ok(())
    }
}

/// Anything that can produce a plugin interface.
pub trait GetPlugin: RegisteredPlugin {
    /// Retrieve or spawn a [`PluginInterface`]. The `context` may be used for determining
    /// environment variables to launch the plugin with.
    fn get_plugin(
        self: Arc<Self>,
        context: Option<(&EngineState, &mut Stack)>,
    ) -> Result<PluginInterface, ShellError>;
}

impl GetPlugin for PersistentPlugin {
    fn get_plugin(
        self: Arc<Self>,
        mut context: Option<(&EngineState, &mut Stack)>,
    ) -> Result<PluginInterface, ShellError> {
        self.get(|| {
            // Get envs from the context if provided.
            let envs = context
                .as_mut()
                .map(|(engine_state, stack)| {
                    // We need the current environment variables for `python` based plugins. Or
                    // we'll likely have a problem when a plugin is implemented in a virtual Python
                    // environment.
                    let stack = &mut stack.start_collect_value();
                    nu_engine::env::env_to_strings(engine_state, stack)
                })
                .transpose()?;

            Ok(envs.unwrap_or_default())
        })
    }
}