1use get_thread_id;
6use glib_sys;
7use std::cell::RefCell;
8use std::collections::VecDeque;
9use std::fmt;
10use std::mem;
11use std::ptr;
12use std::sync::mpsc;
13use std::sync::{Arc, Condvar, Mutex};
14use translate::{mut_override, FromGlibPtrFull, FromGlibPtrNone, ToGlib, ToGlibPtr};
15use Continue;
16use MainContext;
17use Priority;
18use Source;
19use SourceId;
20
21enum ChannelSourceState {
22 NotAttached,
23 Attached(*mut glib_sys::GSource),
24 Destroyed,
25}
26
27unsafe impl Send for ChannelSourceState {}
28unsafe impl Sync for ChannelSourceState {}
29
30struct ChannelInner<T> {
31 queue: VecDeque<T>,
32 source: ChannelSourceState,
33}
34
35impl<T> ChannelInner<T> {
36 fn receiver_disconnected(&self) -> bool {
37 match self.source {
38 ChannelSourceState::Destroyed => true,
39 ChannelSourceState::Attached(source)
41 if unsafe { glib_sys::g_source_is_destroyed(source) } != glib_sys::GFALSE =>
42 {
43 true
44 }
45 ChannelSourceState::NotAttached => false,
47 ChannelSourceState::Attached(_) => false,
49 }
50 }
51
52 fn set_ready_time(&mut self, ready_time: i64) {
53 if let ChannelSourceState::Attached(source) = self.source {
54 unsafe {
55 glib_sys::g_source_set_ready_time(source, ready_time);
56 }
57 }
58 }
59
60 fn source(&self) -> Option<Source> {
61 match self.source {
62 ChannelSourceState::Attached(source)
64 if unsafe { glib_sys::g_source_is_destroyed(source) == glib_sys::GFALSE } =>
65 {
66 Some(unsafe { Source::from_glib_none(source) })
67 }
68 _ => None,
69 }
70 }
71}
72
73struct ChannelBound {
74 bound: usize,
75 cond: Condvar,
76}
77
78struct Channel<T>(Arc<(Mutex<ChannelInner<T>>, Option<ChannelBound>)>);
79
80impl<T> Clone for Channel<T> {
81 fn clone(&self) -> Channel<T> {
82 Channel(self.0.clone())
83 }
84}
85
86impl<T> Channel<T> {
87 fn new(bound: Option<usize>) -> Channel<T> {
88 Channel(Arc::new((
89 Mutex::new(ChannelInner {
90 queue: VecDeque::new(),
91 source: ChannelSourceState::NotAttached,
92 }),
93 bound.map(|bound| ChannelBound {
94 bound,
95 cond: Condvar::new(),
96 }),
97 )))
98 }
99
100 fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
101 let mut inner = (self.0).0.lock().unwrap();
102
103 if let Some(ChannelBound { bound, ref cond }) = (self.0).1 {
110 while inner.queue.len() >= bound
111 && !inner.queue.is_empty()
112 && !inner.receiver_disconnected()
113 {
114 inner = cond.wait(inner).unwrap();
115 }
116 }
117
118 if inner.receiver_disconnected() {
120 return Err(mpsc::SendError(t));
121 }
122
123 inner.queue.push_back(t);
125
126 inner.set_ready_time(0);
128
129 if let Some(ChannelBound { bound: 0, ref cond }) = (self.0).1 {
132 while !inner.queue.is_empty() && !inner.receiver_disconnected() {
133 inner = cond.wait(inner).unwrap();
134 }
135
136 if inner.receiver_disconnected() {
138 if let Some(t) = inner.queue.pop_front() {
141 return Err(mpsc::SendError(t));
142 }
143 }
144 }
145
146 Ok(())
147 }
148
149 fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
150 let mut inner = (self.0).0.lock().unwrap();
151
152 let ChannelBound { bound, ref cond } = (self.0)
153 .1
154 .as_ref()
155 .expect("called try_send() on an unbounded channel");
156
157 if inner.queue.len() >= *bound && !inner.queue.is_empty() {
159 return Err(mpsc::TrySendError::Full(t));
160 }
161
162 if inner.receiver_disconnected() {
164 return Err(mpsc::TrySendError::Disconnected(t));
165 }
166
167 inner.queue.push_back(t);
169
170 inner.set_ready_time(0);
172
173 if *bound == 0 {
176 while !inner.queue.is_empty() && !inner.receiver_disconnected() {
177 inner = cond.wait(inner).unwrap();
178 }
179
180 if inner.receiver_disconnected() {
182 if let Some(t) = inner.queue.pop_front() {
185 return Err(mpsc::TrySendError::Disconnected(t));
186 }
187 }
188 }
189
190 Ok(())
191 }
192
193 fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
194 let mut inner = (self.0).0.lock().unwrap();
195
196 if let Some(item) = inner.queue.pop_front() {
198 if let Some(ChannelBound { ref cond, .. }) = (self.0).1 {
200 cond.notify_one();
201 }
202 return Ok(item);
203 }
204
205 if Arc::strong_count(&self.0) == 1 {
208 Err(mpsc::TryRecvError::Disconnected)
209 } else {
210 Err(mpsc::TryRecvError::Empty)
211 }
212 }
213}
214
215#[repr(C)]
216struct ChannelSource<T, F: FnMut(T) -> Continue + 'static> {
217 source: glib_sys::GSource,
218 thread_id: usize,
219 source_funcs: Option<Box<glib_sys::GSourceFuncs>>,
220 channel: Option<Channel<T>>,
221 callback: Option<RefCell<F>>,
222}
223
224unsafe extern "C" fn prepare<T>(
225 source: *mut glib_sys::GSource,
226 timeout: *mut i32,
227) -> glib_sys::gboolean {
228 *timeout = -1;
229
230 if glib_sys::g_source_get_ready_time(source) == 0 {
233 glib_sys::GTRUE
234 } else {
235 glib_sys::GFALSE
236 }
237}
238
239unsafe extern "C" fn check<T>(source: *mut glib_sys::GSource) -> glib_sys::gboolean {
240 if glib_sys::g_source_get_ready_time(source) == 0 {
243 glib_sys::GTRUE
244 } else {
245 glib_sys::GFALSE
246 }
247}
248
249unsafe extern "C" fn dispatch<T, F: FnMut(T) -> Continue + 'static>(
250 source: *mut glib_sys::GSource,
251 callback: glib_sys::GSourceFunc,
252 _user_data: glib_sys::gpointer,
253) -> glib_sys::gboolean {
254 let source = &mut *(source as *mut ChannelSource<T, F>);
255 assert!(callback.is_none());
256
257 glib_sys::g_source_set_ready_time(&mut source.source, -1);
258
259 assert_eq!(
261 get_thread_id(),
262 source.thread_id,
263 "Source dispatched on a different thread than before"
264 );
265
266 let channel = source
270 .channel
271 .as_ref()
272 .expect("ChannelSource without Channel");
273 loop {
274 match channel.try_recv() {
275 Err(mpsc::TryRecvError::Empty) => break,
276 Err(mpsc::TryRecvError::Disconnected) => return glib_sys::G_SOURCE_REMOVE,
277 Ok(item) => {
278 let callback = source
279 .callback
280 .as_mut()
281 .expect("ChannelSource called before Receiver was attached");
282 if (&mut *callback.borrow_mut())(item) == Continue(false) {
283 return glib_sys::G_SOURCE_REMOVE;
284 }
285 }
286 }
287 }
288
289 glib_sys::G_SOURCE_CONTINUE
290}
291
292unsafe extern "C" fn finalize<T, F: FnMut(T) -> Continue + 'static>(
293 source: *mut glib_sys::GSource,
294) {
295 let source = &mut *(source as *mut ChannelSource<T, F>);
296
297 let channel = source.channel.take().expect("Receiver without channel");
299
300 {
301 let mut inner = (channel.0).0.lock().unwrap();
304 inner.source = ChannelSourceState::Destroyed;
305 if let Some(ChannelBound { ref cond, .. }) = (channel.0).1 {
306 cond.notify_all();
307 }
308 }
309
310 let _ = source.callback.take();
311 let _ = source.source_funcs.take();
312}
313
314pub struct Sender<T>(Option<Channel<T>>);
322
323impl<T> fmt::Debug for Sender<T> {
324 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
325 f.debug_struct("Sender").finish()
326 }
327}
328
329impl<T> Clone for Sender<T> {
330 fn clone(&self) -> Sender<T> {
331 Sender(self.0.clone())
332 }
333}
334
335impl<T> Sender<T> {
336 pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
338 self.0.as_ref().expect("Sender with no channel").send(t)
339 }
340}
341
342impl<T> Drop for Sender<T> {
343 fn drop(&mut self) {
344 unsafe {
345 let channel = self.0.take().expect("Sender with no channel");
349
350 let source = {
351 let inner = (channel.0).0.lock().unwrap();
352
353 match inner.source() {
355 None => return,
356 Some(source) => source,
357 }
358 };
359
360 drop(channel);
362 glib_sys::g_source_set_ready_time(source.to_glib_none().0, 0);
363 }
364 }
365}
366
367pub struct SyncSender<T>(Option<Channel<T>>);
375
376impl<T> fmt::Debug for SyncSender<T> {
377 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
378 f.debug_struct("SyncSender").finish()
379 }
380}
381
382impl<T> Clone for SyncSender<T> {
383 fn clone(&self) -> SyncSender<T> {
384 SyncSender(self.0.clone())
385 }
386}
387
388impl<T> SyncSender<T> {
389 pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
391 self.0.as_ref().expect("Sender with no channel").send(t)
392 }
393
394 pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
396 self.0.as_ref().expect("Sender with no channel").try_send(t)
397 }
398}
399
400impl<T> Drop for SyncSender<T> {
401 fn drop(&mut self) {
402 unsafe {
403 let channel = self.0.take().expect("Sender with no channel");
407
408 let source = {
409 let inner = (channel.0).0.lock().unwrap();
410
411 match inner.source() {
413 None => return,
414 Some(source) => source,
415 }
416 };
417
418 drop(channel);
420 glib_sys::g_source_set_ready_time(source.to_glib_none().0, 0);
421 }
422 }
423}
424
425pub struct Receiver<T>(Option<Channel<T>>, Priority);
434
435impl<T> fmt::Debug for Receiver<T> {
436 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
437 f.debug_struct("Receiver").finish()
438 }
439}
440
441unsafe impl<T: Send> Send for Receiver<T> {}
444
445impl<T> Drop for Receiver<T> {
446 fn drop(&mut self) {
447 if let Some(channel) = self.0.take() {
449 let mut inner = (channel.0).0.lock().unwrap();
450 inner.source = ChannelSourceState::Destroyed;
451 if let Some(ChannelBound { ref cond, .. }) = (channel.0).1 {
452 cond.notify_all();
453 }
454 }
455 }
456}
457
458impl<T> Receiver<T> {
459 pub fn attach<F: FnMut(T) -> Continue + 'static>(
469 mut self,
470 context: Option<&MainContext>,
471 func: F,
472 ) -> SourceId {
473 unsafe {
474 let channel = self.0.take().expect("Receiver without channel");
475
476 let source_funcs = Box::new(glib_sys::GSourceFuncs {
477 check: Some(check::<T>),
478 prepare: Some(prepare::<T>),
479 dispatch: Some(dispatch::<T, F>),
480 finalize: Some(finalize::<T, F>),
481 closure_callback: None,
482 closure_marshal: None,
483 });
484
485 let source = glib_sys::g_source_new(
486 mut_override(&*source_funcs),
487 mem::size_of::<ChannelSource<T, F>>() as u32,
488 ) as *mut ChannelSource<T, F>;
489 assert!(!source.is_null());
490
491 {
493 let source = &mut *source;
494 let mut inner = (channel.0).0.lock().unwrap();
495
496 glib_sys::g_source_set_priority(mut_override(&source.source), self.1.to_glib());
497
498 glib_sys::g_source_set_ready_time(
500 mut_override(&source.source),
501 if !inner.queue.is_empty() || Arc::strong_count(&channel.0) == 1 {
502 0
503 } else {
504 -1
505 },
506 );
507 inner.source = ChannelSourceState::Attached(&mut source.source);
508 }
509
510 {
512 let source = &mut *source;
513 source.thread_id = get_thread_id();
514 ptr::write(&mut source.channel, Some(channel));
515 ptr::write(&mut source.callback, Some(RefCell::new(func)));
516 ptr::write(&mut source.source_funcs, Some(source_funcs));
517 }
518
519 let source = Source::from_glib_full(mut_override(&(*source).source));
520 if let Some(context) = context {
521 assert!(context.is_owner());
522 source.attach(Some(context))
523 } else {
524 let context = MainContext::ref_thread_default();
525 assert!(context.is_owner());
526 source.attach(Some(&context))
527 }
528 }
529 }
530}
531
532impl MainContext {
533 pub fn channel<T>(priority: Priority) -> (Sender<T>, Receiver<T>) {
547 let channel = Channel::new(None);
548 let receiver = Receiver(Some(channel.clone()), priority);
549 let sender = Sender(Some(channel));
550
551 (sender, receiver)
552 }
553
554 pub fn sync_channel<T>(priority: Priority, bound: usize) -> (SyncSender<T>, Receiver<T>) {
569 let channel = Channel::new(Some(bound));
570 let receiver = Receiver(Some(channel.clone()), priority);
571 let sender = SyncSender(Some(channel));
572
573 (sender, receiver)
574 }
575}
576
577#[cfg(test)]
578mod tests {
579 use super::*;
580 use std::cell::RefCell;
581 use std::rc::Rc;
582 use std::thread;
583 use std::time;
584 use MainLoop;
585
586 #[test]
587 fn test_channel() {
588 let c = MainContext::new();
589 let l = MainLoop::new(Some(&c), false);
590
591 c.acquire();
592
593 let (sender, receiver) = MainContext::channel(Priority::default());
594
595 let sum = Rc::new(RefCell::new(0));
596 let sum_clone = sum.clone();
597 let l_clone = l.clone();
598 receiver.attach(Some(&c), move |item| {
599 *sum_clone.borrow_mut() += item;
600 if *sum_clone.borrow() == 6 {
601 l_clone.quit();
602 Continue(false)
603 } else {
604 Continue(true)
605 }
606 });
607
608 sender.send(1).unwrap();
609 sender.send(2).unwrap();
610 sender.send(3).unwrap();
611
612 l.run();
613
614 assert_eq!(*sum.borrow(), 6);
615 }
616
617 #[test]
618 fn test_drop_sender() {
619 let c = MainContext::new();
620 let l = MainLoop::new(Some(&c), false);
621
622 c.acquire();
623
624 let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
625
626 struct Helper(MainLoop);
627 impl Drop for Helper {
628 fn drop(&mut self) {
629 self.0.quit();
630 }
631 }
632
633 let helper = Helper(l.clone());
634 receiver.attach(Some(&c), move |_| {
635 let _ = helper;
636
637 Continue(true)
638 });
639
640 drop(sender);
641
642 l.run();
643 }
644
645 #[test]
646 fn test_drop_receiver() {
647 let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
648
649 drop(receiver);
650 assert_eq!(sender.send(1), Err(mpsc::SendError(1)));
651 }
652
653 #[test]
654 fn test_remove_receiver() {
655 let c = MainContext::new();
656
657 c.acquire();
658
659 let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
660
661 let source_id = receiver.attach(Some(&c), move |_| Continue(true));
662
663 let source = c.find_source_by_id(&source_id).unwrap();
664 source.destroy();
665
666 assert_eq!(sender.send(1), Err(mpsc::SendError(1)));
667 }
668
669 #[test]
670 fn test_remove_receiver_and_drop_source() {
671 let c = MainContext::new();
672
673 c.acquire();
674
675 let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
676
677 struct Helper(Arc<Mutex<bool>>);
678 impl Drop for Helper {
679 fn drop(&mut self) {
680 *self.0.lock().unwrap() = true;
681 }
682 }
683
684 let dropped = Arc::new(Mutex::new(false));
685 let helper = Helper(dropped.clone());
686 let source_id = receiver.attach(Some(&c), move |_| {
687 let _helper = &helper;
688 Continue(true)
689 });
690
691 let source = c.find_source_by_id(&source_id).unwrap();
692 source.destroy();
693
694 drop(source);
696
697 assert_eq!(*dropped.lock().unwrap(), true);
698 assert_eq!(sender.send(1), Err(mpsc::SendError(1)));
699 }
700
701 #[test]
702 fn test_sync_channel() {
703 let c = MainContext::new();
704 let l = MainLoop::new(Some(&c), false);
705
706 c.acquire();
707
708 let (sender, receiver) = MainContext::sync_channel(Priority::default(), 2);
709
710 let sum = Rc::new(RefCell::new(0));
711 let sum_clone = sum.clone();
712 let l_clone = l.clone();
713 receiver.attach(Some(&c), move |item| {
714 *sum_clone.borrow_mut() += item;
715 if *sum_clone.borrow() == 6 {
716 l_clone.quit();
717 Continue(false)
718 } else {
719 Continue(true)
720 }
721 });
722
723 let (wait_sender, wait_receiver) = mpsc::channel();
724
725 let thread = thread::spawn(move || {
726 sender.try_send(1).unwrap();
728 sender.try_send(2).unwrap();
729
730 assert!(sender.try_send(3).is_err());
732 wait_sender.send(()).unwrap();
733
734 sender.send(3).unwrap();
736 });
737
738 let _ = wait_receiver.recv().unwrap();
742 thread::sleep(time::Duration::from_millis(50));
743 l.run();
744
745 thread.join().unwrap();
746
747 assert_eq!(*sum.borrow(), 6);
748 }
749
750 #[test]
751 fn test_sync_channel_drop_wakeup() {
752 let c = MainContext::new();
753 let l = MainLoop::new(Some(&c), false);
754
755 c.acquire();
756
757 let (sender, receiver) = MainContext::sync_channel(Priority::default(), 3);
758
759 let sum = Rc::new(RefCell::new(0));
760 let sum_clone = sum.clone();
761 let l_clone = l.clone();
762 receiver.attach(Some(&c), move |item| {
763 *sum_clone.borrow_mut() += item;
764 if *sum_clone.borrow() == 6 {
765 l_clone.quit();
766 Continue(false)
767 } else {
768 Continue(true)
769 }
770 });
771
772 let (wait_sender, wait_receiver) = mpsc::channel();
773
774 let thread = thread::spawn(move || {
775 sender.try_send(1).unwrap();
777 sender.try_send(2).unwrap();
778 sender.try_send(3).unwrap();
779
780 wait_sender.send(()).unwrap();
781 for i in 4.. {
782 if let Err(_) = sender.send(i) {
785 break;
786 }
787 }
788 });
789
790 let _ = wait_receiver.recv().unwrap();
794 thread::sleep(time::Duration::from_millis(50));
795 l.run();
796
797 thread.join().unwrap();
798
799 assert_eq!(*sum.borrow(), 6);
800 }
801
802 #[test]
803 fn test_sync_channel_drop_receiver_wakeup() {
804 let c = MainContext::new();
805
806 c.acquire();
807
808 let (sender, receiver) = MainContext::sync_channel(Priority::default(), 2);
809
810 let (wait_sender, wait_receiver) = mpsc::channel();
811
812 let thread = thread::spawn(move || {
813 sender.try_send(1).unwrap();
815 sender.try_send(2).unwrap();
816 wait_sender.send(()).unwrap();
817
818 assert!(sender.send(3).is_err());
820 });
821
822 let _ = wait_receiver.recv().unwrap();
826 thread::sleep(time::Duration::from_millis(50));
827 drop(receiver);
828 thread.join().unwrap();
829 }
830
831 #[test]
832 fn test_sync_channel_rendezvous() {
833 let c = MainContext::new();
834 let l = MainLoop::new(Some(&c), false);
835
836 c.acquire();
837
838 let (sender, receiver) = MainContext::sync_channel(Priority::default(), 0);
839
840 let (wait_sender, wait_receiver) = mpsc::channel();
841
842 let thread = thread::spawn(move || {
843 wait_sender.send(()).unwrap();
844 sender.send(1).unwrap();
845 wait_sender.send(()).unwrap();
846 sender.send(2).unwrap();
847 wait_sender.send(()).unwrap();
848 sender.send(3).unwrap();
849 wait_sender.send(()).unwrap();
850 });
851
852 let _ = wait_receiver.recv().unwrap();
856 assert_eq!(
857 wait_receiver.recv_timeout(time::Duration::from_millis(50)),
858 Err(mpsc::RecvTimeoutError::Timeout)
859 );
860
861 let sum = Rc::new(RefCell::new(0));
862 let sum_clone = sum.clone();
863 let l_clone = l.clone();
864 receiver.attach(Some(&c), move |item| {
865 let _ = wait_receiver.recv().unwrap();
868 *sum_clone.borrow_mut() += item;
869 if *sum_clone.borrow() == 6 {
870 assert_eq!(
873 wait_receiver.recv_timeout(time::Duration::from_millis(50)),
874 Err(mpsc::RecvTimeoutError::Disconnected)
875 );
876 l_clone.quit();
877 Continue(false)
878 } else {
879 assert_eq!(
882 wait_receiver.recv_timeout(time::Duration::from_millis(50)),
883 Err(mpsc::RecvTimeoutError::Timeout)
884 );
885 Continue(true)
886 }
887 });
888 l.run();
889
890 thread.join().unwrap();
891
892 assert_eq!(*sum.borrow(), 6);
893 }
894}