1use error::to_std_io_result;
6use gio_sys;
7use glib::object::IsA;
8use glib::translate::*;
9use glib::Priority;
10use glib_sys;
11use gobject_sys;
12use std::io;
13use std::mem;
14use std::ptr;
15use Cancellable;
16use Error;
17use InputStream;
18
19#[cfg(feature = "futures")]
20use futures::future;
21
22pub trait InputStreamExtManual: Sized {
23 fn read<B: AsMut<[u8]>>(
24 &self,
25 buffer: B,
26 cancellable: Option<&Cancellable>,
27 ) -> Result<usize, Error>;
28
29 fn read_all<B: AsMut<[u8]>>(
30 &self,
31 buffer: B,
32 cancellable: Option<&Cancellable>,
33 ) -> Result<(usize, Option<Error>), Error>;
34
35 #[cfg(any(feature = "v2_44", feature = "dox"))]
36 fn read_all_async<
37 B: AsMut<[u8]> + Send + 'static,
38 Q: FnOnce(Result<(B, usize, Option<Error>), (B, Error)>) + Send + 'static,
39 >(
40 &self,
41 buffer: B,
42 io_priority: Priority,
43 cancellable: Option<&Cancellable>,
44 callback: Q,
45 );
46
47 fn read_async<
48 B: AsMut<[u8]> + Send + 'static,
49 Q: FnOnce(Result<(B, usize), (B, Error)>) + Send + 'static,
50 >(
51 &self,
52 buffer: B,
53 io_priority: Priority,
54 cancellable: Option<&Cancellable>,
55 callback: Q,
56 );
57
58 #[cfg(feature = "futures")]
59 #[cfg(any(feature = "v2_44", feature = "dox"))]
60 fn read_all_async_future<'a, B: AsMut<[u8]> + Send + 'static>(
61 &self,
62 buffer: B,
63 io_priority: Priority,
64 ) -> Box<
65 dyn future::Future<Output = Result<(B, usize, Option<Error>), (B, Error)>>
66 + std::marker::Unpin,
67 >;
68
69 #[cfg(feature = "futures")]
70 fn read_async_future<'a, B: AsMut<[u8]> + Send + 'static>(
71 &self,
72 buffer: B,
73 io_priority: Priority,
74 ) -> Box<dyn future::Future<Output = Result<(B, usize), (B, Error)>> + std::marker::Unpin>;
75
76 fn into_read(self) -> InputStreamRead<Self> {
77 InputStreamRead(self)
78 }
79}
80
81impl<O: IsA<InputStream>> InputStreamExtManual for O {
82 fn read<B: AsMut<[u8]>>(
83 &self,
84 mut buffer: B,
85 cancellable: Option<&Cancellable>,
86 ) -> Result<usize, Error> {
87 let cancellable = cancellable.to_glib_none();
88 let buffer = buffer.as_mut();
89 let buffer_ptr = buffer.as_mut_ptr();
90 let count = buffer.len();
91 unsafe {
92 let mut error = ptr::null_mut();
93 let ret = gio_sys::g_input_stream_read(
94 self.as_ref().to_glib_none().0,
95 buffer_ptr,
96 count,
97 cancellable.0,
98 &mut error,
99 );
100 if error.is_null() {
101 Ok(ret as usize)
102 } else {
103 Err(from_glib_full(error))
104 }
105 }
106 }
107
108 fn read_all<B: AsMut<[u8]>>(
109 &self,
110 mut buffer: B,
111 cancellable: Option<&Cancellable>,
112 ) -> Result<(usize, Option<Error>), Error> {
113 let cancellable = cancellable.to_glib_none();
114 let buffer = buffer.as_mut();
115 let buffer_ptr = buffer.as_mut_ptr();
116 let count = buffer.len();
117 unsafe {
118 let mut bytes_read = mem::uninitialized();
119 let mut error = ptr::null_mut();
120 let _ = gio_sys::g_input_stream_read_all(
121 self.as_ref().to_glib_none().0,
122 buffer_ptr,
123 count,
124 &mut bytes_read,
125 cancellable.0,
126 &mut error,
127 );
128
129 if error.is_null() {
130 Ok((bytes_read, None))
131 } else if bytes_read != 0 {
132 Ok((bytes_read, Some(from_glib_full(error))))
133 } else {
134 Err(from_glib_full(error))
135 }
136 }
137 }
138
139 #[cfg(any(feature = "v2_44", feature = "dox"))]
140 fn read_all_async<
141 B: AsMut<[u8]> + Send + 'static,
142 Q: FnOnce(Result<(B, usize, Option<Error>), (B, Error)>) + Send + 'static,
143 >(
144 &self,
145 buffer: B,
146 io_priority: Priority,
147 cancellable: Option<&Cancellable>,
148 callback: Q,
149 ) {
150 let cancellable = cancellable.to_glib_none();
151 let mut user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer)));
152 let (count, buffer_ptr) = {
154 let buffer = &mut (*user_data).as_mut().unwrap().1;
155 let slice = (*buffer).as_mut();
156 (slice.len(), slice.as_mut_ptr())
157 };
158 unsafe extern "C" fn read_all_async_trampoline<
159 B: AsMut<[u8]> + Send + 'static,
160 Q: FnOnce(Result<(B, usize, Option<Error>), (B, Error)>) + Send + 'static,
161 >(
162 _source_object: *mut gobject_sys::GObject,
163 res: *mut gio_sys::GAsyncResult,
164 user_data: glib_sys::gpointer,
165 ) {
166 let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _);
167 let (callback, buffer) = user_data.take().unwrap();
168
169 let mut error = ptr::null_mut();
170 let mut bytes_read = mem::uninitialized();
171 let _ = gio_sys::g_input_stream_read_all_finish(
172 _source_object as *mut _,
173 res,
174 &mut bytes_read,
175 &mut error,
176 );
177
178 let result = if error.is_null() {
179 Ok((buffer, bytes_read, None))
180 } else if bytes_read != 0 {
181 Ok((buffer, bytes_read, Some(from_glib_full(error))))
182 } else {
183 Err((buffer, from_glib_full(error)))
184 };
185
186 callback(result);
187 }
188 let callback = read_all_async_trampoline::<B, Q>;
189 unsafe {
190 gio_sys::g_input_stream_read_all_async(
191 self.as_ref().to_glib_none().0,
192 buffer_ptr,
193 count,
194 io_priority.to_glib(),
195 cancellable.0,
196 Some(callback),
197 Box::into_raw(user_data) as *mut _,
198 );
199 }
200 }
201
202 fn read_async<
203 B: AsMut<[u8]> + Send + 'static,
204 Q: FnOnce(Result<(B, usize), (B, Error)>) + Send + 'static,
205 >(
206 &self,
207 buffer: B,
208 io_priority: Priority,
209 cancellable: Option<&Cancellable>,
210 callback: Q,
211 ) {
212 let cancellable = cancellable.to_glib_none();
213 let mut user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer)));
214 let (count, buffer_ptr) = {
216 let buffer = &mut (*user_data).as_mut().unwrap().1;
217 let slice = (*buffer).as_mut();
218 (slice.len(), slice.as_mut_ptr())
219 };
220 unsafe extern "C" fn read_async_trampoline<
221 B: AsMut<[u8]> + Send + 'static,
222 Q: FnOnce(Result<(B, usize), (B, Error)>) + Send + 'static,
223 >(
224 _source_object: *mut gobject_sys::GObject,
225 res: *mut gio_sys::GAsyncResult,
226 user_data: glib_sys::gpointer,
227 ) {
228 let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _);
229 let (callback, buffer) = user_data.take().unwrap();
230
231 let mut error = ptr::null_mut();
232 let ret =
233 gio_sys::g_input_stream_read_finish(_source_object as *mut _, res, &mut error);
234
235 let result = if error.is_null() {
236 Ok((buffer, ret as usize))
237 } else {
238 Err((buffer, from_glib_full(error)))
239 };
240
241 callback(result);
242 }
243 let callback = read_async_trampoline::<B, Q>;
244 unsafe {
245 gio_sys::g_input_stream_read_async(
246 self.as_ref().to_glib_none().0,
247 buffer_ptr,
248 count,
249 io_priority.to_glib(),
250 cancellable.0,
251 Some(callback),
252 Box::into_raw(user_data) as *mut _,
253 );
254 }
255 }
256
257 #[cfg(feature = "futures")]
258 #[cfg(any(feature = "v2_44", feature = "dox"))]
259 fn read_all_async_future<'a, B: AsMut<[u8]> + Send + 'static>(
260 &self,
261 buffer: B,
262 io_priority: Priority,
263 ) -> Box<
264 dyn future::Future<Output = Result<(B, usize, Option<Error>), (B, Error)>>
265 + std::marker::Unpin,
266 > {
267 use GioFuture;
268
269 GioFuture::new(self, move |obj, send| {
270 use fragile::Fragile;
271
272 let cancellable = Cancellable::new();
273 let send = Fragile::new(send);
274 obj.read_all_async(buffer, io_priority, Some(&cancellable), move |res| {
275 let _ = send.into_inner().send(res);
276 });
277
278 cancellable
279 })
280 }
281
282 #[cfg(feature = "futures")]
283 fn read_async_future<'a, B: AsMut<[u8]> + Send + 'static>(
284 &self,
285 buffer: B,
286 io_priority: Priority,
287 ) -> Box<dyn future::Future<Output = Result<(B, usize), (B, Error)>> + std::marker::Unpin> {
288 use GioFuture;
289
290 GioFuture::new(self, move |obj, send| {
291 use fragile::Fragile;
292
293 let cancellable = Cancellable::new();
294 let send = Fragile::new(send);
295 obj.read_async(buffer, io_priority, Some(&cancellable), move |res| {
296 let _ = send.into_inner().send(res);
297 });
298
299 cancellable
300 })
301 }
302}
303
304#[derive(Debug, PartialEq, Eq, Hash, Clone)]
305pub struct InputStreamRead<T: InputStreamExtManual>(T);
306
307impl<T: InputStreamExtManual> InputStreamRead<T> {
308 pub fn into_input_stream(self) -> T {
309 self.0
310 }
311
312 pub fn input_stream(&self) -> &T {
313 &self.0
314 }
315}
316
317impl<T: InputStreamExtManual> io::Read for InputStreamRead<T> {
318 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
319 let gio_result = self.0.read(buf, None);
320 to_std_io_result(gio_result)
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use glib::*;
327 use std::io::Read;
328 use test_util::run_async;
329 use *;
330
331 #[test]
332 #[cfg(feature = "v2_44")]
333 fn read_all_async() {
334 let ret = run_async(|tx, l| {
335 let b = Bytes::from_owned(vec![1, 2, 3]);
336 let strm = MemoryInputStream::new_from_bytes(&b);
337
338 let buf = vec![0; 10];
339 strm.read_all_async(buf, PRIORITY_DEFAULT_IDLE, None, move |ret| {
340 tx.send(ret).unwrap();
341 l.quit();
342 });
343 });
344
345 let (buf, count, err) = ret.unwrap();
346 assert_eq!(count, 3);
347 assert!(err.is_none());
348 assert_eq!(buf[0], 1);
349 assert_eq!(buf[1], 2);
350 assert_eq!(buf[2], 3);
351 }
352
353 #[test]
354 fn read_all() {
355 let b = Bytes::from_owned(vec![1, 2, 3]);
356 let strm = MemoryInputStream::new_from_bytes(&b);
357 let mut buf = vec![0; 10];
358
359 let ret = strm.read_all(&mut buf, None).unwrap();
360
361 assert_eq!(ret.0, 3);
362 assert!(ret.1.is_none());
363 assert_eq!(buf[0], 1);
364 assert_eq!(buf[1], 2);
365 assert_eq!(buf[2], 3);
366 }
367
368 #[test]
369 fn read() {
370 let b = Bytes::from_owned(vec![1, 2, 3]);
371 let strm = MemoryInputStream::new_from_bytes(&b);
372 let mut buf = vec![0; 10];
373
374 let ret = strm.read(&mut buf, None);
375
376 assert_eq!(ret.unwrap(), 3);
377 assert_eq!(buf[0], 1);
378 assert_eq!(buf[1], 2);
379 assert_eq!(buf[2], 3);
380 }
381
382 #[test]
383 fn read_async() {
384 let ret = run_async(|tx, l| {
385 let b = Bytes::from_owned(vec![1, 2, 3]);
386 let strm = MemoryInputStream::new_from_bytes(&b);
387
388 let buf = vec![0; 10];
389 strm.read_async(buf, PRIORITY_DEFAULT_IDLE, None, move |ret| {
390 tx.send(ret).unwrap();
391 l.quit();
392 });
393 });
394
395 let (buf, count) = ret.unwrap();
396 assert_eq!(count, 3);
397 assert_eq!(buf[0], 1);
398 assert_eq!(buf[1], 2);
399 assert_eq!(buf[2], 3);
400 }
401
402 #[test]
403 fn read_bytes_async() {
404 let ret = run_async(|tx, l| {
405 let b = Bytes::from_owned(vec![1, 2, 3]);
406 let strm = MemoryInputStream::new_from_bytes(&b);
407
408 strm.read_bytes_async(10, PRIORITY_DEFAULT_IDLE, ::NONE_CANCELLABLE, move |ret| {
409 tx.send(ret).unwrap();
410 l.quit();
411 });
412 });
413
414 let bytes = ret.unwrap();
415 assert_eq!(bytes, vec![1, 2, 3]);
416 }
417
418 #[test]
419 fn skip_async() {
420 let ret = run_async(|tx, l| {
421 let b = Bytes::from_owned(vec![1, 2, 3]);
422 let strm = MemoryInputStream::new_from_bytes(&b);
423
424 strm.skip_async(10, PRIORITY_DEFAULT_IDLE, ::NONE_CANCELLABLE, move |ret| {
425 tx.send(ret).unwrap();
426 l.quit();
427 });
428 });
429
430 let skipped = ret.unwrap();
431 assert_eq!(skipped, 3);
432 }
433
434 #[test]
435 fn std_io_read() {
436 let b = Bytes::from_owned(vec![1, 2, 3]);
437 let mut read = MemoryInputStream::new_from_bytes(&b).into_read();
438 let mut buf = [0u8; 10];
439
440 let ret = read.read(&mut buf);
441
442 assert_eq!(ret.unwrap(), 3);
443 assert_eq!(buf[0], 1);
444 assert_eq!(buf[1], 2);
445 assert_eq!(buf[2], 3);
446 }
447
448 #[test]
449 fn into_input_stream() {
450 let b = Bytes::from_owned(vec![1, 2, 3]);
451 let stream = MemoryInputStream::new_from_bytes(&b);
452 let stream_clone = stream.clone();
453 let stream = stream.into_read().into_input_stream();
454
455 assert_eq!(stream, stream_clone);
456 }
457}