nu_plugin_core/communication_mode/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
use std::ffi::OsStr;
use std::io::{Stdin, Stdout};
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};

use nu_protocol::ShellError;

#[cfg(feature = "local-socket")]
mod local_socket;

#[cfg(feature = "local-socket")]
use local_socket::*;

/// The type of communication used between the plugin and the engine.
///
/// `Stdio` is required to be supported by all plugins, and is attempted initially. If the
/// `local-socket` feature is enabled and the plugin supports it, `LocalSocket` may be attempted.
///
/// Local socket communication has the benefit of not tying up stdio, so it's more compatible with
/// plugins that want to take user input from the terminal in some way.
#[derive(Debug, Clone)]
pub enum CommunicationMode {
    /// Communicate using `stdin` and `stdout`.
    Stdio,
    /// Communicate using an operating system-specific local socket.
    #[cfg(feature = "local-socket")]
    LocalSocket(std::ffi::OsString),
}

impl CommunicationMode {
    /// Generate a new local socket communication mode based on the given plugin exe path.
    #[cfg(feature = "local-socket")]
    pub fn local_socket(plugin_exe: &std::path::Path) -> CommunicationMode {
        use std::hash::{Hash, Hasher};
        use std::time::SystemTime;

        // Generate the unique ID based on the plugin path and the current time. The actual
        // algorithm here is not very important, we just want this to be relatively unique very
        // briefly. Using the default hasher in the stdlib means zero extra dependencies.
        let mut hasher = std::collections::hash_map::DefaultHasher::new();

        plugin_exe.hash(&mut hasher);
        SystemTime::now().hash(&mut hasher);

        let unique_id = format!("{:016x}", hasher.finish());

        CommunicationMode::LocalSocket(make_local_socket_name(&unique_id))
    }

    pub fn args(&self) -> Vec<&OsStr> {
        match self {
            CommunicationMode::Stdio => vec![OsStr::new("--stdio")],
            #[cfg(feature = "local-socket")]
            CommunicationMode::LocalSocket(path) => {
                vec![OsStr::new("--local-socket"), path.as_os_str()]
            }
        }
    }

    pub fn setup_command_io(&self, command: &mut Command) {
        match self {
            CommunicationMode::Stdio => {
                // Both stdout and stdin are piped so we can receive information from the plugin
                command.stdin(Stdio::piped());
                command.stdout(Stdio::piped());
            }
            #[cfg(feature = "local-socket")]
            CommunicationMode::LocalSocket(_) => {
                // Stdio can be used by the plugin to talk to the terminal in local socket mode,
                // which is the big benefit
                command.stdin(Stdio::inherit());
                command.stdout(Stdio::inherit());
            }
        }
    }

    pub fn serve(&self) -> Result<PreparedServerCommunication, ShellError> {
        match self {
            // Nothing to set up for stdio - we just take it from the child.
            CommunicationMode::Stdio => Ok(PreparedServerCommunication::Stdio),
            // For sockets: we need to create the server so that the child won't fail to connect.
            #[cfg(feature = "local-socket")]
            CommunicationMode::LocalSocket(name) => {
                use interprocess::local_socket::ListenerOptions;

                let listener = interpret_local_socket_name(name)
                    .and_then(|name| ListenerOptions::new().name(name).create_sync())
                    .map_err(|err| ShellError::IOError {
                        msg: format!("failed to open socket for plugin: {err}"),
                    })?;
                Ok(PreparedServerCommunication::LocalSocket { listener })
            }
        }
    }

    pub fn connect_as_client(&self) -> Result<ClientCommunicationIo, ShellError> {
        match self {
            CommunicationMode::Stdio => Ok(ClientCommunicationIo::Stdio(
                std::io::stdin(),
                std::io::stdout(),
            )),
            #[cfg(feature = "local-socket")]
            CommunicationMode::LocalSocket(name) => {
                // Connect to the specified socket.
                let get_socket = || {
                    use interprocess::local_socket as ls;
                    use ls::traits::Stream;

                    interpret_local_socket_name(name)
                        .and_then(|name| ls::Stream::connect(name))
                        .map_err(|err| ShellError::IOError {
                            msg: format!("failed to connect to socket: {err}"),
                        })
                };
                // Reverse order from the server: read in, write out
                let read_in = get_socket()?;
                let write_out = get_socket()?;
                Ok(ClientCommunicationIo::LocalSocket { read_in, write_out })
            }
        }
    }
}

/// The result of [`CommunicationMode::serve()`], which acts as an intermediate stage for
/// communication modes that require some kind of socket binding to occur before the client process
/// can be started. Call [`.connect()`](Self::connect) once the client process has been started.
///
/// The socket may be cleaned up on `Drop` if applicable.
pub enum PreparedServerCommunication {
    /// Will take stdin and stdout from the process on [`.connect()`](Self::connect).
    Stdio,
    /// Contains the listener to accept connections on. On Unix, the socket is unlinked on `Drop`.
    #[cfg(feature = "local-socket")]
    LocalSocket {
        listener: interprocess::local_socket::Listener,
    },
}

impl PreparedServerCommunication {
    pub fn connect(&self, child: &mut Child) -> Result<ServerCommunicationIo, ShellError> {
        match self {
            PreparedServerCommunication::Stdio => {
                let stdin = child
                    .stdin
                    .take()
                    .ok_or_else(|| ShellError::PluginFailedToLoad {
                        msg: "Plugin missing stdin writer".into(),
                    })?;

                let stdout = child
                    .stdout
                    .take()
                    .ok_or_else(|| ShellError::PluginFailedToLoad {
                        msg: "Plugin missing stdout writer".into(),
                    })?;

                Ok(ServerCommunicationIo::Stdio(stdin, stdout))
            }
            #[cfg(feature = "local-socket")]
            PreparedServerCommunication::LocalSocket { listener, .. } => {
                use interprocess::local_socket::traits::{
                    Listener, ListenerNonblockingMode, Stream,
                };
                use std::time::{Duration, Instant};

                const RETRY_PERIOD: Duration = Duration::from_millis(1);
                const TIMEOUT: Duration = Duration::from_secs(10);

                let start = Instant::now();

                // Use a loop to try to get two clients from the listener: one for read (the plugin
                // output) and one for write (the plugin input)
                //
                // Be non-blocking on Accept only, so we can timeout.
                listener.set_nonblocking(ListenerNonblockingMode::Accept)?;
                let mut get_socket = || {
                    let mut result = None;
                    while let Ok(None) = child.try_wait() {
                        match listener.accept() {
                            Ok(stream) => {
                                // Success! Ensure the stream is in nonblocking mode though, for
                                // good measure. Had an issue without this on macOS.
                                stream.set_nonblocking(false)?;
                                result = Some(stream);
                                break;
                            }
                            Err(err) => {
                                if !is_would_block_err(&err) {
                                    // `WouldBlock` is ok, just means it's not ready yet, but some other
                                    // kind of error should be reported
                                    return Err(err.into());
                                }
                            }
                        }
                        if Instant::now().saturating_duration_since(start) > TIMEOUT {
                            return Err(ShellError::PluginFailedToLoad {
                                msg: "Plugin timed out while waiting to connect to socket".into(),
                            });
                        } else {
                            std::thread::sleep(RETRY_PERIOD);
                        }
                    }
                    if let Some(stream) = result {
                        Ok(stream)
                    } else {
                        // The process may have exited
                        Err(ShellError::PluginFailedToLoad {
                            msg: "Plugin exited without connecting".into(),
                        })
                    }
                };
                // Input stream always comes before output
                let write_in = get_socket()?;
                let read_out = get_socket()?;
                Ok(ServerCommunicationIo::LocalSocket { read_out, write_in })
            }
        }
    }
}

/// The required streams for communication from the engine side, i.e. the server in socket terms.
pub enum ServerCommunicationIo {
    Stdio(ChildStdin, ChildStdout),
    #[cfg(feature = "local-socket")]
    LocalSocket {
        read_out: interprocess::local_socket::Stream,
        write_in: interprocess::local_socket::Stream,
    },
}

/// The required streams for communication from the plugin side, i.e. the client in socket terms.
pub enum ClientCommunicationIo {
    Stdio(Stdin, Stdout),
    #[cfg(feature = "local-socket")]
    LocalSocket {
        read_in: interprocess::local_socket::Stream,
        write_out: interprocess::local_socket::Stream,
    },
}