moq_transport/message/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
//! Low-level message sent over the wire, as defined in the specification.
//!
//! TODO Update this
//! All of these messages are sent over a bidirectional QUIC stream.
//! This introduces some head-of-line blocking but preserves ordering.
//! The only exception are OBJECT "messages", which are sent over dedicated QUIC streams.
//!
//! Messages sent by the publisher:
//! - [Announce]
//! - [Unannounce]
//! - [SubscribeOk]
//! - [SubscribeError]
//! - [SubscribeReset]
//! - [Object]
//!
//! Messages sent by the subscriber:
//! - [Subscribe]
//! - [SubscribeUpdate]
//! - [Unsubscribe]
//! - [AnnounceOk]
//! - [AnnounceError]
//!
//! Example flow:
//! ```test
//! -> ANNOUNCE namespace="foo"
//! <- ANNOUNCE_OK namespace="foo"
//! <- SUBSCRIBE id=0 namespace="foo" name="bar"
//! -> SUBSCRIBE_OK id=0
//! -> OBJECT id=0 sequence=69 priority=4 expires=30
//! -> OBJECT id=0 sequence=70 priority=4 expires=30
//! -> OBJECT id=0 sequence=70 priority=4 expires=30
//! <- SUBSCRIBE_STOP id=0
//! -> SUBSCRIBE_RESET id=0 code=206 reason="closed by peer"
//! ```
mod announce;
mod announce_cancel;
mod announce_error;
mod announce_ok;
mod filter_type;
mod go_away;
mod group_order;
mod publisher;
mod subscribe;
mod subscribe_done;
mod subscribe_error;
mod subscribe_ok;
mod subscribe_update;
mod subscriber;
mod track_status;
mod track_status_request;
mod unannounce;
mod unsubscribe;
pub use announce::*;
pub use announce_cancel::*;
pub use announce_error::*;
pub use announce_ok::*;
pub use filter_type::*;
pub use go_away::*;
pub use group_order::*;
pub use publisher::*;
pub use subscribe::*;
pub use subscribe_done::*;
pub use subscribe_error::*;
pub use subscribe_ok::*;
pub use subscribe_update::*;
pub use subscriber::*;
pub use track_status::*;
pub use track_status_request::*;
pub use unannounce::*;
pub use unsubscribe::*;
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
use std::fmt;
// Use a macro to generate the message types rather than copy-paste.
// This implements a decode/encode method that uses the specified type.
macro_rules! message_types {
{$($name:ident = $val:expr,)*} => {
/// All supported message types.
#[derive(Clone)]
pub enum Message {
$($name($name)),*
}
impl Decode for Message {
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
let t = u64::decode(r)?;
match t {
$($val => {
let msg = $name::decode(r)?;
Ok(Self::$name(msg))
})*
_ => Err(DecodeError::InvalidMessage(t)),
}
}
}
impl Encode for Message {
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
match self {
$(Self::$name(ref m) => {
self.id().encode(w)?;
m.encode(w)
},)*
}
}
}
impl Message {
pub fn id(&self) -> u64 {
match self {
$(Self::$name(_) => {
$val
},)*
}
}
pub fn name(&self) -> &'static str {
match self {
$(Self::$name(_) => {
stringify!($name)
},)*
}
}
}
$(impl From<$name> for Message {
fn from(m: $name) -> Self {
Message::$name(m)
}
})*
impl fmt::Debug for Message {
// Delegate to the message formatter
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
$(Self::$name(ref m) => m.fmt(f),)*
}
}
}
}
}
// Each message is prefixed with the given VarInt type.
message_types! {
// NOTE: Object and Setup are in other modules.
// Object = 0x0
// ObjectUnbounded = 0x2
// SetupClient = 0x40
// SetupServer = 0x41
// SUBSCRIBE family, sent by subscriber
SubscribeUpdate = 0x2,
Subscribe = 0x3,
Unsubscribe = 0xa,
// SUBSCRIBE family, sent by publisher
SubscribeOk = 0x4,
SubscribeError = 0x5,
SubscribeDone = 0xb,
// ANNOUNCE family, sent by publisher
Announce = 0x6,
Unannounce = 0x9,
// ANNOUNCE family, sent by subscriber
AnnounceOk = 0x7,
AnnounceError = 0x8,
AnnounceCancel = 0xc,
// TRACK_STATUS_REQUEST, sent by subscriber
TrackStatusRequest = 0xd,
// TRACK_STATUS, sent by publisher
TrackStatus = 0xe,
// Misc
GoAway = 0x10,
}
/// Track Status Codes
/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html#name-track_status
#[derive(Clone, Debug, PartialEq, Copy)]
pub enum TrackStatusCode {
// 0x00: The track is in progress, and subsequent fields contain the highest group and object ID for that track.
InProgress = 0x00,
// 0x01: The track does not exist. Subsequent fields MUST be zero, and any other value is a malformed message.
DoesNotExist = 0x01,
// 0x02: The track has not yet begun. Subsequent fields MUST be zero. Any other value is a malformed message.
NotYetBegun = 0x02,
// 0x03: The track has finished, so there is no "live edge." Subsequent fields contain the highest Group and object ID known.
Finished = 0x03,
// 0x04: The sender is a relay that cannot obtain the current track status from upstream. Subsequent fields contain the largest group and object ID known.
Relay = 0x04,
}
impl Decode for TrackStatusCode {
fn decode<B: bytes::Buf>(r: &mut B) -> Result<Self, DecodeError> {
match u64::decode(r)? {
0x00 => Ok(Self::InProgress),
0x01 => Ok(Self::DoesNotExist),
0x02 => Ok(Self::NotYetBegun),
0x03 => Ok(Self::Finished),
0x04 => Ok(Self::Relay),
_ => Err(DecodeError::InvalidTrackStatusCode),
}
}
}
impl Encode for TrackStatusCode {
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
match self {
Self::InProgress => (0x00_u64).encode(w),
Self::DoesNotExist => (0x01_u64).encode(w),
Self::NotYetBegun => (0x02_u64).encode(w),
Self::Finished => (0x03_u64).encode(w),
Self::Relay => (0x04_u64).encode(w),
}
}
}