gio/
output_stream.rs

1// Copyright 2013-2017, The Gtk-rs Project Developers.
2// See the COPYRIGHT file at the top-level directory of this distribution.
3// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>
4
5use error::to_std_io_result;
6use gio_sys;
7use glib::object::IsA;
8use glib::translate::*;
9use glib::Priority;
10use glib_sys;
11use gobject_sys;
12use std::io;
13use std::mem;
14use std::ptr;
15use Cancellable;
16use Error;
17use OutputStream;
18use OutputStreamExt;
19
20#[cfg(feature = "futures")]
21use futures::future;
22
23pub trait OutputStreamExtManual: Sized + OutputStreamExt {
24    fn write_async<
25        B: AsRef<[u8]> + Send + 'static,
26        Q: FnOnce(Result<(B, usize), (B, Error)>) + Send + 'static,
27    >(
28        &self,
29        buffer: B,
30        io_priority: Priority,
31        cancellable: Option<&Cancellable>,
32        callback: Q,
33    );
34
35    fn write_all(
36        &self,
37        buffer: &[u8],
38        cancellable: Option<&Cancellable>,
39    ) -> Result<(usize, Option<Error>), Error>;
40
41    #[cfg(any(feature = "v2_44", feature = "dox"))]
42    fn write_all_async<
43        B: AsRef<[u8]> + Send + 'static,
44        Q: FnOnce(Result<(B, usize, Option<Error>), (B, Error)>) + Send + 'static,
45    >(
46        &self,
47        buffer: B,
48        io_priority: Priority,
49        cancellable: Option<&Cancellable>,
50        callback: Q,
51    );
52
53    #[cfg(feature = "futures")]
54    fn write_async_future<'a, B: AsRef<[u8]> + Send + 'static>(
55        &self,
56        buffer: B,
57        io_priority: Priority,
58    ) -> Box<dyn future::Future<Output = Result<(B, usize), (B, Error)>> + std::marker::Unpin>;
59
60    #[cfg(feature = "futures")]
61    #[cfg(any(feature = "v2_44", feature = "dox"))]
62    fn write_all_async_future<'a, B: AsRef<[u8]> + Send + 'static>(
63        &self,
64        buffer: B,
65        io_priority: Priority,
66    ) -> Box<
67        dyn future::Future<Output = Result<(B, usize, Option<Error>), (B, Error)>>
68            + std::marker::Unpin,
69    >;
70
71    fn into_write(self) -> OutputStreamWrite<Self> {
72        OutputStreamWrite(self)
73    }
74}
75
76impl<O: IsA<OutputStream>> OutputStreamExtManual for O {
77    fn write_async<
78        B: AsRef<[u8]> + Send + 'static,
79        Q: FnOnce(Result<(B, usize), (B, Error)>) + Send + 'static,
80    >(
81        &self,
82        buffer: B,
83        io_priority: Priority,
84        cancellable: Option<&Cancellable>,
85        callback: Q,
86    ) {
87        let cancellable = cancellable.to_glib_none();
88        let user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer)));
89        // Need to do this after boxing as the contents pointer might change by moving into the box
90        let (count, buffer_ptr) = {
91            let buffer = &(*user_data).as_ref().unwrap().1;
92            let slice = buffer.as_ref();
93            (slice.len(), slice.as_ptr())
94        };
95        unsafe extern "C" fn write_async_trampoline<
96            B: AsRef<[u8]> + Send + 'static,
97            Q: FnOnce(Result<(B, usize), (B, Error)>) + Send + 'static,
98        >(
99            _source_object: *mut gobject_sys::GObject,
100            res: *mut gio_sys::GAsyncResult,
101            user_data: glib_sys::gpointer,
102        ) {
103            let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _);
104            let (callback, buffer) = user_data.take().unwrap();
105
106            let mut error = ptr::null_mut();
107            let ret =
108                gio_sys::g_output_stream_write_finish(_source_object as *mut _, res, &mut error);
109            let result = if error.is_null() {
110                Ok((buffer, ret as usize))
111            } else {
112                Err((buffer, from_glib_full(error)))
113            };
114            callback(result);
115        }
116        let callback = write_async_trampoline::<B, Q>;
117        unsafe {
118            gio_sys::g_output_stream_write_async(
119                self.as_ref().to_glib_none().0,
120                mut_override(buffer_ptr),
121                count,
122                io_priority.to_glib(),
123                cancellable.0,
124                Some(callback),
125                Box::into_raw(user_data) as *mut _,
126            );
127        }
128    }
129
130    fn write_all(
131        &self,
132        buffer: &[u8],
133        cancellable: Option<&Cancellable>,
134    ) -> Result<(usize, Option<Error>), Error> {
135        let cancellable = cancellable.to_glib_none();
136        let count = buffer.len() as usize;
137        unsafe {
138            let mut bytes_written = mem::uninitialized();
139            let mut error = ptr::null_mut();
140            let _ = gio_sys::g_output_stream_write_all(
141                self.as_ref().to_glib_none().0,
142                buffer.to_glib_none().0,
143                count,
144                &mut bytes_written,
145                cancellable.0,
146                &mut error,
147            );
148
149            if error.is_null() {
150                Ok((bytes_written, None))
151            } else if bytes_written != 0 {
152                Ok((bytes_written, Some(from_glib_full(error))))
153            } else {
154                Err(from_glib_full(error))
155            }
156        }
157    }
158
159    #[cfg(any(feature = "v2_44", feature = "dox"))]
160    fn write_all_async<
161        'a,
162        B: AsRef<[u8]> + Send + 'static,
163        Q: FnOnce(Result<(B, usize, Option<Error>), (B, Error)>) + Send + 'static,
164    >(
165        &self,
166        buffer: B,
167        io_priority: Priority,
168        cancellable: Option<&Cancellable>,
169        callback: Q,
170    ) {
171        let cancellable = cancellable.to_glib_none();
172        let user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer)));
173        // Need to do this after boxing as the contents pointer might change by moving into the box
174        let (count, buffer_ptr) = {
175            let buffer = &(*user_data).as_ref().unwrap().1;
176            let slice = buffer.as_ref();
177            (slice.len(), slice.as_ptr())
178        };
179        unsafe extern "C" fn write_all_async_trampoline<
180            B: AsRef<[u8]> + Send + 'static,
181            Q: FnOnce(Result<(B, usize, Option<Error>), (B, Error)>) + Send + 'static,
182        >(
183            _source_object: *mut gobject_sys::GObject,
184            res: *mut gio_sys::GAsyncResult,
185            user_data: glib_sys::gpointer,
186        ) {
187            let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _);
188            let (callback, buffer) = user_data.take().unwrap();
189
190            let mut error = ptr::null_mut();
191            let mut bytes_written = mem::uninitialized();
192            let _ = gio_sys::g_output_stream_write_all_finish(
193                _source_object as *mut _,
194                res,
195                &mut bytes_written,
196                &mut error,
197            );
198            let result = if error.is_null() {
199                Ok((buffer, bytes_written, None))
200            } else if bytes_written != 0 {
201                Ok((buffer, bytes_written, from_glib_full(error)))
202            } else {
203                Err((buffer, from_glib_full(error)))
204            };
205            callback(result);
206        }
207        let callback = write_all_async_trampoline::<B, Q>;
208        unsafe {
209            gio_sys::g_output_stream_write_all_async(
210                self.as_ref().to_glib_none().0,
211                mut_override(buffer_ptr),
212                count,
213                io_priority.to_glib(),
214                cancellable.0,
215                Some(callback),
216                Box::into_raw(user_data) as *mut _,
217            );
218        }
219    }
220
221    #[cfg(feature = "futures")]
222    fn write_async_future<'a, B: AsRef<[u8]> + Send + 'static>(
223        &self,
224        buffer: B,
225        io_priority: Priority,
226    ) -> Box<dyn future::Future<Output = Result<(B, usize), (B, Error)>> + std::marker::Unpin> {
227        use GioFuture;
228
229        GioFuture::new(self, move |obj, send| {
230            use fragile::Fragile;
231
232            let cancellable = Cancellable::new();
233            let send = Fragile::new(send);
234            obj.write_async(buffer, io_priority, Some(&cancellable), move |res| {
235                let _ = send.into_inner().send(res);
236            });
237
238            cancellable
239        })
240    }
241
242    #[cfg(feature = "futures")]
243    #[cfg(any(feature = "v2_44", feature = "dox"))]
244    fn write_all_async_future<'a, B: AsRef<[u8]> + Send + 'static>(
245        &self,
246        buffer: B,
247        io_priority: Priority,
248    ) -> Box<
249        dyn future::Future<Output = Result<(B, usize, Option<Error>), (B, Error)>>
250            + std::marker::Unpin,
251    > {
252        use GioFuture;
253
254        GioFuture::new(self, move |obj, send| {
255            use fragile::Fragile;
256
257            let cancellable = Cancellable::new();
258            let send = Fragile::new(send);
259            obj.write_all_async(buffer, io_priority, Some(&cancellable), move |res| {
260                let _ = send.into_inner().send(res);
261            });
262
263            cancellable
264        })
265    }
266}
267
268#[derive(Debug, PartialEq, Eq, Hash, Clone)]
269pub struct OutputStreamWrite<T: OutputStreamExt>(T);
270
271impl<T: OutputStreamExt> OutputStreamWrite<T> {
272    pub fn into_output_stream(self) -> T {
273        self.0
274    }
275
276    pub fn output_stream(&self) -> &T {
277        &self.0
278    }
279}
280
281impl<T: OutputStreamExt> io::Write for OutputStreamWrite<T> {
282    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
283        let result = self
284            .0
285            .write(buf, ::NONE_CANCELLABLE)
286            .map(|size| size as usize);
287        to_std_io_result(result)
288    }
289
290    fn flush(&mut self) -> io::Result<()> {
291        let gio_result = self.0.flush(::NONE_CANCELLABLE);
292        to_std_io_result(gio_result)
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use glib::*;
299    use std::io::Write;
300    use test_util::run_async;
301    use *;
302
303    #[test]
304    fn splice_async() {
305        let ret = run_async(|tx, l| {
306            let input = MemoryInputStream::new();
307            let b = Bytes::from_owned(vec![1, 2, 3]);
308            input.add_bytes(&b);
309
310            let strm = MemoryOutputStream::new_resizable();
311            strm.splice_async(
312                &input,
313                OutputStreamSpliceFlags::CLOSE_SOURCE,
314                PRIORITY_DEFAULT_IDLE,
315                ::NONE_CANCELLABLE,
316                move |ret| {
317                    tx.send(ret).unwrap();
318                    l.quit();
319                },
320            );
321        });
322
323        assert_eq!(ret.unwrap(), 3);
324    }
325
326    #[test]
327    fn write_async() {
328        let ret = run_async(|tx, l| {
329            let strm = MemoryOutputStream::new_resizable();
330
331            let buf = vec![1, 2, 3];
332            strm.write_async(buf, PRIORITY_DEFAULT_IDLE, ::NONE_CANCELLABLE, move |ret| {
333                tx.send(ret).unwrap();
334                l.quit();
335            });
336        });
337
338        let (buf, size) = ret.unwrap();
339        assert_eq!(buf, vec![1, 2, 3]);
340        assert_eq!(size, 3);
341    }
342
343    #[test]
344    #[cfg(any(feature = "v2_44", feature = "dox"))]
345    fn write_all_async() {
346        let ret = run_async(|tx, l| {
347            let strm = MemoryOutputStream::new_resizable();
348
349            let buf = vec![1, 2, 3];
350            strm.write_all_async(buf, PRIORITY_DEFAULT_IDLE, ::NONE_CANCELLABLE, move |ret| {
351                tx.send(ret).unwrap();
352                l.quit();
353            });
354        });
355
356        let (buf, size, err) = ret.unwrap();
357        assert_eq!(buf, vec![1, 2, 3]);
358        assert_eq!(size, 3);
359        assert!(err.is_none());
360    }
361
362    #[test]
363    fn write_bytes_async() {
364        let ret = run_async(|tx, l| {
365            let strm = MemoryOutputStream::new_resizable();
366
367            let b = Bytes::from_owned(vec![1, 2, 3]);
368            strm.write_bytes_async(&b, PRIORITY_DEFAULT_IDLE, ::NONE_CANCELLABLE, move |ret| {
369                tx.send(ret).unwrap();
370                l.quit();
371            });
372        });
373
374        assert_eq!(ret.unwrap(), 3);
375    }
376
377    #[test]
378    fn std_io_write() {
379        let b = Bytes::from_owned(vec![1, 2, 3]);
380        let mut write = MemoryOutputStream::new_resizable().into_write();
381
382        let ret = write.write(&b);
383
384        let stream = write.into_output_stream();
385        stream.close(::NONE_CANCELLABLE).unwrap();
386        assert_eq!(ret.unwrap(), 3);
387        assert_eq!(stream.steal_as_bytes().unwrap(), [1, 2, 3].as_ref());
388    }
389
390    #[test]
391    fn into_output_stream() {
392        let stream = MemoryOutputStream::new_resizable();
393        let stream_clone = stream.clone();
394        let stream = stream.into_write().into_output_stream();
395
396        assert_eq!(stream, stream_clone);
397    }
398}