gio/
pollable_input_stream.rs1use fragile::Fragile;
6use gio_sys;
7use glib;
8use glib::object::{Cast, IsA};
9use glib::translate::*;
10use glib_sys;
11use std::cell::RefCell;
12use std::mem::transmute;
13use std::ptr;
14use Cancellable;
15use Error;
16use PollableInputStream;
17
18#[cfg(feature = "futures")]
19use futures::future::Future;
20#[cfg(feature = "futures")]
21use futures::stream::Stream;
22
23pub trait PollableInputStreamExtManual: Sized {
24 fn create_source<F>(
25 &self,
26 cancellable: Option<&Cancellable>,
27 name: Option<&str>,
28 priority: glib::Priority,
29 func: F,
30 ) -> glib::Source
31 where
32 F: FnMut(&Self) -> glib::Continue + 'static;
33
34 #[cfg(feature = "futures")]
35 fn create_source_future(
36 &self,
37 cancellable: Option<&Cancellable>,
38 priority: glib::Priority,
39 ) -> Box<dyn Future<Output = ()> + std::marker::Unpin>;
40
41 #[cfg(feature = "futures")]
42 fn create_source_stream(
43 &self,
44 cancellable: Option<&Cancellable>,
45 priority: glib::Priority,
46 ) -> Box<dyn Stream<Item = ()> + std::marker::Unpin>;
47
48 fn read_nonblocking(
49 &self,
50 buffer: &mut [u8],
51 cancellable: Option<&Cancellable>,
52 ) -> Result<isize, Error>;
53}
54
55impl<O: IsA<PollableInputStream>> PollableInputStreamExtManual for O {
56 fn create_source<F>(
57 &self,
58 cancellable: Option<&Cancellable>,
59 name: Option<&str>,
60 priority: glib::Priority,
61 func: F,
62 ) -> glib::Source
63 where
64 F: FnMut(&Self) -> glib::Continue + 'static,
65 {
66 #[cfg_attr(feature = "cargo-clippy", allow(transmute_ptr_to_ref))]
67 unsafe extern "C" fn trampoline<O: IsA<PollableInputStream>>(
68 stream: *mut gio_sys::GPollableInputStream,
69 func: glib_sys::gpointer,
70 ) -> glib_sys::gboolean {
71 let func: &Fragile<RefCell<Box<dyn FnMut(&O) -> glib::Continue + 'static>>> =
72 transmute(func);
73 let func = func.get();
74 let mut func = func.borrow_mut();
75 (&mut *func)(&PollableInputStream::from_glib_borrow(stream).unsafe_cast()).to_glib()
76 }
77 unsafe extern "C" fn destroy_closure<O>(ptr: glib_sys::gpointer) {
78 Box::<Fragile<RefCell<Box<dyn FnMut(&O) -> glib::Continue + 'static>>>>::from_raw(
79 ptr as *mut _,
80 );
81 }
82 let cancellable = cancellable.to_glib_none();
83 unsafe {
84 let source = gio_sys::g_pollable_input_stream_create_source(
85 self.as_ref().to_glib_none().0,
86 cancellable.0,
87 );
88
89 let trampoline = trampoline::<Self> as glib_sys::gpointer;
90 glib_sys::g_source_set_callback(
91 source,
92 Some(transmute(trampoline)),
93 into_raw(func),
94 Some(destroy_closure::<Self>),
95 );
96 glib_sys::g_source_set_priority(source, priority.to_glib());
97
98 if let Some(name) = name {
99 glib_sys::g_source_set_name(source, name.to_glib_none().0);
100 }
101
102 from_glib_full(source)
103 }
104 }
105
106 fn read_nonblocking(
107 &self,
108 buffer: &mut [u8],
109 cancellable: Option<&Cancellable>,
110 ) -> Result<isize, Error> {
111 let cancellable = cancellable.to_glib_none();
112 let count = buffer.len() as usize;
113 unsafe {
114 let mut error = ptr::null_mut();
115 let ret = gio_sys::g_pollable_input_stream_read_nonblocking(
116 self.as_ref().to_glib_none().0,
117 buffer.to_glib_none().0,
118 count,
119 cancellable.0,
120 &mut error,
121 );
122 if error.is_null() {
123 Ok(ret)
124 } else {
125 Err(from_glib_full(error))
126 }
127 }
128 }
129
130 #[cfg(feature = "futures")]
131 fn create_source_future(
132 &self,
133 cancellable: Option<&Cancellable>,
134 priority: glib::Priority,
135 ) -> Box<dyn Future<Output = ()> + std::marker::Unpin> {
136 let cancellable: Option<Cancellable> = cancellable.cloned();
137
138 let obj = Fragile::new(self.clone());
139 Box::new(glib::SourceFuture::new(move |send| {
140 let mut send = Some(send);
141 obj.get()
142 .create_source(cancellable.as_ref(), None, priority, move |_| {
143 let _ = send.take().unwrap().send(());
144 glib::Continue(false)
145 })
146 }))
147 }
148
149 #[cfg(feature = "futures")]
150 fn create_source_stream(
151 &self,
152 cancellable: Option<&Cancellable>,
153 priority: glib::Priority,
154 ) -> Box<dyn Stream<Item = ()> + std::marker::Unpin> {
155 let cancellable: Option<Cancellable> = cancellable.cloned();
156
157 let obj = Fragile::new(self.clone());
158 Box::new(glib::SourceStream::new(move |send| {
159 obj.get()
160 .create_source(cancellable.as_ref(), None, priority, move |_| {
161 if send.unbounded_send(()).is_err() {
162 glib::Continue(false)
163 } else {
164 glib::Continue(true)
165 }
166 })
167 }))
168 }
169}
170
171fn into_raw<O, F: FnMut(&O) -> glib::Continue + 'static>(func: F) -> glib_sys::gpointer {
172 let func: Box<Fragile<RefCell<Box<dyn FnMut(&O) -> glib::Continue + 'static>>>> =
173 Box::new(Fragile::new(RefCell::new(Box::new(func))));
174 Box::into_raw(func) as glib_sys::gpointer
175}