1use error::to_std_io_result;
6use gio_sys;
7use glib::object::IsA;
8use glib::translate::*;
9use glib::Priority;
10use glib_sys;
11use gobject_sys;
12use std::io;
13use std::mem;
14use std::ptr;
15use Cancellable;
16use Error;
17use OutputStream;
18use OutputStreamExt;
19
20#[cfg(feature = "futures")]
21use futures::future;
22
23pub trait OutputStreamExtManual: Sized + OutputStreamExt {
24 fn write_async<
25 B: AsRef<[u8]> + Send + 'static,
26 Q: FnOnce(Result<(B, usize), (B, Error)>) + Send + 'static,
27 >(
28 &self,
29 buffer: B,
30 io_priority: Priority,
31 cancellable: Option<&Cancellable>,
32 callback: Q,
33 );
34
35 fn write_all(
36 &self,
37 buffer: &[u8],
38 cancellable: Option<&Cancellable>,
39 ) -> Result<(usize, Option<Error>), Error>;
40
41 #[cfg(any(feature = "v2_44", feature = "dox"))]
42 fn write_all_async<
43 B: AsRef<[u8]> + Send + 'static,
44 Q: FnOnce(Result<(B, usize, Option<Error>), (B, Error)>) + Send + 'static,
45 >(
46 &self,
47 buffer: B,
48 io_priority: Priority,
49 cancellable: Option<&Cancellable>,
50 callback: Q,
51 );
52
53 #[cfg(feature = "futures")]
54 fn write_async_future<'a, B: AsRef<[u8]> + Send + 'static>(
55 &self,
56 buffer: B,
57 io_priority: Priority,
58 ) -> Box<dyn future::Future<Output = Result<(B, usize), (B, Error)>> + std::marker::Unpin>;
59
60 #[cfg(feature = "futures")]
61 #[cfg(any(feature = "v2_44", feature = "dox"))]
62 fn write_all_async_future<'a, B: AsRef<[u8]> + Send + 'static>(
63 &self,
64 buffer: B,
65 io_priority: Priority,
66 ) -> Box<
67 dyn future::Future<Output = Result<(B, usize, Option<Error>), (B, Error)>>
68 + std::marker::Unpin,
69 >;
70
71 fn into_write(self) -> OutputStreamWrite<Self> {
72 OutputStreamWrite(self)
73 }
74}
75
76impl<O: IsA<OutputStream>> OutputStreamExtManual for O {
77 fn write_async<
78 B: AsRef<[u8]> + Send + 'static,
79 Q: FnOnce(Result<(B, usize), (B, Error)>) + Send + 'static,
80 >(
81 &self,
82 buffer: B,
83 io_priority: Priority,
84 cancellable: Option<&Cancellable>,
85 callback: Q,
86 ) {
87 let cancellable = cancellable.to_glib_none();
88 let user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer)));
89 let (count, buffer_ptr) = {
91 let buffer = &(*user_data).as_ref().unwrap().1;
92 let slice = buffer.as_ref();
93 (slice.len(), slice.as_ptr())
94 };
95 unsafe extern "C" fn write_async_trampoline<
96 B: AsRef<[u8]> + Send + 'static,
97 Q: FnOnce(Result<(B, usize), (B, Error)>) + Send + 'static,
98 >(
99 _source_object: *mut gobject_sys::GObject,
100 res: *mut gio_sys::GAsyncResult,
101 user_data: glib_sys::gpointer,
102 ) {
103 let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _);
104 let (callback, buffer) = user_data.take().unwrap();
105
106 let mut error = ptr::null_mut();
107 let ret =
108 gio_sys::g_output_stream_write_finish(_source_object as *mut _, res, &mut error);
109 let result = if error.is_null() {
110 Ok((buffer, ret as usize))
111 } else {
112 Err((buffer, from_glib_full(error)))
113 };
114 callback(result);
115 }
116 let callback = write_async_trampoline::<B, Q>;
117 unsafe {
118 gio_sys::g_output_stream_write_async(
119 self.as_ref().to_glib_none().0,
120 mut_override(buffer_ptr),
121 count,
122 io_priority.to_glib(),
123 cancellable.0,
124 Some(callback),
125 Box::into_raw(user_data) as *mut _,
126 );
127 }
128 }
129
130 fn write_all(
131 &self,
132 buffer: &[u8],
133 cancellable: Option<&Cancellable>,
134 ) -> Result<(usize, Option<Error>), Error> {
135 let cancellable = cancellable.to_glib_none();
136 let count = buffer.len() as usize;
137 unsafe {
138 let mut bytes_written = mem::uninitialized();
139 let mut error = ptr::null_mut();
140 let _ = gio_sys::g_output_stream_write_all(
141 self.as_ref().to_glib_none().0,
142 buffer.to_glib_none().0,
143 count,
144 &mut bytes_written,
145 cancellable.0,
146 &mut error,
147 );
148
149 if error.is_null() {
150 Ok((bytes_written, None))
151 } else if bytes_written != 0 {
152 Ok((bytes_written, Some(from_glib_full(error))))
153 } else {
154 Err(from_glib_full(error))
155 }
156 }
157 }
158
159 #[cfg(any(feature = "v2_44", feature = "dox"))]
160 fn write_all_async<
161 'a,
162 B: AsRef<[u8]> + Send + 'static,
163 Q: FnOnce(Result<(B, usize, Option<Error>), (B, Error)>) + Send + 'static,
164 >(
165 &self,
166 buffer: B,
167 io_priority: Priority,
168 cancellable: Option<&Cancellable>,
169 callback: Q,
170 ) {
171 let cancellable = cancellable.to_glib_none();
172 let user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer)));
173 let (count, buffer_ptr) = {
175 let buffer = &(*user_data).as_ref().unwrap().1;
176 let slice = buffer.as_ref();
177 (slice.len(), slice.as_ptr())
178 };
179 unsafe extern "C" fn write_all_async_trampoline<
180 B: AsRef<[u8]> + Send + 'static,
181 Q: FnOnce(Result<(B, usize, Option<Error>), (B, Error)>) + Send + 'static,
182 >(
183 _source_object: *mut gobject_sys::GObject,
184 res: *mut gio_sys::GAsyncResult,
185 user_data: glib_sys::gpointer,
186 ) {
187 let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _);
188 let (callback, buffer) = user_data.take().unwrap();
189
190 let mut error = ptr::null_mut();
191 let mut bytes_written = mem::uninitialized();
192 let _ = gio_sys::g_output_stream_write_all_finish(
193 _source_object as *mut _,
194 res,
195 &mut bytes_written,
196 &mut error,
197 );
198 let result = if error.is_null() {
199 Ok((buffer, bytes_written, None))
200 } else if bytes_written != 0 {
201 Ok((buffer, bytes_written, from_glib_full(error)))
202 } else {
203 Err((buffer, from_glib_full(error)))
204 };
205 callback(result);
206 }
207 let callback = write_all_async_trampoline::<B, Q>;
208 unsafe {
209 gio_sys::g_output_stream_write_all_async(
210 self.as_ref().to_glib_none().0,
211 mut_override(buffer_ptr),
212 count,
213 io_priority.to_glib(),
214 cancellable.0,
215 Some(callback),
216 Box::into_raw(user_data) as *mut _,
217 );
218 }
219 }
220
221 #[cfg(feature = "futures")]
222 fn write_async_future<'a, B: AsRef<[u8]> + Send + 'static>(
223 &self,
224 buffer: B,
225 io_priority: Priority,
226 ) -> Box<dyn future::Future<Output = Result<(B, usize), (B, Error)>> + std::marker::Unpin> {
227 use GioFuture;
228
229 GioFuture::new(self, move |obj, send| {
230 use fragile::Fragile;
231
232 let cancellable = Cancellable::new();
233 let send = Fragile::new(send);
234 obj.write_async(buffer, io_priority, Some(&cancellable), move |res| {
235 let _ = send.into_inner().send(res);
236 });
237
238 cancellable
239 })
240 }
241
242 #[cfg(feature = "futures")]
243 #[cfg(any(feature = "v2_44", feature = "dox"))]
244 fn write_all_async_future<'a, B: AsRef<[u8]> + Send + 'static>(
245 &self,
246 buffer: B,
247 io_priority: Priority,
248 ) -> Box<
249 dyn future::Future<Output = Result<(B, usize, Option<Error>), (B, Error)>>
250 + std::marker::Unpin,
251 > {
252 use GioFuture;
253
254 GioFuture::new(self, move |obj, send| {
255 use fragile::Fragile;
256
257 let cancellable = Cancellable::new();
258 let send = Fragile::new(send);
259 obj.write_all_async(buffer, io_priority, Some(&cancellable), move |res| {
260 let _ = send.into_inner().send(res);
261 });
262
263 cancellable
264 })
265 }
266}
267
268#[derive(Debug, PartialEq, Eq, Hash, Clone)]
269pub struct OutputStreamWrite<T: OutputStreamExt>(T);
270
271impl<T: OutputStreamExt> OutputStreamWrite<T> {
272 pub fn into_output_stream(self) -> T {
273 self.0
274 }
275
276 pub fn output_stream(&self) -> &T {
277 &self.0
278 }
279}
280
281impl<T: OutputStreamExt> io::Write for OutputStreamWrite<T> {
282 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
283 let result = self
284 .0
285 .write(buf, ::NONE_CANCELLABLE)
286 .map(|size| size as usize);
287 to_std_io_result(result)
288 }
289
290 fn flush(&mut self) -> io::Result<()> {
291 let gio_result = self.0.flush(::NONE_CANCELLABLE);
292 to_std_io_result(gio_result)
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use glib::*;
299 use std::io::Write;
300 use test_util::run_async;
301 use *;
302
303 #[test]
304 fn splice_async() {
305 let ret = run_async(|tx, l| {
306 let input = MemoryInputStream::new();
307 let b = Bytes::from_owned(vec![1, 2, 3]);
308 input.add_bytes(&b);
309
310 let strm = MemoryOutputStream::new_resizable();
311 strm.splice_async(
312 &input,
313 OutputStreamSpliceFlags::CLOSE_SOURCE,
314 PRIORITY_DEFAULT_IDLE,
315 ::NONE_CANCELLABLE,
316 move |ret| {
317 tx.send(ret).unwrap();
318 l.quit();
319 },
320 );
321 });
322
323 assert_eq!(ret.unwrap(), 3);
324 }
325
326 #[test]
327 fn write_async() {
328 let ret = run_async(|tx, l| {
329 let strm = MemoryOutputStream::new_resizable();
330
331 let buf = vec![1, 2, 3];
332 strm.write_async(buf, PRIORITY_DEFAULT_IDLE, ::NONE_CANCELLABLE, move |ret| {
333 tx.send(ret).unwrap();
334 l.quit();
335 });
336 });
337
338 let (buf, size) = ret.unwrap();
339 assert_eq!(buf, vec![1, 2, 3]);
340 assert_eq!(size, 3);
341 }
342
343 #[test]
344 #[cfg(any(feature = "v2_44", feature = "dox"))]
345 fn write_all_async() {
346 let ret = run_async(|tx, l| {
347 let strm = MemoryOutputStream::new_resizable();
348
349 let buf = vec![1, 2, 3];
350 strm.write_all_async(buf, PRIORITY_DEFAULT_IDLE, ::NONE_CANCELLABLE, move |ret| {
351 tx.send(ret).unwrap();
352 l.quit();
353 });
354 });
355
356 let (buf, size, err) = ret.unwrap();
357 assert_eq!(buf, vec![1, 2, 3]);
358 assert_eq!(size, 3);
359 assert!(err.is_none());
360 }
361
362 #[test]
363 fn write_bytes_async() {
364 let ret = run_async(|tx, l| {
365 let strm = MemoryOutputStream::new_resizable();
366
367 let b = Bytes::from_owned(vec![1, 2, 3]);
368 strm.write_bytes_async(&b, PRIORITY_DEFAULT_IDLE, ::NONE_CANCELLABLE, move |ret| {
369 tx.send(ret).unwrap();
370 l.quit();
371 });
372 });
373
374 assert_eq!(ret.unwrap(), 3);
375 }
376
377 #[test]
378 fn std_io_write() {
379 let b = Bytes::from_owned(vec![1, 2, 3]);
380 let mut write = MemoryOutputStream::new_resizable().into_write();
381
382 let ret = write.write(&b);
383
384 let stream = write.into_output_stream();
385 stream.close(::NONE_CANCELLABLE).unwrap();
386 assert_eq!(ret.unwrap(), 3);
387 assert_eq!(stream.steal_as_bytes().unwrap(), [1, 2, 3].as_ref());
388 }
389
390 #[test]
391 fn into_output_stream() {
392 let stream = MemoryOutputStream::new_resizable();
393 let stream_clone = stream.clone();
394 let stream = stream.into_write().into_output_stream();
395
396 assert_eq!(stream, stream_clone);
397 }
398}