glib/
main_context_channel.rs

1// Copyright 2019, The Gtk-rs Project Developers.
2// See the COPYRIGHT file at the top-level directory of this distribution.
3// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>
4
5use get_thread_id;
6use glib_sys;
7use std::cell::RefCell;
8use std::collections::VecDeque;
9use std::fmt;
10use std::mem;
11use std::ptr;
12use std::sync::mpsc;
13use std::sync::{Arc, Condvar, Mutex};
14use translate::{mut_override, FromGlibPtrFull, FromGlibPtrNone, ToGlib, ToGlibPtr};
15use Continue;
16use MainContext;
17use Priority;
18use Source;
19use SourceId;
20
21enum ChannelSourceState {
22    NotAttached,
23    Attached(*mut glib_sys::GSource),
24    Destroyed,
25}
26
27unsafe impl Send for ChannelSourceState {}
28unsafe impl Sync for ChannelSourceState {}
29
30struct ChannelInner<T> {
31    queue: VecDeque<T>,
32    source: ChannelSourceState,
33}
34
35impl<T> ChannelInner<T> {
36    fn receiver_disconnected(&self) -> bool {
37        match self.source {
38            ChannelSourceState::Destroyed => true,
39            // Receiver exists but is already destroyed
40            ChannelSourceState::Attached(source)
41                if unsafe { glib_sys::g_source_is_destroyed(source) } != glib_sys::GFALSE =>
42            {
43                true
44            }
45            // Not attached yet so the Receiver still exists
46            ChannelSourceState::NotAttached => false,
47            // Receiver still running
48            ChannelSourceState::Attached(_) => false,
49        }
50    }
51
52    fn set_ready_time(&mut self, ready_time: i64) {
53        if let ChannelSourceState::Attached(source) = self.source {
54            unsafe {
55                glib_sys::g_source_set_ready_time(source, ready_time);
56            }
57        }
58    }
59
60    fn source(&self) -> Option<Source> {
61        match self.source {
62            // Receiver exists and is not destroyed yet
63            ChannelSourceState::Attached(source)
64                if unsafe { glib_sys::g_source_is_destroyed(source) == glib_sys::GFALSE } =>
65            {
66                Some(unsafe { Source::from_glib_none(source) })
67            }
68            _ => None,
69        }
70    }
71}
72
73struct ChannelBound {
74    bound: usize,
75    cond: Condvar,
76}
77
78struct Channel<T>(Arc<(Mutex<ChannelInner<T>>, Option<ChannelBound>)>);
79
80impl<T> Clone for Channel<T> {
81    fn clone(&self) -> Channel<T> {
82        Channel(self.0.clone())
83    }
84}
85
86impl<T> Channel<T> {
87    fn new(bound: Option<usize>) -> Channel<T> {
88        Channel(Arc::new((
89            Mutex::new(ChannelInner {
90                queue: VecDeque::new(),
91                source: ChannelSourceState::NotAttached,
92            }),
93            bound.map(|bound| ChannelBound {
94                bound,
95                cond: Condvar::new(),
96            }),
97        )))
98    }
99
100    fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
101        let mut inner = (self.0).0.lock().unwrap();
102
103        // If we have a bounded channel then we need to wait here until enough free space is
104        // available or the receiver disappears
105        //
106        // A special case here is a bound of 0: the queue must be empty for accepting
107        // new data and then we will again wait later for the data to be actually taken
108        // out
109        if let Some(ChannelBound { bound, ref cond }) = (self.0).1 {
110            while inner.queue.len() >= bound
111                && !inner.queue.is_empty()
112                && !inner.receiver_disconnected()
113            {
114                inner = cond.wait(inner).unwrap();
115            }
116        }
117
118        // Error out directly if the receiver is disconnected
119        if inner.receiver_disconnected() {
120            return Err(mpsc::SendError(t));
121        }
122
123        // Store the item on our queue
124        inner.queue.push_back(t);
125
126        // and then wake up the GSource
127        inner.set_ready_time(0);
128
129        // If we have a bound of 0 we need to wait until the receiver actually
130        // handled the data
131        if let Some(ChannelBound { bound: 0, ref cond }) = (self.0).1 {
132            while !inner.queue.is_empty() && !inner.receiver_disconnected() {
133                inner = cond.wait(inner).unwrap();
134            }
135
136            // If the receiver was destroyed in the meantime take out the item and report an error
137            if inner.receiver_disconnected() {
138                // If the item is not in the queue anymore then the receiver just handled it before
139                // getting disconnected and all is good
140                if let Some(t) = inner.queue.pop_front() {
141                    return Err(mpsc::SendError(t));
142                }
143            }
144        }
145
146        Ok(())
147    }
148
149    fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
150        let mut inner = (self.0).0.lock().unwrap();
151
152        let ChannelBound { bound, ref cond } = (self.0)
153            .1
154            .as_ref()
155            .expect("called try_send() on an unbounded channel");
156
157        // Check if the queue is full and handle the special case of a 0 bound
158        if inner.queue.len() >= *bound && !inner.queue.is_empty() {
159            return Err(mpsc::TrySendError::Full(t));
160        }
161
162        // Error out directly if the receiver is disconnected
163        if inner.receiver_disconnected() {
164            return Err(mpsc::TrySendError::Disconnected(t));
165        }
166
167        // Store the item on our queue
168        inner.queue.push_back(t);
169
170        // and then wake up the GSource
171        inner.set_ready_time(0);
172
173        // If we have a bound of 0 we need to wait until the receiver actually
174        // handled the data
175        if *bound == 0 {
176            while !inner.queue.is_empty() && !inner.receiver_disconnected() {
177                inner = cond.wait(inner).unwrap();
178            }
179
180            // If the receiver was destroyed in the meantime take out the item and report an error
181            if inner.receiver_disconnected() {
182                // If the item is not in the queue anymore then the receiver just handled it before
183                // getting disconnected and all is good
184                if let Some(t) = inner.queue.pop_front() {
185                    return Err(mpsc::TrySendError::Disconnected(t));
186                }
187            }
188        }
189
190        Ok(())
191    }
192
193    fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
194        let mut inner = (self.0).0.lock().unwrap();
195
196        // Pop item if we have any
197        if let Some(item) = inner.queue.pop_front() {
198            // Wake up a sender that is currently waiting, if any
199            if let Some(ChannelBound { ref cond, .. }) = (self.0).1 {
200                cond.notify_one();
201            }
202            return Ok(item);
203        }
204
205        // If there are no senders left we are disconnected or otherwise empty. That's the case if
206        // the only remaining strong reference is the one of the receiver
207        if Arc::strong_count(&self.0) == 1 {
208            Err(mpsc::TryRecvError::Disconnected)
209        } else {
210            Err(mpsc::TryRecvError::Empty)
211        }
212    }
213}
214
215#[repr(C)]
216struct ChannelSource<T, F: FnMut(T) -> Continue + 'static> {
217    source: glib_sys::GSource,
218    thread_id: usize,
219    source_funcs: Option<Box<glib_sys::GSourceFuncs>>,
220    channel: Option<Channel<T>>,
221    callback: Option<RefCell<F>>,
222}
223
224unsafe extern "C" fn prepare<T>(
225    source: *mut glib_sys::GSource,
226    timeout: *mut i32,
227) -> glib_sys::gboolean {
228    *timeout = -1;
229
230    // We're always ready when the ready time was set to 0. There
231    // will be at least one item or the senders are disconnected now
232    if glib_sys::g_source_get_ready_time(source) == 0 {
233        glib_sys::GTRUE
234    } else {
235        glib_sys::GFALSE
236    }
237}
238
239unsafe extern "C" fn check<T>(source: *mut glib_sys::GSource) -> glib_sys::gboolean {
240    // We're always ready when the ready time was set to 0. There
241    // will be at least one item or the senders are disconnected now
242    if glib_sys::g_source_get_ready_time(source) == 0 {
243        glib_sys::GTRUE
244    } else {
245        glib_sys::GFALSE
246    }
247}
248
249unsafe extern "C" fn dispatch<T, F: FnMut(T) -> Continue + 'static>(
250    source: *mut glib_sys::GSource,
251    callback: glib_sys::GSourceFunc,
252    _user_data: glib_sys::gpointer,
253) -> glib_sys::gboolean {
254    let source = &mut *(source as *mut ChannelSource<T, F>);
255    assert!(callback.is_none());
256
257    glib_sys::g_source_set_ready_time(&mut source.source, -1);
258
259    // Check the thread to ensure we're only ever called from the same thread
260    assert_eq!(
261        get_thread_id(),
262        source.thread_id,
263        "Source dispatched on a different thread than before"
264    );
265
266    // Now iterate over all items that we currently have in the channel until it is
267    // empty again. If all senders are disconnected at some point we remove the GSource
268    // from the main context it was attached to as it will never ever be called again.
269    let channel = source
270        .channel
271        .as_ref()
272        .expect("ChannelSource without Channel");
273    loop {
274        match channel.try_recv() {
275            Err(mpsc::TryRecvError::Empty) => break,
276            Err(mpsc::TryRecvError::Disconnected) => return glib_sys::G_SOURCE_REMOVE,
277            Ok(item) => {
278                let callback = source
279                    .callback
280                    .as_mut()
281                    .expect("ChannelSource called before Receiver was attached");
282                if (&mut *callback.borrow_mut())(item) == Continue(false) {
283                    return glib_sys::G_SOURCE_REMOVE;
284                }
285            }
286        }
287    }
288
289    glib_sys::G_SOURCE_CONTINUE
290}
291
292unsafe extern "C" fn finalize<T, F: FnMut(T) -> Continue + 'static>(
293    source: *mut glib_sys::GSource,
294) {
295    let source = &mut *(source as *mut ChannelSource<T, F>);
296
297    // Drop all memory we own by taking it out of the Options
298    let channel = source.channel.take().expect("Receiver without channel");
299
300    {
301        // Set the source inside the channel to None so that all senders know that there
302        // is no receiver left and wake up the condition variable if any
303        let mut inner = (channel.0).0.lock().unwrap();
304        inner.source = ChannelSourceState::Destroyed;
305        if let Some(ChannelBound { ref cond, .. }) = (channel.0).1 {
306            cond.notify_all();
307        }
308    }
309
310    let _ = source.callback.take();
311    let _ = source.source_funcs.take();
312}
313
314/// A `Sender` that can be used to send items to the corresponding main context receiver.
315///
316/// This `Sender` behaves the same as `std::sync::mpsc::Sender`.
317///
318/// See [`MainContext::channel()`] for how to create such a `Sender`.
319///
320/// [`MainContext::channel()`]: struct.MainContext.html#method.channel
321pub struct Sender<T>(Option<Channel<T>>);
322
323impl<T> fmt::Debug for Sender<T> {
324    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
325        f.debug_struct("Sender").finish()
326    }
327}
328
329impl<T> Clone for Sender<T> {
330    fn clone(&self) -> Sender<T> {
331        Sender(self.0.clone())
332    }
333}
334
335impl<T> Sender<T> {
336    /// Sends a value to the channel.
337    pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
338        self.0.as_ref().expect("Sender with no channel").send(t)
339    }
340}
341
342impl<T> Drop for Sender<T> {
343    fn drop(&mut self) {
344        unsafe {
345            // Wake up the receiver after dropping our own reference to ensure
346            // that after the last sender is dropped the receiver will see a strong
347            // reference count of exactly 1 by itself.
348            let channel = self.0.take().expect("Sender with no channel");
349
350            let source = {
351                let inner = (channel.0).0.lock().unwrap();
352
353                // Get a strong reference to the source
354                match inner.source() {
355                    None => return,
356                    Some(source) => source,
357                }
358            };
359
360            // Drop the channel and wake up the source/receiver
361            drop(channel);
362            glib_sys::g_source_set_ready_time(source.to_glib_none().0, 0);
363        }
364    }
365}
366
367/// A `SyncSender` that can be used to send items to the corresponding main context receiver.
368///
369/// This `SyncSender` behaves the same as `std::sync::mpsc::SyncSender`.
370///
371/// See [`MainContext::sync_channel()`] for how to create such a `SyncSender`.
372///
373/// [`MainContext::sync_channel()`]: struct.MainContext.html#method.sync_channel
374pub struct SyncSender<T>(Option<Channel<T>>);
375
376impl<T> fmt::Debug for SyncSender<T> {
377    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
378        f.debug_struct("SyncSender").finish()
379    }
380}
381
382impl<T> Clone for SyncSender<T> {
383    fn clone(&self) -> SyncSender<T> {
384        SyncSender(self.0.clone())
385    }
386}
387
388impl<T> SyncSender<T> {
389    /// Sends a value to the channel and blocks if the channel is full.
390    pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
391        self.0.as_ref().expect("Sender with no channel").send(t)
392    }
393
394    /// Sends a value to the channel.
395    pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
396        self.0.as_ref().expect("Sender with no channel").try_send(t)
397    }
398}
399
400impl<T> Drop for SyncSender<T> {
401    fn drop(&mut self) {
402        unsafe {
403            // Wake up the receiver after dropping our own reference to ensure
404            // that after the last sender is dropped the receiver will see a strong
405            // reference count of exactly 1 by itself.
406            let channel = self.0.take().expect("Sender with no channel");
407
408            let source = {
409                let inner = (channel.0).0.lock().unwrap();
410
411                // Get a strong reference to the source
412                match inner.source() {
413                    None => return,
414                    Some(source) => source,
415                }
416            };
417
418            // Drop the channel and wake up the source/receiver
419            drop(channel);
420            glib_sys::g_source_set_ready_time(source.to_glib_none().0, 0);
421        }
422    }
423}
424
425/// A `Receiver` that can be attached to a main context to receive items from its corresponding
426/// `Sender` or `SyncSender`.
427///
428/// See [`MainContext::channel()`] or [`MainContext::sync_channel()`] for how to create
429/// such a `Receiver`.
430///
431/// [`MainContext::channel()`]: struct.MainContext.html#method.channel
432/// [`MainContext::sync_channel()`]: struct.MainContext.html#method.sync_channel
433pub struct Receiver<T>(Option<Channel<T>>, Priority);
434
435impl<T> fmt::Debug for Receiver<T> {
436    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
437        f.debug_struct("Receiver").finish()
438    }
439}
440
441// It's safe to send the Receiver to other threads for attaching it as
442// long as the items to be sent can also be sent between threads.
443unsafe impl<T: Send> Send for Receiver<T> {}
444
445impl<T> Drop for Receiver<T> {
446    fn drop(&mut self) {
447        // If the receiver was never attached to a main context we need to let all the senders know
448        if let Some(channel) = self.0.take() {
449            let mut inner = (channel.0).0.lock().unwrap();
450            inner.source = ChannelSourceState::Destroyed;
451            if let Some(ChannelBound { ref cond, .. }) = (channel.0).1 {
452                cond.notify_all();
453            }
454        }
455    }
456}
457
458impl<T> Receiver<T> {
459    /// Attaches the receiver to the given `context` and calls `func` whenever an item is
460    /// available on the channel.
461    ///
462    /// Passing `None` for the context will attach it to the thread default main context.
463    ///
464    /// # Panics
465    ///
466    /// This function panics if called from a thread that is not the owner of the provided
467    /// `context`, or, if `None` is provided, of the thread default main context.
468    pub fn attach<F: FnMut(T) -> Continue + 'static>(
469        mut self,
470        context: Option<&MainContext>,
471        func: F,
472    ) -> SourceId {
473        unsafe {
474            let channel = self.0.take().expect("Receiver without channel");
475
476            let source_funcs = Box::new(glib_sys::GSourceFuncs {
477                check: Some(check::<T>),
478                prepare: Some(prepare::<T>),
479                dispatch: Some(dispatch::<T, F>),
480                finalize: Some(finalize::<T, F>),
481                closure_callback: None,
482                closure_marshal: None,
483            });
484
485            let source = glib_sys::g_source_new(
486                mut_override(&*source_funcs),
487                mem::size_of::<ChannelSource<T, F>>() as u32,
488            ) as *mut ChannelSource<T, F>;
489            assert!(!source.is_null());
490
491            // Set up the GSource
492            {
493                let source = &mut *source;
494                let mut inner = (channel.0).0.lock().unwrap();
495
496                glib_sys::g_source_set_priority(mut_override(&source.source), self.1.to_glib());
497
498                // We're immediately ready if the queue is not empty or if no sender is left at this point
499                glib_sys::g_source_set_ready_time(
500                    mut_override(&source.source),
501                    if !inner.queue.is_empty() || Arc::strong_count(&channel.0) == 1 {
502                        0
503                    } else {
504                        -1
505                    },
506                );
507                inner.source = ChannelSourceState::Attached(&mut source.source);
508            }
509
510            // Store all our data inside our part of the GSource
511            {
512                let source = &mut *source;
513                source.thread_id = get_thread_id();
514                ptr::write(&mut source.channel, Some(channel));
515                ptr::write(&mut source.callback, Some(RefCell::new(func)));
516                ptr::write(&mut source.source_funcs, Some(source_funcs));
517            }
518
519            let source = Source::from_glib_full(mut_override(&(*source).source));
520            if let Some(context) = context {
521                assert!(context.is_owner());
522                source.attach(Some(context))
523            } else {
524                let context = MainContext::ref_thread_default();
525                assert!(context.is_owner());
526                source.attach(Some(&context))
527            }
528        }
529    }
530}
531
532impl MainContext {
533    /// Creates a channel for a main context.
534    ///
535    /// The `Receiver` has to be attached to a main context at a later time, together with a
536    /// closure that will be called for every item sent to a `Sender`.
537    ///
538    /// The `Sender` can be cloned and both the `Sender` and `Receiver` can be sent to different
539    /// threads as long as the item type implements the `Send` trait.
540    ///
541    /// When the last `Sender` is dropped the channel is removed from the main context. If the
542    /// `Receiver` is dropped and not attached to a main context all sending to the `Sender`
543    /// will fail.
544    ///
545    /// The returned `Sender` behaves the same as `std::sync::mpsc::Sender`.
546    pub fn channel<T>(priority: Priority) -> (Sender<T>, Receiver<T>) {
547        let channel = Channel::new(None);
548        let receiver = Receiver(Some(channel.clone()), priority);
549        let sender = Sender(Some(channel));
550
551        (sender, receiver)
552    }
553
554    /// Creates a synchronous channel for a main context with a given bound on the capacity of the
555    /// channel.
556    ///
557    /// The `Receiver` has to be attached to a main context at a later time, together with a
558    /// closure that will be called for every item sent to a `SyncSender`.
559    ///
560    /// The `SyncSender` can be cloned and both the `SyncSender` and `Receiver` can be sent to different
561    /// threads as long as the item type implements the `Send` trait.
562    ///
563    /// When the last `SyncSender` is dropped the channel is removed from the main context. If the
564    /// `Receiver` is dropped and not attached to a main context all sending to the `SyncSender`
565    /// will fail.
566    ///
567    /// The returned `SyncSender` behaves the same as `std::sync::mpsc::SyncSender`.
568    pub fn sync_channel<T>(priority: Priority, bound: usize) -> (SyncSender<T>, Receiver<T>) {
569        let channel = Channel::new(Some(bound));
570        let receiver = Receiver(Some(channel.clone()), priority);
571        let sender = SyncSender(Some(channel));
572
573        (sender, receiver)
574    }
575}
576
577#[cfg(test)]
578mod tests {
579    use super::*;
580    use std::cell::RefCell;
581    use std::rc::Rc;
582    use std::thread;
583    use std::time;
584    use MainLoop;
585
586    #[test]
587    fn test_channel() {
588        let c = MainContext::new();
589        let l = MainLoop::new(Some(&c), false);
590
591        c.acquire();
592
593        let (sender, receiver) = MainContext::channel(Priority::default());
594
595        let sum = Rc::new(RefCell::new(0));
596        let sum_clone = sum.clone();
597        let l_clone = l.clone();
598        receiver.attach(Some(&c), move |item| {
599            *sum_clone.borrow_mut() += item;
600            if *sum_clone.borrow() == 6 {
601                l_clone.quit();
602                Continue(false)
603            } else {
604                Continue(true)
605            }
606        });
607
608        sender.send(1).unwrap();
609        sender.send(2).unwrap();
610        sender.send(3).unwrap();
611
612        l.run();
613
614        assert_eq!(*sum.borrow(), 6);
615    }
616
617    #[test]
618    fn test_drop_sender() {
619        let c = MainContext::new();
620        let l = MainLoop::new(Some(&c), false);
621
622        c.acquire();
623
624        let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
625
626        struct Helper(MainLoop);
627        impl Drop for Helper {
628            fn drop(&mut self) {
629                self.0.quit();
630            }
631        }
632
633        let helper = Helper(l.clone());
634        receiver.attach(Some(&c), move |_| {
635            let _ = helper;
636
637            Continue(true)
638        });
639
640        drop(sender);
641
642        l.run();
643    }
644
645    #[test]
646    fn test_drop_receiver() {
647        let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
648
649        drop(receiver);
650        assert_eq!(sender.send(1), Err(mpsc::SendError(1)));
651    }
652
653    #[test]
654    fn test_remove_receiver() {
655        let c = MainContext::new();
656
657        c.acquire();
658
659        let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
660
661        let source_id = receiver.attach(Some(&c), move |_| Continue(true));
662
663        let source = c.find_source_by_id(&source_id).unwrap();
664        source.destroy();
665
666        assert_eq!(sender.send(1), Err(mpsc::SendError(1)));
667    }
668
669    #[test]
670    fn test_remove_receiver_and_drop_source() {
671        let c = MainContext::new();
672
673        c.acquire();
674
675        let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
676
677        struct Helper(Arc<Mutex<bool>>);
678        impl Drop for Helper {
679            fn drop(&mut self) {
680                *self.0.lock().unwrap() = true;
681            }
682        }
683
684        let dropped = Arc::new(Mutex::new(false));
685        let helper = Helper(dropped.clone());
686        let source_id = receiver.attach(Some(&c), move |_| {
687            let _helper = &helper;
688            Continue(true)
689        });
690
691        let source = c.find_source_by_id(&source_id).unwrap();
692        source.destroy();
693
694        // This should drop the closure
695        drop(source);
696
697        assert_eq!(*dropped.lock().unwrap(), true);
698        assert_eq!(sender.send(1), Err(mpsc::SendError(1)));
699    }
700
701    #[test]
702    fn test_sync_channel() {
703        let c = MainContext::new();
704        let l = MainLoop::new(Some(&c), false);
705
706        c.acquire();
707
708        let (sender, receiver) = MainContext::sync_channel(Priority::default(), 2);
709
710        let sum = Rc::new(RefCell::new(0));
711        let sum_clone = sum.clone();
712        let l_clone = l.clone();
713        receiver.attach(Some(&c), move |item| {
714            *sum_clone.borrow_mut() += item;
715            if *sum_clone.borrow() == 6 {
716                l_clone.quit();
717                Continue(false)
718            } else {
719                Continue(true)
720            }
721        });
722
723        let (wait_sender, wait_receiver) = mpsc::channel();
724
725        let thread = thread::spawn(move || {
726            // The first two must succeed
727            sender.try_send(1).unwrap();
728            sender.try_send(2).unwrap();
729
730            // This fills up the channel
731            assert!(sender.try_send(3).is_err());
732            wait_sender.send(()).unwrap();
733
734            // This will block
735            sender.send(3).unwrap();
736        });
737
738        // Wait until the channel is full, and then another
739        // 50ms to make sure the sender is blocked now and
740        // can wake up properly once an item was consumed
741        let _ = wait_receiver.recv().unwrap();
742        thread::sleep(time::Duration::from_millis(50));
743        l.run();
744
745        thread.join().unwrap();
746
747        assert_eq!(*sum.borrow(), 6);
748    }
749
750    #[test]
751    fn test_sync_channel_drop_wakeup() {
752        let c = MainContext::new();
753        let l = MainLoop::new(Some(&c), false);
754
755        c.acquire();
756
757        let (sender, receiver) = MainContext::sync_channel(Priority::default(), 3);
758
759        let sum = Rc::new(RefCell::new(0));
760        let sum_clone = sum.clone();
761        let l_clone = l.clone();
762        receiver.attach(Some(&c), move |item| {
763            *sum_clone.borrow_mut() += item;
764            if *sum_clone.borrow() == 6 {
765                l_clone.quit();
766                Continue(false)
767            } else {
768                Continue(true)
769            }
770        });
771
772        let (wait_sender, wait_receiver) = mpsc::channel();
773
774        let thread = thread::spawn(move || {
775            // The first three must succeed
776            sender.try_send(1).unwrap();
777            sender.try_send(2).unwrap();
778            sender.try_send(3).unwrap();
779
780            wait_sender.send(()).unwrap();
781            for i in 4.. {
782                // This will block at some point until the
783                // receiver is removed from the main context
784                if let Err(_) = sender.send(i) {
785                    break;
786                }
787            }
788        });
789
790        // Wait until the channel is full, and then another
791        // 50ms to make sure the sender is blocked now and
792        // can wake up properly once an item was consumed
793        let _ = wait_receiver.recv().unwrap();
794        thread::sleep(time::Duration::from_millis(50));
795        l.run();
796
797        thread.join().unwrap();
798
799        assert_eq!(*sum.borrow(), 6);
800    }
801
802    #[test]
803    fn test_sync_channel_drop_receiver_wakeup() {
804        let c = MainContext::new();
805
806        c.acquire();
807
808        let (sender, receiver) = MainContext::sync_channel(Priority::default(), 2);
809
810        let (wait_sender, wait_receiver) = mpsc::channel();
811
812        let thread = thread::spawn(move || {
813            // The first two must succeed
814            sender.try_send(1).unwrap();
815            sender.try_send(2).unwrap();
816            wait_sender.send(()).unwrap();
817
818            // This will block and then error out because the receiver is destroyed
819            assert!(sender.send(3).is_err());
820        });
821
822        // Wait until the channel is full, and then another
823        // 50ms to make sure the sender is blocked now and
824        // can wake up properly once an item was consumed
825        let _ = wait_receiver.recv().unwrap();
826        thread::sleep(time::Duration::from_millis(50));
827        drop(receiver);
828        thread.join().unwrap();
829    }
830
831    #[test]
832    fn test_sync_channel_rendezvous() {
833        let c = MainContext::new();
834        let l = MainLoop::new(Some(&c), false);
835
836        c.acquire();
837
838        let (sender, receiver) = MainContext::sync_channel(Priority::default(), 0);
839
840        let (wait_sender, wait_receiver) = mpsc::channel();
841
842        let thread = thread::spawn(move || {
843            wait_sender.send(()).unwrap();
844            sender.send(1).unwrap();
845            wait_sender.send(()).unwrap();
846            sender.send(2).unwrap();
847            wait_sender.send(()).unwrap();
848            sender.send(3).unwrap();
849            wait_sender.send(()).unwrap();
850        });
851
852        // Wait until the thread is started, then wait another 50ms and
853        // during that time it must not have proceeded yet to send the
854        // second item because we did not yet receive the first item.
855        let _ = wait_receiver.recv().unwrap();
856        assert_eq!(
857            wait_receiver.recv_timeout(time::Duration::from_millis(50)),
858            Err(mpsc::RecvTimeoutError::Timeout)
859        );
860
861        let sum = Rc::new(RefCell::new(0));
862        let sum_clone = sum.clone();
863        let l_clone = l.clone();
864        receiver.attach(Some(&c), move |item| {
865            // We consumed one item so there should be one item on
866            // the other receiver now.
867            let _ = wait_receiver.recv().unwrap();
868            *sum_clone.borrow_mut() += item;
869            if *sum_clone.borrow() == 6 {
870                // But as we didn't consume the next one yet, there must be no
871                // other item available yet
872                assert_eq!(
873                    wait_receiver.recv_timeout(time::Duration::from_millis(50)),
874                    Err(mpsc::RecvTimeoutError::Disconnected)
875                );
876                l_clone.quit();
877                Continue(false)
878            } else {
879                // But as we didn't consume the next one yet, there must be no
880                // other item available yet
881                assert_eq!(
882                    wait_receiver.recv_timeout(time::Duration::from_millis(50)),
883                    Err(mpsc::RecvTimeoutError::Timeout)
884                );
885                Continue(true)
886            }
887        });
888        l.run();
889
890        thread.join().unwrap();
891
892        assert_eq!(*sum.borrow(), 6);
893    }
894}