hyper/ffi/body.rs
1use std::ffi::{c_int, c_void};
2use std::mem::ManuallyDrop;
3use std::ptr;
4use std::task::{Context, Poll};
5
6use http_body_util::BodyExt as _;
7
8use super::task::{hyper_context, hyper_task, hyper_task_return_type, AsTaskType};
9use super::{UserDataPointer, HYPER_ITER_CONTINUE};
10use crate::body::{Bytes, Frame, Incoming as IncomingBody};
11use crate::ffi::size_t;
12
13/// A streaming HTTP body.
14///
15/// This is used both for sending requests (with `hyper_request_set_body`) and
16/// for receiving responses (with `hyper_response_body`).
17///
18/// For outgoing request bodies, call `hyper_body_set_data_func` to provide the
19/// data.
20///
21/// For incoming response bodies, call `hyper_body_data` to get a task that will
22/// yield a chunk of data each time it is polled. That task must be then be
23/// added to the executor with `hyper_executor_push`.
24///
25/// Methods:
26///
27/// - hyper_body_new: Create a new “empty” body.
28/// - hyper_body_set_userdata: Set userdata on this body, which will be passed to callback functions.
29/// - hyper_body_set_data_func: Set the data callback for this body.
30/// - hyper_body_data: Creates a task that will poll a response body for the next buffer of data.
31/// - hyper_body_foreach: Creates a task to execute the callback with each body chunk received.
32/// - hyper_body_free: Free a body.
33pub struct hyper_body(pub(super) IncomingBody);
34
35/// A buffer of bytes that is sent or received on a `hyper_body`.
36///
37/// Obtain one of these in the callback of `hyper_body_foreach` or by receiving
38/// a task of type `HYPER_TASK_BUF` from `hyper_executor_poll` (after calling
39/// `hyper_body_data` and pushing the resulting task).
40///
41/// Methods:
42///
43/// - hyper_buf_bytes: Get a pointer to the bytes in this buffer.
44/// - hyper_buf_copy: Create a new hyper_buf * by copying the provided bytes.
45/// - hyper_buf_free: Free this buffer.
46/// - hyper_buf_len: Get the length of the bytes this buffer contains.
47pub struct hyper_buf(pub(crate) Bytes);
48
49pub(crate) struct UserBody {
50 data_func: hyper_body_data_callback,
51 userdata: *mut c_void,
52}
53
54// ===== Body =====
55
56type hyper_body_foreach_callback = extern "C" fn(*mut c_void, *const hyper_buf) -> c_int;
57
58type hyper_body_data_callback =
59 extern "C" fn(*mut c_void, *mut hyper_context<'_>, *mut *mut hyper_buf) -> c_int;
60
61ffi_fn! {
62 /// Creates a new "empty" body.
63 ///
64 /// If not configured, this body acts as an empty payload.
65 ///
66 /// To avoid a memory leak, the body must eventually be consumed by
67 /// `hyper_body_free`, `hyper_body_foreach`, or `hyper_request_set_body`.
68 fn hyper_body_new() -> *mut hyper_body {
69 Box::into_raw(Box::new(hyper_body(IncomingBody::ffi())))
70 } ?= ptr::null_mut()
71}
72
73ffi_fn! {
74 /// Free a body.
75 ///
76 /// This should only be used if the request isn't consumed by
77 /// `hyper_body_foreach` or `hyper_request_set_body`.
78 fn hyper_body_free(body: *mut hyper_body) {
79 drop(non_null!(Box::from_raw(body) ?= ()));
80 }
81}
82
83ffi_fn! {
84 /// Creates a task that will poll a response body for the next buffer of data.
85 ///
86 /// The task may have different types depending on the outcome:
87 ///
88 /// - `HYPER_TASK_BUF`: Success, and more data was received.
89 /// - `HYPER_TASK_ERROR`: An error retrieving the data.
90 /// - `HYPER_TASK_EMPTY`: The body has finished streaming data.
91 ///
92 /// When the application receives the task from `hyper_executor_poll`,
93 /// if the task type is `HYPER_TASK_BUF`, it should cast the task to
94 /// `hyper_buf *` and consume all the bytes in the buffer. Then
95 /// the application should call `hyper_body_data` again for the same
96 /// `hyper_body *`, to create a task for the next buffer of data.
97 /// Repeat until the polled task type is `HYPER_TASK_ERROR` or
98 /// `HYPER_TASK_EMPTY`.
99 ///
100 /// To avoid a memory leak, the task must eventually be consumed by
101 /// `hyper_task_free`, or taken ownership of by `hyper_executor_push`
102 /// without subsequently being given back by `hyper_executor_poll`.
103 ///
104 /// This does not consume the `hyper_body *`, so it may be used again.
105 /// However, the `hyper_body *` MUST NOT be used or freed until the
106 /// related task is returned from `hyper_executor_poll`.
107 ///
108 /// For a more convenient method, see also `hyper_body_foreach`.
109 fn hyper_body_data(body: *mut hyper_body) -> *mut hyper_task {
110 // This doesn't take ownership of the Body, so don't allow destructor
111 let mut body = ManuallyDrop::new(non_null!(Box::from_raw(body) ?= ptr::null_mut()));
112
113 Box::into_raw(hyper_task::boxed(async move {
114 loop {
115 match body.0.frame().await {
116 Some(Ok(frame)) => {
117 if let Ok(data) = frame.into_data() {
118 return Ok(Some(hyper_buf(data)));
119 } else {
120 continue;
121 }
122 },
123 Some(Err(e)) => return Err(e),
124 None => return Ok(None),
125 }
126 }
127 }))
128 } ?= ptr::null_mut()
129}
130
131ffi_fn! {
132 /// Creates a task to execute the callback with each body chunk received.
133 ///
134 /// To avoid a memory leak, the task must eventually be consumed by
135 /// `hyper_task_free`, or taken ownership of by `hyper_executor_push`
136 /// without subsequently being given back by `hyper_executor_poll`.
137 ///
138 /// The `hyper_buf` pointer is only a borrowed reference. It cannot live outside
139 /// the execution of the callback. You must make a copy of the bytes to retain them.
140 ///
141 /// The callback should return `HYPER_ITER_CONTINUE` to continue iterating
142 /// chunks as they are received, or `HYPER_ITER_BREAK` to cancel. Each
143 /// invocation of the callback must consume all the bytes it is provided.
144 /// There is no mechanism to signal to Hyper that only a subset of bytes were
145 /// consumed.
146 ///
147 /// This will consume the `hyper_body *`, you shouldn't use it anymore or free it.
148 fn hyper_body_foreach(body: *mut hyper_body, func: hyper_body_foreach_callback, userdata: *mut c_void) -> *mut hyper_task {
149 let mut body = non_null!(Box::from_raw(body) ?= ptr::null_mut());
150 let userdata = UserDataPointer(userdata);
151
152 Box::into_raw(hyper_task::boxed(async move {
153 let _ = &userdata;
154 while let Some(item) = body.0.frame().await {
155 let frame = item?;
156 if let Ok(chunk) = frame.into_data() {
157 if HYPER_ITER_CONTINUE != func(userdata.0, &hyper_buf(chunk)) {
158 return Err(crate::Error::new_user_aborted_by_callback());
159 }
160 }
161 }
162 Ok(())
163 }))
164 } ?= ptr::null_mut()
165}
166
167ffi_fn! {
168 /// Set userdata on this body, which will be passed to callback functions.
169 fn hyper_body_set_userdata(body: *mut hyper_body, userdata: *mut c_void) {
170 let b = non_null!(&mut *body ?= ());
171 b.0.as_ffi_mut().userdata = userdata;
172 }
173}
174
175ffi_fn! {
176 /// Set the outgoing data callback for this body.
177 ///
178 /// The callback is called each time hyper needs to send more data for the
179 /// body. It is passed the value from `hyper_body_set_userdata`.
180 ///
181 /// If there is data available, the `hyper_buf **` argument should be set
182 /// to a `hyper_buf *` containing the data, and `HYPER_POLL_READY` should
183 /// be returned.
184 ///
185 /// Returning `HYPER_POLL_READY` while the `hyper_buf **` argument points
186 /// to `NULL` will indicate the body has completed all data.
187 ///
188 /// If there is more data to send, but it isn't yet available, a
189 /// `hyper_waker` should be saved from the `hyper_context *` argument, and
190 /// `HYPER_POLL_PENDING` should be returned. You must wake the saved waker
191 /// to signal the task when data is available.
192 ///
193 /// If some error has occurred, you can return `HYPER_POLL_ERROR` to abort
194 /// the body.
195 fn hyper_body_set_data_func(body: *mut hyper_body, func: hyper_body_data_callback) {
196 let b = non_null!{ &mut *body ?= () };
197 b.0.as_ffi_mut().data_func = func;
198 }
199}
200
201// ===== impl UserBody =====
202
203impl UserBody {
204 pub(crate) fn new() -> UserBody {
205 UserBody {
206 data_func: data_noop,
207 userdata: std::ptr::null_mut(),
208 }
209 }
210
211 pub(crate) fn poll_data(
212 &mut self,
213 cx: &mut Context<'_>,
214 ) -> Poll<Option<crate::Result<Frame<Bytes>>>> {
215 let mut out = std::ptr::null_mut();
216 match (self.data_func)(self.userdata, hyper_context::wrap(cx), &mut out) {
217 super::task::HYPER_POLL_READY => {
218 if out.is_null() {
219 Poll::Ready(None)
220 } else {
221 let buf = unsafe { Box::from_raw(out) };
222 Poll::Ready(Some(Ok(Frame::data(buf.0))))
223 }
224 }
225 super::task::HYPER_POLL_PENDING => Poll::Pending,
226 super::task::HYPER_POLL_ERROR => {
227 Poll::Ready(Some(Err(crate::Error::new_body_write_aborted())))
228 }
229 unexpected => Poll::Ready(Some(Err(crate::Error::new_body_write(format!(
230 "unexpected hyper_body_data_func return code {}",
231 unexpected
232 ))))),
233 }
234 }
235}
236
237/// cbindgen:ignore
238extern "C" fn data_noop(
239 _userdata: *mut c_void,
240 _: *mut hyper_context<'_>,
241 _: *mut *mut hyper_buf,
242) -> c_int {
243 super::task::HYPER_POLL_READY
244}
245
246unsafe impl Send for UserBody {}
247unsafe impl Sync for UserBody {}
248
249// ===== Bytes =====
250
251ffi_fn! {
252 /// Create a new `hyper_buf *` by copying the provided bytes.
253 ///
254 /// This makes an owned copy of the bytes, so the `buf` argument can be
255 /// freed (with `hyper_buf_free`) or changed afterwards.
256 ///
257 /// To avoid a memory leak, the copy must eventually be consumed by
258 /// `hyper_buf_free`.
259 ///
260 /// This returns `NULL` if allocating a new buffer fails.
261 fn hyper_buf_copy(buf: *const u8, len: size_t) -> *mut hyper_buf {
262 let slice = unsafe {
263 std::slice::from_raw_parts(buf, len)
264 };
265 Box::into_raw(Box::new(hyper_buf(Bytes::copy_from_slice(slice))))
266 } ?= ptr::null_mut()
267}
268
269ffi_fn! {
270 /// Get a pointer to the bytes in this buffer.
271 ///
272 /// This should be used in conjunction with `hyper_buf_len` to get the length
273 /// of the bytes data.
274 ///
275 /// This pointer is borrowed data, and not valid once the `hyper_buf` is
276 /// consumed/freed.
277 fn hyper_buf_bytes(buf: *const hyper_buf) -> *const u8 {
278 unsafe { (*buf).0.as_ptr() }
279 } ?= ptr::null()
280}
281
282ffi_fn! {
283 /// Get the length of the bytes this buffer contains.
284 fn hyper_buf_len(buf: *const hyper_buf) -> size_t {
285 unsafe { (*buf).0.len() }
286 }
287}
288
289ffi_fn! {
290 /// Free this buffer.
291 ///
292 /// This should be used for any buffer once it is no longer needed.
293 fn hyper_buf_free(buf: *mut hyper_buf) {
294 drop(unsafe { Box::from_raw(buf) });
295 }
296}
297
298unsafe impl AsTaskType for hyper_buf {
299 fn as_task_type(&self) -> hyper_task_return_type {
300 hyper_task_return_type::HYPER_TASK_BUF
301 }
302}