gio/
input_stream.rs

1// Copyright 2013-2017, The Gtk-rs Project Developers.
2// See the COPYRIGHT file at the top-level directory of this distribution.
3// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>
4
5use error::to_std_io_result;
6use gio_sys;
7use glib::object::IsA;
8use glib::translate::*;
9use glib::Priority;
10use glib_sys;
11use gobject_sys;
12use std::io;
13use std::mem;
14use std::ptr;
15use Cancellable;
16use Error;
17use InputStream;
18
19#[cfg(feature = "futures")]
20use futures::future;
21
22pub trait InputStreamExtManual: Sized {
23    fn read<B: AsMut<[u8]>>(
24        &self,
25        buffer: B,
26        cancellable: Option<&Cancellable>,
27    ) -> Result<usize, Error>;
28
29    fn read_all<B: AsMut<[u8]>>(
30        &self,
31        buffer: B,
32        cancellable: Option<&Cancellable>,
33    ) -> Result<(usize, Option<Error>), Error>;
34
35    #[cfg(any(feature = "v2_44", feature = "dox"))]
36    fn read_all_async<
37        B: AsMut<[u8]> + Send + 'static,
38        Q: FnOnce(Result<(B, usize, Option<Error>), (B, Error)>) + Send + 'static,
39    >(
40        &self,
41        buffer: B,
42        io_priority: Priority,
43        cancellable: Option<&Cancellable>,
44        callback: Q,
45    );
46
47    fn read_async<
48        B: AsMut<[u8]> + Send + 'static,
49        Q: FnOnce(Result<(B, usize), (B, Error)>) + Send + 'static,
50    >(
51        &self,
52        buffer: B,
53        io_priority: Priority,
54        cancellable: Option<&Cancellable>,
55        callback: Q,
56    );
57
58    #[cfg(feature = "futures")]
59    #[cfg(any(feature = "v2_44", feature = "dox"))]
60    fn read_all_async_future<'a, B: AsMut<[u8]> + Send + 'static>(
61        &self,
62        buffer: B,
63        io_priority: Priority,
64    ) -> Box<
65        dyn future::Future<Output = Result<(B, usize, Option<Error>), (B, Error)>>
66            + std::marker::Unpin,
67    >;
68
69    #[cfg(feature = "futures")]
70    fn read_async_future<'a, B: AsMut<[u8]> + Send + 'static>(
71        &self,
72        buffer: B,
73        io_priority: Priority,
74    ) -> Box<dyn future::Future<Output = Result<(B, usize), (B, Error)>> + std::marker::Unpin>;
75
76    fn into_read(self) -> InputStreamRead<Self> {
77        InputStreamRead(self)
78    }
79}
80
81impl<O: IsA<InputStream>> InputStreamExtManual for O {
82    fn read<B: AsMut<[u8]>>(
83        &self,
84        mut buffer: B,
85        cancellable: Option<&Cancellable>,
86    ) -> Result<usize, Error> {
87        let cancellable = cancellable.to_glib_none();
88        let buffer = buffer.as_mut();
89        let buffer_ptr = buffer.as_mut_ptr();
90        let count = buffer.len();
91        unsafe {
92            let mut error = ptr::null_mut();
93            let ret = gio_sys::g_input_stream_read(
94                self.as_ref().to_glib_none().0,
95                buffer_ptr,
96                count,
97                cancellable.0,
98                &mut error,
99            );
100            if error.is_null() {
101                Ok(ret as usize)
102            } else {
103                Err(from_glib_full(error))
104            }
105        }
106    }
107
108    fn read_all<B: AsMut<[u8]>>(
109        &self,
110        mut buffer: B,
111        cancellable: Option<&Cancellable>,
112    ) -> Result<(usize, Option<Error>), Error> {
113        let cancellable = cancellable.to_glib_none();
114        let buffer = buffer.as_mut();
115        let buffer_ptr = buffer.as_mut_ptr();
116        let count = buffer.len();
117        unsafe {
118            let mut bytes_read = mem::uninitialized();
119            let mut error = ptr::null_mut();
120            let _ = gio_sys::g_input_stream_read_all(
121                self.as_ref().to_glib_none().0,
122                buffer_ptr,
123                count,
124                &mut bytes_read,
125                cancellable.0,
126                &mut error,
127            );
128
129            if error.is_null() {
130                Ok((bytes_read, None))
131            } else if bytes_read != 0 {
132                Ok((bytes_read, Some(from_glib_full(error))))
133            } else {
134                Err(from_glib_full(error))
135            }
136        }
137    }
138
139    #[cfg(any(feature = "v2_44", feature = "dox"))]
140    fn read_all_async<
141        B: AsMut<[u8]> + Send + 'static,
142        Q: FnOnce(Result<(B, usize, Option<Error>), (B, Error)>) + Send + 'static,
143    >(
144        &self,
145        buffer: B,
146        io_priority: Priority,
147        cancellable: Option<&Cancellable>,
148        callback: Q,
149    ) {
150        let cancellable = cancellable.to_glib_none();
151        let mut user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer)));
152        // Need to do this after boxing as the contents pointer might change by moving into the box
153        let (count, buffer_ptr) = {
154            let buffer = &mut (*user_data).as_mut().unwrap().1;
155            let slice = (*buffer).as_mut();
156            (slice.len(), slice.as_mut_ptr())
157        };
158        unsafe extern "C" fn read_all_async_trampoline<
159            B: AsMut<[u8]> + Send + 'static,
160            Q: FnOnce(Result<(B, usize, Option<Error>), (B, Error)>) + Send + 'static,
161        >(
162            _source_object: *mut gobject_sys::GObject,
163            res: *mut gio_sys::GAsyncResult,
164            user_data: glib_sys::gpointer,
165        ) {
166            let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _);
167            let (callback, buffer) = user_data.take().unwrap();
168
169            let mut error = ptr::null_mut();
170            let mut bytes_read = mem::uninitialized();
171            let _ = gio_sys::g_input_stream_read_all_finish(
172                _source_object as *mut _,
173                res,
174                &mut bytes_read,
175                &mut error,
176            );
177
178            let result = if error.is_null() {
179                Ok((buffer, bytes_read, None))
180            } else if bytes_read != 0 {
181                Ok((buffer, bytes_read, Some(from_glib_full(error))))
182            } else {
183                Err((buffer, from_glib_full(error)))
184            };
185
186            callback(result);
187        }
188        let callback = read_all_async_trampoline::<B, Q>;
189        unsafe {
190            gio_sys::g_input_stream_read_all_async(
191                self.as_ref().to_glib_none().0,
192                buffer_ptr,
193                count,
194                io_priority.to_glib(),
195                cancellable.0,
196                Some(callback),
197                Box::into_raw(user_data) as *mut _,
198            );
199        }
200    }
201
202    fn read_async<
203        B: AsMut<[u8]> + Send + 'static,
204        Q: FnOnce(Result<(B, usize), (B, Error)>) + Send + 'static,
205    >(
206        &self,
207        buffer: B,
208        io_priority: Priority,
209        cancellable: Option<&Cancellable>,
210        callback: Q,
211    ) {
212        let cancellable = cancellable.to_glib_none();
213        let mut user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer)));
214        // Need to do this after boxing as the contents pointer might change by moving into the box
215        let (count, buffer_ptr) = {
216            let buffer = &mut (*user_data).as_mut().unwrap().1;
217            let slice = (*buffer).as_mut();
218            (slice.len(), slice.as_mut_ptr())
219        };
220        unsafe extern "C" fn read_async_trampoline<
221            B: AsMut<[u8]> + Send + 'static,
222            Q: FnOnce(Result<(B, usize), (B, Error)>) + Send + 'static,
223        >(
224            _source_object: *mut gobject_sys::GObject,
225            res: *mut gio_sys::GAsyncResult,
226            user_data: glib_sys::gpointer,
227        ) {
228            let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _);
229            let (callback, buffer) = user_data.take().unwrap();
230
231            let mut error = ptr::null_mut();
232            let ret =
233                gio_sys::g_input_stream_read_finish(_source_object as *mut _, res, &mut error);
234
235            let result = if error.is_null() {
236                Ok((buffer, ret as usize))
237            } else {
238                Err((buffer, from_glib_full(error)))
239            };
240
241            callback(result);
242        }
243        let callback = read_async_trampoline::<B, Q>;
244        unsafe {
245            gio_sys::g_input_stream_read_async(
246                self.as_ref().to_glib_none().0,
247                buffer_ptr,
248                count,
249                io_priority.to_glib(),
250                cancellable.0,
251                Some(callback),
252                Box::into_raw(user_data) as *mut _,
253            );
254        }
255    }
256
257    #[cfg(feature = "futures")]
258    #[cfg(any(feature = "v2_44", feature = "dox"))]
259    fn read_all_async_future<'a, B: AsMut<[u8]> + Send + 'static>(
260        &self,
261        buffer: B,
262        io_priority: Priority,
263    ) -> Box<
264        dyn future::Future<Output = Result<(B, usize, Option<Error>), (B, Error)>>
265            + std::marker::Unpin,
266    > {
267        use GioFuture;
268
269        GioFuture::new(self, move |obj, send| {
270            use fragile::Fragile;
271
272            let cancellable = Cancellable::new();
273            let send = Fragile::new(send);
274            obj.read_all_async(buffer, io_priority, Some(&cancellable), move |res| {
275                let _ = send.into_inner().send(res);
276            });
277
278            cancellable
279        })
280    }
281
282    #[cfg(feature = "futures")]
283    fn read_async_future<'a, B: AsMut<[u8]> + Send + 'static>(
284        &self,
285        buffer: B,
286        io_priority: Priority,
287    ) -> Box<dyn future::Future<Output = Result<(B, usize), (B, Error)>> + std::marker::Unpin> {
288        use GioFuture;
289
290        GioFuture::new(self, move |obj, send| {
291            use fragile::Fragile;
292
293            let cancellable = Cancellable::new();
294            let send = Fragile::new(send);
295            obj.read_async(buffer, io_priority, Some(&cancellable), move |res| {
296                let _ = send.into_inner().send(res);
297            });
298
299            cancellable
300        })
301    }
302}
303
304#[derive(Debug, PartialEq, Eq, Hash, Clone)]
305pub struct InputStreamRead<T: InputStreamExtManual>(T);
306
307impl<T: InputStreamExtManual> InputStreamRead<T> {
308    pub fn into_input_stream(self) -> T {
309        self.0
310    }
311
312    pub fn input_stream(&self) -> &T {
313        &self.0
314    }
315}
316
317impl<T: InputStreamExtManual> io::Read for InputStreamRead<T> {
318    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
319        let gio_result = self.0.read(buf, None);
320        to_std_io_result(gio_result)
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use glib::*;
327    use std::io::Read;
328    use test_util::run_async;
329    use *;
330
331    #[test]
332    #[cfg(feature = "v2_44")]
333    fn read_all_async() {
334        let ret = run_async(|tx, l| {
335            let b = Bytes::from_owned(vec![1, 2, 3]);
336            let strm = MemoryInputStream::new_from_bytes(&b);
337
338            let buf = vec![0; 10];
339            strm.read_all_async(buf, PRIORITY_DEFAULT_IDLE, None, move |ret| {
340                tx.send(ret).unwrap();
341                l.quit();
342            });
343        });
344
345        let (buf, count, err) = ret.unwrap();
346        assert_eq!(count, 3);
347        assert!(err.is_none());
348        assert_eq!(buf[0], 1);
349        assert_eq!(buf[1], 2);
350        assert_eq!(buf[2], 3);
351    }
352
353    #[test]
354    fn read_all() {
355        let b = Bytes::from_owned(vec![1, 2, 3]);
356        let strm = MemoryInputStream::new_from_bytes(&b);
357        let mut buf = vec![0; 10];
358
359        let ret = strm.read_all(&mut buf, None).unwrap();
360
361        assert_eq!(ret.0, 3);
362        assert!(ret.1.is_none());
363        assert_eq!(buf[0], 1);
364        assert_eq!(buf[1], 2);
365        assert_eq!(buf[2], 3);
366    }
367
368    #[test]
369    fn read() {
370        let b = Bytes::from_owned(vec![1, 2, 3]);
371        let strm = MemoryInputStream::new_from_bytes(&b);
372        let mut buf = vec![0; 10];
373
374        let ret = strm.read(&mut buf, None);
375
376        assert_eq!(ret.unwrap(), 3);
377        assert_eq!(buf[0], 1);
378        assert_eq!(buf[1], 2);
379        assert_eq!(buf[2], 3);
380    }
381
382    #[test]
383    fn read_async() {
384        let ret = run_async(|tx, l| {
385            let b = Bytes::from_owned(vec![1, 2, 3]);
386            let strm = MemoryInputStream::new_from_bytes(&b);
387
388            let buf = vec![0; 10];
389            strm.read_async(buf, PRIORITY_DEFAULT_IDLE, None, move |ret| {
390                tx.send(ret).unwrap();
391                l.quit();
392            });
393        });
394
395        let (buf, count) = ret.unwrap();
396        assert_eq!(count, 3);
397        assert_eq!(buf[0], 1);
398        assert_eq!(buf[1], 2);
399        assert_eq!(buf[2], 3);
400    }
401
402    #[test]
403    fn read_bytes_async() {
404        let ret = run_async(|tx, l| {
405            let b = Bytes::from_owned(vec![1, 2, 3]);
406            let strm = MemoryInputStream::new_from_bytes(&b);
407
408            strm.read_bytes_async(10, PRIORITY_DEFAULT_IDLE, ::NONE_CANCELLABLE, move |ret| {
409                tx.send(ret).unwrap();
410                l.quit();
411            });
412        });
413
414        let bytes = ret.unwrap();
415        assert_eq!(bytes, vec![1, 2, 3]);
416    }
417
418    #[test]
419    fn skip_async() {
420        let ret = run_async(|tx, l| {
421            let b = Bytes::from_owned(vec![1, 2, 3]);
422            let strm = MemoryInputStream::new_from_bytes(&b);
423
424            strm.skip_async(10, PRIORITY_DEFAULT_IDLE, ::NONE_CANCELLABLE, move |ret| {
425                tx.send(ret).unwrap();
426                l.quit();
427            });
428        });
429
430        let skipped = ret.unwrap();
431        assert_eq!(skipped, 3);
432    }
433
434    #[test]
435    fn std_io_read() {
436        let b = Bytes::from_owned(vec![1, 2, 3]);
437        let mut read = MemoryInputStream::new_from_bytes(&b).into_read();
438        let mut buf = [0u8; 10];
439
440        let ret = read.read(&mut buf);
441
442        assert_eq!(ret.unwrap(), 3);
443        assert_eq!(buf[0], 1);
444        assert_eq!(buf[1], 2);
445        assert_eq!(buf[2], 3);
446    }
447
448    #[test]
449    fn into_input_stream() {
450        let b = Bytes::from_owned(vec![1, 2, 3]);
451        let stream = MemoryInputStream::new_from_bytes(&b);
452        let stream_clone = stream.clone();
453        let stream = stream.into_read().into_input_stream();
454
455        assert_eq!(stream, stream_clone);
456    }
457}