Upgrade futures to 0.3.30

This project was upgraded with external_updater.
Usage: tools/external_updater/updater.sh update external/rust/crates/futures
For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md

Test: TreeHugger
Change-Id: Icb56a87578eeb10d440723d206b5e32d6686fbaf
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index c5e5d7e..f60a8e9 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
 {
   "git": {
-    "sha1": "5e3693a350f96244151081d2c030208cd15f9572"
+    "sha1": "de1a0fd64a1bcae9a1534ed4da1699632993cc26"
   },
   "path_in_vcs": "futures"
 }
\ No newline at end of file
diff --git a/Android.bp b/Android.bp
index 46c18a2..f6f44d5 100644
--- a/Android.bp
+++ b/Android.bp
@@ -42,7 +42,7 @@
     host_supported: true,
     crate_name: "futures",
     cargo_env_compat: true,
-    cargo_pkg_version: "0.3.26",
+    cargo_pkg_version: "0.3.30",
     srcs: ["src/lib.rs"],
     edition: "2018",
     features: [
diff --git a/Cargo.toml b/Cargo.toml
index 51e052a..5272299 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,9 +11,9 @@
 
 [package]
 edition = "2018"
-rust-version = "1.45"
+rust-version = "1.56"
 name = "futures"
-version = "0.3.26"
+version = "0.3.30"
 description = """
 An implementation of futures and streams featuring zero allocations,
 composability, and iterator-like interfaces.
@@ -47,33 +47,33 @@
 ]
 
 [dependencies.futures-channel]
-version = "0.3.26"
+version = "0.3.30"
 features = ["sink"]
 default-features = false
 
 [dependencies.futures-core]
-version = "0.3.26"
+version = "0.3.30"
 default-features = false
 
 [dependencies.futures-executor]
-version = "0.3.26"
+version = "0.3.30"
 optional = true
 default-features = false
 
 [dependencies.futures-io]
-version = "0.3.26"
+version = "0.3.30"
 default-features = false
 
 [dependencies.futures-sink]
-version = "0.3.26"
+version = "0.3.30"
 default-features = false
 
 [dependencies.futures-task]
-version = "0.3.26"
+version = "0.3.30"
 default-features = false
 
 [dependencies.futures-util]
-version = "0.3.26"
+version = "0.3.30"
 features = ["sink"]
 default-features = false
 
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index e7a5f38..6208f61 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,8 +1,8 @@
 [package]
 name = "futures"
-version = "0.3.26"
+version = "0.3.30"
 edition = "2018"
-rust-version = "1.45"
+rust-version = "1.56"
 license = "MIT OR Apache-2.0"
 readme = "../README.md"
 keywords = ["futures", "async", "future"]
@@ -15,13 +15,13 @@
 categories = ["asynchronous"]
 
 [dependencies]
-futures-core = { path = "../futures-core", version = "0.3.26", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.26", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.26", default-features = false, features = ["sink"] }
-futures-executor = { path = "../futures-executor", version = "0.3.26", default-features = false, optional = true }
-futures-io = { path = "../futures-io", version = "0.3.26", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.26", default-features = false }
-futures-util = { path = "../futures-util", version = "0.3.26", default-features = false, features = ["sink"] }
+futures-core = { path = "../futures-core", version = "0.3.30", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.30", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.30", default-features = false, features = ["sink"] }
+futures-executor = { path = "../futures-executor", version = "0.3.30", default-features = false, optional = true }
+futures-io = { path = "../futures-io", version = "0.3.30", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.30", default-features = false }
+futures-util = { path = "../futures-util", version = "0.3.30", default-features = false, features = ["sink"] }
 
 [dev-dependencies]
 futures-executor = { path = "../futures-executor", features = ["thread-pool"] }
diff --git a/METADATA b/METADATA
index 240fa20..06699ef 100644
--- a/METADATA
+++ b/METADATA
@@ -1,23 +1,20 @@
 # This project was upgraded with external_updater.
-# Usage: tools/external_updater/updater.sh update rust/crates/futures
-# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
+# Usage: tools/external_updater/updater.sh update external/rust/crates/futures
+# For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md
 
 name: "futures"
 description: "An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces."
 third_party {
-  url {
-    type: HOMEPAGE
-    value: "https://crates.io/crates/futures"
-  }
-  url {
-    type: ARCHIVE
-    value: "https://static.crates.io/crates/futures/futures-0.3.26.crate"
-  }
-  version: "0.3.26"
   license_type: NOTICE
   last_upgrade_date {
-    year: 2023
+    year: 2024
     month: 2
-    day: 15
+    day: 1
+  }
+  homepage: "https://crates.io/crates/futures"
+  identifier {
+    type: "Archive"
+    value: "https://static.crates.io/crates/futures/futures-0.3.30.crate"
+    version: "0.3.30"
   }
 }
diff --git a/README.md b/README.md
index 45e1f5b..355d607 100644
--- a/README.md
+++ b/README.md
@@ -38,7 +38,7 @@
 futures = "0.3"
 ```
 
-The current `futures` requires Rust 1.45 or later.
+The current `futures` requires Rust 1.56 or later.
 
 ### Feature `std`
 
diff --git a/tests/auto_traits.rs b/tests/auto_traits.rs
index 5fc0f7d..004fda1 100644
--- a/tests/auto_traits.rs
+++ b/tests/auto_traits.rs
@@ -18,6 +18,8 @@
 pub type SendTryFuture<T = *const (), E = *const ()> = SendFuture<Result<T, E>>;
 pub type SyncFuture<T = *const ()> = Pin<Box<dyn Future<Output = T> + Sync>>;
 pub type SyncTryFuture<T = *const (), E = *const ()> = SyncFuture<Result<T, E>>;
+pub type SendSyncFuture<T = *const ()> = Pin<Box<dyn Future<Output = T> + Send + Sync>>;
+pub type SendSyncTryFuture<T = *const (), E = *const ()> = SendSyncFuture<Result<T, E>>;
 pub type UnpinFuture<T = PhantomPinned> = LocalFuture<T>;
 pub type UnpinTryFuture<T = PhantomPinned, E = PhantomPinned> = UnpinFuture<Result<T, E>>;
 pub struct PinnedFuture<T = PhantomPinned>(PhantomPinned, PhantomData<T>);
@@ -35,6 +37,8 @@
 pub type SendTryStream<T = *const (), E = *const ()> = SendStream<Result<T, E>>;
 pub type SyncStream<T = *const ()> = Pin<Box<dyn Stream<Item = T> + Sync>>;
 pub type SyncTryStream<T = *const (), E = *const ()> = SyncStream<Result<T, E>>;
+pub type SendSyncStream<T = *const ()> = Pin<Box<dyn Stream<Item = T> + Send + Sync>>;
+pub type SendSyncTryStream<T = *const (), E = *const ()> = SendSyncStream<Result<T, E>>;
 pub type UnpinStream<T = PhantomPinned> = LocalStream<T>;
 pub type UnpinTryStream<T = PhantomPinned, E = PhantomPinned> = UnpinStream<Result<T, E>>;
 pub struct PinnedStream<T = PhantomPinned>(PhantomPinned, PhantomData<T>);
@@ -365,9 +369,10 @@
     assert_impl!(JoinAll<SendFuture<()>>: Send);
     assert_not_impl!(JoinAll<LocalFuture>: Send);
     assert_not_impl!(JoinAll<SendFuture>: Send);
-    assert_impl!(JoinAll<SyncFuture<()>>: Sync);
-    assert_not_impl!(JoinAll<LocalFuture>: Sync);
-    assert_not_impl!(JoinAll<SyncFuture>: Sync);
+    assert_impl!(JoinAll<SendSyncFuture<()>>: Sync);
+    assert_not_impl!(JoinAll<SendFuture<()>>: Sync);
+    assert_not_impl!(JoinAll<SyncFuture<()>>: Sync);
+    assert_not_impl!(JoinAll<SendSyncFuture>: Sync);
     assert_impl!(JoinAll<PinnedFuture>: Unpin);
 
     assert_impl!(Lazy<()>: Send);
@@ -579,9 +584,10 @@
     assert_impl!(TryJoinAll<SendTryFuture<(), ()>>: Send);
     assert_not_impl!(TryJoinAll<LocalTryFuture>: Send);
     assert_not_impl!(TryJoinAll<SendTryFuture>: Send);
-    assert_impl!(TryJoinAll<SyncTryFuture<(), ()>>: Sync);
-    assert_not_impl!(TryJoinAll<LocalTryFuture>: Sync);
-    assert_not_impl!(TryJoinAll<SyncTryFuture>: Sync);
+    assert_impl!(TryJoinAll<SendSyncTryFuture<(), ()>>: Sync);
+    assert_not_impl!(TryJoinAll<SendTryFuture<(), ()>>: Sync);
+    assert_not_impl!(TryJoinAll<SyncTryFuture<(), ()>>: Sync);
+    assert_not_impl!(TryJoinAll<SendSyncTryFuture>: Sync);
     assert_impl!(TryJoinAll<PinnedTryFuture>: Unpin);
 
     assert_impl!(TrySelect<SendFuture, SendFuture>: Send);
@@ -1118,10 +1124,9 @@
     assert_not_impl!(Buffered<SendStream<SendFuture>>: Send);
     assert_not_impl!(Buffered<SendStream<LocalFuture>>: Send);
     assert_not_impl!(Buffered<LocalStream<SendFuture<()>>>: Send);
-    assert_impl!(Buffered<SyncStream<SyncFuture<()>>>: Sync);
-    assert_not_impl!(Buffered<SyncStream<SyncFuture>>: Sync);
-    assert_not_impl!(Buffered<SyncStream<LocalFuture>>: Sync);
-    assert_not_impl!(Buffered<LocalStream<SyncFuture<()>>>: Sync);
+    assert_impl!(Buffered<SyncStream<SendSyncFuture<()>>>: Sync);
+    assert_not_impl!(Buffered<SyncStream<SyncFuture<()>>>: Sync);
+    assert_not_impl!(Buffered<LocalStream<SendSyncFuture<()>>>: Sync);
     assert_impl!(Buffered<UnpinStream<PinnedFuture>>: Unpin);
     assert_not_impl!(Buffered<PinnedStream<PinnedFuture>>: Unpin);
 
@@ -1303,9 +1308,10 @@
     assert_impl!(FuturesOrdered<SendFuture<()>>: Send);
     assert_not_impl!(FuturesOrdered<SendFuture>: Send);
     assert_not_impl!(FuturesOrdered<SendFuture>: Send);
-    assert_impl!(FuturesOrdered<SyncFuture<()>>: Sync);
-    assert_not_impl!(FuturesOrdered<LocalFuture<()>>: Sync);
-    assert_not_impl!(FuturesOrdered<LocalFuture<()>>: Sync);
+    assert_impl!(FuturesOrdered<SendSyncFuture<()>>: Sync);
+    assert_not_impl!(FuturesOrdered<SyncFuture<()>>: Sync);
+    assert_not_impl!(FuturesOrdered<SendFuture<()>>: Sync);
+    assert_not_impl!(FuturesOrdered<SendSyncFuture>: Sync);
     assert_impl!(FuturesOrdered<PinnedFuture>: Unpin);
 
     assert_impl!(FuturesUnordered<()>: Send);
@@ -1647,11 +1653,12 @@
     assert_not_impl!(TryBuffered<SendTryStream<SendTryFuture<(), *const ()>>>: Send);
     assert_not_impl!(TryBuffered<SendTryStream<LocalTryFuture<(), ()>>>: Send);
     assert_not_impl!(TryBuffered<LocalTryStream<SendTryFuture<(), ()>>>: Send);
-    assert_impl!(TryBuffered<SyncTryStream<SyncTryFuture<(), ()>>>: Sync);
-    assert_not_impl!(TryBuffered<SyncTryStream<SyncTryFuture<*const (), ()>>>: Sync);
-    assert_not_impl!(TryBuffered<SyncTryStream<SyncTryFuture<(), *const ()>>>: Sync);
-    assert_not_impl!(TryBuffered<SyncTryStream<LocalTryFuture<(), ()>>>: Sync);
-    assert_not_impl!(TryBuffered<LocalTryStream<SyncTryFuture<(), ()>>>: Sync);
+    assert_impl!(TryBuffered<SyncTryStream<SendSyncTryFuture<(), ()>>>: Sync);
+    assert_not_impl!(TryBuffered<SyncTryStream<SendSyncTryFuture<*const (), ()>>>: Sync);
+    assert_not_impl!(TryBuffered<SyncTryStream<SendSyncTryFuture<(), *const ()>>>: Sync);
+    assert_not_impl!(TryBuffered<SyncTryStream<SendTryFuture<(), ()>>>: Sync);
+    assert_not_impl!(TryBuffered<SyncTryStream<SyncTryFuture<(), ()>>>: Sync);
+    assert_not_impl!(TryBuffered<LocalTryStream<SendSyncTryFuture<(), ()>>>: Sync);
     assert_impl!(TryBuffered<UnpinTryStream<PinnedTryFuture>>: Unpin);
     assert_not_impl!(TryBuffered<PinnedTryStream<UnpinTryFuture>>: Unpin);
 
diff --git a/tests/bilock.rs b/tests/bilock.rs
new file mode 100644
index 0000000..b103487
--- /dev/null
+++ b/tests/bilock.rs
@@ -0,0 +1,104 @@
+#![cfg(feature = "bilock")]
+
+use futures::executor::block_on;
+use futures::future;
+use futures::stream;
+use futures::task::{Context, Poll};
+use futures::Future;
+use futures::StreamExt;
+use futures_test::task::noop_context;
+use futures_util::lock::BiLock;
+use std::pin::Pin;
+use std::thread;
+
+#[test]
+fn smoke() {
+    let future = future::lazy(|cx| {
+        let (a, b) = BiLock::new(1);
+
+        {
+            let mut lock = match a.poll_lock(cx) {
+                Poll::Ready(l) => l,
+                Poll::Pending => panic!("poll not ready"),
+            };
+            assert_eq!(*lock, 1);
+            *lock = 2;
+
+            assert!(b.poll_lock(cx).is_pending());
+            assert!(a.poll_lock(cx).is_pending());
+        }
+
+        assert!(b.poll_lock(cx).is_ready());
+        assert!(a.poll_lock(cx).is_ready());
+
+        {
+            let lock = match b.poll_lock(cx) {
+                Poll::Ready(l) => l,
+                Poll::Pending => panic!("poll not ready"),
+            };
+            assert_eq!(*lock, 2);
+        }
+
+        assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2);
+
+        Ok::<(), ()>(())
+    });
+
+    assert_eq!(block_on(future), Ok(()));
+}
+
+#[test]
+fn concurrent() {
+    const N: usize = 10000;
+    let mut cx = noop_context();
+    let (a, b) = BiLock::new(0);
+
+    let a = Increment { a: Some(a), remaining: N };
+    let b = stream::iter(0..N).fold(b, |b, _n| async {
+        let mut g = b.lock().await;
+        *g += 1;
+        drop(g);
+        b
+    });
+
+    let t1 = thread::spawn(move || block_on(a));
+    let b = block_on(b);
+    let a = t1.join().unwrap();
+
+    match a.poll_lock(&mut cx) {
+        Poll::Ready(l) => assert_eq!(*l, 2 * N),
+        Poll::Pending => panic!("poll not ready"),
+    }
+    match b.poll_lock(&mut cx) {
+        Poll::Ready(l) => assert_eq!(*l, 2 * N),
+        Poll::Pending => panic!("poll not ready"),
+    }
+
+    assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N);
+
+    struct Increment {
+        remaining: usize,
+        a: Option<BiLock<usize>>,
+    }
+
+    impl Future for Increment {
+        type Output = BiLock<usize>;
+
+        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<BiLock<usize>> {
+            loop {
+                if self.remaining == 0 {
+                    return self.a.take().unwrap().into();
+                }
+
+                let a = self.a.as_mut().unwrap();
+                let mut a = match a.poll_lock(cx) {
+                    Poll::Ready(l) => l,
+                    Poll::Pending => return Poll::Pending,
+                };
+                *a += 1;
+                drop(a);
+                self.remaining -= 1;
+            }
+        }
+    }
+}
diff --git a/tests/stream.rs b/tests/stream.rs
index 5cde458..6cbef75 100644
--- a/tests/stream.rs
+++ b/tests/stream.rs
@@ -14,6 +14,7 @@
 use futures::task::Poll;
 use futures::{ready, FutureExt};
 use futures_core::Stream;
+use futures_executor::ThreadPool;
 use futures_test::task::noop_context;
 
 #[test]
@@ -65,6 +66,7 @@
     use futures::task::*;
     use std::convert::identity;
     use std::pin::Pin;
+    use std::sync::atomic::{AtomicBool, Ordering};
     use std::thread;
     use std::time::Duration;
 
@@ -322,6 +324,78 @@
             assert_eq!(values, (0..60).collect::<Vec<u8>>());
         });
     }
+
+    fn timeout<I: Clone>(time: Duration, value: I) -> impl Future<Output = I> {
+        let ready = Arc::new(AtomicBool::new(false));
+        let mut spawned = false;
+
+        future::poll_fn(move |cx| {
+            if !spawned {
+                let waker = cx.waker().clone();
+                let ready = ready.clone();
+
+                std::thread::spawn(move || {
+                    std::thread::sleep(time);
+                    ready.store(true, Ordering::Release);
+
+                    waker.wake_by_ref()
+                });
+                spawned = true;
+            }
+
+            if ready.load(Ordering::Acquire) {
+                Poll::Ready(value.clone())
+            } else {
+                Poll::Pending
+            }
+        })
+    }
+
+    fn build_nested_fu<S: Stream + Unpin>(st: S) -> impl Stream<Item = S::Item> + Unpin
+    where
+        S::Item: Clone,
+    {
+        let inner = st
+            .then(|item| timeout(Duration::from_millis(50), item))
+            .enumerate()
+            .map(|(idx, value)| {
+                stream::once(if idx % 2 == 0 {
+                    future::ready(value).left_future()
+                } else {
+                    timeout(Duration::from_millis(100), value).right_future()
+                })
+            })
+            .flatten_unordered(None);
+
+        stream::once(future::ready(inner)).flatten_unordered(None)
+    }
+
+    // nested `flatten_unordered`
+    let te = ThreadPool::new().unwrap();
+    let base_handle = te
+        .spawn_with_handle(async move {
+            let fu = build_nested_fu(stream::iter(1..=10));
+
+            assert_eq!(fu.count().await, 10);
+        })
+        .unwrap();
+
+    block_on(base_handle);
+
+    let empty_state_move_handle = te
+        .spawn_with_handle(async move {
+            let mut fu = build_nested_fu(stream::iter(1..10));
+            {
+                let mut cx = noop_context();
+                let _ = fu.poll_next_unpin(&mut cx);
+                let _ = fu.poll_next_unpin(&mut cx);
+            }
+
+            assert_eq!(fu.count().await, 9);
+        })
+        .unwrap();
+
+    block_on(empty_state_move_handle);
 }
 
 #[test]
@@ -461,3 +535,43 @@
         assert_eq!(count.get(), times_should_poll + 1);
     }
 }
+
+async fn is_even(number: u8) -> bool {
+    number % 2 == 0
+}
+
+#[test]
+fn all() {
+    block_on(async {
+        let empty: [u8; 0] = [];
+        let st = stream::iter(empty);
+        let all = st.all(is_even).await;
+        assert!(all);
+
+        let st = stream::iter([2, 4, 6, 8]);
+        let all = st.all(is_even).await;
+        assert!(all);
+
+        let st = stream::iter([2, 3, 4]);
+        let all = st.all(is_even).await;
+        assert!(!all);
+    });
+}
+
+#[test]
+fn any() {
+    block_on(async {
+        let empty: [u8; 0] = [];
+        let st = stream::iter(empty);
+        let any = st.any(is_even).await;
+        assert!(!any);
+
+        let st = stream::iter([1, 2, 3]);
+        let any = st.any(is_even).await;
+        assert!(any);
+
+        let st = stream::iter([1, 3, 5]);
+        let any = st.any(is_even).await;
+        assert!(!any);
+    });
+}
diff --git a/tests/stream_futures_unordered.rs b/tests/stream_futures_unordered.rs
index b568280..7bdf543 100644
--- a/tests/stream_futures_unordered.rs
+++ b/tests/stream_futures_unordered.rs
@@ -381,3 +381,28 @@
     tasks.clear();
     assert!(!tasks.is_terminated());
 }
+
+// https://github.com/rust-lang/futures-rs/issues/2529#issuecomment-997290279
+#[test]
+fn clear_in_loop() {
+    const N: usize =
+        if cfg!(miri) || option_env!("QEMU_LD_PREFIX").is_some() { 100 } else { 10_000 };
+    futures::executor::block_on(async {
+        async fn task() {
+            let (s, r) = oneshot::channel();
+            std::thread::spawn(|| {
+                std::thread::sleep(std::time::Duration::from_micros(100));
+                let _ = s.send(());
+            });
+            r.await.unwrap()
+        }
+        let mut futures = FuturesUnordered::new();
+        for _ in 0..N {
+            for _ in 0..24 {
+                futures.push(task());
+            }
+            let _ = futures.next().await;
+            futures.clear();
+        }
+    });
+}
diff --git a/tests/stream_try_stream.rs b/tests/stream_try_stream.rs
index 194e74d..ef38c51 100644
--- a/tests/stream_try_stream.rs
+++ b/tests/stream_try_stream.rs
@@ -1,7 +1,13 @@
+use core::pin::Pin;
+use std::convert::Infallible;
+
 use futures::{
-    stream::{self, StreamExt, TryStreamExt},
+    stream::{self, repeat, Repeat, StreamExt, TryStreamExt},
     task::Poll,
+    Stream,
 };
+use futures_executor::block_on;
+use futures_task::Context;
 use futures_test::task::noop_context;
 
 #[test]
@@ -36,3 +42,142 @@
         .boxed();
     assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
 }
+
+#[test]
+fn try_flatten_unordered() {
+    let test_st = stream::iter(1..7)
+        .map(|val: u32| {
+            if val % 2 == 0 {
+                Ok(stream::unfold((val, 1), |(val, pow)| async move {
+                    Some((val.pow(pow), (val, pow + 1)))
+                })
+                .take(3)
+                .map(move |val| if val % 16 != 0 { Ok(val) } else { Err(val) }))
+            } else {
+                Err(val)
+            }
+        })
+        .map_ok(Box::pin)
+        .try_flatten_unordered(None);
+
+    block_on(async move {
+        assert_eq!(
+            // All numbers can be divided by 16 and odds must be `Err`
+            // For all basic evens we must have powers from 1 to 3
+            vec![
+                Err(1),
+                Err(3),
+                Err(5),
+                Ok(2),
+                Ok(4),
+                Ok(6),
+                Ok(4),
+                Err(16),
+                Ok(36),
+                Ok(8),
+                Err(64),
+                Ok(216)
+            ],
+            test_st.collect::<Vec<_>>().await
+        )
+    });
+
+    #[derive(Clone, Debug)]
+    struct ErrorStream {
+        error_after: usize,
+        polled: usize,
+    }
+
+    impl Stream for ErrorStream {
+        type Item = Result<Repeat<Result<(), ()>>, ()>;
+
+        fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
+            if self.polled > self.error_after {
+                panic!("Polled after error");
+            } else {
+                let out =
+                    if self.polled == self.error_after { Err(()) } else { Ok(repeat(Ok(()))) };
+                self.polled += 1;
+                Poll::Ready(Some(out))
+            }
+        }
+    }
+
+    block_on(async move {
+        let mut st = ErrorStream { error_after: 3, polled: 0 }.try_flatten_unordered(None);
+        let mut ctr = 0;
+        while (st.try_next().await).is_ok() {
+            ctr += 1;
+        }
+        assert_eq!(ctr, 0);
+
+        assert_eq!(
+            ErrorStream { error_after: 10, polled: 0 }
+                .try_flatten_unordered(None)
+                .inspect_ok(|_| panic!("Unexpected `Ok`"))
+                .try_collect::<Vec<_>>()
+                .await,
+            Err(())
+        );
+
+        let mut taken = 0;
+        assert_eq!(
+            ErrorStream { error_after: 10, polled: 0 }
+                .map_ok(|st| st.take(3))
+                .try_flatten_unordered(1)
+                .inspect(|_| taken += 1)
+                .try_fold((), |(), res| async move { Ok(res) })
+                .await,
+            Err(())
+        );
+        assert_eq!(taken, 31);
+    })
+}
+
+async fn is_even(number: u8) -> bool {
+    number % 2 == 0
+}
+
+#[test]
+fn try_all() {
+    block_on(async {
+        let empty: [Result<u8, Infallible>; 0] = [];
+        let st = stream::iter(empty);
+        let all = st.try_all(is_even).await;
+        assert_eq!(Ok(true), all);
+
+        let st = stream::iter([Ok::<_, Infallible>(2), Ok(4), Ok(6), Ok(8)]);
+        let all = st.try_all(is_even).await;
+        assert_eq!(Ok(true), all);
+
+        let st = stream::iter([Ok::<_, Infallible>(2), Ok(3), Ok(4)]);
+        let all = st.try_all(is_even).await;
+        assert_eq!(Ok(false), all);
+
+        let st = stream::iter([Ok(2), Ok(4), Err("err"), Ok(8)]);
+        let all = st.try_all(is_even).await;
+        assert_eq!(Err("err"), all);
+    });
+}
+
+#[test]
+fn try_any() {
+    block_on(async {
+        let empty: [Result<u8, Infallible>; 0] = [];
+        let st = stream::iter(empty);
+        let any = st.try_any(is_even).await;
+        assert_eq!(Ok(false), any);
+
+        let st = stream::iter([Ok::<_, Infallible>(1), Ok(2), Ok(3)]);
+        let any = st.try_any(is_even).await;
+        assert_eq!(Ok(true), any);
+
+        let st = stream::iter([Ok::<_, Infallible>(1), Ok(3), Ok(5)]);
+        let any = st.try_any(is_even).await;
+        assert_eq!(Ok(false), any);
+
+        let st = stream::iter([Ok(1), Ok(3), Err("err"), Ok(8)]);
+        let any = st.try_any(is_even).await;
+        assert_eq!(Err("err"), any);
+    });
+}
diff --git a/tests_disabled/bilock.rs b/tests_disabled/bilock.rs
deleted file mode 100644
index 0166ca4..0000000
--- a/tests_disabled/bilock.rs
+++ /dev/null
@@ -1,102 +0,0 @@
-use futures::future;
-use futures::stream;
-use futures::task;
-use futures_util::lock::BiLock;
-use std::thread;
-
-// mod support;
-// use support::*;
-
-#[test]
-fn smoke() {
-    let future = future::lazy(|_| {
-        let (a, b) = BiLock::new(1);
-
-        {
-            let mut lock = match a.poll_lock() {
-                Poll::Ready(l) => l,
-                Poll::Pending => panic!("poll not ready"),
-            };
-            assert_eq!(*lock, 1);
-            *lock = 2;
-
-            assert!(b.poll_lock().is_pending());
-            assert!(a.poll_lock().is_pending());
-        }
-
-        assert!(b.poll_lock().is_ready());
-        assert!(a.poll_lock().is_ready());
-
-        {
-            let lock = match b.poll_lock() {
-                Poll::Ready(l) => l,
-                Poll::Pending => panic!("poll not ready"),
-            };
-            assert_eq!(*lock, 2);
-        }
-
-        assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2);
-
-        Ok::<(), ()>(())
-    });
-
-    assert!(task::spawn(future)
-        .poll_future_notify(&notify_noop(), 0)
-        .expect("failure in poll")
-        .is_ready());
-}
-
-#[test]
-fn concurrent() {
-    const N: usize = 10000;
-    let (a, b) = BiLock::new(0);
-
-    let a = Increment { a: Some(a), remaining: N };
-    let b = stream::iter_ok(0..N).fold(b, |b, _n| {
-        b.lock().map(|mut b| {
-            *b += 1;
-            b.unlock()
-        })
-    });
-
-    let t1 = thread::spawn(move || a.wait());
-    let b = b.wait().expect("b error");
-    let a = t1.join().unwrap().expect("a error");
-
-    match a.poll_lock() {
-        Poll::Ready(l) => assert_eq!(*l, 2 * N),
-        Poll::Pending => panic!("poll not ready"),
-    }
-    match b.poll_lock() {
-        Poll::Ready(l) => assert_eq!(*l, 2 * N),
-        Poll::Pending => panic!("poll not ready"),
-    }
-
-    assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N);
-
-    struct Increment {
-        remaining: usize,
-        a: Option<BiLock<usize>>,
-    }
-
-    impl Future for Increment {
-        type Item = BiLock<usize>;
-        type Error = ();
-
-        fn poll(&mut self) -> Poll<BiLock<usize>, ()> {
-            loop {
-                if self.remaining == 0 {
-                    return Ok(self.a.take().unwrap().into());
-                }
-
-                let a = self.a.as_ref().unwrap();
-                let mut a = match a.poll_lock() {
-                    Poll::Ready(l) => l,
-                    Poll::Pending => return Ok(Poll::Pending),
-                };
-                self.remaining -= 1;
-                *a += 1;
-            }
-        }
-    }
-}