nu_plugin_core/communication_mode/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
use std::ffi::OsStr;
use std::io::{Stdin, Stdout};
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
use nu_protocol::ShellError;
#[cfg(feature = "local-socket")]
mod local_socket;
#[cfg(feature = "local-socket")]
use local_socket::*;
/// The type of communication used between the plugin and the engine.
///
/// `Stdio` is required to be supported by all plugins, and is attempted initially. If the
/// `local-socket` feature is enabled and the plugin supports it, `LocalSocket` may be attempted.
///
/// Local socket communication has the benefit of not tying up stdio, so it's more compatible with
/// plugins that want to take user input from the terminal in some way.
#[derive(Debug, Clone)]
pub enum CommunicationMode {
/// Communicate using `stdin` and `stdout`.
Stdio,
/// Communicate using an operating system-specific local socket.
#[cfg(feature = "local-socket")]
LocalSocket(std::ffi::OsString),
}
impl CommunicationMode {
/// Generate a new local socket communication mode based on the given plugin exe path.
#[cfg(feature = "local-socket")]
pub fn local_socket(plugin_exe: &std::path::Path) -> CommunicationMode {
use std::hash::{Hash, Hasher};
use std::time::SystemTime;
// Generate the unique ID based on the plugin path and the current time. The actual
// algorithm here is not very important, we just want this to be relatively unique very
// briefly. Using the default hasher in the stdlib means zero extra dependencies.
let mut hasher = std::collections::hash_map::DefaultHasher::new();
plugin_exe.hash(&mut hasher);
SystemTime::now().hash(&mut hasher);
let unique_id = format!("{:016x}", hasher.finish());
CommunicationMode::LocalSocket(make_local_socket_name(&unique_id))
}
pub fn args(&self) -> Vec<&OsStr> {
match self {
CommunicationMode::Stdio => vec![OsStr::new("--stdio")],
#[cfg(feature = "local-socket")]
CommunicationMode::LocalSocket(path) => {
vec![OsStr::new("--local-socket"), path.as_os_str()]
}
}
}
pub fn setup_command_io(&self, command: &mut Command) {
match self {
CommunicationMode::Stdio => {
// Both stdout and stdin are piped so we can receive information from the plugin
command.stdin(Stdio::piped());
command.stdout(Stdio::piped());
}
#[cfg(feature = "local-socket")]
CommunicationMode::LocalSocket(_) => {
// Stdio can be used by the plugin to talk to the terminal in local socket mode,
// which is the big benefit
command.stdin(Stdio::inherit());
command.stdout(Stdio::inherit());
}
}
}
pub fn serve(&self) -> Result<PreparedServerCommunication, ShellError> {
match self {
// Nothing to set up for stdio - we just take it from the child.
CommunicationMode::Stdio => Ok(PreparedServerCommunication::Stdio),
// For sockets: we need to create the server so that the child won't fail to connect.
#[cfg(feature = "local-socket")]
CommunicationMode::LocalSocket(name) => {
use interprocess::local_socket::ListenerOptions;
let listener = interpret_local_socket_name(name)
.and_then(|name| ListenerOptions::new().name(name).create_sync())
.map_err(|err| ShellError::IOError {
msg: format!("failed to open socket for plugin: {err}"),
})?;
Ok(PreparedServerCommunication::LocalSocket { listener })
}
}
}
pub fn connect_as_client(&self) -> Result<ClientCommunicationIo, ShellError> {
match self {
CommunicationMode::Stdio => Ok(ClientCommunicationIo::Stdio(
std::io::stdin(),
std::io::stdout(),
)),
#[cfg(feature = "local-socket")]
CommunicationMode::LocalSocket(name) => {
// Connect to the specified socket.
let get_socket = || {
use interprocess::local_socket as ls;
use ls::traits::Stream;
interpret_local_socket_name(name)
.and_then(|name| ls::Stream::connect(name))
.map_err(|err| ShellError::IOError {
msg: format!("failed to connect to socket: {err}"),
})
};
// Reverse order from the server: read in, write out
let read_in = get_socket()?;
let write_out = get_socket()?;
Ok(ClientCommunicationIo::LocalSocket { read_in, write_out })
}
}
}
}
/// The result of [`CommunicationMode::serve()`], which acts as an intermediate stage for
/// communication modes that require some kind of socket binding to occur before the client process
/// can be started. Call [`.connect()`](Self::connect) once the client process has been started.
///
/// The socket may be cleaned up on `Drop` if applicable.
pub enum PreparedServerCommunication {
/// Will take stdin and stdout from the process on [`.connect()`](Self::connect).
Stdio,
/// Contains the listener to accept connections on. On Unix, the socket is unlinked on `Drop`.
#[cfg(feature = "local-socket")]
LocalSocket {
listener: interprocess::local_socket::Listener,
},
}
impl PreparedServerCommunication {
pub fn connect(&self, child: &mut Child) -> Result<ServerCommunicationIo, ShellError> {
match self {
PreparedServerCommunication::Stdio => {
let stdin = child
.stdin
.take()
.ok_or_else(|| ShellError::PluginFailedToLoad {
msg: "Plugin missing stdin writer".into(),
})?;
let stdout = child
.stdout
.take()
.ok_or_else(|| ShellError::PluginFailedToLoad {
msg: "Plugin missing stdout writer".into(),
})?;
Ok(ServerCommunicationIo::Stdio(stdin, stdout))
}
#[cfg(feature = "local-socket")]
PreparedServerCommunication::LocalSocket { listener, .. } => {
use interprocess::local_socket::traits::{
Listener, ListenerNonblockingMode, Stream,
};
use std::time::{Duration, Instant};
const RETRY_PERIOD: Duration = Duration::from_millis(1);
const TIMEOUT: Duration = Duration::from_secs(10);
let start = Instant::now();
// Use a loop to try to get two clients from the listener: one for read (the plugin
// output) and one for write (the plugin input)
//
// Be non-blocking on Accept only, so we can timeout.
listener.set_nonblocking(ListenerNonblockingMode::Accept)?;
let mut get_socket = || {
let mut result = None;
while let Ok(None) = child.try_wait() {
match listener.accept() {
Ok(stream) => {
// Success! Ensure the stream is in nonblocking mode though, for
// good measure. Had an issue without this on macOS.
stream.set_nonblocking(false)?;
result = Some(stream);
break;
}
Err(err) => {
if !is_would_block_err(&err) {
// `WouldBlock` is ok, just means it's not ready yet, but some other
// kind of error should be reported
return Err(err.into());
}
}
}
if Instant::now().saturating_duration_since(start) > TIMEOUT {
return Err(ShellError::PluginFailedToLoad {
msg: "Plugin timed out while waiting to connect to socket".into(),
});
} else {
std::thread::sleep(RETRY_PERIOD);
}
}
if let Some(stream) = result {
Ok(stream)
} else {
// The process may have exited
Err(ShellError::PluginFailedToLoad {
msg: "Plugin exited without connecting".into(),
})
}
};
// Input stream always comes before output
let write_in = get_socket()?;
let read_out = get_socket()?;
Ok(ServerCommunicationIo::LocalSocket { read_out, write_in })
}
}
}
}
/// The required streams for communication from the engine side, i.e. the server in socket terms.
pub enum ServerCommunicationIo {
Stdio(ChildStdin, ChildStdout),
#[cfg(feature = "local-socket")]
LocalSocket {
read_out: interprocess::local_socket::Stream,
write_in: interprocess::local_socket::Stream,
},
}
/// The required streams for communication from the plugin side, i.e. the client in socket terms.
pub enum ClientCommunicationIo {
Stdio(Stdin, Stdout),
#[cfg(feature = "local-socket")]
LocalSocket {
read_in: interprocess::local_socket::Stream,
write_out: interprocess::local_socket::Stream,
},
}