lsp_server/
lib.rs

1//! A language server scaffold, exposing a synchronous crossbeam-channel based API.
2//! This crate handles protocol handshaking and parsing messages, while you
3//! control the message dispatch loop yourself.
4//!
5//! Run with `RUST_LOG=lsp_server=debug` to see all the messages.
6
7#![warn(rust_2018_idioms, unused_lifetimes)]
8#![allow(clippy::print_stdout, clippy::disallowed_types)]
9
10mod error;
11mod msg;
12mod req_queue;
13mod socket;
14mod stdio;
15
16use std::{
17    io,
18    net::{TcpListener, TcpStream, ToSocketAddrs},
19};
20
21use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender};
22
23pub use crate::{
24    error::{ExtractError, ProtocolError},
25    msg::{ErrorCode, Message, Notification, Request, RequestId, Response, ResponseError},
26    req_queue::{Incoming, Outgoing, ReqQueue},
27    stdio::IoThreads,
28};
29
30/// Connection is just a pair of channels of LSP messages.
31pub struct Connection {
32    pub sender: Sender<Message>,
33    pub receiver: Receiver<Message>,
34}
35
36impl Connection {
37    /// Create connection over standard in/standard out.
38    ///
39    /// Use this to create a real language server.
40    pub fn stdio() -> (Connection, IoThreads) {
41        let (sender, receiver, io_threads) = stdio::stdio_transport();
42        (Connection { sender, receiver }, io_threads)
43    }
44
45    /// Open a connection over tcp.
46    /// This call blocks until a connection is established.
47    ///
48    /// Use this to create a real language server.
49    pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<(Connection, IoThreads)> {
50        let stream = TcpStream::connect(addr)?;
51        let (sender, receiver, io_threads) = socket::socket_transport(stream);
52        Ok((Connection { sender, receiver }, io_threads))
53    }
54
55    /// Listen for a connection over tcp.
56    /// This call blocks until a connection is established.
57    ///
58    /// Use this to create a real language server.
59    pub fn listen<A: ToSocketAddrs>(addr: A) -> io::Result<(Connection, IoThreads)> {
60        let listener = TcpListener::bind(addr)?;
61        let (stream, _) = listener.accept()?;
62        let (sender, receiver, io_threads) = socket::socket_transport(stream);
63        Ok((Connection { sender, receiver }, io_threads))
64    }
65
66    /// Creates a pair of connected connections.
67    ///
68    /// Use this for testing.
69    pub fn memory() -> (Connection, Connection) {
70        let (s1, r1) = crossbeam_channel::unbounded();
71        let (s2, r2) = crossbeam_channel::unbounded();
72        (Connection { sender: s1, receiver: r2 }, Connection { sender: s2, receiver: r1 })
73    }
74
75    /// Starts the initialization process by waiting for an initialize
76    /// request from the client. Use this for more advanced customization than
77    /// `initialize` can provide.
78    ///
79    /// Returns the request id and serialized `InitializeParams` from the client.
80    ///
81    /// # Example
82    ///
83    /// ```no_run
84    /// use std::error::Error;
85    /// use lsp_types::{ClientCapabilities, InitializeParams, ServerCapabilities};
86    ///
87    /// use lsp_server::{Connection, Message, Request, RequestId, Response};
88    ///
89    /// fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
90    ///    // Create the transport. Includes the stdio (stdin and stdout) versions but this could
91    ///    // also be implemented to use sockets or HTTP.
92    ///    let (connection, io_threads) = Connection::stdio();
93    ///
94    ///    // Run the server
95    ///    let (id, params) = connection.initialize_start()?;
96    ///
97    ///    let init_params: InitializeParams = serde_json::from_value(params).unwrap();
98    ///    let client_capabilities: ClientCapabilities = init_params.capabilities;
99    ///    let server_capabilities = ServerCapabilities::default();
100    ///
101    ///    let initialize_data = serde_json::json!({
102    ///        "capabilities": server_capabilities,
103    ///        "serverInfo": {
104    ///            "name": "lsp-server-test",
105    ///            "version": "0.1"
106    ///        }
107    ///    });
108    ///
109    ///    connection.initialize_finish(id, initialize_data)?;
110    ///
111    ///    // ... Run main loop ...
112    ///
113    ///    Ok(())
114    /// }
115    /// ```
116    pub fn initialize_start(&self) -> Result<(RequestId, serde_json::Value), ProtocolError> {
117        self.initialize_start_while(|| true)
118    }
119
120    /// Starts the initialization process by waiting for an initialize as described in
121    /// [`Self::initialize_start`] as long as `running` returns
122    /// `true` while the return value can be changed through a sig handler such as `CTRL + C`.
123    ///
124    /// # Example
125    ///
126    /// ```rust
127    /// use std::sync::atomic::{AtomicBool, Ordering};
128    /// use std::sync::Arc;
129    /// # use std::error::Error;
130    /// # use lsp_types::{ClientCapabilities, InitializeParams, ServerCapabilities};
131    /// # use lsp_server::{Connection, Message, Request, RequestId, Response};
132    /// # fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
133    /// let running = Arc::new(AtomicBool::new(true));
134    /// # running.store(true, Ordering::SeqCst);
135    /// let r = running.clone();
136    ///
137    /// ctrlc::set_handler(move || {
138    ///     r.store(false, Ordering::SeqCst);
139    /// }).expect("Error setting Ctrl-C handler");
140    ///
141    /// let (connection, io_threads) = Connection::stdio();
142    ///
143    /// let res = connection.initialize_start_while(|| running.load(Ordering::SeqCst));
144    /// # assert!(res.is_err());
145    ///
146    /// # Ok(())
147    /// # }
148    /// ```
149    pub fn initialize_start_while<C>(
150        &self,
151        running: C,
152    ) -> Result<(RequestId, serde_json::Value), ProtocolError>
153    where
154        C: Fn() -> bool,
155    {
156        while running() {
157            let msg = match self.receiver.recv_timeout(std::time::Duration::from_secs(1)) {
158                Ok(msg) => msg,
159                Err(RecvTimeoutError::Timeout) => {
160                    continue;
161                }
162                Err(RecvTimeoutError::Disconnected) => return Err(ProtocolError::disconnected()),
163            };
164
165            match msg {
166                Message::Request(req) if req.is_initialize() => return Ok((req.id, req.params)),
167                // Respond to non-initialize requests with ServerNotInitialized
168                Message::Request(req) => {
169                    let resp = Response::new_err(
170                        req.id.clone(),
171                        ErrorCode::ServerNotInitialized as i32,
172                        format!("expected initialize request, got {req:?}"),
173                    );
174                    self.sender.send(resp.into()).unwrap();
175                    continue;
176                }
177                Message::Notification(n) if !n.is_exit() => {
178                    continue;
179                }
180                msg => {
181                    return Err(ProtocolError::new(format!(
182                        "expected initialize request, got {msg:?}"
183                    )));
184                }
185            };
186        }
187
188        Err(ProtocolError::new(String::from(
189            "Initialization has been aborted during initialization",
190        )))
191    }
192
193    /// Finishes the initialization process by sending an `InitializeResult` to the client
194    pub fn initialize_finish(
195        &self,
196        initialize_id: RequestId,
197        initialize_result: serde_json::Value,
198    ) -> Result<(), ProtocolError> {
199        let resp = Response::new_ok(initialize_id, initialize_result);
200        self.sender.send(resp.into()).unwrap();
201        match &self.receiver.recv() {
202            Ok(Message::Notification(n)) if n.is_initialized() => Ok(()),
203            Ok(msg) => Err(ProtocolError::new(format!(
204                r#"expected initialized notification, got: {msg:?}"#
205            ))),
206            Err(RecvError) => Err(ProtocolError::disconnected()),
207        }
208    }
209
210    /// Finishes the initialization process as described in [`Self::initialize_finish`] as
211    /// long as `running` returns `true` while the return value can be changed through a sig
212    /// handler such as `CTRL + C`.
213    pub fn initialize_finish_while<C>(
214        &self,
215        initialize_id: RequestId,
216        initialize_result: serde_json::Value,
217        running: C,
218    ) -> Result<(), ProtocolError>
219    where
220        C: Fn() -> bool,
221    {
222        let resp = Response::new_ok(initialize_id, initialize_result);
223        self.sender.send(resp.into()).unwrap();
224
225        while running() {
226            let msg = match self.receiver.recv_timeout(std::time::Duration::from_secs(1)) {
227                Ok(msg) => msg,
228                Err(RecvTimeoutError::Timeout) => {
229                    continue;
230                }
231                Err(RecvTimeoutError::Disconnected) => {
232                    return Err(ProtocolError::disconnected());
233                }
234            };
235
236            match msg {
237                Message::Notification(n) if n.is_initialized() => {
238                    return Ok(());
239                }
240                msg => {
241                    return Err(ProtocolError::new(format!(
242                        r#"expected initialized notification, got: {msg:?}"#
243                    )));
244                }
245            }
246        }
247
248        Err(ProtocolError::new(String::from(
249            "Initialization has been aborted during initialization",
250        )))
251    }
252
253    /// Initialize the connection. Sends the server capabilities
254    /// to the client and returns the serialized client capabilities
255    /// on success. If more fine-grained initialization is required use
256    /// `initialize_start`/`initialize_finish`.
257    ///
258    /// # Example
259    ///
260    /// ```no_run
261    /// use std::error::Error;
262    /// use lsp_types::ServerCapabilities;
263    ///
264    /// use lsp_server::{Connection, Message, Request, RequestId, Response};
265    ///
266    /// fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
267    ///    // Create the transport. Includes the stdio (stdin and stdout) versions but this could
268    ///    // also be implemented to use sockets or HTTP.
269    ///    let (connection, io_threads) = Connection::stdio();
270    ///
271    ///    // Run the server
272    ///    let server_capabilities = serde_json::to_value(&ServerCapabilities::default()).unwrap();
273    ///    let initialization_params = connection.initialize(server_capabilities)?;
274    ///
275    ///    // ... Run main loop ...
276    ///
277    ///    Ok(())
278    /// }
279    /// ```
280    pub fn initialize(
281        &self,
282        server_capabilities: serde_json::Value,
283    ) -> Result<serde_json::Value, ProtocolError> {
284        let (id, params) = self.initialize_start()?;
285
286        let initialize_data = serde_json::json!({
287            "capabilities": server_capabilities,
288        });
289
290        self.initialize_finish(id, initialize_data)?;
291
292        Ok(params)
293    }
294
295    /// Initialize the connection as described in [`Self::initialize`] as long as `running` returns
296    /// `true` while the return value can be changed through a sig handler such as `CTRL + C`.
297    ///
298    /// # Example
299    ///
300    /// ```rust
301    /// use std::sync::atomic::{AtomicBool, Ordering};
302    /// use std::sync::Arc;
303    /// # use std::error::Error;
304    /// # use lsp_types::ServerCapabilities;
305    /// # use lsp_server::{Connection, Message, Request, RequestId, Response};
306    ///
307    /// # fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
308    /// let running = Arc::new(AtomicBool::new(true));
309    /// # running.store(true, Ordering::SeqCst);
310    /// let r = running.clone();
311    ///
312    /// ctrlc::set_handler(move || {
313    ///     r.store(false, Ordering::SeqCst);
314    /// }).expect("Error setting Ctrl-C handler");
315    ///
316    /// let (connection, io_threads) = Connection::stdio();
317    ///
318    /// let server_capabilities = serde_json::to_value(&ServerCapabilities::default()).unwrap();
319    /// let initialization_params = connection.initialize_while(
320    ///     server_capabilities,
321    ///     || running.load(Ordering::SeqCst)
322    /// );
323    ///
324    /// # assert!(initialization_params.is_err());
325    /// # Ok(())
326    /// # }
327    /// ```
328    pub fn initialize_while<C>(
329        &self,
330        server_capabilities: serde_json::Value,
331        running: C,
332    ) -> Result<serde_json::Value, ProtocolError>
333    where
334        C: Fn() -> bool,
335    {
336        let (id, params) = self.initialize_start_while(&running)?;
337
338        let initialize_data = serde_json::json!({
339            "capabilities": server_capabilities,
340        });
341
342        self.initialize_finish_while(id, initialize_data, running)?;
343
344        Ok(params)
345    }
346
347    /// If `req` is `Shutdown`, respond to it and return `true`, otherwise return `false`
348    pub fn handle_shutdown(&self, req: &Request) -> Result<bool, ProtocolError> {
349        if !req.is_shutdown() {
350            return Ok(false);
351        }
352        let resp = Response::new_ok(req.id.clone(), ());
353        let _ = self.sender.send(resp.into());
354        match &self.receiver.recv_timeout(std::time::Duration::from_secs(30)) {
355            Ok(Message::Notification(n)) if n.is_exit() => (),
356            Ok(msg) => {
357                return Err(ProtocolError::new(format!(
358                    "unexpected message during shutdown: {msg:?}"
359                )))
360            }
361            Err(RecvTimeoutError::Timeout) => {
362                return Err(ProtocolError::new(
363                    "timed out waiting for exit notification".to_owned(),
364                ))
365            }
366            Err(RecvTimeoutError::Disconnected) => {
367                return Err(ProtocolError::new(
368                    "channel disconnected waiting for exit notification".to_owned(),
369                ))
370            }
371        }
372        Ok(true)
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use crossbeam_channel::unbounded;
379    use lsp_types::notification::{Exit, Initialized, Notification};
380    use lsp_types::request::{Initialize, Request};
381    use lsp_types::{InitializeParams, InitializedParams};
382    use serde_json::to_value;
383
384    use crate::{Connection, Message, ProtocolError, RequestId};
385
386    struct TestCase {
387        test_messages: Vec<Message>,
388        expected_resp: Result<(RequestId, serde_json::Value), ProtocolError>,
389    }
390
391    fn initialize_start_test(test_case: TestCase) {
392        let (reader_sender, reader_receiver) = unbounded::<Message>();
393        let (writer_sender, writer_receiver) = unbounded::<Message>();
394        let conn = Connection { sender: writer_sender, receiver: reader_receiver };
395
396        for msg in test_case.test_messages {
397            assert!(reader_sender.send(msg).is_ok());
398        }
399
400        let resp = conn.initialize_start();
401        assert_eq!(test_case.expected_resp, resp);
402
403        assert!(writer_receiver.recv_timeout(std::time::Duration::from_secs(1)).is_err());
404    }
405
406    #[test]
407    fn not_exit_notification() {
408        let notification = crate::Notification {
409            method: Initialized::METHOD.to_owned(),
410            params: to_value(InitializedParams {}).unwrap(),
411        };
412
413        let params_as_value = to_value(InitializeParams::default()).unwrap();
414        let req_id = RequestId::from(234);
415        let request = crate::Request {
416            id: req_id.clone(),
417            method: Initialize::METHOD.to_owned(),
418            params: params_as_value.clone(),
419        };
420
421        initialize_start_test(TestCase {
422            test_messages: vec![notification.into(), request.into()],
423            expected_resp: Ok((req_id, params_as_value)),
424        });
425    }
426
427    #[test]
428    fn exit_notification() {
429        let notification =
430            crate::Notification { method: Exit::METHOD.to_owned(), params: to_value(()).unwrap() };
431        let notification_msg = Message::from(notification);
432
433        initialize_start_test(TestCase {
434            test_messages: vec![notification_msg.clone()],
435            expected_resp: Err(ProtocolError::new(format!(
436                "expected initialize request, got {notification_msg:?}"
437            ))),
438        });
439    }
440}