gio/
pollable_output_stream.rs

1// Copyright 2013-2018, The Gtk-rs Project Developers.
2// See the COPYRIGHT file at the top-level directory of this distribution.
3// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>
4
5use fragile::Fragile;
6use gio_sys;
7use glib;
8use glib::object::{Cast, IsA};
9use glib::translate::*;
10use glib_sys;
11use std::cell::RefCell;
12use std::mem::transmute;
13use Cancellable;
14use PollableOutputStream;
15
16#[cfg(feature = "futures")]
17use futures::future::Future;
18#[cfg(feature = "futures")]
19use futures::stream::Stream;
20
21pub trait PollableOutputStreamExtManual {
22    fn create_source<F>(
23        &self,
24        cancellable: Option<&Cancellable>,
25        name: Option<&str>,
26        priority: glib::Priority,
27        func: F,
28    ) -> glib::Source
29    where
30        F: FnMut(&Self) -> glib::Continue + 'static;
31
32    #[cfg(feature = "futures")]
33    fn create_source_future(
34        &self,
35        cancellable: Option<&Cancellable>,
36        priority: glib::Priority,
37    ) -> Box<dyn Future<Output = ()> + std::marker::Unpin>;
38
39    #[cfg(feature = "futures")]
40    fn create_source_stream(
41        &self,
42        cancellable: Option<&Cancellable>,
43        priority: glib::Priority,
44    ) -> Box<dyn Stream<Item = ()> + std::marker::Unpin>;
45}
46
47impl<O: IsA<PollableOutputStream>> PollableOutputStreamExtManual for O {
48    fn create_source<F>(
49        &self,
50        cancellable: Option<&Cancellable>,
51        name: Option<&str>,
52        priority: glib::Priority,
53        func: F,
54    ) -> glib::Source
55    where
56        F: FnMut(&Self) -> glib::Continue + 'static,
57    {
58        #[cfg_attr(feature = "cargo-clippy", allow(transmute_ptr_to_ref))]
59        unsafe extern "C" fn trampoline<O: IsA<PollableOutputStream>>(
60            stream: *mut gio_sys::GPollableOutputStream,
61            func: glib_sys::gpointer,
62        ) -> glib_sys::gboolean {
63            let func: &Fragile<RefCell<Box<dyn FnMut(&O) -> glib::Continue + 'static>>> =
64                transmute(func);
65            let func = func.get();
66            let mut func = func.borrow_mut();
67            (&mut *func)(&PollableOutputStream::from_glib_borrow(stream).unsafe_cast()).to_glib()
68        }
69        unsafe extern "C" fn destroy_closure<O>(ptr: glib_sys::gpointer) {
70            Box::<Fragile<RefCell<Box<dyn FnMut(&O) -> glib::Continue + 'static>>>>::from_raw(
71                ptr as *mut _,
72            );
73        }
74        let cancellable = cancellable.to_glib_none();
75        unsafe {
76            let source = gio_sys::g_pollable_output_stream_create_source(
77                self.as_ref().to_glib_none().0,
78                cancellable.0,
79            );
80
81            let trampoline = trampoline::<Self> as glib_sys::gpointer;
82            glib_sys::g_source_set_callback(
83                source,
84                Some(transmute(trampoline)),
85                into_raw(func),
86                Some(destroy_closure::<Self>),
87            );
88            glib_sys::g_source_set_priority(source, priority.to_glib());
89
90            if let Some(name) = name {
91                glib_sys::g_source_set_name(source, name.to_glib_none().0);
92            }
93
94            from_glib_full(source)
95        }
96    }
97
98    #[cfg(feature = "futures")]
99    fn create_source_future(
100        &self,
101        cancellable: Option<&Cancellable>,
102        priority: glib::Priority,
103    ) -> Box<dyn Future<Output = ()> + std::marker::Unpin> {
104        let cancellable: Option<Cancellable> = cancellable.cloned();
105
106        let obj = Fragile::new(self.clone());
107        Box::new(glib::SourceFuture::new(move |send| {
108            let mut send = Some(send);
109            obj.get()
110                .create_source(cancellable.as_ref(), None, priority, move |_| {
111                    let _ = send.take().unwrap().send(());
112                    glib::Continue(false)
113                })
114        }))
115    }
116
117    #[cfg(feature = "futures")]
118    fn create_source_stream(
119        &self,
120        cancellable: Option<&Cancellable>,
121        priority: glib::Priority,
122    ) -> Box<dyn Stream<Item = ()> + std::marker::Unpin> {
123        let cancellable: Option<Cancellable> = cancellable.cloned();
124
125        let obj = Fragile::new(self.clone());
126        Box::new(glib::SourceStream::new(move |send| {
127            let send = Some(send);
128            obj.get()
129                .create_source(cancellable.as_ref(), None, priority, move |_| {
130                    if send.as_ref().unwrap().unbounded_send(()).is_err() {
131                        glib::Continue(false)
132                    } else {
133                        glib::Continue(true)
134                    }
135                })
136        }))
137    }
138}
139
140fn into_raw<O, F: FnMut(&O) -> glib::Continue + 'static>(func: F) -> glib_sys::gpointer {
141    let func: Box<Fragile<RefCell<Box<dyn FnMut(&O) -> glib::Continue + 'static>>>> =
142        Box::new(Fragile::new(RefCell::new(Box::new(func))));
143    Box::into_raw(func) as glib_sys::gpointer
144}