lsp_server/lib.rs
1//! A language server scaffold, exposing a synchronous crossbeam-channel based API.
2//! This crate handles protocol handshaking and parsing messages, while you
3//! control the message dispatch loop yourself.
4//!
5//! Run with `RUST_LOG=lsp_server=debug` to see all the messages.
6
7#![warn(rust_2018_idioms, unused_lifetimes)]
8#![allow(clippy::print_stdout, clippy::disallowed_types)]
9
10mod error;
11mod msg;
12mod req_queue;
13mod socket;
14mod stdio;
15
16use std::{
17 io,
18 net::{TcpListener, TcpStream, ToSocketAddrs},
19};
20
21use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender};
22
23pub use crate::{
24 error::{ExtractError, ProtocolError},
25 msg::{ErrorCode, Message, Notification, Request, RequestId, Response, ResponseError},
26 req_queue::{Incoming, Outgoing, ReqQueue},
27 stdio::IoThreads,
28};
29
30/// Connection is just a pair of channels of LSP messages.
31pub struct Connection {
32 pub sender: Sender<Message>,
33 pub receiver: Receiver<Message>,
34}
35
36impl Connection {
37 /// Create connection over standard in/standard out.
38 ///
39 /// Use this to create a real language server.
40 pub fn stdio() -> (Connection, IoThreads) {
41 let (sender, receiver, io_threads) = stdio::stdio_transport();
42 (Connection { sender, receiver }, io_threads)
43 }
44
45 /// Open a connection over tcp.
46 /// This call blocks until a connection is established.
47 ///
48 /// Use this to create a real language server.
49 pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<(Connection, IoThreads)> {
50 let stream = TcpStream::connect(addr)?;
51 let (sender, receiver, io_threads) = socket::socket_transport(stream);
52 Ok((Connection { sender, receiver }, io_threads))
53 }
54
55 /// Listen for a connection over tcp.
56 /// This call blocks until a connection is established.
57 ///
58 /// Use this to create a real language server.
59 pub fn listen<A: ToSocketAddrs>(addr: A) -> io::Result<(Connection, IoThreads)> {
60 let listener = TcpListener::bind(addr)?;
61 let (stream, _) = listener.accept()?;
62 let (sender, receiver, io_threads) = socket::socket_transport(stream);
63 Ok((Connection { sender, receiver }, io_threads))
64 }
65
66 /// Creates a pair of connected connections.
67 ///
68 /// Use this for testing.
69 pub fn memory() -> (Connection, Connection) {
70 let (s1, r1) = crossbeam_channel::unbounded();
71 let (s2, r2) = crossbeam_channel::unbounded();
72 (Connection { sender: s1, receiver: r2 }, Connection { sender: s2, receiver: r1 })
73 }
74
75 /// Starts the initialization process by waiting for an initialize
76 /// request from the client. Use this for more advanced customization than
77 /// `initialize` can provide.
78 ///
79 /// Returns the request id and serialized `InitializeParams` from the client.
80 ///
81 /// # Example
82 ///
83 /// ```no_run
84 /// use std::error::Error;
85 /// use lsp_types::{ClientCapabilities, InitializeParams, ServerCapabilities};
86 ///
87 /// use lsp_server::{Connection, Message, Request, RequestId, Response};
88 ///
89 /// fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
90 /// // Create the transport. Includes the stdio (stdin and stdout) versions but this could
91 /// // also be implemented to use sockets or HTTP.
92 /// let (connection, io_threads) = Connection::stdio();
93 ///
94 /// // Run the server
95 /// let (id, params) = connection.initialize_start()?;
96 ///
97 /// let init_params: InitializeParams = serde_json::from_value(params).unwrap();
98 /// let client_capabilities: ClientCapabilities = init_params.capabilities;
99 /// let server_capabilities = ServerCapabilities::default();
100 ///
101 /// let initialize_data = serde_json::json!({
102 /// "capabilities": server_capabilities,
103 /// "serverInfo": {
104 /// "name": "lsp-server-test",
105 /// "version": "0.1"
106 /// }
107 /// });
108 ///
109 /// connection.initialize_finish(id, initialize_data)?;
110 ///
111 /// // ... Run main loop ...
112 ///
113 /// Ok(())
114 /// }
115 /// ```
116 pub fn initialize_start(&self) -> Result<(RequestId, serde_json::Value), ProtocolError> {
117 self.initialize_start_while(|| true)
118 }
119
120 /// Starts the initialization process by waiting for an initialize as described in
121 /// [`Self::initialize_start`] as long as `running` returns
122 /// `true` while the return value can be changed through a sig handler such as `CTRL + C`.
123 ///
124 /// # Example
125 ///
126 /// ```rust
127 /// use std::sync::atomic::{AtomicBool, Ordering};
128 /// use std::sync::Arc;
129 /// # use std::error::Error;
130 /// # use lsp_types::{ClientCapabilities, InitializeParams, ServerCapabilities};
131 /// # use lsp_server::{Connection, Message, Request, RequestId, Response};
132 /// # fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
133 /// let running = Arc::new(AtomicBool::new(true));
134 /// # running.store(true, Ordering::SeqCst);
135 /// let r = running.clone();
136 ///
137 /// ctrlc::set_handler(move || {
138 /// r.store(false, Ordering::SeqCst);
139 /// }).expect("Error setting Ctrl-C handler");
140 ///
141 /// let (connection, io_threads) = Connection::stdio();
142 ///
143 /// let res = connection.initialize_start_while(|| running.load(Ordering::SeqCst));
144 /// # assert!(res.is_err());
145 ///
146 /// # Ok(())
147 /// # }
148 /// ```
149 pub fn initialize_start_while<C>(
150 &self,
151 running: C,
152 ) -> Result<(RequestId, serde_json::Value), ProtocolError>
153 where
154 C: Fn() -> bool,
155 {
156 while running() {
157 let msg = match self.receiver.recv_timeout(std::time::Duration::from_secs(1)) {
158 Ok(msg) => msg,
159 Err(RecvTimeoutError::Timeout) => {
160 continue;
161 }
162 Err(RecvTimeoutError::Disconnected) => return Err(ProtocolError::disconnected()),
163 };
164
165 match msg {
166 Message::Request(req) if req.is_initialize() => return Ok((req.id, req.params)),
167 // Respond to non-initialize requests with ServerNotInitialized
168 Message::Request(req) => {
169 let resp = Response::new_err(
170 req.id.clone(),
171 ErrorCode::ServerNotInitialized as i32,
172 format!("expected initialize request, got {req:?}"),
173 );
174 self.sender.send(resp.into()).unwrap();
175 continue;
176 }
177 Message::Notification(n) if !n.is_exit() => {
178 continue;
179 }
180 msg => {
181 return Err(ProtocolError::new(format!(
182 "expected initialize request, got {msg:?}"
183 )));
184 }
185 };
186 }
187
188 Err(ProtocolError::new(String::from(
189 "Initialization has been aborted during initialization",
190 )))
191 }
192
193 /// Finishes the initialization process by sending an `InitializeResult` to the client
194 pub fn initialize_finish(
195 &self,
196 initialize_id: RequestId,
197 initialize_result: serde_json::Value,
198 ) -> Result<(), ProtocolError> {
199 let resp = Response::new_ok(initialize_id, initialize_result);
200 self.sender.send(resp.into()).unwrap();
201 match &self.receiver.recv() {
202 Ok(Message::Notification(n)) if n.is_initialized() => Ok(()),
203 Ok(msg) => Err(ProtocolError::new(format!(
204 r#"expected initialized notification, got: {msg:?}"#
205 ))),
206 Err(RecvError) => Err(ProtocolError::disconnected()),
207 }
208 }
209
210 /// Finishes the initialization process as described in [`Self::initialize_finish`] as
211 /// long as `running` returns `true` while the return value can be changed through a sig
212 /// handler such as `CTRL + C`.
213 pub fn initialize_finish_while<C>(
214 &self,
215 initialize_id: RequestId,
216 initialize_result: serde_json::Value,
217 running: C,
218 ) -> Result<(), ProtocolError>
219 where
220 C: Fn() -> bool,
221 {
222 let resp = Response::new_ok(initialize_id, initialize_result);
223 self.sender.send(resp.into()).unwrap();
224
225 while running() {
226 let msg = match self.receiver.recv_timeout(std::time::Duration::from_secs(1)) {
227 Ok(msg) => msg,
228 Err(RecvTimeoutError::Timeout) => {
229 continue;
230 }
231 Err(RecvTimeoutError::Disconnected) => {
232 return Err(ProtocolError::disconnected());
233 }
234 };
235
236 match msg {
237 Message::Notification(n) if n.is_initialized() => {
238 return Ok(());
239 }
240 msg => {
241 return Err(ProtocolError::new(format!(
242 r#"expected initialized notification, got: {msg:?}"#
243 )));
244 }
245 }
246 }
247
248 Err(ProtocolError::new(String::from(
249 "Initialization has been aborted during initialization",
250 )))
251 }
252
253 /// Initialize the connection. Sends the server capabilities
254 /// to the client and returns the serialized client capabilities
255 /// on success. If more fine-grained initialization is required use
256 /// `initialize_start`/`initialize_finish`.
257 ///
258 /// # Example
259 ///
260 /// ```no_run
261 /// use std::error::Error;
262 /// use lsp_types::ServerCapabilities;
263 ///
264 /// use lsp_server::{Connection, Message, Request, RequestId, Response};
265 ///
266 /// fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
267 /// // Create the transport. Includes the stdio (stdin and stdout) versions but this could
268 /// // also be implemented to use sockets or HTTP.
269 /// let (connection, io_threads) = Connection::stdio();
270 ///
271 /// // Run the server
272 /// let server_capabilities = serde_json::to_value(&ServerCapabilities::default()).unwrap();
273 /// let initialization_params = connection.initialize(server_capabilities)?;
274 ///
275 /// // ... Run main loop ...
276 ///
277 /// Ok(())
278 /// }
279 /// ```
280 pub fn initialize(
281 &self,
282 server_capabilities: serde_json::Value,
283 ) -> Result<serde_json::Value, ProtocolError> {
284 let (id, params) = self.initialize_start()?;
285
286 let initialize_data = serde_json::json!({
287 "capabilities": server_capabilities,
288 });
289
290 self.initialize_finish(id, initialize_data)?;
291
292 Ok(params)
293 }
294
295 /// Initialize the connection as described in [`Self::initialize`] as long as `running` returns
296 /// `true` while the return value can be changed through a sig handler such as `CTRL + C`.
297 ///
298 /// # Example
299 ///
300 /// ```rust
301 /// use std::sync::atomic::{AtomicBool, Ordering};
302 /// use std::sync::Arc;
303 /// # use std::error::Error;
304 /// # use lsp_types::ServerCapabilities;
305 /// # use lsp_server::{Connection, Message, Request, RequestId, Response};
306 ///
307 /// # fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
308 /// let running = Arc::new(AtomicBool::new(true));
309 /// # running.store(true, Ordering::SeqCst);
310 /// let r = running.clone();
311 ///
312 /// ctrlc::set_handler(move || {
313 /// r.store(false, Ordering::SeqCst);
314 /// }).expect("Error setting Ctrl-C handler");
315 ///
316 /// let (connection, io_threads) = Connection::stdio();
317 ///
318 /// let server_capabilities = serde_json::to_value(&ServerCapabilities::default()).unwrap();
319 /// let initialization_params = connection.initialize_while(
320 /// server_capabilities,
321 /// || running.load(Ordering::SeqCst)
322 /// );
323 ///
324 /// # assert!(initialization_params.is_err());
325 /// # Ok(())
326 /// # }
327 /// ```
328 pub fn initialize_while<C>(
329 &self,
330 server_capabilities: serde_json::Value,
331 running: C,
332 ) -> Result<serde_json::Value, ProtocolError>
333 where
334 C: Fn() -> bool,
335 {
336 let (id, params) = self.initialize_start_while(&running)?;
337
338 let initialize_data = serde_json::json!({
339 "capabilities": server_capabilities,
340 });
341
342 self.initialize_finish_while(id, initialize_data, running)?;
343
344 Ok(params)
345 }
346
347 /// If `req` is `Shutdown`, respond to it and return `true`, otherwise return `false`
348 pub fn handle_shutdown(&self, req: &Request) -> Result<bool, ProtocolError> {
349 if !req.is_shutdown() {
350 return Ok(false);
351 }
352 let resp = Response::new_ok(req.id.clone(), ());
353 let _ = self.sender.send(resp.into());
354 match &self.receiver.recv_timeout(std::time::Duration::from_secs(30)) {
355 Ok(Message::Notification(n)) if n.is_exit() => (),
356 Ok(msg) => {
357 return Err(ProtocolError::new(format!(
358 "unexpected message during shutdown: {msg:?}"
359 )))
360 }
361 Err(RecvTimeoutError::Timeout) => {
362 return Err(ProtocolError::new(
363 "timed out waiting for exit notification".to_owned(),
364 ))
365 }
366 Err(RecvTimeoutError::Disconnected) => {
367 return Err(ProtocolError::new(
368 "channel disconnected waiting for exit notification".to_owned(),
369 ))
370 }
371 }
372 Ok(true)
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use crossbeam_channel::unbounded;
379 use lsp_types::notification::{Exit, Initialized, Notification};
380 use lsp_types::request::{Initialize, Request};
381 use lsp_types::{InitializeParams, InitializedParams};
382 use serde_json::to_value;
383
384 use crate::{Connection, Message, ProtocolError, RequestId};
385
386 struct TestCase {
387 test_messages: Vec<Message>,
388 expected_resp: Result<(RequestId, serde_json::Value), ProtocolError>,
389 }
390
391 fn initialize_start_test(test_case: TestCase) {
392 let (reader_sender, reader_receiver) = unbounded::<Message>();
393 let (writer_sender, writer_receiver) = unbounded::<Message>();
394 let conn = Connection { sender: writer_sender, receiver: reader_receiver };
395
396 for msg in test_case.test_messages {
397 assert!(reader_sender.send(msg).is_ok());
398 }
399
400 let resp = conn.initialize_start();
401 assert_eq!(test_case.expected_resp, resp);
402
403 assert!(writer_receiver.recv_timeout(std::time::Duration::from_secs(1)).is_err());
404 }
405
406 #[test]
407 fn not_exit_notification() {
408 let notification = crate::Notification {
409 method: Initialized::METHOD.to_owned(),
410 params: to_value(InitializedParams {}).unwrap(),
411 };
412
413 let params_as_value = to_value(InitializeParams::default()).unwrap();
414 let req_id = RequestId::from(234);
415 let request = crate::Request {
416 id: req_id.clone(),
417 method: Initialize::METHOD.to_owned(),
418 params: params_as_value.clone(),
419 };
420
421 initialize_start_test(TestCase {
422 test_messages: vec![notification.into(), request.into()],
423 expected_resp: Ok((req_id, params_as_value)),
424 });
425 }
426
427 #[test]
428 fn exit_notification() {
429 let notification =
430 crate::Notification { method: Exit::METHOD.to_owned(), params: to_value(()).unwrap() };
431 let notification_msg = Message::from(notification);
432
433 initialize_start_test(TestCase {
434 test_messages: vec![notification_msg.clone()],
435 expected_resp: Err(ProtocolError::new(format!(
436 "expected initialize request, got {notification_msg:?}"
437 ))),
438 });
439 }
440}