x11rb_protocol/connection/
mod.rs

1//! Helper types for implementing an X11 client.
2
3use alloc::collections::VecDeque;
4use alloc::vec::Vec;
5
6use crate::utils::RawFdContainer;
7use crate::{DiscardMode, SequenceNumber};
8
9/// A combination of a buffer and a list of file descriptors.
10pub type BufWithFds = crate::BufWithFds<Vec<u8>>;
11
12/// The raw bytes of an X11 event and its sequence number.
13pub type RawEventAndSeqNumber = crate::RawEventAndSeqNumber<Vec<u8>>;
14
15/// Information about the reply to an X11 request.
16#[derive(Debug, Copy, Clone, PartialEq, Eq)]
17pub enum ReplyFdKind {
18    /// The request does not have a reply.
19    NoReply,
20    /// The request has a reply and that reply does *not* contain any file descriptors.
21    ReplyWithoutFDs,
22    /// The request has a reply and that reply *does* contain file descriptor(s).
23    ReplyWithFDs,
24}
25
26/// Information about the result of polling for a reply packet.
27#[derive(Debug, Clone)]
28pub enum PollReply {
29    /// It is not clear yet what the result will be; try again later.
30    TryAgain,
31    /// There will be no reply; polling is done.
32    NoReply,
33    /// Here is the result of the polling; polling is done.
34    Reply(Vec<u8>),
35}
36
37#[derive(Debug, Clone, Copy, Eq, PartialEq)]
38struct SentRequest {
39    seqno: SequenceNumber,
40    discard_mode: Option<DiscardMode>,
41    has_fds: bool,
42}
43
44/// A pure-rust, sans-I/O implementation of the X11 protocol.
45///
46/// This object is designed to be used in combination with an I/O backend, in
47/// order to keep state for the X11 protocol.
48#[derive(Debug)]
49pub struct Connection {
50    // The sequence number of the last request that was written
51    last_sequence_written: SequenceNumber,
52    // Sorted(!) list with information on requests that were written, but no answer received yet.
53    sent_requests: VecDeque<SentRequest>,
54
55    // The sequence number of the next reply that is expected to come in
56    next_reply_expected: SequenceNumber,
57
58    // The sequence number of the last reply/error/event that was read
59    last_sequence_read: SequenceNumber,
60    // Events that were read, but not yet returned to the API user
61    pending_events: VecDeque<(SequenceNumber, Vec<u8>)>,
62    // Replies that were read, but not yet returned to the API user
63    pending_replies: VecDeque<(SequenceNumber, BufWithFds)>,
64
65    // FDs that were read, but not yet assigned to any reply
66    pending_fds: VecDeque<RawFdContainer>,
67}
68
69impl Default for Connection {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75impl Connection {
76    /// Create a new `Connection`.
77    ///
78    /// It is assumed that the connection was just established. This means that the next request
79    /// that is sent will have sequence number one.
80    pub fn new() -> Self {
81        Connection {
82            last_sequence_written: 0,
83            next_reply_expected: 0,
84            last_sequence_read: 0,
85            sent_requests: VecDeque::new(),
86            pending_events: VecDeque::new(),
87            pending_replies: VecDeque::new(),
88            pending_fds: VecDeque::new(),
89        }
90    }
91
92    /// Send a request to the X11 server.
93    ///
94    /// When this returns `None`, a sync with the server is necessary. Afterwards, the caller
95    /// should try again.
96    pub fn send_request(&mut self, kind: ReplyFdKind) -> Option<SequenceNumber> {
97        let has_response = match kind {
98            ReplyFdKind::NoReply => false,
99            ReplyFdKind::ReplyWithoutFDs => true,
100            ReplyFdKind::ReplyWithFDs => true,
101        };
102        if self.next_reply_expected + SequenceNumber::from(u16::max_value()) - 1
103            <= self.last_sequence_written
104            && !has_response
105        {
106            // The caller need to call send_sync(). Otherwise, we might not be able to reconstruct
107            // full sequence numbers for received packets.
108            return None;
109        }
110
111        self.last_sequence_written += 1;
112        let seqno = self.last_sequence_written;
113
114        if has_response {
115            self.next_reply_expected = self.last_sequence_written;
116        }
117
118        let sent_request = SentRequest {
119            seqno,
120            discard_mode: None,
121            has_fds: kind == ReplyFdKind::ReplyWithFDs,
122        };
123        self.sent_requests.push_back(sent_request);
124
125        Some(seqno)
126    }
127
128    /// Ignore the reply for a request that was previously sent.
129    pub fn discard_reply(&mut self, seqno: SequenceNumber, mode: DiscardMode) {
130        if let Ok(index) = self.sent_requests.binary_search_by_key(&seqno, |r| r.seqno) {
131            self.sent_requests[index].discard_mode = Some(mode);
132        }
133        match mode {
134            DiscardMode::DiscardReplyAndError => self.pending_replies.retain(|r| r.0 != seqno),
135            DiscardMode::DiscardReply => {
136                if let Some(index) = self.pending_replies.iter().position(|r| r.0 == seqno) {
137                    while self
138                        .pending_replies
139                        .get(index)
140                        .filter(|r| r.0 == seqno)
141                        .is_some()
142                    {
143                        if let Some((_, packet)) = self.pending_replies.remove(index) {
144                            if packet.0[0] == 0 {
145                                // This is an error
146                                self.pending_events.push_back((seqno, packet.0));
147                            }
148                        }
149                    }
150                }
151            }
152        }
153    }
154
155    // Extract the sequence number from a packet read from the X11 server. The packet must be a
156    // reply, an event, or an error. All of these have a u16 sequence number in bytes 2 and 3...
157    // except for KeymapNotify events.
158    fn extract_sequence_number(&mut self, buffer: &[u8]) -> Option<SequenceNumber> {
159        use crate::protocol::xproto::KEYMAP_NOTIFY_EVENT;
160        if buffer[0] == KEYMAP_NOTIFY_EVENT {
161            return None;
162        }
163        // We get the u16 from the wire...
164        let number = u16::from_ne_bytes([buffer[2], buffer[3]]);
165
166        // ...and use our state to reconstruct the high bytes
167        let high_bytes = self.last_sequence_read & !SequenceNumber::from(u16::max_value());
168        let mut full_number = SequenceNumber::from(number) | high_bytes;
169        if full_number < self.last_sequence_read {
170            full_number += SequenceNumber::from(u16::max_value()) + 1;
171        }
172
173        // Update our state
174        self.last_sequence_read = full_number;
175        if self.next_reply_expected < full_number {
176            // This is most likely an event/error that allows us to update our sequence number
177            // implicitly. Normally, only requests with a reply update this (in send_request()).
178            self.next_reply_expected = full_number;
179        }
180        Some(full_number)
181    }
182
183    /// Add FDs that were received to the internal state.
184    ///
185    /// This must be called before the corresponding packets are enqueued.
186    pub fn enqueue_fds(&mut self, fds: Vec<RawFdContainer>) {
187        self.pending_fds.extend(fds);
188    }
189
190    /// An X11 packet was received from the connection and is now enqueued into our state.
191    ///
192    /// Any FDs that were received must already be enqueued before this can be called.
193    pub fn enqueue_packet(&mut self, packet: Vec<u8>) {
194        let kind = packet[0];
195
196        // extract_sequence_number() updates our state and is thus important to call even when we
197        // do not need the sequence number
198        let seqno = self
199            .extract_sequence_number(&packet)
200            .unwrap_or(self.last_sequence_read);
201
202        // Remove all entries for older requests
203        while let Some(request) = self.sent_requests.front() {
204            if request.seqno >= seqno {
205                break;
206            }
207            let _ = self.sent_requests.pop_front();
208        }
209        let request = self.sent_requests.front().filter(|r| r.seqno == seqno);
210
211        if kind == 0 {
212            // It is an error. Let's see where we have to send it to.
213            if let Some(request) = request {
214                match request.discard_mode {
215                    Some(DiscardMode::DiscardReplyAndError) => { /* This error should be ignored */
216                    }
217                    Some(DiscardMode::DiscardReply) => {
218                        self.pending_events.push_back((seqno, packet))
219                    }
220                    None => self
221                        .pending_replies
222                        .push_back((seqno, (packet, Vec::new()))),
223                }
224            } else {
225                // Unexpected error, send to main loop
226                self.pending_events.push_back((seqno, packet));
227            }
228        } else if kind == 1 {
229            let fds = if request.filter(|r| r.has_fds).is_some() {
230                // This reply has FDs, the number of FDs is always in the second byte
231                let num_fds = usize::from(packet[1]);
232                // FIXME Turn this into some kind of "permanent error state" (so that
233                // everything fails with said error) instead of using a panic (this panic will
234                // likely poison some Mutex and produce an error state that way).
235                assert!(
236                    num_fds <= self.pending_fds.len(),
237                    "FIXME: The server sent us too few FDs. The connection is now unusable \
238                     since we will never be sure again which FD belongs to which reply."
239                );
240                self.pending_fds.drain(..num_fds).collect()
241            } else {
242                Vec::new()
243            };
244
245            // It is a reply
246            if request.filter(|r| r.discard_mode.is_some()).is_some() {
247                // This reply should be discarded
248            } else {
249                self.pending_replies.push_back((seqno, (packet, fds)));
250            }
251        } else {
252            // It is an event
253            self.pending_events.push_back((seqno, packet));
254        }
255    }
256
257    /// Check if the server already sent an answer to the request with the given sequence number.
258    ///
259    /// This function is meant to be used for requests that have a reply. Such requests always
260    /// cause a reply or an error to be sent.
261    pub fn poll_for_reply_or_error(&mut self, sequence: SequenceNumber) -> Option<BufWithFds> {
262        for (index, (seqno, _packet)) in self.pending_replies.iter().enumerate() {
263            if *seqno == sequence {
264                return Some(self.pending_replies.remove(index).unwrap().1);
265            }
266        }
267        None
268    }
269
270    /// Prepare for calling `poll_check_for_reply_or_error()`.
271    ///
272    /// To check if a request with a reply caused an error, one simply has to wait for the error or
273    /// reply to be received. However, this approach does not work for requests without errors:
274    /// Success is indicated by the absence of an error.
275    ///
276    /// Thus, this function returns true if a sync is necessary to ensure that a reply with a
277    /// higher sequence number will be received. Since the X11 server handles requests in-order,
278    /// if the reply to a later request is received, this means that the earlier request did not
279    /// fail.
280    pub fn prepare_check_for_reply_or_error(&mut self, sequence: SequenceNumber) -> bool {
281        self.next_reply_expected < sequence
282    }
283
284    /// Check if the request with the given sequence number was already handled by the server.
285    ///
286    /// Before calling this function, you must call `prepare_check_for_reply_or_error()` with the
287    /// sequence number.
288    ///
289    /// This function can be used for requests with and without a reply.
290    pub fn poll_check_for_reply_or_error(&mut self, sequence: SequenceNumber) -> PollReply {
291        if let Some(result) = self.poll_for_reply_or_error(sequence) {
292            return PollReply::Reply(result.0);
293        }
294
295        if self.last_sequence_read > sequence {
296            // We can be sure that there will be no reply/error
297            PollReply::NoReply
298        } else {
299            // Hm, we cannot be sure yet. Perhaps there will still be a reply/error
300            PollReply::TryAgain
301        }
302    }
303
304    /// Find the reply for the request with the given sequence number.
305    ///
306    /// If the request caused an error, that error will be handled as an event. This means that a
307    /// latter call to `poll_for_event()` will return it.
308    pub fn poll_for_reply(&mut self, sequence: SequenceNumber) -> PollReply {
309        if let Some(reply) = self.poll_for_reply_or_error(sequence) {
310            if reply.0[0] == 0 {
311                self.pending_events.push_back((sequence, reply.0));
312                PollReply::NoReply
313            } else {
314                PollReply::Reply(reply.0)
315            }
316        } else {
317            PollReply::TryAgain
318        }
319    }
320
321    /// Get a pending event.
322    pub fn poll_for_event_with_sequence(&mut self) -> Option<RawEventAndSeqNumber> {
323        self.pending_events
324            .pop_front()
325            .map(|(seqno, event)| (event, seqno))
326    }
327}
328
329#[cfg(test)]
330mod test {
331    use super::{Connection, ReplyFdKind};
332
333    #[test]
334    fn insert_sync_no_reply() {
335        // The connection must send a sync (GetInputFocus) request every 2^16-1 requests (that do not
336        // have a reply). Thus, this test sends more than that and tests for the sync to appear.
337
338        let mut connection = Connection::new();
339
340        for num in 1..0xffff {
341            let seqno = connection.send_request(ReplyFdKind::NoReply);
342            assert_eq!(Some(num), seqno);
343        }
344        // request 0xffff should be a sync, hence the next one is 0x10000
345        let seqno = connection.send_request(ReplyFdKind::NoReply);
346        assert_eq!(None, seqno);
347
348        let seqno = connection.send_request(ReplyFdKind::ReplyWithoutFDs);
349        assert_eq!(Some(0xffff), seqno);
350
351        let seqno = connection.send_request(ReplyFdKind::NoReply);
352        assert_eq!(Some(0x10000), seqno);
353    }
354
355    #[test]
356    fn insert_no_sync_with_reply() {
357        // Compared to the previous test, this uses ReplyFdKind::ReplyWithoutFDs, so no sync needs to
358        // be inserted.
359
360        let mut connection = Connection::new();
361
362        for num in 1..=0x10001 {
363            let seqno = connection.send_request(ReplyFdKind::ReplyWithoutFDs);
364            assert_eq!(Some(num), seqno);
365        }
366    }
367
368    #[test]
369    fn insert_no_sync_when_already_syncing() {
370        // This test sends enough ReplyFdKind::NoReply requests that a sync becomes necessary on
371        // the next request. Then it sends a ReplyFdKind::ReplyWithoutFDs request so that no sync is
372        // necessary. This is a regression test: Once upon a time, an unnecessary sync was done.
373
374        let mut connection = Connection::new();
375
376        for num in 1..0xffff {
377            let seqno = connection.send_request(ReplyFdKind::NoReply);
378            assert_eq!(Some(num), seqno);
379        }
380
381        let seqno = connection.send_request(ReplyFdKind::ReplyWithoutFDs);
382        assert_eq!(Some(0xffff), seqno);
383    }
384
385    #[test]
386    fn get_sync_replies() {
387        // This sends requests with a reply with seqno 1 and 1+2^16 and then checks that their
388        // replies are correctly mapped to the requests.
389
390        let mut connection = Connection::new();
391
392        let first_reply = 1;
393        let second_reply = 0x10001;
394
395        // First, send all the requests
396
397        // First request is one with a reply
398        let seqno = connection.send_request(ReplyFdKind::ReplyWithoutFDs);
399        assert_eq!(Some(first_reply), seqno);
400
401        // Then, there should be enough requests so that the next request will end up with sequence
402        // number 'second_reply'
403        for num in (first_reply + 1)..(second_reply - 1) {
404            let seqno = connection.send_request(ReplyFdKind::NoReply);
405            assert_eq!(Some(num), seqno);
406        }
407
408        // Send one more request. This needs to be a sync request so that sequence numbers can be
409        // reconstructed correctly. The bug that we testing was that no sync was required, so this
410        // test handles both cases correctly.
411        let requested_extra_sync = connection.send_request(ReplyFdKind::NoReply).is_none();
412        if requested_extra_sync {
413            let _ = connection.send_request(ReplyFdKind::ReplyWithoutFDs);
414        }
415
416        let seqno = connection.send_request(ReplyFdKind::ReplyWithoutFDs);
417        assert_eq!(Some(second_reply), seqno);
418
419        // Prepare a reply packet
420        let mut packet = [0; 32];
421        // It is a reply
422        packet[0] = 1;
423        // Set the sequence number to 1
424        packet[2..4].copy_from_slice(&1u16.to_ne_bytes());
425
426        // Enqueue the first reply.
427        connection.enqueue_packet(packet.to_vec());
428
429        // Send an extra reply if the code wanted one. This extra reply allows to detect that all
430        // replies to the first request were received (remember, there can be multiple replies to a
431        // single request!)
432        if requested_extra_sync {
433            packet[2..4].copy_from_slice(&((second_reply - 1) as u16).to_ne_bytes());
434            connection.enqueue_packet(packet.to_vec());
435        }
436
437        // Set the sequence number for the second reply
438        packet[2..4].copy_from_slice(&(second_reply as u16).to_ne_bytes());
439        connection.enqueue_packet(packet.to_vec());
440
441        // Now check that the sequence number for the last packet was reconstructed correctly.
442        assert!(connection.poll_for_reply_or_error(second_reply).is_some());
443    }
444}