blob: 0cea233936e02d0d1df49274a0f981e3a86580e6 [file] [log] [blame]
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001//! Tests copied from `std::sync::mpsc`.
2//!
3//! This is a copy of tests for the `std::sync::mpsc` channels from the standard library, but
4//! modified to work with `crossbeam-channel` instead.
5//!
6//! Minor tweaks were needed to make the tests compile:
7//!
8//! - Replace `box` syntax with `Box::new`.
9//! - Replace all uses of `Select` with `select!`.
10//! - Change the imports.
11//! - Join all spawned threads.
12//! - Removed assertion from oneshot_multi_thread_send_close_stress tests.
13//!
14//! Source:
15//! - https://github.com/rust-lang/rust/tree/master/src/libstd/sync/mpsc
16//!
17//! Copyright & License:
18//! - Copyright 2013-2014 The Rust Project Developers
19//! - Apache License, Version 2.0 or MIT license, at your option
20//! - https://github.com/rust-lang/rust/blob/master/COPYRIGHT
21//! - https://www.rust-lang.org/en-US/legal.html
22
Jeff Vander Stoep109440f2024-02-01 13:35:38 +010023#![allow(clippy::match_single_binding, clippy::redundant_clone)]
David LeGare54fc8482022-03-01 18:58:39 +000024
Jakub Kotur2b588ff2020-12-21 17:28:14 +010025use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError};
26use std::sync::mpsc::{SendError, TrySendError};
27use std::thread::JoinHandle;
28use std::time::Duration;
29
30use crossbeam_channel as cc;
31
32pub struct Sender<T> {
33 pub inner: cc::Sender<T>,
34}
35
36impl<T> Sender<T> {
37 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
38 self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
39 }
40}
41
42impl<T> Clone for Sender<T> {
43 fn clone(&self) -> Sender<T> {
44 Sender {
45 inner: self.inner.clone(),
46 }
47 }
48}
49
50pub struct SyncSender<T> {
51 pub inner: cc::Sender<T>,
52}
53
54impl<T> SyncSender<T> {
55 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
56 self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
57 }
58
59 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
60 self.inner.try_send(t).map_err(|err| match err {
61 cc::TrySendError::Full(m) => TrySendError::Full(m),
62 cc::TrySendError::Disconnected(m) => TrySendError::Disconnected(m),
63 })
64 }
65}
66
67impl<T> Clone for SyncSender<T> {
68 fn clone(&self) -> SyncSender<T> {
69 SyncSender {
70 inner: self.inner.clone(),
71 }
72 }
73}
74
75pub struct Receiver<T> {
76 pub inner: cc::Receiver<T>,
77}
78
79impl<T> Receiver<T> {
80 pub fn try_recv(&self) -> Result<T, TryRecvError> {
81 self.inner.try_recv().map_err(|err| match err {
82 cc::TryRecvError::Empty => TryRecvError::Empty,
83 cc::TryRecvError::Disconnected => TryRecvError::Disconnected,
84 })
85 }
86
87 pub fn recv(&self) -> Result<T, RecvError> {
88 self.inner.recv().map_err(|_| RecvError)
89 }
90
91 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
92 self.inner.recv_timeout(timeout).map_err(|err| match err {
93 cc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
94 cc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
95 })
96 }
97
98 pub fn iter(&self) -> Iter<T> {
99 Iter { inner: self }
100 }
101
102 pub fn try_iter(&self) -> TryIter<T> {
103 TryIter { inner: self }
104 }
105}
106
107impl<'a, T> IntoIterator for &'a Receiver<T> {
108 type Item = T;
109 type IntoIter = Iter<'a, T>;
110
111 fn into_iter(self) -> Iter<'a, T> {
112 self.iter()
113 }
114}
115
116impl<T> IntoIterator for Receiver<T> {
117 type Item = T;
118 type IntoIter = IntoIter<T>;
119
120 fn into_iter(self) -> IntoIter<T> {
121 IntoIter { inner: self }
122 }
123}
124
125pub struct TryIter<'a, T: 'a> {
126 inner: &'a Receiver<T>,
127}
128
129impl<'a, T> Iterator for TryIter<'a, T> {
130 type Item = T;
131
132 fn next(&mut self) -> Option<T> {
133 self.inner.try_recv().ok()
134 }
135}
136
137pub struct Iter<'a, T: 'a> {
138 inner: &'a Receiver<T>,
139}
140
141impl<'a, T> Iterator for Iter<'a, T> {
142 type Item = T;
143
144 fn next(&mut self) -> Option<T> {
145 self.inner.recv().ok()
146 }
147}
148
149pub struct IntoIter<T> {
150 inner: Receiver<T>,
151}
152
153impl<T> Iterator for IntoIter<T> {
154 type Item = T;
155
156 fn next(&mut self) -> Option<T> {
157 self.inner.recv().ok()
158 }
159}
160
161pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
162 let (s, r) = cc::unbounded();
163 let s = Sender { inner: s };
164 let r = Receiver { inner: r };
165 (s, r)
166}
167
168pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
169 let (s, r) = cc::bounded(bound);
170 let s = SyncSender { inner: s };
171 let r = Receiver { inner: r };
172 (s, r)
173}
174
175macro_rules! select {
176 (
177 $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
178 ) => ({
179 cc::crossbeam_channel_internal! {
180 $(
David LeGare54fc8482022-03-01 18:58:39 +0000181 $meth(($rx).inner) -> res => {
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100182 let $name = res.map_err(|_| ::std::sync::mpsc::RecvError);
183 $code
184 }
185 )+
186 }
187 })
188}
189
190// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
191mod channel_tests {
192 use super::*;
193
194 use std::env;
195 use std::thread;
196 use std::time::{Duration, Instant};
197
198 pub fn stress_factor() -> usize {
199 match env::var("RUST_TEST_STRESS") {
200 Ok(val) => val.parse().unwrap(),
201 Err(..) => 1,
202 }
203 }
204
205 #[test]
206 fn smoke() {
207 let (tx, rx) = channel::<i32>();
208 tx.send(1).unwrap();
209 assert_eq!(rx.recv().unwrap(), 1);
210 }
211
212 #[test]
213 fn drop_full() {
214 let (tx, _rx) = channel::<Box<isize>>();
215 tx.send(Box::new(1)).unwrap();
216 }
217
218 #[test]
219 fn drop_full_shared() {
220 let (tx, _rx) = channel::<Box<isize>>();
221 drop(tx.clone());
222 drop(tx.clone());
223 tx.send(Box::new(1)).unwrap();
224 }
225
226 #[test]
227 fn smoke_shared() {
228 let (tx, rx) = channel::<i32>();
229 tx.send(1).unwrap();
230 assert_eq!(rx.recv().unwrap(), 1);
231 let tx = tx.clone();
232 tx.send(1).unwrap();
233 assert_eq!(rx.recv().unwrap(), 1);
234 }
235
236 #[test]
237 fn smoke_threads() {
238 let (tx, rx) = channel::<i32>();
239 let t = thread::spawn(move || {
240 tx.send(1).unwrap();
241 });
242 assert_eq!(rx.recv().unwrap(), 1);
243 t.join().unwrap();
244 }
245
246 #[test]
247 fn smoke_port_gone() {
248 let (tx, rx) = channel::<i32>();
249 drop(rx);
250 assert!(tx.send(1).is_err());
251 }
252
253 #[test]
254 fn smoke_shared_port_gone() {
255 let (tx, rx) = channel::<i32>();
256 drop(rx);
257 assert!(tx.send(1).is_err())
258 }
259
260 #[test]
261 fn smoke_shared_port_gone2() {
262 let (tx, rx) = channel::<i32>();
263 drop(rx);
264 let tx2 = tx.clone();
265 drop(tx);
266 assert!(tx2.send(1).is_err());
267 }
268
269 #[test]
270 fn port_gone_concurrent() {
271 let (tx, rx) = channel::<i32>();
272 let t = thread::spawn(move || {
273 rx.recv().unwrap();
274 });
275 while tx.send(1).is_ok() {}
276 t.join().unwrap();
277 }
278
279 #[test]
280 fn port_gone_concurrent_shared() {
281 let (tx, rx) = channel::<i32>();
282 let tx2 = tx.clone();
283 let t = thread::spawn(move || {
284 rx.recv().unwrap();
285 });
286 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
287 t.join().unwrap();
288 }
289
290 #[test]
291 fn smoke_chan_gone() {
292 let (tx, rx) = channel::<i32>();
293 drop(tx);
294 assert!(rx.recv().is_err());
295 }
296
297 #[test]
298 fn smoke_chan_gone_shared() {
299 let (tx, rx) = channel::<()>();
300 let tx2 = tx.clone();
301 drop(tx);
302 drop(tx2);
303 assert!(rx.recv().is_err());
304 }
305
306 #[test]
307 fn chan_gone_concurrent() {
308 let (tx, rx) = channel::<i32>();
309 let t = thread::spawn(move || {
310 tx.send(1).unwrap();
311 tx.send(1).unwrap();
312 });
313 while rx.recv().is_ok() {}
314 t.join().unwrap();
315 }
316
317 #[test]
318 fn stress() {
David LeGare54fc8482022-03-01 18:58:39 +0000319 #[cfg(miri)]
David LeGare45db9642022-06-28 21:31:41 +0000320 const COUNT: usize = 100;
David LeGare54fc8482022-03-01 18:58:39 +0000321 #[cfg(not(miri))]
322 const COUNT: usize = 10000;
323
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100324 let (tx, rx) = channel::<i32>();
325 let t = thread::spawn(move || {
David LeGare54fc8482022-03-01 18:58:39 +0000326 for _ in 0..COUNT {
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100327 tx.send(1).unwrap();
328 }
329 });
David LeGare54fc8482022-03-01 18:58:39 +0000330 for _ in 0..COUNT {
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100331 assert_eq!(rx.recv().unwrap(), 1);
332 }
333 t.join().ok().unwrap();
334 }
335
336 #[test]
337 fn stress_shared() {
Jeff Vander Stoepeb774e82022-12-08 15:18:00 +0100338 let amt: u32 = if cfg!(miri) { 100 } else { 10_000 };
339 let nthreads: u32 = if cfg!(miri) { 4 } else { 8 };
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100340 let (tx, rx) = channel::<i32>();
341
342 let t = thread::spawn(move || {
Jeff Vander Stoepeb774e82022-12-08 15:18:00 +0100343 for _ in 0..amt * nthreads {
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100344 assert_eq!(rx.recv().unwrap(), 1);
345 }
David LeGare54fc8482022-03-01 18:58:39 +0000346 assert!(rx.try_recv().is_err());
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100347 });
348
Jeff Vander Stoepeb774e82022-12-08 15:18:00 +0100349 let mut ts = Vec::with_capacity(nthreads as usize);
350 for _ in 0..nthreads {
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100351 let tx = tx.clone();
352 let t = thread::spawn(move || {
Jeff Vander Stoepeb774e82022-12-08 15:18:00 +0100353 for _ in 0..amt {
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100354 tx.send(1).unwrap();
355 }
356 });
357 ts.push(t);
358 }
359 drop(tx);
360 t.join().ok().unwrap();
361 for t in ts {
362 t.join().unwrap();
363 }
364 }
365
366 #[test]
367 fn send_from_outside_runtime() {
368 let (tx1, rx1) = channel::<()>();
369 let (tx2, rx2) = channel::<i32>();
370 let t1 = thread::spawn(move || {
371 tx1.send(()).unwrap();
372 for _ in 0..40 {
373 assert_eq!(rx2.recv().unwrap(), 1);
374 }
375 });
376 rx1.recv().unwrap();
377 let t2 = thread::spawn(move || {
378 for _ in 0..40 {
379 tx2.send(1).unwrap();
380 }
381 });
382 t1.join().ok().unwrap();
383 t2.join().ok().unwrap();
384 }
385
386 #[test]
387 fn recv_from_outside_runtime() {
388 let (tx, rx) = channel::<i32>();
389 let t = thread::spawn(move || {
390 for _ in 0..40 {
391 assert_eq!(rx.recv().unwrap(), 1);
392 }
393 });
394 for _ in 0..40 {
395 tx.send(1).unwrap();
396 }
397 t.join().ok().unwrap();
398 }
399
400 #[test]
401 fn no_runtime() {
402 let (tx1, rx1) = channel::<i32>();
403 let (tx2, rx2) = channel::<i32>();
404 let t1 = thread::spawn(move || {
405 assert_eq!(rx1.recv().unwrap(), 1);
406 tx2.send(2).unwrap();
407 });
408 let t2 = thread::spawn(move || {
409 tx1.send(1).unwrap();
410 assert_eq!(rx2.recv().unwrap(), 2);
411 });
412 t1.join().ok().unwrap();
413 t2.join().ok().unwrap();
414 }
415
416 #[test]
417 fn oneshot_single_thread_close_port_first() {
418 // Simple test of closing without sending
419 let (_tx, rx) = channel::<i32>();
420 drop(rx);
421 }
422
423 #[test]
424 fn oneshot_single_thread_close_chan_first() {
425 // Simple test of closing without sending
426 let (tx, _rx) = channel::<i32>();
427 drop(tx);
428 }
429
430 #[test]
431 fn oneshot_single_thread_send_port_close() {
432 // Testing that the sender cleans up the payload if receiver is closed
433 let (tx, rx) = channel::<Box<i32>>();
434 drop(rx);
435 assert!(tx.send(Box::new(0)).is_err());
436 }
437
438 #[test]
439 fn oneshot_single_thread_recv_chan_close() {
440 let (tx, rx) = channel::<i32>();
441 drop(tx);
442 assert_eq!(rx.recv(), Err(RecvError));
443 }
444
445 #[test]
446 fn oneshot_single_thread_send_then_recv() {
447 let (tx, rx) = channel::<Box<i32>>();
448 tx.send(Box::new(10)).unwrap();
449 assert!(*rx.recv().unwrap() == 10);
450 }
451
452 #[test]
453 fn oneshot_single_thread_try_send_open() {
454 let (tx, rx) = channel::<i32>();
455 assert!(tx.send(10).is_ok());
456 assert!(rx.recv().unwrap() == 10);
457 }
458
459 #[test]
460 fn oneshot_single_thread_try_send_closed() {
461 let (tx, rx) = channel::<i32>();
462 drop(rx);
463 assert!(tx.send(10).is_err());
464 }
465
466 #[test]
467 fn oneshot_single_thread_try_recv_open() {
468 let (tx, rx) = channel::<i32>();
469 tx.send(10).unwrap();
470 assert!(rx.recv() == Ok(10));
471 }
472
473 #[test]
474 fn oneshot_single_thread_try_recv_closed() {
475 let (tx, rx) = channel::<i32>();
476 drop(tx);
477 assert!(rx.recv().is_err());
478 }
479
480 #[test]
481 fn oneshot_single_thread_peek_data() {
482 let (tx, rx) = channel::<i32>();
483 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
484 tx.send(10).unwrap();
485 assert_eq!(rx.try_recv(), Ok(10));
486 }
487
488 #[test]
489 fn oneshot_single_thread_peek_close() {
490 let (tx, rx) = channel::<i32>();
491 drop(tx);
492 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
493 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
494 }
495
496 #[test]
497 fn oneshot_single_thread_peek_open() {
498 let (_tx, rx) = channel::<i32>();
499 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
500 }
501
502 #[test]
503 fn oneshot_multi_task_recv_then_send() {
504 let (tx, rx) = channel::<Box<i32>>();
505 let t = thread::spawn(move || {
506 assert!(*rx.recv().unwrap() == 10);
507 });
508
509 tx.send(Box::new(10)).unwrap();
510 t.join().unwrap();
511 }
512
513 #[test]
514 fn oneshot_multi_task_recv_then_close() {
515 let (tx, rx) = channel::<Box<i32>>();
516 let t = thread::spawn(move || {
517 drop(tx);
518 });
519 thread::spawn(move || {
520 assert_eq!(rx.recv(), Err(RecvError));
521 })
522 .join()
523 .unwrap();
524 t.join().unwrap();
525 }
526
527 #[test]
528 fn oneshot_multi_thread_close_stress() {
529 let stress_factor = stress_factor();
530 let mut ts = Vec::with_capacity(stress_factor);
531 for _ in 0..stress_factor {
532 let (tx, rx) = channel::<i32>();
533 let t = thread::spawn(move || {
534 drop(rx);
535 });
536 ts.push(t);
537 drop(tx);
538 }
539 for t in ts {
540 t.join().unwrap();
541 }
542 }
543
544 #[test]
545 fn oneshot_multi_thread_send_close_stress() {
546 let stress_factor = stress_factor();
547 let mut ts = Vec::with_capacity(2 * stress_factor);
548 for _ in 0..stress_factor {
549 let (tx, rx) = channel::<i32>();
550 let t = thread::spawn(move || {
551 drop(rx);
552 });
553 ts.push(t);
554 thread::spawn(move || {
555 let _ = tx.send(1);
556 })
557 .join()
558 .unwrap();
559 }
560 for t in ts {
561 t.join().unwrap();
562 }
563 }
564
565 #[test]
566 fn oneshot_multi_thread_recv_close_stress() {
567 let stress_factor = stress_factor();
568 let mut ts = Vec::with_capacity(2 * stress_factor);
569 for _ in 0..stress_factor {
570 let (tx, rx) = channel::<i32>();
571 let t = thread::spawn(move || {
572 thread::spawn(move || {
573 assert_eq!(rx.recv(), Err(RecvError));
574 })
575 .join()
576 .unwrap();
577 });
578 ts.push(t);
579 let t2 = thread::spawn(move || {
580 let t = thread::spawn(move || {
581 drop(tx);
582 });
583 t.join().unwrap();
584 });
585 ts.push(t2);
586 }
587 for t in ts {
588 t.join().unwrap();
589 }
590 }
591
592 #[test]
593 fn oneshot_multi_thread_send_recv_stress() {
594 let stress_factor = stress_factor();
595 let mut ts = Vec::with_capacity(stress_factor);
596 for _ in 0..stress_factor {
597 let (tx, rx) = channel::<Box<isize>>();
598 let t = thread::spawn(move || {
599 tx.send(Box::new(10)).unwrap();
600 });
601 ts.push(t);
602 assert!(*rx.recv().unwrap() == 10);
603 }
604 for t in ts {
605 t.join().unwrap();
606 }
607 }
608
609 #[test]
610 fn stream_send_recv_stress() {
611 let stress_factor = stress_factor();
612 let mut ts = Vec::with_capacity(2 * stress_factor);
613 for _ in 0..stress_factor {
614 let (tx, rx) = channel();
615
616 if let Some(t) = send(tx, 0) {
617 ts.push(t);
618 }
619 if let Some(t2) = recv(rx, 0) {
620 ts.push(t2);
621 }
622
623 fn send(tx: Sender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
624 if i == 10 {
625 return None;
626 }
627
628 Some(thread::spawn(move || {
629 tx.send(Box::new(i)).unwrap();
630 send(tx, i + 1);
631 }))
632 }
633
634 fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
635 if i == 10 {
636 return None;
637 }
638
639 Some(thread::spawn(move || {
640 assert!(*rx.recv().unwrap() == i);
641 recv(rx, i + 1);
642 }))
643 }
644 }
645 for t in ts {
646 t.join().unwrap();
647 }
648 }
649
650 #[test]
651 fn oneshot_single_thread_recv_timeout() {
652 let (tx, rx) = channel();
653 tx.send(()).unwrap();
654 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
655 assert_eq!(
656 rx.recv_timeout(Duration::from_millis(1)),
657 Err(RecvTimeoutError::Timeout)
658 );
659 tx.send(()).unwrap();
660 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
661 }
662
663 #[test]
664 fn stress_recv_timeout_two_threads() {
665 let (tx, rx) = channel();
666 let stress = stress_factor() + 100;
667 let timeout = Duration::from_millis(100);
668
669 let t = thread::spawn(move || {
670 for i in 0..stress {
671 if i % 2 == 0 {
672 thread::sleep(timeout * 2);
673 }
674 tx.send(1usize).unwrap();
675 }
676 });
677
678 let mut recv_count = 0;
679 loop {
680 match rx.recv_timeout(timeout) {
681 Ok(n) => {
682 assert_eq!(n, 1usize);
683 recv_count += 1;
684 }
685 Err(RecvTimeoutError::Timeout) => continue,
686 Err(RecvTimeoutError::Disconnected) => break,
687 }
688 }
689
690 assert_eq!(recv_count, stress);
691 t.join().unwrap()
692 }
693
694 #[test]
695 fn recv_timeout_upgrade() {
696 let (tx, rx) = channel::<()>();
697 let timeout = Duration::from_millis(1);
698 let _tx_clone = tx.clone();
699
700 let start = Instant::now();
701 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
702 assert!(Instant::now() >= start + timeout);
703 }
704
705 #[test]
706 fn stress_recv_timeout_shared() {
707 let (tx, rx) = channel();
708 let stress = stress_factor() + 100;
709
710 let mut ts = Vec::with_capacity(stress);
711 for i in 0..stress {
712 let tx = tx.clone();
713 let t = thread::spawn(move || {
714 thread::sleep(Duration::from_millis(i as u64 * 10));
715 tx.send(1usize).unwrap();
716 });
717 ts.push(t);
718 }
719
720 drop(tx);
721
722 let mut recv_count = 0;
723 loop {
724 match rx.recv_timeout(Duration::from_millis(10)) {
725 Ok(n) => {
726 assert_eq!(n, 1usize);
727 recv_count += 1;
728 }
729 Err(RecvTimeoutError::Timeout) => continue,
730 Err(RecvTimeoutError::Disconnected) => break,
731 }
732 }
733
734 assert_eq!(recv_count, stress);
735 for t in ts {
736 t.join().unwrap();
737 }
738 }
739
740 #[test]
741 fn recv_a_lot() {
David LeGare54fc8482022-03-01 18:58:39 +0000742 #[cfg(miri)]
David LeGare45db9642022-06-28 21:31:41 +0000743 const N: usize = 50;
David LeGare54fc8482022-03-01 18:58:39 +0000744 #[cfg(not(miri))]
745 const N: usize = 10000;
746
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100747 // Regression test that we don't run out of stack in scheduler context
748 let (tx, rx) = channel();
David LeGare54fc8482022-03-01 18:58:39 +0000749 for _ in 0..N {
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100750 tx.send(()).unwrap();
751 }
David LeGare54fc8482022-03-01 18:58:39 +0000752 for _ in 0..N {
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100753 rx.recv().unwrap();
754 }
755 }
756
757 #[test]
758 fn shared_recv_timeout() {
759 let (tx, rx) = channel();
760 let total = 5;
761 let mut ts = Vec::with_capacity(total);
762 for _ in 0..total {
763 let tx = tx.clone();
764 let t = thread::spawn(move || {
765 tx.send(()).unwrap();
766 });
767 ts.push(t);
768 }
769
770 for _ in 0..total {
771 rx.recv().unwrap();
772 }
773
774 assert_eq!(
775 rx.recv_timeout(Duration::from_millis(1)),
776 Err(RecvTimeoutError::Timeout)
777 );
778 tx.send(()).unwrap();
779 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
780 for t in ts {
781 t.join().unwrap();
782 }
783 }
784
785 #[test]
786 fn shared_chan_stress() {
787 let (tx, rx) = channel();
788 let total = stress_factor() + 100;
789 let mut ts = Vec::with_capacity(total);
790 for _ in 0..total {
791 let tx = tx.clone();
792 let t = thread::spawn(move || {
793 tx.send(()).unwrap();
794 });
795 ts.push(t);
796 }
797
798 for _ in 0..total {
799 rx.recv().unwrap();
800 }
801 for t in ts {
802 t.join().unwrap();
803 }
804 }
805
806 #[test]
807 fn test_nested_recv_iter() {
808 let (tx, rx) = channel::<i32>();
809 let (total_tx, total_rx) = channel::<i32>();
810
811 let t = thread::spawn(move || {
812 let mut acc = 0;
813 for x in rx.iter() {
814 acc += x;
815 }
816 total_tx.send(acc).unwrap();
817 });
818
819 tx.send(3).unwrap();
820 tx.send(1).unwrap();
821 tx.send(2).unwrap();
822 drop(tx);
823 assert_eq!(total_rx.recv().unwrap(), 6);
824 t.join().unwrap();
825 }
826
827 #[test]
828 fn test_recv_iter_break() {
829 let (tx, rx) = channel::<i32>();
830 let (count_tx, count_rx) = channel();
831
832 let t = thread::spawn(move || {
833 let mut count = 0;
834 for x in rx.iter() {
835 if count >= 3 {
836 break;
837 } else {
838 count += x;
839 }
840 }
841 count_tx.send(count).unwrap();
842 });
843
844 tx.send(2).unwrap();
845 tx.send(2).unwrap();
846 tx.send(2).unwrap();
847 let _ = tx.send(2);
848 drop(tx);
849 assert_eq!(count_rx.recv().unwrap(), 4);
850 t.join().unwrap();
851 }
852
853 #[test]
854 fn test_recv_try_iter() {
855 let (request_tx, request_rx) = channel();
856 let (response_tx, response_rx) = channel();
857
858 // Request `x`s until we have `6`.
859 let t = thread::spawn(move || {
860 let mut count = 0;
861 loop {
862 for x in response_rx.try_iter() {
863 count += x;
864 if count == 6 {
865 return count;
866 }
867 }
868 request_tx.send(()).unwrap();
869 }
870 });
871
872 for _ in request_rx.iter() {
873 if response_tx.send(2).is_err() {
874 break;
875 }
876 }
877
878 assert_eq!(t.join().unwrap(), 6);
879 }
880
881 #[test]
882 fn test_recv_into_iter_owned() {
883 let mut iter = {
884 let (tx, rx) = channel::<i32>();
885 tx.send(1).unwrap();
886 tx.send(2).unwrap();
887
888 rx.into_iter()
889 };
890 assert_eq!(iter.next().unwrap(), 1);
891 assert_eq!(iter.next().unwrap(), 2);
David LeGare54fc8482022-03-01 18:58:39 +0000892 assert!(iter.next().is_none());
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100893 }
894
895 #[test]
896 fn test_recv_into_iter_borrowed() {
897 let (tx, rx) = channel::<i32>();
898 tx.send(1).unwrap();
899 tx.send(2).unwrap();
900 drop(tx);
901 let mut iter = (&rx).into_iter();
902 assert_eq!(iter.next().unwrap(), 1);
903 assert_eq!(iter.next().unwrap(), 2);
David LeGare54fc8482022-03-01 18:58:39 +0000904 assert!(iter.next().is_none());
Jakub Kotur2b588ff2020-12-21 17:28:14 +0100905 }
906
907 #[test]
908 fn try_recv_states() {
909 let (tx1, rx1) = channel::<i32>();
910 let (tx2, rx2) = channel::<()>();
911 let (tx3, rx3) = channel::<()>();
912 let t = thread::spawn(move || {
913 rx2.recv().unwrap();
914 tx1.send(1).unwrap();
915 tx3.send(()).unwrap();
916 rx2.recv().unwrap();
917 drop(tx1);
918 tx3.send(()).unwrap();
919 });
920
921 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
922 tx2.send(()).unwrap();
923 rx3.recv().unwrap();
924 assert_eq!(rx1.try_recv(), Ok(1));
925 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
926 tx2.send(()).unwrap();
927 rx3.recv().unwrap();
928 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
929 t.join().unwrap();
930 }
931
932 // This bug used to end up in a livelock inside of the Receiver destructor
933 // because the internal state of the Shared packet was corrupted
934 #[test]
935 fn destroy_upgraded_shared_port_when_sender_still_active() {
936 let (tx, rx) = channel();
937 let (tx2, rx2) = channel();
938 let t = thread::spawn(move || {
939 rx.recv().unwrap(); // wait on a oneshot
940 drop(rx); // destroy a shared
941 tx2.send(()).unwrap();
942 });
943 // make sure the other thread has gone to sleep
944 for _ in 0..5000 {
945 thread::yield_now();
946 }
947
948 // upgrade to a shared chan and send a message
949 let tx2 = tx.clone();
950 drop(tx);
951 tx2.send(()).unwrap();
952
953 // wait for the child thread to exit before we exit
954 rx2.recv().unwrap();
955 t.join().unwrap();
956 }
957
958 #[test]
959 fn issue_32114() {
960 let (tx, _) = channel();
961 let _ = tx.send(123);
962 assert_eq!(tx.send(123), Err(SendError(123)));
963 }
964}
965
966// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
967mod sync_channel_tests {
968 use super::*;
969
970 use std::env;
971 use std::thread;
972 use std::time::Duration;
973
974 pub fn stress_factor() -> usize {
975 match env::var("RUST_TEST_STRESS") {
976 Ok(val) => val.parse().unwrap(),
977 Err(..) => 1,
978 }
979 }
980
981 #[test]
982 fn smoke() {
983 let (tx, rx) = sync_channel::<i32>(1);
984 tx.send(1).unwrap();
985 assert_eq!(rx.recv().unwrap(), 1);
986 }
987
988 #[test]
989 fn drop_full() {
990 let (tx, _rx) = sync_channel::<Box<isize>>(1);
991 tx.send(Box::new(1)).unwrap();
992 }
993
994 #[test]
995 fn smoke_shared() {
996 let (tx, rx) = sync_channel::<i32>(1);
997 tx.send(1).unwrap();
998 assert_eq!(rx.recv().unwrap(), 1);
999 let tx = tx.clone();
1000 tx.send(1).unwrap();
1001 assert_eq!(rx.recv().unwrap(), 1);
1002 }
1003
1004 #[test]
1005 fn recv_timeout() {
1006 let (tx, rx) = sync_channel::<i32>(1);
1007 assert_eq!(
1008 rx.recv_timeout(Duration::from_millis(1)),
1009 Err(RecvTimeoutError::Timeout)
1010 );
1011 tx.send(1).unwrap();
1012 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
1013 }
1014
1015 #[test]
1016 fn smoke_threads() {
1017 let (tx, rx) = sync_channel::<i32>(0);
1018 let t = thread::spawn(move || {
1019 tx.send(1).unwrap();
1020 });
1021 assert_eq!(rx.recv().unwrap(), 1);
1022 t.join().unwrap();
1023 }
1024
1025 #[test]
1026 fn smoke_port_gone() {
1027 let (tx, rx) = sync_channel::<i32>(0);
1028 drop(rx);
1029 assert!(tx.send(1).is_err());
1030 }
1031
1032 #[test]
1033 fn smoke_shared_port_gone2() {
1034 let (tx, rx) = sync_channel::<i32>(0);
1035 drop(rx);
1036 let tx2 = tx.clone();
1037 drop(tx);
1038 assert!(tx2.send(1).is_err());
1039 }
1040
1041 #[test]
1042 fn port_gone_concurrent() {
1043 let (tx, rx) = sync_channel::<i32>(0);
1044 let t = thread::spawn(move || {
1045 rx.recv().unwrap();
1046 });
1047 while tx.send(1).is_ok() {}
1048 t.join().unwrap();
1049 }
1050
1051 #[test]
1052 fn port_gone_concurrent_shared() {
1053 let (tx, rx) = sync_channel::<i32>(0);
1054 let tx2 = tx.clone();
1055 let t = thread::spawn(move || {
1056 rx.recv().unwrap();
1057 });
1058 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1059 t.join().unwrap();
1060 }
1061
1062 #[test]
1063 fn smoke_chan_gone() {
1064 let (tx, rx) = sync_channel::<i32>(0);
1065 drop(tx);
1066 assert!(rx.recv().is_err());
1067 }
1068
1069 #[test]
1070 fn smoke_chan_gone_shared() {
1071 let (tx, rx) = sync_channel::<()>(0);
1072 let tx2 = tx.clone();
1073 drop(tx);
1074 drop(tx2);
1075 assert!(rx.recv().is_err());
1076 }
1077
1078 #[test]
1079 fn chan_gone_concurrent() {
1080 let (tx, rx) = sync_channel::<i32>(0);
1081 let t = thread::spawn(move || {
1082 tx.send(1).unwrap();
1083 tx.send(1).unwrap();
1084 });
1085 while rx.recv().is_ok() {}
1086 t.join().unwrap();
1087 }
1088
1089 #[test]
1090 fn stress() {
David LeGare54fc8482022-03-01 18:58:39 +00001091 #[cfg(miri)]
1092 const N: usize = 100;
1093 #[cfg(not(miri))]
1094 const N: usize = 10000;
1095
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001096 let (tx, rx) = sync_channel::<i32>(0);
1097 let t = thread::spawn(move || {
David LeGare54fc8482022-03-01 18:58:39 +00001098 for _ in 0..N {
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001099 tx.send(1).unwrap();
1100 }
1101 });
David LeGare54fc8482022-03-01 18:58:39 +00001102 for _ in 0..N {
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001103 assert_eq!(rx.recv().unwrap(), 1);
1104 }
1105 t.join().unwrap();
1106 }
1107
1108 #[test]
1109 fn stress_recv_timeout_two_threads() {
David LeGare54fc8482022-03-01 18:58:39 +00001110 #[cfg(miri)]
1111 const N: usize = 100;
1112 #[cfg(not(miri))]
1113 const N: usize = 10000;
1114
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001115 let (tx, rx) = sync_channel::<i32>(0);
1116
1117 let t = thread::spawn(move || {
David LeGare54fc8482022-03-01 18:58:39 +00001118 for _ in 0..N {
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001119 tx.send(1).unwrap();
1120 }
1121 });
1122
1123 let mut recv_count = 0;
1124 loop {
1125 match rx.recv_timeout(Duration::from_millis(1)) {
1126 Ok(v) => {
1127 assert_eq!(v, 1);
1128 recv_count += 1;
1129 }
1130 Err(RecvTimeoutError::Timeout) => continue,
1131 Err(RecvTimeoutError::Disconnected) => break,
1132 }
1133 }
1134
David LeGare54fc8482022-03-01 18:58:39 +00001135 assert_eq!(recv_count, N);
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001136 t.join().unwrap();
1137 }
1138
1139 #[test]
1140 fn stress_recv_timeout_shared() {
David LeGare54fc8482022-03-01 18:58:39 +00001141 #[cfg(miri)]
1142 const AMT: u32 = 100;
1143 #[cfg(not(miri))]
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001144 const AMT: u32 = 1000;
1145 const NTHREADS: u32 = 8;
1146 let (tx, rx) = sync_channel::<i32>(0);
1147 let (dtx, drx) = sync_channel::<()>(0);
1148
1149 let t = thread::spawn(move || {
1150 let mut recv_count = 0;
1151 loop {
1152 match rx.recv_timeout(Duration::from_millis(10)) {
1153 Ok(v) => {
1154 assert_eq!(v, 1);
1155 recv_count += 1;
1156 }
1157 Err(RecvTimeoutError::Timeout) => continue,
1158 Err(RecvTimeoutError::Disconnected) => break,
1159 }
1160 }
1161
1162 assert_eq!(recv_count, AMT * NTHREADS);
1163 assert!(rx.try_recv().is_err());
1164
1165 dtx.send(()).unwrap();
1166 });
1167
1168 let mut ts = Vec::with_capacity(NTHREADS as usize);
1169 for _ in 0..NTHREADS {
1170 let tx = tx.clone();
1171 let t = thread::spawn(move || {
1172 for _ in 0..AMT {
1173 tx.send(1).unwrap();
1174 }
1175 });
1176 ts.push(t);
1177 }
1178
1179 drop(tx);
1180
1181 drx.recv().unwrap();
1182 for t in ts {
1183 t.join().unwrap();
1184 }
1185 t.join().unwrap();
1186 }
1187
1188 #[test]
1189 fn stress_shared() {
David LeGare54fc8482022-03-01 18:58:39 +00001190 #[cfg(miri)]
1191 const AMT: u32 = 100;
1192 #[cfg(not(miri))]
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001193 const AMT: u32 = 1000;
1194 const NTHREADS: u32 = 8;
1195 let (tx, rx) = sync_channel::<i32>(0);
1196 let (dtx, drx) = sync_channel::<()>(0);
1197
1198 let t = thread::spawn(move || {
1199 for _ in 0..AMT * NTHREADS {
1200 assert_eq!(rx.recv().unwrap(), 1);
1201 }
David LeGare54fc8482022-03-01 18:58:39 +00001202 assert!(rx.try_recv().is_err());
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001203 dtx.send(()).unwrap();
1204 });
1205
1206 let mut ts = Vec::with_capacity(NTHREADS as usize);
1207 for _ in 0..NTHREADS {
1208 let tx = tx.clone();
1209 let t = thread::spawn(move || {
1210 for _ in 0..AMT {
1211 tx.send(1).unwrap();
1212 }
1213 });
1214 ts.push(t);
1215 }
1216 drop(tx);
1217 drx.recv().unwrap();
1218 for t in ts {
1219 t.join().unwrap();
1220 }
1221 t.join().unwrap();
1222 }
1223
1224 #[test]
1225 fn oneshot_single_thread_close_port_first() {
1226 // Simple test of closing without sending
1227 let (_tx, rx) = sync_channel::<i32>(0);
1228 drop(rx);
1229 }
1230
1231 #[test]
1232 fn oneshot_single_thread_close_chan_first() {
1233 // Simple test of closing without sending
1234 let (tx, _rx) = sync_channel::<i32>(0);
1235 drop(tx);
1236 }
1237
1238 #[test]
1239 fn oneshot_single_thread_send_port_close() {
1240 // Testing that the sender cleans up the payload if receiver is closed
1241 let (tx, rx) = sync_channel::<Box<i32>>(0);
1242 drop(rx);
1243 assert!(tx.send(Box::new(0)).is_err());
1244 }
1245
1246 #[test]
1247 fn oneshot_single_thread_recv_chan_close() {
1248 let (tx, rx) = sync_channel::<i32>(0);
1249 drop(tx);
1250 assert_eq!(rx.recv(), Err(RecvError));
1251 }
1252
1253 #[test]
1254 fn oneshot_single_thread_send_then_recv() {
1255 let (tx, rx) = sync_channel::<Box<i32>>(1);
1256 tx.send(Box::new(10)).unwrap();
1257 assert!(*rx.recv().unwrap() == 10);
1258 }
1259
1260 #[test]
1261 fn oneshot_single_thread_try_send_open() {
1262 let (tx, rx) = sync_channel::<i32>(1);
1263 assert_eq!(tx.try_send(10), Ok(()));
1264 assert!(rx.recv().unwrap() == 10);
1265 }
1266
1267 #[test]
1268 fn oneshot_single_thread_try_send_closed() {
1269 let (tx, rx) = sync_channel::<i32>(0);
1270 drop(rx);
1271 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1272 }
1273
1274 #[test]
1275 fn oneshot_single_thread_try_send_closed2() {
1276 let (tx, _rx) = sync_channel::<i32>(0);
1277 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1278 }
1279
1280 #[test]
1281 fn oneshot_single_thread_try_recv_open() {
1282 let (tx, rx) = sync_channel::<i32>(1);
1283 tx.send(10).unwrap();
1284 assert!(rx.recv() == Ok(10));
1285 }
1286
1287 #[test]
1288 fn oneshot_single_thread_try_recv_closed() {
1289 let (tx, rx) = sync_channel::<i32>(0);
1290 drop(tx);
1291 assert!(rx.recv().is_err());
1292 }
1293
1294 #[test]
1295 fn oneshot_single_thread_try_recv_closed_with_data() {
1296 let (tx, rx) = sync_channel::<i32>(1);
1297 tx.send(10).unwrap();
1298 drop(tx);
1299 assert_eq!(rx.try_recv(), Ok(10));
1300 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1301 }
1302
1303 #[test]
1304 fn oneshot_single_thread_peek_data() {
1305 let (tx, rx) = sync_channel::<i32>(1);
1306 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1307 tx.send(10).unwrap();
1308 assert_eq!(rx.try_recv(), Ok(10));
1309 }
1310
1311 #[test]
1312 fn oneshot_single_thread_peek_close() {
1313 let (tx, rx) = sync_channel::<i32>(0);
1314 drop(tx);
1315 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1316 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1317 }
1318
1319 #[test]
1320 fn oneshot_single_thread_peek_open() {
1321 let (_tx, rx) = sync_channel::<i32>(0);
1322 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1323 }
1324
1325 #[test]
1326 fn oneshot_multi_task_recv_then_send() {
1327 let (tx, rx) = sync_channel::<Box<i32>>(0);
1328 let t = thread::spawn(move || {
1329 assert!(*rx.recv().unwrap() == 10);
1330 });
1331
1332 tx.send(Box::new(10)).unwrap();
1333 t.join().unwrap();
1334 }
1335
1336 #[test]
1337 fn oneshot_multi_task_recv_then_close() {
1338 let (tx, rx) = sync_channel::<Box<i32>>(0);
1339 let t = thread::spawn(move || {
1340 drop(tx);
1341 });
1342 thread::spawn(move || {
1343 assert_eq!(rx.recv(), Err(RecvError));
1344 })
1345 .join()
1346 .unwrap();
1347 t.join().unwrap();
1348 }
1349
1350 #[test]
1351 fn oneshot_multi_thread_close_stress() {
1352 let stress_factor = stress_factor();
1353 let mut ts = Vec::with_capacity(stress_factor);
1354 for _ in 0..stress_factor {
1355 let (tx, rx) = sync_channel::<i32>(0);
1356 let t = thread::spawn(move || {
1357 drop(rx);
1358 });
1359 ts.push(t);
1360 drop(tx);
1361 }
1362 for t in ts {
1363 t.join().unwrap();
1364 }
1365 }
1366
1367 #[test]
1368 fn oneshot_multi_thread_send_close_stress() {
1369 let stress_factor = stress_factor();
1370 let mut ts = Vec::with_capacity(stress_factor);
1371 for _ in 0..stress_factor {
1372 let (tx, rx) = sync_channel::<i32>(0);
1373 let t = thread::spawn(move || {
1374 drop(rx);
1375 });
1376 ts.push(t);
1377 thread::spawn(move || {
1378 let _ = tx.send(1);
1379 })
1380 .join()
1381 .unwrap();
1382 }
1383 for t in ts {
1384 t.join().unwrap();
1385 }
1386 }
1387
1388 #[test]
1389 fn oneshot_multi_thread_recv_close_stress() {
1390 let stress_factor = stress_factor();
1391 let mut ts = Vec::with_capacity(2 * stress_factor);
1392 for _ in 0..stress_factor {
1393 let (tx, rx) = sync_channel::<i32>(0);
1394 let t = thread::spawn(move || {
1395 thread::spawn(move || {
1396 assert_eq!(rx.recv(), Err(RecvError));
1397 })
1398 .join()
1399 .unwrap();
1400 });
1401 ts.push(t);
1402 let t2 = thread::spawn(move || {
1403 thread::spawn(move || {
1404 drop(tx);
1405 });
1406 });
1407 ts.push(t2);
1408 }
1409 for t in ts {
1410 t.join().unwrap();
1411 }
1412 }
1413
1414 #[test]
1415 fn oneshot_multi_thread_send_recv_stress() {
1416 let stress_factor = stress_factor();
1417 let mut ts = Vec::with_capacity(stress_factor);
1418 for _ in 0..stress_factor {
1419 let (tx, rx) = sync_channel::<Box<i32>>(0);
1420 let t = thread::spawn(move || {
1421 tx.send(Box::new(10)).unwrap();
1422 });
1423 ts.push(t);
1424 assert!(*rx.recv().unwrap() == 10);
1425 }
1426 for t in ts {
1427 t.join().unwrap();
1428 }
1429 }
1430
1431 #[test]
1432 fn stream_send_recv_stress() {
1433 let stress_factor = stress_factor();
1434 let mut ts = Vec::with_capacity(2 * stress_factor);
1435 for _ in 0..stress_factor {
1436 let (tx, rx) = sync_channel::<Box<i32>>(0);
1437
1438 if let Some(t) = send(tx, 0) {
1439 ts.push(t);
1440 }
1441 if let Some(t) = recv(rx, 0) {
1442 ts.push(t);
1443 }
1444
1445 fn send(tx: SyncSender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1446 if i == 10 {
1447 return None;
1448 }
1449
1450 Some(thread::spawn(move || {
1451 tx.send(Box::new(i)).unwrap();
1452 send(tx, i + 1);
1453 }))
1454 }
1455
1456 fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1457 if i == 10 {
1458 return None;
1459 }
1460
1461 Some(thread::spawn(move || {
1462 assert!(*rx.recv().unwrap() == i);
1463 recv(rx, i + 1);
1464 }))
1465 }
1466 }
1467 for t in ts {
1468 t.join().unwrap();
1469 }
1470 }
1471
1472 #[test]
1473 fn recv_a_lot() {
David LeGare54fc8482022-03-01 18:58:39 +00001474 #[cfg(miri)]
1475 const N: usize = 100;
1476 #[cfg(not(miri))]
1477 const N: usize = 10000;
1478
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001479 // Regression test that we don't run out of stack in scheduler context
David LeGare54fc8482022-03-01 18:58:39 +00001480 let (tx, rx) = sync_channel(N);
1481 for _ in 0..N {
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001482 tx.send(()).unwrap();
1483 }
David LeGare54fc8482022-03-01 18:58:39 +00001484 for _ in 0..N {
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001485 rx.recv().unwrap();
1486 }
1487 }
1488
1489 #[test]
1490 fn shared_chan_stress() {
1491 let (tx, rx) = sync_channel(0);
1492 let total = stress_factor() + 100;
1493 let mut ts = Vec::with_capacity(total);
1494 for _ in 0..total {
1495 let tx = tx.clone();
1496 let t = thread::spawn(move || {
1497 tx.send(()).unwrap();
1498 });
1499 ts.push(t);
1500 }
1501
1502 for _ in 0..total {
1503 rx.recv().unwrap();
1504 }
1505 for t in ts {
1506 t.join().unwrap();
1507 }
1508 }
1509
1510 #[test]
1511 fn test_nested_recv_iter() {
1512 let (tx, rx) = sync_channel::<i32>(0);
1513 let (total_tx, total_rx) = sync_channel::<i32>(0);
1514
1515 let t = thread::spawn(move || {
1516 let mut acc = 0;
1517 for x in rx.iter() {
1518 acc += x;
1519 }
1520 total_tx.send(acc).unwrap();
1521 });
1522
1523 tx.send(3).unwrap();
1524 tx.send(1).unwrap();
1525 tx.send(2).unwrap();
1526 drop(tx);
1527 assert_eq!(total_rx.recv().unwrap(), 6);
1528 t.join().unwrap();
1529 }
1530
1531 #[test]
1532 fn test_recv_iter_break() {
1533 let (tx, rx) = sync_channel::<i32>(0);
1534 let (count_tx, count_rx) = sync_channel(0);
1535
1536 let t = thread::spawn(move || {
1537 let mut count = 0;
1538 for x in rx.iter() {
1539 if count >= 3 {
1540 break;
1541 } else {
1542 count += x;
1543 }
1544 }
1545 count_tx.send(count).unwrap();
1546 });
1547
1548 tx.send(2).unwrap();
1549 tx.send(2).unwrap();
1550 tx.send(2).unwrap();
1551 let _ = tx.try_send(2);
1552 drop(tx);
1553 assert_eq!(count_rx.recv().unwrap(), 4);
1554 t.join().unwrap();
1555 }
1556
1557 #[test]
1558 fn try_recv_states() {
1559 let (tx1, rx1) = sync_channel::<i32>(1);
1560 let (tx2, rx2) = sync_channel::<()>(1);
1561 let (tx3, rx3) = sync_channel::<()>(1);
1562 let t = thread::spawn(move || {
1563 rx2.recv().unwrap();
1564 tx1.send(1).unwrap();
1565 tx3.send(()).unwrap();
1566 rx2.recv().unwrap();
1567 drop(tx1);
1568 tx3.send(()).unwrap();
1569 });
1570
1571 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1572 tx2.send(()).unwrap();
1573 rx3.recv().unwrap();
1574 assert_eq!(rx1.try_recv(), Ok(1));
1575 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1576 tx2.send(()).unwrap();
1577 rx3.recv().unwrap();
1578 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1579 t.join().unwrap();
1580 }
1581
1582 // This bug used to end up in a livelock inside of the Receiver destructor
1583 // because the internal state of the Shared packet was corrupted
1584 #[test]
1585 fn destroy_upgraded_shared_port_when_sender_still_active() {
1586 let (tx, rx) = sync_channel::<()>(0);
1587 let (tx2, rx2) = sync_channel::<()>(0);
1588 let t = thread::spawn(move || {
1589 rx.recv().unwrap(); // wait on a oneshot
1590 drop(rx); // destroy a shared
1591 tx2.send(()).unwrap();
1592 });
1593 // make sure the other thread has gone to sleep
1594 for _ in 0..5000 {
1595 thread::yield_now();
1596 }
1597
1598 // upgrade to a shared chan and send a message
1599 let tx2 = tx.clone();
1600 drop(tx);
1601 tx2.send(()).unwrap();
1602
1603 // wait for the child thread to exit before we exit
1604 rx2.recv().unwrap();
1605 t.join().unwrap();
1606 }
1607
1608 #[test]
1609 fn send1() {
1610 let (tx, rx) = sync_channel::<i32>(0);
1611 let t = thread::spawn(move || {
1612 rx.recv().unwrap();
1613 });
1614 assert_eq!(tx.send(1), Ok(()));
1615 t.join().unwrap();
1616 }
1617
1618 #[test]
1619 fn send2() {
1620 let (tx, rx) = sync_channel::<i32>(0);
1621 let t = thread::spawn(move || {
1622 drop(rx);
1623 });
1624 assert!(tx.send(1).is_err());
1625 t.join().unwrap();
1626 }
1627
1628 #[test]
1629 fn send3() {
1630 let (tx, rx) = sync_channel::<i32>(1);
1631 assert_eq!(tx.send(1), Ok(()));
1632 let t = thread::spawn(move || {
1633 drop(rx);
1634 });
1635 assert!(tx.send(1).is_err());
1636 t.join().unwrap();
1637 }
1638
1639 #[test]
1640 fn send4() {
1641 let (tx, rx) = sync_channel::<i32>(0);
1642 let tx2 = tx.clone();
1643 let (done, donerx) = channel();
1644 let done2 = done.clone();
1645 let t = thread::spawn(move || {
1646 assert!(tx.send(1).is_err());
1647 done.send(()).unwrap();
1648 });
1649 let t2 = thread::spawn(move || {
1650 assert!(tx2.send(2).is_err());
1651 done2.send(()).unwrap();
1652 });
1653 drop(rx);
1654 donerx.recv().unwrap();
1655 donerx.recv().unwrap();
1656 t.join().unwrap();
1657 t2.join().unwrap();
1658 }
1659
1660 #[test]
1661 fn try_send1() {
1662 let (tx, _rx) = sync_channel::<i32>(0);
1663 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1664 }
1665
1666 #[test]
1667 fn try_send2() {
1668 let (tx, _rx) = sync_channel::<i32>(1);
1669 assert_eq!(tx.try_send(1), Ok(()));
1670 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1671 }
1672
1673 #[test]
1674 fn try_send3() {
1675 let (tx, rx) = sync_channel::<i32>(1);
1676 assert_eq!(tx.try_send(1), Ok(()));
1677 drop(rx);
1678 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
1679 }
1680
1681 #[test]
1682 fn issue_15761() {
1683 fn repro() {
1684 let (tx1, rx1) = sync_channel::<()>(3);
1685 let (tx2, rx2) = sync_channel::<()>(3);
1686
1687 let _t = thread::spawn(move || {
1688 rx1.recv().unwrap();
1689 tx2.try_send(()).unwrap();
1690 });
1691
1692 tx1.try_send(()).unwrap();
1693 rx2.recv().unwrap();
1694 }
1695
1696 for _ in 0..100 {
1697 repro()
1698 }
1699 }
1700}
1701
1702// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/select.rs
1703mod select_tests {
1704 use super::*;
1705
1706 use std::thread;
1707
1708 #[test]
1709 fn smoke() {
1710 let (tx1, rx1) = channel::<i32>();
1711 let (tx2, rx2) = channel::<i32>();
1712 tx1.send(1).unwrap();
1713 select! {
1714 foo = rx1.recv() => assert_eq!(foo.unwrap(), 1),
1715 _bar = rx2.recv() => panic!()
1716 }
1717 tx2.send(2).unwrap();
1718 select! {
1719 _foo = rx1.recv() => panic!(),
1720 bar = rx2.recv() => assert_eq!(bar.unwrap(), 2)
1721 }
1722 drop(tx1);
1723 select! {
1724 foo = rx1.recv() => assert!(foo.is_err()),
1725 _bar = rx2.recv() => panic!()
1726 }
1727 drop(tx2);
1728 select! {
1729 bar = rx2.recv() => assert!(bar.is_err())
1730 }
1731 }
1732
1733 #[test]
1734 fn smoke2() {
1735 let (_tx1, rx1) = channel::<i32>();
1736 let (_tx2, rx2) = channel::<i32>();
1737 let (_tx3, rx3) = channel::<i32>();
1738 let (_tx4, rx4) = channel::<i32>();
1739 let (tx5, rx5) = channel::<i32>();
1740 tx5.send(4).unwrap();
1741 select! {
1742 _foo = rx1.recv() => panic!("1"),
1743 _foo = rx2.recv() => panic!("2"),
1744 _foo = rx3.recv() => panic!("3"),
1745 _foo = rx4.recv() => panic!("4"),
1746 foo = rx5.recv() => assert_eq!(foo.unwrap(), 4)
1747 }
1748 }
1749
1750 #[test]
1751 fn closed() {
1752 let (_tx1, rx1) = channel::<i32>();
1753 let (tx2, rx2) = channel::<i32>();
1754 drop(tx2);
1755
1756 select! {
1757 _a1 = rx1.recv() => panic!(),
1758 a2 = rx2.recv() => assert!(a2.is_err())
1759 }
1760 }
1761
1762 #[test]
1763 fn unblocks() {
1764 let (tx1, rx1) = channel::<i32>();
1765 let (_tx2, rx2) = channel::<i32>();
1766 let (tx3, rx3) = channel::<i32>();
1767
1768 let t = thread::spawn(move || {
1769 for _ in 0..20 {
1770 thread::yield_now();
1771 }
1772 tx1.send(1).unwrap();
1773 rx3.recv().unwrap();
1774 for _ in 0..20 {
1775 thread::yield_now();
1776 }
1777 });
1778
1779 select! {
1780 a = rx1.recv() => assert_eq!(a.unwrap(), 1),
1781 _b = rx2.recv() => panic!()
1782 }
1783 tx3.send(1).unwrap();
1784 select! {
1785 a = rx1.recv() => assert!(a.is_err()),
1786 _b = rx2.recv() => panic!()
1787 }
1788 t.join().unwrap();
1789 }
1790
1791 #[test]
1792 fn both_ready() {
1793 let (tx1, rx1) = channel::<i32>();
1794 let (tx2, rx2) = channel::<i32>();
1795 let (tx3, rx3) = channel::<()>();
1796
1797 let t = thread::spawn(move || {
1798 for _ in 0..20 {
1799 thread::yield_now();
1800 }
1801 tx1.send(1).unwrap();
1802 tx2.send(2).unwrap();
1803 rx3.recv().unwrap();
1804 });
1805
1806 select! {
1807 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1808 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1809 }
1810 select! {
1811 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1812 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1813 }
1814 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1815 assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
1816 tx3.send(()).unwrap();
1817 t.join().unwrap();
1818 }
1819
1820 #[test]
1821 fn stress() {
David LeGare54fc8482022-03-01 18:58:39 +00001822 #[cfg(miri)]
1823 const AMT: i32 = 100;
1824 #[cfg(not(miri))]
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001825 const AMT: i32 = 10000;
David LeGare54fc8482022-03-01 18:58:39 +00001826
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001827 let (tx1, rx1) = channel::<i32>();
1828 let (tx2, rx2) = channel::<i32>();
1829 let (tx3, rx3) = channel::<()>();
1830
1831 let t = thread::spawn(move || {
1832 for i in 0..AMT {
1833 if i % 2 == 0 {
1834 tx1.send(i).unwrap();
1835 } else {
1836 tx2.send(i).unwrap();
1837 }
1838 rx3.recv().unwrap();
1839 }
1840 });
1841
1842 for i in 0..AMT {
1843 select! {
1844 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
1845 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
1846 }
1847 tx3.send(()).unwrap();
1848 }
1849 t.join().unwrap();
1850 }
1851
1852 #[allow(unused_must_use)]
1853 #[test]
1854 fn cloning() {
1855 let (tx1, rx1) = channel::<i32>();
1856 let (_tx2, rx2) = channel::<i32>();
1857 let (tx3, rx3) = channel::<()>();
1858
1859 let t = thread::spawn(move || {
1860 rx3.recv().unwrap();
1861 tx1.clone();
1862 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1863 tx1.send(2).unwrap();
1864 rx3.recv().unwrap();
1865 });
1866
1867 tx3.send(()).unwrap();
1868 select! {
1869 _i1 = rx1.recv() => {},
1870 _i2 = rx2.recv() => panic!()
1871 }
1872 tx3.send(()).unwrap();
1873 t.join().unwrap();
1874 }
1875
1876 #[allow(unused_must_use)]
1877 #[test]
1878 fn cloning2() {
1879 let (tx1, rx1) = channel::<i32>();
1880 let (_tx2, rx2) = channel::<i32>();
1881 let (tx3, rx3) = channel::<()>();
1882
1883 let t = thread::spawn(move || {
1884 rx3.recv().unwrap();
1885 tx1.clone();
1886 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1887 tx1.send(2).unwrap();
1888 rx3.recv().unwrap();
1889 });
1890
1891 tx3.send(()).unwrap();
1892 select! {
1893 _i1 = rx1.recv() => {},
1894 _i2 = rx2.recv() => panic!()
1895 }
1896 tx3.send(()).unwrap();
1897 t.join().unwrap();
1898 }
1899
1900 #[test]
1901 fn cloning3() {
1902 let (tx1, rx1) = channel::<()>();
1903 let (tx2, rx2) = channel::<()>();
1904 let (tx3, rx3) = channel::<()>();
1905 let t = thread::spawn(move || {
1906 select! {
1907 _ = rx1.recv() => panic!(),
1908 _ = rx2.recv() => {}
1909 }
1910 tx3.send(()).unwrap();
1911 });
1912
1913 for _ in 0..1000 {
1914 thread::yield_now();
1915 }
1916 drop(tx1.clone());
1917 tx2.send(()).unwrap();
1918 rx3.recv().unwrap();
1919 t.join().unwrap();
1920 }
1921
1922 #[test]
1923 fn preflight1() {
1924 let (tx, rx) = channel();
1925 tx.send(()).unwrap();
1926 select! {
1927 _n = rx.recv() => {}
1928 }
1929 }
1930
1931 #[test]
1932 fn preflight2() {
1933 let (tx, rx) = channel();
1934 tx.send(()).unwrap();
1935 tx.send(()).unwrap();
1936 select! {
1937 _n = rx.recv() => {}
1938 }
1939 }
1940
1941 #[test]
1942 fn preflight3() {
1943 let (tx, rx) = channel();
1944 drop(tx.clone());
1945 tx.send(()).unwrap();
1946 select! {
1947 _n = rx.recv() => {}
1948 }
1949 }
1950
1951 #[test]
1952 fn preflight4() {
1953 let (tx, rx) = channel();
1954 tx.send(()).unwrap();
1955 select! {
1956 _ = rx.recv() => {}
1957 }
1958 }
1959
1960 #[test]
1961 fn preflight5() {
1962 let (tx, rx) = channel();
1963 tx.send(()).unwrap();
1964 tx.send(()).unwrap();
1965 select! {
1966 _ = rx.recv() => {}
1967 }
1968 }
1969
1970 #[test]
1971 fn preflight6() {
1972 let (tx, rx) = channel();
1973 drop(tx.clone());
1974 tx.send(()).unwrap();
1975 select! {
1976 _ = rx.recv() => {}
1977 }
1978 }
1979
1980 #[test]
1981 fn preflight7() {
1982 let (tx, rx) = channel::<()>();
1983 drop(tx);
1984 select! {
1985 _ = rx.recv() => {}
1986 }
1987 }
1988
1989 #[test]
1990 fn preflight8() {
1991 let (tx, rx) = channel();
1992 tx.send(()).unwrap();
1993 drop(tx);
1994 rx.recv().unwrap();
1995 select! {
1996 _ = rx.recv() => {}
1997 }
1998 }
1999
2000 #[test]
2001 fn preflight9() {
2002 let (tx, rx) = channel();
2003 drop(tx.clone());
2004 tx.send(()).unwrap();
2005 drop(tx);
2006 rx.recv().unwrap();
2007 select! {
2008 _ = rx.recv() => {}
2009 }
2010 }
2011
2012 #[test]
2013 fn oneshot_data_waiting() {
2014 let (tx1, rx1) = channel();
2015 let (tx2, rx2) = channel();
2016 let t = thread::spawn(move || {
2017 select! {
2018 _n = rx1.recv() => {}
2019 }
2020 tx2.send(()).unwrap();
2021 });
2022
2023 for _ in 0..100 {
2024 thread::yield_now()
2025 }
2026 tx1.send(()).unwrap();
2027 rx2.recv().unwrap();
2028 t.join().unwrap();
2029 }
2030
2031 #[test]
2032 fn stream_data_waiting() {
2033 let (tx1, rx1) = channel();
2034 let (tx2, rx2) = channel();
2035 tx1.send(()).unwrap();
2036 tx1.send(()).unwrap();
2037 rx1.recv().unwrap();
2038 rx1.recv().unwrap();
2039 let t = thread::spawn(move || {
2040 select! {
2041 _n = rx1.recv() => {}
2042 }
2043 tx2.send(()).unwrap();
2044 });
2045
2046 for _ in 0..100 {
2047 thread::yield_now()
2048 }
2049 tx1.send(()).unwrap();
2050 rx2.recv().unwrap();
2051 t.join().unwrap();
2052 }
2053
2054 #[test]
2055 fn shared_data_waiting() {
2056 let (tx1, rx1) = channel();
2057 let (tx2, rx2) = channel();
2058 drop(tx1.clone());
2059 tx1.send(()).unwrap();
2060 rx1.recv().unwrap();
2061 let t = thread::spawn(move || {
2062 select! {
2063 _n = rx1.recv() => {}
2064 }
2065 tx2.send(()).unwrap();
2066 });
2067
2068 for _ in 0..100 {
2069 thread::yield_now()
2070 }
2071 tx1.send(()).unwrap();
2072 rx2.recv().unwrap();
2073 t.join().unwrap();
2074 }
2075
2076 #[test]
2077 fn sync1() {
2078 let (tx, rx) = sync_channel::<i32>(1);
2079 tx.send(1).unwrap();
2080 select! {
2081 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2082 }
2083 }
2084
2085 #[test]
2086 fn sync2() {
2087 let (tx, rx) = sync_channel::<i32>(0);
2088 let t = thread::spawn(move || {
2089 for _ in 0..100 {
2090 thread::yield_now()
2091 }
2092 tx.send(1).unwrap();
2093 });
2094 select! {
2095 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2096 }
2097 t.join().unwrap();
2098 }
2099
2100 #[test]
2101 fn sync3() {
2102 let (tx1, rx1) = sync_channel::<i32>(0);
2103 let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
2104 let t = thread::spawn(move || {
2105 tx1.send(1).unwrap();
2106 });
2107 let t2 = thread::spawn(move || {
2108 tx2.send(2).unwrap();
2109 });
2110 select! {
2111 n = rx1.recv() => {
2112 let n = n.unwrap();
2113 assert_eq!(n, 1);
2114 assert_eq!(rx2.recv().unwrap(), 2);
2115 },
2116 n = rx2.recv() => {
2117 let n = n.unwrap();
2118 assert_eq!(n, 2);
2119 assert_eq!(rx1.recv().unwrap(), 1);
2120 }
2121 }
2122 t.join().unwrap();
2123 t2.join().unwrap();
2124 }
2125}