Struct wayland_client::EventQueue
source · pub struct EventQueue<State> { /* private fields */ }
Expand description
An event queue
This is an abstraction for handling event dispatching, that allows you to ensure
access to some common state &mut State
to your event handlers.
Event queues are created through Connection::new_event_queue()
.
Upon creation, a wayland object is assigned to an event queue by passing the associated QueueHandle
as argument to the method creating it. All events received by that object will be processed by that event
queue, when dispatch_pending()
or
blocking_dispatch()
is invoked.
Usage
Single queue app
If your app is simple enough that the only source of event to process is the Wayland socket and you only need a single event queue, your main loop can be as simple as this:
use wayland_client::Connection;
let connection = Connection::connect_to_env().unwrap();
let mut event_queue = connection.new_event_queue();
/*
* Here your initial setup
*/
// And the main loop:
while !state.exit {
event_queue.blocking_dispatch(&mut state).unwrap();
}
The blocking_dispatch()
will wait (by putting the thread to sleep)
until there are some events from the server that can be processed, and all your actual app logic can be
done in the callbacks of the Dispatch
implementations, and in the main loop
after the
blocking_dispatch()
call.
Multi-thread multi-queue app
In a case where you app is multithreaded and you want to process events in multiple thread, a simple
pattern is to have one EventQueue
per thread processing Wayland events.
With this pattern, each thread can use EventQueue::blocking_dispatch()
(EventQueue::blocking_dispatch
on its own event loop, and everything will “Just Work”.
Single-queue guest library
If your code is some library code that will act on a Wayland connection shared by the main program, it is
likely you should not trigger socket reads yourself and instead let the main app take care of it. In this
case, to ensure your EventQueue
still makes progress, you should regularly invoke
EventQueue::dispatch_pending()
which will process the events that were
enqueued in the inner buffer of your EventQueue
by the main app reading the socket.
Integrating the event queue with other sources of events
If your program needs to monitor other sources of events alongside the Wayland socket using a monitoring
system like epoll
, you can integrate the Wayland socket into this system. This is done with the help
of the EventQueue::prepare_read()
method. You event loop will be a bit more
explicit:
loop {
// flush the outgoing buffers to ensure that the server does receive the messages
// you've sent
event_queue.flush().unwrap();
// (this step is only relevant if other threads might be reading the socket as well)
// make sure you don't have any pending events if the event queue that might have been
// enqueued by other threads reading the socket
event_queue.dispatch_pending(&mut state).unwrap();
// This puts in place some internal synchronization to prepare for the fact that
// you're going to wait for events on the socket and read them, in case other threads
// are doing the same thing
let read_guard = event_queue.prepare_read().unwrap();
/*
* At this point you can invoke epoll(..) to wait for readiness on the multiple FD you
* are working with, and read_guard.connection_fd() will give you the FD to wait on for
* the Wayland connection
*/
if wayland_socket_ready {
// If epoll notified readiness of the Wayland socket, you can now proceed to the read
read_guard.read().unwrap();
// And now, you must invoke dispatch_pending() to actually process the events
event_queue.dispatch_pending(&mut state).unwrap();
} else {
// otherwise, some of your other FD are ready, but you didn't receive Wayland events,
// you can drop the guard to cancel the read preparation
std::mem::drop(read_guard);
}
/*
* There you process all relevant events from your other event sources
*/
}
Implementations§
source§impl<State> EventQueue<State>
impl<State> EventQueue<State>
sourcepub fn handle(&self) -> QueueHandle<State>
pub fn handle(&self) -> QueueHandle<State>
Get a QueueHandle
for this event queue
Examples found in repository?
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
pub fn registry_queue_init<State>(
conn: &Connection,
) -> Result<(GlobalList, EventQueue<State>), GlobalError>
where
State: Dispatch<wl_registry::WlRegistry, GlobalListContents> + 'static,
{
let event_queue = conn.new_event_queue();
let display = conn.display();
let data = Arc::new(RegistryState {
globals: GlobalListContents { contents: Default::default() },
handle: event_queue.handle(),
initial_roundtrip_done: AtomicBool::new(false),
});
let registry = display.send_constructor(wl_display::Request::GetRegistry {}, data.clone())?;
// We don't need to dispatch the event queue as for now nothing will be sent to it
conn.roundtrip()?;
data.initial_roundtrip_done.store(true, Ordering::Relaxed);
Ok((GlobalList { registry }, event_queue))
}
sourcepub fn dispatch_pending(
&mut self,
data: &mut State
) -> Result<usize, DispatchError>
pub fn dispatch_pending(
&mut self,
data: &mut State
) -> Result<usize, DispatchError>
Dispatch pending events
Events are accumulated in the event queue internal buffer when the Wayland socket is read using
the read APIs on Connection
, or when reading is done from an other thread.
This method will dispatch all such pending events by sequentially invoking their associated handlers:
the Dispatch
implementations on the provided &mut D
.
Note: this may block if another thread has frozen the queue.
Examples found in repository?
More examples
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
pub fn blocking_dispatch(&mut self, data: &mut State) -> Result<usize, DispatchError> {
let dispatched = self.dispatch_pending(data)?;
if dispatched > 0 {
return Ok(dispatched);
}
self.conn.flush()?;
let guard = self.conn.prepare_read()?;
// we need to check the queue again, just in case another thread did a read between
// dispatch_pending and prepare_read
if self.handle.inner.lock().unwrap().queue.is_empty() {
crate::conn::blocking_read(guard)?;
} else {
drop(guard);
}
self.dispatch_pending(data)
}
sourcepub fn blocking_dispatch(
&mut self,
data: &mut State
) -> Result<usize, DispatchError>
pub fn blocking_dispatch(
&mut self,
data: &mut State
) -> Result<usize, DispatchError>
Block waiting for events and dispatch them
This method is similar to dispatch_pending
, but if there are no
pending events it will also flush the connection and block waiting for the Wayland server to send an
event.
A simple app event loop can consist of invoking this method in a loop.
Examples found in repository?
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
pub fn roundtrip(&mut self, data: &mut State) -> Result<usize, DispatchError> {
let done = Arc::new(SyncData::default());
let display = self.conn.display();
self.conn
.send_request(
&display,
crate::protocol::wl_display::Request::Sync {},
Some(done.clone()),
)
.map_err(|_| WaylandError::Io(Error::EPIPE.into()))?;
let mut dispatched = 0;
while !done.done.load(Ordering::Relaxed) {
dispatched += self.blocking_dispatch(data)?;
}
Ok(dispatched)
}
sourcepub fn roundtrip(&mut self, data: &mut State) -> Result<usize, DispatchError>
pub fn roundtrip(&mut self, data: &mut State) -> Result<usize, DispatchError>
Synchronous roundtrip
This function will cause a synchronous round trip with the wayland server. This function will block until all requests in the queue are sent and processed by the server.
This function may be useful during initial setup of your app. This function may also be useful where you need to guarantee all requests prior to calling this function are completed.
sourcepub fn prepare_read(&self) -> Result<ReadEventsGuard, WaylandError>
pub fn prepare_read(&self) -> Result<ReadEventsGuard, WaylandError>
Start a synchronized read from the socket
This is needed if you plan to wait on readiness of the Wayland socket using an event
loop. See the EventQueue
and ReadEventsGuard
docs for details. Once the events are received,
you’ll then need to dispatch them from the event queue using
EventQueue::dispatch_pending()
.
If you don’t need to manage multiple event sources, see
blocking_dispatch()
for a simpler mechanism.
This method is identical to Connection::prepare_read()
.
Examples found in repository?
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
pub fn new(queue: EventQueue<D>) -> Result<WaylandSource<D>, WaylandError> {
let guard = queue.prepare_read()?;
let fd = Generic::new(guard.connection_fd().as_raw_fd(), Interest::READ, Mode::Level);
drop(guard);
Ok(WaylandSource { queue, fd, read_guard: None })
}
/// Access the underlying event queue
///
/// Note that you should be careful when interacting with it if you invoke methods that
/// interact with the wayland socket (such as `dispatch()` or `prepare_read()`). These may
/// interfere with the proper waking up of this event source in the event loop.
pub fn queue(&mut self) -> &mut EventQueue<D> {
&mut self.queue
}
/// Insert this source into the given event loop.
///
/// This adapter will pass the event loop's shared data as the `D` type for the event loop.
pub fn insert(self, handle: LoopHandle<D>) -> Result<RegistrationToken, InsertError<Self>>
where
D: 'static,
{
handle.insert_source(self, |_, queue, data| queue.dispatch_pending(data))
}
}
impl<D> EventSource for WaylandSource<D> {
type Event = ();
/// The underlying event queue.
///
/// You should call [`EventQueue::dispatch_pending`] inside your callback using this queue.
type Metadata = EventQueue<D>;
type Ret = Result<usize, DispatchError>;
type Error = calloop::Error;
fn process_events<F>(
&mut self,
readiness: Readiness,
token: Token,
mut callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let queue = &mut self.queue;
let read_guard = &mut self.read_guard;
let action = self.fd.process_events(readiness, token, |_, _| {
// 1. read events from the socket if any are available
if let Some(guard) = read_guard.take() {
// might be None if some other thread read events before us, concurrently
if let Err(WaylandError::Io(err)) = guard.read() {
if err.kind() != io::ErrorKind::WouldBlock {
return Err(err);
}
}
}
// 2. dispatch any pending events in the queue
// This is done to ensure we are not waiting for messages that are already in the buffer.
Self::loop_callback_pending(queue, &mut callback)?;
*read_guard = Some(Self::prepare_read(queue)?);
// 3. Once dispatching is finished, flush the responses to the compositor
if let Err(WaylandError::Io(e)) = queue.flush() {
if e.kind() != io::ErrorKind::WouldBlock {
// in case of error, forward it and fast-exit
return Err(e);
}
// WouldBlock error means the compositor could not process all our messages
// quickly. Either it is slowed down or we are a spammer.
// Should not really happen, if it does we do nothing and will flush again later
}
Ok(PostAction::Continue)
})?;
Ok(action)
}
fn register(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> calloop::Result<()> {
self.fd.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> calloop::Result<()> {
self.fd.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut Poll) -> calloop::Result<()> {
self.fd.unregister(poll)
}
fn pre_run<F>(&mut self, mut callback: F) -> calloop::Result<()>
where
F: FnMut((), &mut Self::Metadata) -> Self::Ret,
{
debug_assert!(self.read_guard.is_none());
// flush the display before starting to poll
if let Err(WaylandError::Io(err)) = self.queue.flush() {
if err.kind() != io::ErrorKind::WouldBlock {
// in case of error, don't prepare a read, if the error is persistent, it'll trigger in other
// wayland methods anyway
log::error!("Error trying to flush the wayland display: {}", err);
return Err(err.into());
}
}
// ensure we are not waiting for messages that are already in the buffer.
Self::loop_callback_pending(&mut self.queue, &mut callback)?;
self.read_guard = Some(Self::prepare_read(&mut self.queue)?);
Ok(())
}
fn post_run<F>(&mut self, _: F) -> calloop::Result<()>
where
F: FnMut((), &mut Self::Metadata) -> Self::Ret,
{
// Drop implementation of ReadEventsGuard will do cleanup
self.read_guard.take();
Ok(())
}
}
impl<D> WaylandSource<D> {
/// Loop over the callback until all pending messages have been dispatched.
fn loop_callback_pending<F>(queue: &mut EventQueue<D>, callback: &mut F) -> io::Result<()>
where
F: FnMut((), &mut EventQueue<D>) -> Result<usize, DispatchError>,
{
// Loop on the callback until no pending events are left.
loop {
match callback((), queue) {
// No more pending events.
Ok(0) => break Ok(()),
Ok(_) => continue,
Err(DispatchError::Backend(WaylandError::Io(err))) => {
return Err(err);
}
Err(DispatchError::Backend(WaylandError::Protocol(err))) => {
log::error!("Protocol error received on display: {}", err);
break Err(Errno::EPROTO.into());
}
Err(DispatchError::BadMessage { interface, sender_id, opcode }) => {
log::error!(
"Bad message on interface \"{}\": (sender_id: {}, opcode: {})",
interface,
sender_id,
opcode,
);
break Err(Errno::EPROTO.into());
}
}
}
}
fn prepare_read(queue: &mut EventQueue<D>) -> io::Result<ReadEventsGuard> {
queue.prepare_read().map_err(|err| match err {
WaylandError::Io(err) => err,
WaylandError::Protocol(err) => {
log::error!("Protocol error received on display: {}", err);
Errno::EPROTO.into()
}
})
}
sourcepub fn flush(&self) -> Result<(), WaylandError>
pub fn flush(&self) -> Result<(), WaylandError>
Flush pending outgoing events to the server
This needs to be done regularly to ensure the server receives all your requests.
/// This method is identical to Connection::flush()
.
Examples found in repository?
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
fn process_events<F>(
&mut self,
readiness: Readiness,
token: Token,
mut callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let queue = &mut self.queue;
let read_guard = &mut self.read_guard;
let action = self.fd.process_events(readiness, token, |_, _| {
// 1. read events from the socket if any are available
if let Some(guard) = read_guard.take() {
// might be None if some other thread read events before us, concurrently
if let Err(WaylandError::Io(err)) = guard.read() {
if err.kind() != io::ErrorKind::WouldBlock {
return Err(err);
}
}
}
// 2. dispatch any pending events in the queue
// This is done to ensure we are not waiting for messages that are already in the buffer.
Self::loop_callback_pending(queue, &mut callback)?;
*read_guard = Some(Self::prepare_read(queue)?);
// 3. Once dispatching is finished, flush the responses to the compositor
if let Err(WaylandError::Io(e)) = queue.flush() {
if e.kind() != io::ErrorKind::WouldBlock {
// in case of error, forward it and fast-exit
return Err(e);
}
// WouldBlock error means the compositor could not process all our messages
// quickly. Either it is slowed down or we are a spammer.
// Should not really happen, if it does we do nothing and will flush again later
}
Ok(PostAction::Continue)
})?;
Ok(action)
}
fn register(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> calloop::Result<()> {
self.fd.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> calloop::Result<()> {
self.fd.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut Poll) -> calloop::Result<()> {
self.fd.unregister(poll)
}
fn pre_run<F>(&mut self, mut callback: F) -> calloop::Result<()>
where
F: FnMut((), &mut Self::Metadata) -> Self::Ret,
{
debug_assert!(self.read_guard.is_none());
// flush the display before starting to poll
if let Err(WaylandError::Io(err)) = self.queue.flush() {
if err.kind() != io::ErrorKind::WouldBlock {
// in case of error, don't prepare a read, if the error is persistent, it'll trigger in other
// wayland methods anyway
log::error!("Error trying to flush the wayland display: {}", err);
return Err(err.into());
}
}
// ensure we are not waiting for messages that are already in the buffer.
Self::loop_callback_pending(&mut self.queue, &mut callback)?;
self.read_guard = Some(Self::prepare_read(&mut self.queue)?);
Ok(())
}
sourcepub fn poll_dispatch_pending(
&mut self,
cx: &mut Context<'_>,
data: &mut State
) -> Poll<Result<Infallible, DispatchError>>
pub fn poll_dispatch_pending(
&mut self,
cx: &mut Context<'_>,
data: &mut State
) -> Poll<Result<Infallible, DispatchError>>
Attempt to dispatch events from this queue, registering the current task for wakeup if no events are pending.
This method is similar to dispatch_pending
; it will not
perform reads on the Wayland socket. Reads on the socket by other tasks or threads will
cause the current task to wake up if events are pending on this queue.
use futures_channel::mpsc::Receiver;
use futures_util::future::{poll_fn,select};
use futures_util::stream::StreamExt;
use wayland_client::EventQueue;
struct Data;
enum AppEvent {
SomethingHappened(u32),
}
impl Data {
fn handle(&mut self, event: AppEvent) {
// actual event handling goes here
}
}
// An async task that is spawned on an executor in order to handle events that need access
// to a specific data object.
async fn run(data: &mut Data, mut wl_queue: EventQueue<Data>, mut app_queue: Receiver<AppEvent>)
-> Result<(), Box<dyn std::error::Error>>
{
use futures_util::future::Either;
loop {
match select(
poll_fn(|cx| wl_queue.poll_dispatch_pending(cx, data)),
app_queue.next(),
).await {
Either::Left((res, _)) => match res? {},
Either::Right((Some(event), _)) => {
data.handle(event);
}
Either::Right((None, _)) => return Ok(()),
}
}
}
Trait Implementations§
Auto Trait Implementations§
impl<State> RefUnwindSafe for EventQueue<State>
impl<State> Send for EventQueue<State>
impl<State> Sync for EventQueue<State>
impl<State> Unpin for EventQueue<State>
impl<State> UnwindSafe for EventQueue<State>
Blanket Implementations§
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T, Global>) -> Box<dyn Any + 'static, Global>
fn into_any(self: Box<T, Global>) -> Box<dyn Any + 'static, Global>
Box<dyn Trait>
(where Trait: Downcast
) to Box<dyn Any>
. Box<dyn Any>
can
then be further downcast
into Box<ConcreteType>
where ConcreteType
implements Trait
.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any + 'static>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any + 'static>
Rc<Trait>
(where Trait: Downcast
) to Rc<Any>
. Rc<Any>
can then be
further downcast
into Rc<ConcreteType>
where ConcreteType
implements Trait
.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &Any
’s vtable from &Trait
’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &mut Any
’s vtable from &mut Trait
’s.