pub struct EventQueue<State> { /* private fields */ }
Expand description

An event queue

This is an abstraction for handling event dispatching, that allows you to ensure access to some common state &mut State to your event handlers.

Event queues are created through Connection::new_event_queue().

Upon creation, a wayland object is assigned to an event queue by passing the associated QueueHandle as argument to the method creating it. All events received by that object will be processed by that event queue, when dispatch_pending() or blocking_dispatch() is invoked.

Usage

Single queue app

If your app is simple enough that the only source of event to process is the Wayland socket and you only need a single event queue, your main loop can be as simple as this:

use wayland_client::Connection;

let connection = Connection::connect_to_env().unwrap();
let mut event_queue = connection.new_event_queue();

/*
 * Here your initial setup
 */

// And the main loop:
while !state.exit {
    event_queue.blocking_dispatch(&mut state).unwrap();
}

The blocking_dispatch() will wait (by putting the thread to sleep) until there are some events from the server that can be processed, and all your actual app logic can be done in the callbacks of the Dispatch implementations, and in the main loop after the blocking_dispatch() call.

Multi-thread multi-queue app

In a case where you app is multithreaded and you want to process events in multiple thread, a simple pattern is to have one EventQueue per thread processing Wayland events.

With this pattern, each thread can use EventQueue::blocking_dispatch()(EventQueue::blocking_dispatch on its own event loop, and everything will “Just Work”.

Single-queue guest library

If your code is some library code that will act on a Wayland connection shared by the main program, it is likely you should not trigger socket reads yourself and instead let the main app take care of it. In this case, to ensure your EventQueue still makes progress, you should regularly invoke EventQueue::dispatch_pending() which will process the events that were enqueued in the inner buffer of your EventQueue by the main app reading the socket.

Integrating the event queue with other sources of events

If your program needs to monitor other sources of events alongside the Wayland socket using a monitoring system like epoll, you can integrate the Wayland socket into this system. This is done with the help of the EventQueue::prepare_read() method. You event loop will be a bit more explicit:


loop {
    // flush the outgoing buffers to ensure that the server does receive the messages
    // you've sent
    event_queue.flush().unwrap();

    // (this step is only relevant if other threads might be reading the socket as well)
    // make sure you don't have any pending events if the event queue that might have been
    // enqueued by other threads reading the socket
    event_queue.dispatch_pending(&mut state).unwrap();

    // This puts in place some internal synchronization to prepare for the fact that
    // you're going to wait for events on the socket and read them, in case other threads
    // are doing the same thing
    let read_guard = event_queue.prepare_read().unwrap();

    /*
     * At this point you can invoke epoll(..) to wait for readiness on the multiple FD you
     * are working with, and read_guard.connection_fd() will give you the FD to wait on for
     * the Wayland connection
     */

    if wayland_socket_ready {
        // If epoll notified readiness of the Wayland socket, you can now proceed to the read
        read_guard.read().unwrap();
        // And now, you must invoke dispatch_pending() to actually process the events
        event_queue.dispatch_pending(&mut state).unwrap();
    } else {
        // otherwise, some of your other FD are ready, but you didn't receive Wayland events,
        // you can drop the guard to cancel the read preparation
        std::mem::drop(read_guard);
    }

    /*
     * There you process all relevant events from your other event sources
     */
}

Implementations§

Get a QueueHandle for this event queue

Examples found in repository?
src/globals.rs (line 87)
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
pub fn registry_queue_init<State>(
    conn: &Connection,
) -> Result<(GlobalList, EventQueue<State>), GlobalError>
where
    State: Dispatch<wl_registry::WlRegistry, GlobalListContents> + 'static,
{
    let event_queue = conn.new_event_queue();
    let display = conn.display();
    let data = Arc::new(RegistryState {
        globals: GlobalListContents { contents: Default::default() },
        handle: event_queue.handle(),
        initial_roundtrip_done: AtomicBool::new(false),
    });
    let registry = display.send_constructor(wl_display::Request::GetRegistry {}, data.clone())?;
    // We don't need to dispatch the event queue as for now nothing will be sent to it
    conn.roundtrip()?;
    data.initial_roundtrip_done.store(true, Ordering::Relaxed);
    Ok((GlobalList { registry }, event_queue))
}

Dispatch pending events

Events are accumulated in the event queue internal buffer when the Wayland socket is read using the read APIs on Connection, or when reading is done from an other thread. This method will dispatch all such pending events by sequentially invoking their associated handlers: the Dispatch implementations on the provided &mut D.

Note: this may block if another thread has frozen the queue.

Examples found in repository?
src/calloop.rs (line 58)
54
55
56
57
58
59
    pub fn insert(self, handle: LoopHandle<D>) -> Result<RegistrationToken, InsertError<Self>>
    where
        D: 'static,
    {
        handle.insert_source(self, |_, queue, data| queue.dispatch_pending(data))
    }
More examples
Hide additional examples
src/event_queue.rs (line 387)
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
    pub fn blocking_dispatch(&mut self, data: &mut State) -> Result<usize, DispatchError> {
        let dispatched = self.dispatch_pending(data)?;
        if dispatched > 0 {
            return Ok(dispatched);
        }

        self.conn.flush()?;

        let guard = self.conn.prepare_read()?;

        // we need to check the queue again, just in case another thread did a read between
        // dispatch_pending and prepare_read
        if self.handle.inner.lock().unwrap().queue.is_empty() {
            crate::conn::blocking_read(guard)?;
        } else {
            drop(guard);
        }

        self.dispatch_pending(data)
    }

Block waiting for events and dispatch them

This method is similar to dispatch_pending, but if there are no pending events it will also flush the connection and block waiting for the Wayland server to send an event.

A simple app event loop can consist of invoking this method in a loop.

Examples found in repository?
src/event_queue.rs (line 429)
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
    pub fn roundtrip(&mut self, data: &mut State) -> Result<usize, DispatchError> {
        let done = Arc::new(SyncData::default());

        let display = self.conn.display();
        self.conn
            .send_request(
                &display,
                crate::protocol::wl_display::Request::Sync {},
                Some(done.clone()),
            )
            .map_err(|_| WaylandError::Io(Error::EPIPE.into()))?;

        let mut dispatched = 0;

        while !done.done.load(Ordering::Relaxed) {
            dispatched += self.blocking_dispatch(data)?;
        }

        Ok(dispatched)
    }

Synchronous roundtrip

This function will cause a synchronous round trip with the wayland server. This function will block until all requests in the queue are sent and processed by the server.

This function may be useful during initial setup of your app. This function may also be useful where you need to guarantee all requests prior to calling this function are completed.

Start a synchronized read from the socket

This is needed if you plan to wait on readiness of the Wayland socket using an event loop. See the EventQueue and ReadEventsGuard docs for details. Once the events are received, you’ll then need to dispatch them from the event queue using EventQueue::dispatch_pending().

If you don’t need to manage multiple event sources, see blocking_dispatch() for a simpler mechanism.

This method is identical to Connection::prepare_read().

Examples found in repository?
src/calloop.rs (line 35)
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
    pub fn new(queue: EventQueue<D>) -> Result<WaylandSource<D>, WaylandError> {
        let guard = queue.prepare_read()?;
        let fd = Generic::new(guard.connection_fd().as_raw_fd(), Interest::READ, Mode::Level);
        drop(guard);

        Ok(WaylandSource { queue, fd, read_guard: None })
    }

    /// Access the underlying event queue
    ///
    /// Note that you should be careful when interacting with it if you invoke methods that
    /// interact with the wayland socket (such as `dispatch()` or `prepare_read()`). These may
    /// interfere with the proper waking up of this event source in the event loop.
    pub fn queue(&mut self) -> &mut EventQueue<D> {
        &mut self.queue
    }

    /// Insert this source into the given event loop.
    ///
    /// This adapter will pass the event loop's shared data as the `D` type for the event loop.
    pub fn insert(self, handle: LoopHandle<D>) -> Result<RegistrationToken, InsertError<Self>>
    where
        D: 'static,
    {
        handle.insert_source(self, |_, queue, data| queue.dispatch_pending(data))
    }
}

impl<D> EventSource for WaylandSource<D> {
    type Event = ();

    /// The underlying event queue.
    ///
    /// You should call [`EventQueue::dispatch_pending`] inside your callback using this queue.
    type Metadata = EventQueue<D>;
    type Ret = Result<usize, DispatchError>;
    type Error = calloop::Error;

    fn process_events<F>(
        &mut self,
        readiness: Readiness,
        token: Token,
        mut callback: F,
    ) -> Result<PostAction, Self::Error>
    where
        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
    {
        let queue = &mut self.queue;
        let read_guard = &mut self.read_guard;

        let action = self.fd.process_events(readiness, token, |_, _| {
            // 1. read events from the socket if any are available
            if let Some(guard) = read_guard.take() {
                // might be None if some other thread read events before us, concurrently
                if let Err(WaylandError::Io(err)) = guard.read() {
                    if err.kind() != io::ErrorKind::WouldBlock {
                        return Err(err);
                    }
                }
            }

            // 2. dispatch any pending events in the queue
            // This is done to ensure we are not waiting for messages that are already in the buffer.
            Self::loop_callback_pending(queue, &mut callback)?;
            *read_guard = Some(Self::prepare_read(queue)?);

            // 3. Once dispatching is finished, flush the responses to the compositor
            if let Err(WaylandError::Io(e)) = queue.flush() {
                if e.kind() != io::ErrorKind::WouldBlock {
                    // in case of error, forward it and fast-exit
                    return Err(e);
                }
                // WouldBlock error means the compositor could not process all our messages
                // quickly. Either it is slowed down or we are a spammer.
                // Should not really happen, if it does we do nothing and will flush again later
            }

            Ok(PostAction::Continue)
        })?;

        Ok(action)
    }

    fn register(
        &mut self,
        poll: &mut Poll,
        token_factory: &mut TokenFactory,
    ) -> calloop::Result<()> {
        self.fd.register(poll, token_factory)
    }

    fn reregister(
        &mut self,
        poll: &mut Poll,
        token_factory: &mut TokenFactory,
    ) -> calloop::Result<()> {
        self.fd.reregister(poll, token_factory)
    }

    fn unregister(&mut self, poll: &mut Poll) -> calloop::Result<()> {
        self.fd.unregister(poll)
    }

    fn pre_run<F>(&mut self, mut callback: F) -> calloop::Result<()>
    where
        F: FnMut((), &mut Self::Metadata) -> Self::Ret,
    {
        debug_assert!(self.read_guard.is_none());

        // flush the display before starting to poll
        if let Err(WaylandError::Io(err)) = self.queue.flush() {
            if err.kind() != io::ErrorKind::WouldBlock {
                // in case of error, don't prepare a read, if the error is persistent, it'll trigger in other
                // wayland methods anyway
                log::error!("Error trying to flush the wayland display: {}", err);
                return Err(err.into());
            }
        }

        // ensure we are not waiting for messages that are already in the buffer.
        Self::loop_callback_pending(&mut self.queue, &mut callback)?;
        self.read_guard = Some(Self::prepare_read(&mut self.queue)?);

        Ok(())
    }

    fn post_run<F>(&mut self, _: F) -> calloop::Result<()>
    where
        F: FnMut((), &mut Self::Metadata) -> Self::Ret,
    {
        // Drop implementation of ReadEventsGuard will do cleanup
        self.read_guard.take();
        Ok(())
    }
}

impl<D> WaylandSource<D> {
    /// Loop over the callback until all pending messages have been dispatched.
    fn loop_callback_pending<F>(queue: &mut EventQueue<D>, callback: &mut F) -> io::Result<()>
    where
        F: FnMut((), &mut EventQueue<D>) -> Result<usize, DispatchError>,
    {
        // Loop on the callback until no pending events are left.
        loop {
            match callback((), queue) {
                // No more pending events.
                Ok(0) => break Ok(()),

                Ok(_) => continue,

                Err(DispatchError::Backend(WaylandError::Io(err))) => {
                    return Err(err);
                }

                Err(DispatchError::Backend(WaylandError::Protocol(err))) => {
                    log::error!("Protocol error received on display: {}", err);

                    break Err(Errno::EPROTO.into());
                }

                Err(DispatchError::BadMessage { interface, sender_id, opcode }) => {
                    log::error!(
                        "Bad message on interface \"{}\": (sender_id: {}, opcode: {})",
                        interface,
                        sender_id,
                        opcode,
                    );

                    break Err(Errno::EPROTO.into());
                }
            }
        }
    }

    fn prepare_read(queue: &mut EventQueue<D>) -> io::Result<ReadEventsGuard> {
        queue.prepare_read().map_err(|err| match err {
            WaylandError::Io(err) => err,

            WaylandError::Protocol(err) => {
                log::error!("Protocol error received on display: {}", err);
                Errno::EPROTO.into()
            }
        })
    }

Flush pending outgoing events to the server

This needs to be done regularly to ensure the server receives all your requests. /// This method is identical to Connection::flush().

Examples found in repository?
src/calloop.rs (line 101)
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
    fn process_events<F>(
        &mut self,
        readiness: Readiness,
        token: Token,
        mut callback: F,
    ) -> Result<PostAction, Self::Error>
    where
        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
    {
        let queue = &mut self.queue;
        let read_guard = &mut self.read_guard;

        let action = self.fd.process_events(readiness, token, |_, _| {
            // 1. read events from the socket if any are available
            if let Some(guard) = read_guard.take() {
                // might be None if some other thread read events before us, concurrently
                if let Err(WaylandError::Io(err)) = guard.read() {
                    if err.kind() != io::ErrorKind::WouldBlock {
                        return Err(err);
                    }
                }
            }

            // 2. dispatch any pending events in the queue
            // This is done to ensure we are not waiting for messages that are already in the buffer.
            Self::loop_callback_pending(queue, &mut callback)?;
            *read_guard = Some(Self::prepare_read(queue)?);

            // 3. Once dispatching is finished, flush the responses to the compositor
            if let Err(WaylandError::Io(e)) = queue.flush() {
                if e.kind() != io::ErrorKind::WouldBlock {
                    // in case of error, forward it and fast-exit
                    return Err(e);
                }
                // WouldBlock error means the compositor could not process all our messages
                // quickly. Either it is slowed down or we are a spammer.
                // Should not really happen, if it does we do nothing and will flush again later
            }

            Ok(PostAction::Continue)
        })?;

        Ok(action)
    }

    fn register(
        &mut self,
        poll: &mut Poll,
        token_factory: &mut TokenFactory,
    ) -> calloop::Result<()> {
        self.fd.register(poll, token_factory)
    }

    fn reregister(
        &mut self,
        poll: &mut Poll,
        token_factory: &mut TokenFactory,
    ) -> calloop::Result<()> {
        self.fd.reregister(poll, token_factory)
    }

    fn unregister(&mut self, poll: &mut Poll) -> calloop::Result<()> {
        self.fd.unregister(poll)
    }

    fn pre_run<F>(&mut self, mut callback: F) -> calloop::Result<()>
    where
        F: FnMut((), &mut Self::Metadata) -> Self::Ret,
    {
        debug_assert!(self.read_guard.is_none());

        // flush the display before starting to poll
        if let Err(WaylandError::Io(err)) = self.queue.flush() {
            if err.kind() != io::ErrorKind::WouldBlock {
                // in case of error, don't prepare a read, if the error is persistent, it'll trigger in other
                // wayland methods anyway
                log::error!("Error trying to flush the wayland display: {}", err);
                return Err(err.into());
            }
        }

        // ensure we are not waiting for messages that are already in the buffer.
        Self::loop_callback_pending(&mut self.queue, &mut callback)?;
        self.read_guard = Some(Self::prepare_read(&mut self.queue)?);

        Ok(())
    }

Attempt to dispatch events from this queue, registering the current task for wakeup if no events are pending.

This method is similar to dispatch_pending; it will not perform reads on the Wayland socket. Reads on the socket by other tasks or threads will cause the current task to wake up if events are pending on this queue.

use futures_channel::mpsc::Receiver;
use futures_util::future::{poll_fn,select};
use futures_util::stream::StreamExt;
use wayland_client::EventQueue;

struct Data;

enum AppEvent {
    SomethingHappened(u32),
}

impl Data {
    fn handle(&mut self, event: AppEvent) {
        // actual event handling goes here
    }
}

// An async task that is spawned on an executor in order to handle events that need access
// to a specific data object.
async fn run(data: &mut Data, mut wl_queue: EventQueue<Data>, mut app_queue: Receiver<AppEvent>)
    -> Result<(), Box<dyn std::error::Error>>
{
    use futures_util::future::Either;
    loop {
        match select(
            poll_fn(|cx| wl_queue.poll_dispatch_pending(cx, data)),
            app_queue.next(),
        ).await {
            Either::Left((res, _)) => match res? {},
            Either::Right((Some(event), _)) => {
                data.handle(event);
            }
            Either::Right((None, _)) => return Ok(()),
        }
    }
}

Trait Implementations§

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Gets the TypeId of self. Read more
Immutably borrows from an owned value. Read more
Mutably borrows from an owned value. Read more
Convert Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>. Box<dyn Any> can then be further downcast into Box<ConcreteType> where ConcreteType implements Trait.
Convert Rc<Trait> (where Trait: Downcast) to Rc<Any>. Rc<Any> can then be further downcast into Rc<ConcreteType> where ConcreteType implements Trait.
Convert &Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot generate &Any’s vtable from &Trait’s.
Convert &mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot generate &mut Any’s vtable from &mut Trait’s.
Convert Arc<Trait> (where Trait: Downcast) to Arc<Any>. Arc<Any> can then be further downcast into Arc<ConcreteType> where ConcreteType implements Trait.

Returns the argument unchanged.

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

The type returned in the event of a conversion error.
Performs the conversion.
The type returned in the event of a conversion error.
Performs the conversion.