moq_transport/message/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
//! Low-level message sent over the wire, as defined in the specification.
//!
//! TODO Update this
//! All of these messages are sent over a bidirectional QUIC stream.
//! This introduces some head-of-line blocking but preserves ordering.
//! The only exception are OBJECT "messages", which are sent over dedicated QUIC streams.
//!
//! Messages sent by the publisher:
//! - [Announce]
//! - [Unannounce]
//! - [SubscribeOk]
//! - [SubscribeError]
//! - [SubscribeReset]
//! - [Object]
//!
//! Messages sent by the subscriber:
//! - [Subscribe]
//! - [SubscribeUpdate]
//! - [Unsubscribe]
//! - [AnnounceOk]
//! - [AnnounceError]
//!
//! Example flow:
//! ```test
//!  -> ANNOUNCE        namespace="foo"
//!  <- ANNOUNCE_OK     namespace="foo"
//!  <- SUBSCRIBE       id=0 namespace="foo" name="bar"
//!  -> SUBSCRIBE_OK    id=0
//!  -> OBJECT          id=0 sequence=69 priority=4 expires=30
//!  -> OBJECT          id=0 sequence=70 priority=4 expires=30
//!  -> OBJECT          id=0 sequence=70 priority=4 expires=30
//!  <- SUBSCRIBE_STOP  id=0
//!  -> SUBSCRIBE_RESET id=0 code=206 reason="closed by peer"
//! ```
mod announce;
mod announce_cancel;
mod announce_error;
mod announce_ok;
mod filter_type;
mod go_away;
mod group_order;
mod publisher;
mod subscribe;
mod subscribe_done;
mod subscribe_error;
mod subscribe_ok;
mod subscribe_update;
mod subscriber;
mod track_status;
mod track_status_request;
mod unannounce;
mod unsubscribe;

pub use announce::*;
pub use announce_cancel::*;
pub use announce_error::*;
pub use announce_ok::*;
pub use filter_type::*;
pub use go_away::*;
pub use group_order::*;
pub use publisher::*;
pub use subscribe::*;
pub use subscribe_done::*;
pub use subscribe_error::*;
pub use subscribe_ok::*;
pub use subscribe_update::*;
pub use subscriber::*;
pub use track_status::*;
pub use track_status_request::*;
pub use unannounce::*;
pub use unsubscribe::*;

use crate::coding::{Decode, DecodeError, Encode, EncodeError};
use std::fmt;

// Use a macro to generate the message types rather than copy-paste.
// This implements a decode/encode method that uses the specified type.
macro_rules! message_types {
    {$($name:ident = $val:expr,)*} => {
		/// All supported message types.
		#[derive(Clone)]
		pub enum Message {
			$($name($name)),*
		}

		impl Decode for Message {
			fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
				let t = u64::decode(r)?;

				match t {
					$($val => {
						let msg = $name::decode(r)?;
						Ok(Self::$name(msg))
					})*
					_ => Err(DecodeError::InvalidMessage(t)),
				}
			}
		}

		impl Encode for Message {
			fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
				match self {
					$(Self::$name(ref m) => {
						self.id().encode(w)?;
						m.encode(w)
					},)*
				}
			}
		}

		impl Message {
			pub fn id(&self) -> u64 {
				match self {
					$(Self::$name(_) => {
						$val
					},)*
				}
			}

			pub fn name(&self) -> &'static str {
				match self {
					$(Self::$name(_) => {
						stringify!($name)
					},)*
				}
			}
		}

		$(impl From<$name> for Message {
			fn from(m: $name) -> Self {
				Message::$name(m)
			}
		})*

		impl fmt::Debug for Message {
			// Delegate to the message formatter
			fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
				match self {
					$(Self::$name(ref m) => m.fmt(f),)*
				}
			}
		}
    }
}

// Each message is prefixed with the given VarInt type.
message_types! {
	// NOTE: Object and Setup are in other modules.
	// Object = 0x0
	// ObjectUnbounded = 0x2
	// SetupClient = 0x40
	// SetupServer = 0x41

	// SUBSCRIBE family, sent by subscriber
	SubscribeUpdate = 0x2,
	Subscribe = 0x3,
	Unsubscribe = 0xa,

	// SUBSCRIBE family, sent by publisher
	SubscribeOk = 0x4,
	SubscribeError = 0x5,
	SubscribeDone = 0xb,

	// ANNOUNCE family, sent by publisher
	Announce = 0x6,
	Unannounce = 0x9,

	// ANNOUNCE family, sent by subscriber
	AnnounceOk = 0x7,
	AnnounceError = 0x8,
	AnnounceCancel = 0xc,

	// TRACK_STATUS_REQUEST, sent by subscriber
	TrackStatusRequest = 0xd,

	// TRACK_STATUS, sent by publisher
	TrackStatus = 0xe,

	// Misc
	GoAway = 0x10,
}

/// Track Status Codes
/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html#name-track_status
#[derive(Clone, Debug, PartialEq, Copy)]
pub enum TrackStatusCode {
	// 0x00: The track is in progress, and subsequent fields contain the highest group and object ID for that track.
	InProgress = 0x00,
	// 0x01: The track does not exist. Subsequent fields MUST be zero, and any other value is a malformed message.
	DoesNotExist = 0x01,
	// 0x02: The track has not yet begun. Subsequent fields MUST be zero. Any other value is a malformed message.
	NotYetBegun = 0x02,
	// 0x03: The track has finished, so there is no "live edge." Subsequent fields contain the highest Group and object ID known.
	Finished = 0x03,
	// 0x04: The sender is a relay that cannot obtain the current track status from upstream. Subsequent fields contain the largest group and object ID known.
	Relay = 0x04,
}

impl Decode for TrackStatusCode {
	fn decode<B: bytes::Buf>(r: &mut B) -> Result<Self, DecodeError> {
		match u64::decode(r)? {
			0x00 => Ok(Self::InProgress),
			0x01 => Ok(Self::DoesNotExist),
			0x02 => Ok(Self::NotYetBegun),
			0x03 => Ok(Self::Finished),
			0x04 => Ok(Self::Relay),
			_ => Err(DecodeError::InvalidTrackStatusCode),
		}
	}
}

impl Encode for TrackStatusCode {
	fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
		match self {
			Self::InProgress => (0x00_u64).encode(w),
			Self::DoesNotExist => (0x01_u64).encode(w),
			Self::NotYetBegun => (0x02_u64).encode(w),
			Self::Finished => (0x03_u64).encode(w),
			Self::Relay => (0x04_u64).encode(w),
		}
	}
}