nu_plugin_engine/
init.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
use std::{
    io::{BufReader, BufWriter},
    path::Path,
    process::Child,
    sync::{Arc, Mutex},
};

#[cfg(unix)]
use std::os::unix::process::CommandExt;
#[cfg(windows)]
use std::os::windows::process::CommandExt;

use nu_plugin_core::{
    CommunicationMode, EncodingType, InterfaceManager, PreparedServerCommunication,
    ServerCommunicationIo,
};
use nu_protocol::{
    engine::StateWorkingSet, report_shell_error, PluginIdentity, PluginRegistryFile,
    PluginRegistryItem, PluginRegistryItemData, RegisteredPlugin, ShellError, Span,
};

use crate::{
    PersistentPlugin, PluginDeclaration, PluginGc, PluginInterface, PluginInterfaceManager,
    PluginSource,
};

/// This should be larger than the largest commonly sent message to avoid excessive fragmentation.
///
/// The buffers coming from byte streams are typically each 8192 bytes, so double that.
pub(crate) const OUTPUT_BUFFER_SIZE: usize = 16384;

/// Spawn the command for a plugin, in the given `mode`. After spawning, it can be passed to
/// [`make_plugin_interface()`] to get a [`PluginInterface`].
pub fn create_command(
    path: &Path,
    mut shell: Option<&Path>,
    mode: &CommunicationMode,
) -> std::process::Command {
    log::trace!("Starting plugin: {path:?}, shell = {shell:?}, mode = {mode:?}");

    let mut shell_args = vec![];

    if shell.is_none() {
        // We only have to do this for things that are not executable by Rust's Command API on
        // Windows. They do handle bat/cmd files for us, helpfully.
        //
        // Also include anything that wouldn't be executable with a shebang, like JAR files.
        shell = match path.extension().and_then(|e| e.to_str()) {
            Some("sh") => {
                if cfg!(unix) {
                    // We don't want to override what might be in the shebang if this is Unix, since
                    // some scripts will have a shebang specifying bash even if they're .sh
                    None
                } else {
                    Some(Path::new("sh"))
                }
            }
            Some("nu") => {
                shell_args.push("--stdin");
                Some(Path::new("nu"))
            }
            Some("py") => Some(Path::new("python")),
            Some("rb") => Some(Path::new("ruby")),
            Some("jar") => {
                shell_args.push("-jar");
                Some(Path::new("java"))
            }
            _ => None,
        };
    }

    let mut process = if let Some(shell) = shell {
        let mut process = std::process::Command::new(shell);
        process.args(shell_args);
        process.arg(path);

        process
    } else {
        std::process::Command::new(path)
    };

    process.args(mode.args());

    // Setup I/O according to the communication mode
    mode.setup_command_io(&mut process);

    // The plugin should be run in a new process group to prevent Ctrl-C from stopping it
    #[cfg(unix)]
    process.process_group(0);
    #[cfg(windows)]
    process.creation_flags(windows::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP.0);

    // In order to make bugs with improper use of filesystem without getting the engine current
    // directory more obvious, the plugin always starts in the directory of its executable
    if let Some(dirname) = path.parent() {
        process.current_dir(dirname);
    }

    process
}

/// Create a plugin interface from a spawned child process.
///
/// `comm` determines the communication type the process was spawned with, and whether stdio will
/// be taken from the child.
pub fn make_plugin_interface(
    mut child: Child,
    comm: PreparedServerCommunication,
    source: Arc<PluginSource>,
    pid: Option<u32>,
    gc: Option<PluginGc>,
) -> Result<PluginInterface, ShellError> {
    match comm.connect(&mut child)? {
        ServerCommunicationIo::Stdio(stdin, stdout) => make_plugin_interface_with_streams(
            stdout,
            stdin,
            move || {
                let _ = child.wait();
            },
            source,
            pid,
            gc,
        ),
        #[cfg(feature = "local-socket")]
        ServerCommunicationIo::LocalSocket { read_out, write_in } => {
            make_plugin_interface_with_streams(
                read_out,
                write_in,
                move || {
                    let _ = child.wait();
                },
                source,
                pid,
                gc,
            )
        }
    }
}

/// Create a plugin interface from low-level components.
///
/// - `after_close` is called to clean up after the `reader` ends.
/// - `source` is required so that custom values produced by the plugin can spawn it.
/// - `pid` may be provided for process management (e.g. `EnterForeground`).
/// - `gc` may be provided for communication with the plugin's GC (e.g. `SetGcDisabled`).
pub fn make_plugin_interface_with_streams(
    mut reader: impl std::io::Read + Send + 'static,
    writer: impl std::io::Write + Send + 'static,
    after_close: impl FnOnce() + Send + 'static,
    source: Arc<PluginSource>,
    pid: Option<u32>,
    gc: Option<PluginGc>,
) -> Result<PluginInterface, ShellError> {
    let encoder = get_plugin_encoding(&mut reader)?;

    let reader = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, reader);
    let writer = BufWriter::with_capacity(OUTPUT_BUFFER_SIZE, writer);

    let mut manager =
        PluginInterfaceManager::new(source.clone(), pid, (Mutex::new(writer), encoder));
    manager.set_garbage_collector(gc);

    let interface = manager.get_interface();
    interface.hello()?;

    // Spawn the reader on a new thread. We need to be able to read messages at the same time that
    // we write, because we are expected to be able to handle multiple messages coming in from the
    // plugin at any time, including stream messages like `Drop`.
    std::thread::Builder::new()
        .name(format!(
            "plugin interface reader ({})",
            source.identity.name()
        ))
        .spawn(move || {
            if let Err(err) = manager.consume_all((reader, encoder)) {
                log::warn!("Error in PluginInterfaceManager: {err}");
            }
            // If the loop has ended, drop the manager so everyone disconnects and then run
            // after_close
            drop(manager);
            after_close();
        })
        .map_err(|err| ShellError::PluginFailedToLoad {
            msg: format!("Failed to spawn thread for plugin: {err}"),
        })?;

    Ok(interface)
}

/// Determine the plugin's encoding from a freshly opened stream.
///
/// The plugin is expected to send a 1-byte length and either `json` or `msgpack`, so this reads
/// that and determines the right length.
pub fn get_plugin_encoding(
    child_stdout: &mut impl std::io::Read,
) -> Result<EncodingType, ShellError> {
    let mut length_buf = [0u8; 1];
    child_stdout
        .read_exact(&mut length_buf)
        .map_err(|e| ShellError::PluginFailedToLoad {
            msg: format!("unable to get encoding from plugin: {e}"),
        })?;

    let mut buf = vec![0u8; length_buf[0] as usize];
    child_stdout
        .read_exact(&mut buf)
        .map_err(|e| ShellError::PluginFailedToLoad {
            msg: format!("unable to get encoding from plugin: {e}"),
        })?;

    EncodingType::try_from_bytes(&buf).ok_or_else(|| {
        let encoding_for_debug = String::from_utf8_lossy(&buf);
        ShellError::PluginFailedToLoad {
            msg: format!("get unsupported plugin encoding: {encoding_for_debug}"),
        }
    })
}

/// Load the definitions from the plugin file into the engine state
pub fn load_plugin_file(
    working_set: &mut StateWorkingSet,
    plugin_registry_file: &PluginRegistryFile,
    span: Option<Span>,
) {
    for plugin in &plugin_registry_file.plugins {
        // Any errors encountered should just be logged.
        if let Err(err) = load_plugin_registry_item(working_set, plugin, span) {
            report_shell_error(working_set.permanent_state, &err)
        }
    }
}

/// Load a definition from the plugin file into the engine state
pub fn load_plugin_registry_item(
    working_set: &mut StateWorkingSet,
    plugin: &PluginRegistryItem,
    span: Option<Span>,
) -> Result<Arc<PersistentPlugin>, ShellError> {
    let identity =
        PluginIdentity::new(plugin.filename.clone(), plugin.shell.clone()).map_err(|_| {
            ShellError::GenericError {
                error: "Invalid plugin filename in plugin registry file".into(),
                msg: "loaded from here".into(),
                span,
                help: Some(format!(
                    "the filename for `{}` is not a valid nushell plugin: {}",
                    plugin.name,
                    plugin.filename.display()
                )),
                inner: vec![],
            }
        })?;

    match &plugin.data {
        PluginRegistryItemData::Valid { metadata, commands } => {
            let plugin = add_plugin_to_working_set(working_set, &identity)?;

            // Ensure that the plugin is reset. We're going to load new signatures, so we want to
            // make sure the running plugin reflects those new signatures, and it's possible that it
            // doesn't.
            plugin.reset()?;

            // Set the plugin metadata from the file
            plugin.set_metadata(Some(metadata.clone()));

            // Create the declarations from the commands
            for signature in commands {
                let decl = PluginDeclaration::new(plugin.clone(), signature.clone());
                working_set.add_decl(Box::new(decl));
            }
            Ok(plugin)
        }
        PluginRegistryItemData::Invalid => Err(ShellError::PluginRegistryDataInvalid {
            plugin_name: identity.name().to_owned(),
            span,
            add_command: identity.add_command(),
        }),
    }
}

/// Find [`PersistentPlugin`] with the given `identity` in the `working_set`, or construct it
/// if it doesn't exist.
///
/// The garbage collection config is always found and set in either case.
pub fn add_plugin_to_working_set(
    working_set: &mut StateWorkingSet,
    identity: &PluginIdentity,
) -> Result<Arc<PersistentPlugin>, ShellError> {
    // Find garbage collection config for the plugin
    let gc_config = working_set
        .get_config()
        .plugin_gc
        .get(identity.name())
        .clone();

    // Add it to / get it from the working set
    let plugin = working_set.find_or_create_plugin(identity, || {
        Arc::new(PersistentPlugin::new(identity.clone(), gc_config.clone()))
    });

    plugin.set_gc_config(&gc_config);

    // Downcast the plugin to `PersistentPlugin` - we generally expect this to succeed.
    // The trait object only exists so that nu-protocol can contain plugins without knowing
    // anything about their implementation, but we only use `PersistentPlugin` in practice.
    plugin
        .as_any()
        .downcast()
        .map_err(|_| ShellError::NushellFailed {
            msg: "encountered unexpected RegisteredPlugin type".into(),
        })
}