-[ ] Make core publish take qos with publish
-[ ] Rust iterator like design
-[ ] Should the APIs take files directly
This doc is a journal. Make a blog out of this
Primarily designed to efficiently perform streaming (unbounded) mqtt publishes and
subscriptions in flaky networks.
But the design choices to take a `Stream` for user requests (publishes, subscriptions etc) and the
eventloop it self being a `Stream` yielding incoming packets to the user makes other use cases easy to implement.
With out boring myself and you with only details, let's try to dive in
with some real world use cases
##### Robustness (reconnections and retransmissions)
------
##### Pausing the network activity to cooperate with other processes which needs bandwidth
------
##### Shutting down the eventloop by saving current state to the disk
------
##### Disk aware request stream
------
##### Bounded requests
------
* Automatic reconnections
* No commands for misc operations like network pause/
Connection and Automatic reconnections
-------
Open question: When should event loop start taking control of reconnections? After initial success or should
we expose options similar to current implementation? what should be the default behavior
```
Reconnect::AfterFirstSuccess
Reconnect::Always
Reconnect::Never
```
Let's do this for starters
```$xslt
// create an eventloop after the initial mqtt connection is successful
// user will decide if this should be retried or not
let eventloop = connect(mqttoptions) -> Result<EventLoop, Error>
// during intermittent reconnetions due to bad network, eventloop will
// behave as per configured reconnection options to the eventloop
let stream = eventloop.assemble(reconnection_options, inputs);
```
Possible reconnection options
```$xslt
Reconnect::Never
Reconnect::Automatic
```
I feel this a good middle ground between manual user control and rumq being very opinionated about
reconnection behaviour.
But why not leave the intermittent reconnection behaviour as well to the user you ask?
Because maintaini
state is a much more complicated business than just retrying the connection. We don't want the user to think
about mqttstate by default. If the user intends for a much more custom behaviour, he/she can use
`Reconnect::Never` and pass the returned `MqttState` to the next connection.
Takes any stream type as input
-------
Allows eventloop to be channel implementation agnostic. This opens up interesting usage patterns. Instead of providing
methods to communicate with the eventloop over a channel, we can directly communicate with th eventloop over the specified stream. Users can customize the stream in any way that fits their usecase. For example we plugin streams which orchestrate data between
disk and memory.
This strategy prevents resource overhead in cases where user uses a different kind of channel than what rumqtt uses. E.g if
rumqtt uses futures channnel and user wants to plugin data coming from different kind of stream, user has to make extra copies
passing data fromn the channel of his choice to futures channel (requires user to run an extra thread which does the handover)
```
// thread 1
user_channel_tx.send(data)
// thread 2
data = user_channel_rx.recv();
rumqtt_channel_tx.send(data);
// thread 3
rumqtt_eventloop.start(rumqtt_channel_rx);
```
vs
```
// thread 1
user_channel_tx.send(data)
// thread 2
rumqtt_eventloop.start(user_channel_rx_stream);
```
Don't spawn any thread from the library
-------
Provide all the eventloop handling necessary for a robust mqtt connection but don't spawn any inner threads. This choice is
left to the user. Usecases might vary from running streams with fixed input to channels that keep producing data forever
```
let stream = vec![publish1, publish2, publish3];
let eventloop = MqttEventLoop::new(options);
eventloop.run(stream);
```
or
```
let (tx, rx) = channel::bound(10);
sleep(1)
});
let eventloop = MqttEventLoop::new(options);
eventloop.run(rx);
```
Support both synchronous and asynchronous use cases
-------
Leverage on the above pattern to support both synchronous and asynchronus publishes with timeouts
```
let publishes = [p1, p2, p3];
eventloop.run_timeout(publishes, 10 * Duration::SECS);
```
Eventloop will wait for all the acks within timeout (where ever necessary) and exits
Command channels to configure eventloop dynamically
-------
reconnections, disconnection, throttle speed, pause/resume (without disconnections)
#### shutdown, disconnection, reconnection, pause/resume semantics
there might be no need to implement separate shutdown to stop the eventloop and return existing state
```
let eventloop = MqttEventloop::new();
})
enum EventloopStatus {
Shutdown(MqttState)
// sends disconnect to the server and waits for server server disconnection
// should't process any new data from the user channel
Disconnect
}
eventloop.run() //return -> Result<MqttSt>
```
Keep additional functionality like gcloud jwt auth out of rumqtt
-------
Prevents (some) conflicts w.r.t different versions of ring. conflicts because of rustls are still possible but atleast
prevents ones w.r.t jsonwebtoken.
Keeps the codebase small which eases some maintainence burden
Keep alive
-------
Keepalives can be a little tricky
client should keep track of keepalive timeouts for 2 different reasons
* when there is no incoming n/w activity
to detect any halfopen connections to the broker, client should send a pingrequest packet
and validate next pingreq with previous pingresp.
if previous ack isn't received, client should consider this as a halfopen connection and
disconnect. takes 2 keepalive times to detect halfopen connection and disconnect
* to prevent broker from disconnecting the client due to no client activity
broker should receive some packet activity from a client or else it'll assume the
connection as halfopen and disconnect.
for this reason, even though the client is receiving some incoming packets(qos0 publishes)
client should timeout when there is no outgoing packet activity and send a ping request
We would require timeouts on network incoming packets as well as network outgoing packets. concurrently
so we need to create 2 streams with timeouts, one on n/w incoming packets and another on n/w outgoing packets and select
```rust
let incoming_stream = stream!(tcp.read_mqtt().timeout(NetworkTimeout))
This captures incoming stream timeout
Caputuring timeout on outgoing packects (replys due to incoming packets + user requests) is tricky because replys
are a sideeffect of processing incoming packets which generate notifications for the user as well.
To have a timeout on a combination of reply and requests, we need to filter out notifications (or put it into other stream)
which beats our design of returning one eventloop stream to the user to handle full mqtt io
let mqtt_stream = stream! {
loop {
select! {
(notification, reply) = incoming_stream.next().handle_incoming_packet(),
(request) = requests.next()
}
yield reply
}
}
```
So there is no way to separate notifications from reply without creating a duplicate stream
Option 2 is to timeout on user requests alone. This will lead to sending unnecessary pingreqests after keep alive time
even when there is outgoing network activity due to network replys. But this will let us have one simple
stream to poll. Returning 2 streams might not be intuitive to the users. main eventloop progress can stop when the user
doensn't poll the notification stream (tx.send will block which eventually blocks all incoming packets). Having a second stream also results in more allocations in the hotpath (which might not be a big deal but not ideal)
Option2 is also considerable less codebase and hence easy maintainence.
Keep alive take 2
Ok, creating independent timeout streams on network and request streams makes keep alive complicated. Both the streams mutating a common timeout is the solution
* When there is an incoming packet which replys the network with some packet, reset the timer and don't generate `PingReq`
* When there are no incoming and outgoing packets, timeout, hence generate `PingReq`. Reset the delay
* In a keep alive window, when there is an incoming packet (which doesn't trigger a reply), mark it
* When there is an outgoing request in this window with incoming already marked. Reset the timer.
* If there is no outgoing request in this window, timeout, hence generate `PingReq`. Reset the delay and markers
Timeout for packets which are not acked
------------------
Implement timeout for unacked packets and drop/republish them. Useful if the
broker is buggy. But might not be too important as unacked messages are
anyway retried during reconnection