Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick};
30use crate::location::{Location, Tick, TopLevel, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35    AggFuncAlgebra, ApplyMonotoneStream, StreamMapFuncAlgebra, ValidCommutativityFor,
36    ValidIdempotenceFor, ValidMutCommutativityFor, ValidMutIdempotenceFor,
37};
38
39pub mod networking;
40
41/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
42#[sealed::sealed]
43pub trait Ordering:
44    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
45{
46    /// The [`StreamOrder`] corresponding to this type.
47    const ORDERING_KIND: StreamOrder;
48}
49
50/// Marks the stream as being totally ordered, which means that there are
51/// no sources of non-determinism (other than intentional ones) that will
52/// affect the order of elements.
53pub enum TotalOrder {}
54
55#[sealed::sealed]
56impl Ordering for TotalOrder {
57    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
58}
59
60/// Marks the stream as having no order, which means that the order of
61/// elements may be affected by non-determinism.
62///
63/// This restricts certain operators, such as `fold` and `reduce`, to only
64/// be used with commutative aggregation functions.
65pub enum NoOrder {}
66
67#[sealed::sealed]
68impl Ordering for NoOrder {
69    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
70}
71
72/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
73/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
74/// have `Self` guarantees instead.
75#[sealed::sealed]
76pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
77#[sealed::sealed]
78impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
79
80/// Helper trait for determining the weakest of two orderings.
81#[sealed::sealed]
82pub trait MinOrder<Other: ?Sized> {
83    /// The weaker of the two orderings.
84    type Min: Ordering;
85}
86
87#[sealed::sealed]
88impl<O: Ordering> MinOrder<O> for TotalOrder {
89    type Min = O;
90}
91
92#[sealed::sealed]
93impl<O: Ordering> MinOrder<O> for NoOrder {
94    type Min = NoOrder;
95}
96
97/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
98#[sealed::sealed]
99pub trait Retries:
100    MinRetries<Self, Min = Self>
101    + MinRetries<ExactlyOnce, Min = Self>
102    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
103{
104    /// The [`StreamRetry`] corresponding to this type.
105    const RETRIES_KIND: StreamRetry;
106}
107
108/// Marks the stream as having deterministic message cardinality, with no
109/// possibility of duplicates.
110pub enum ExactlyOnce {}
111
112#[sealed::sealed]
113impl Retries for ExactlyOnce {
114    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
115}
116
117/// Marks the stream as having non-deterministic message cardinality, which
118/// means that duplicates may occur, but messages will not be dropped.
119pub enum AtLeastOnce {}
120
121#[sealed::sealed]
122impl Retries for AtLeastOnce {
123    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
124}
125
126/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
127/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
128/// have `Self` guarantees instead.
129#[sealed::sealed]
130pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
131#[sealed::sealed]
132impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
133
134/// Helper trait for determining the weakest of two retry guarantees.
135#[sealed::sealed]
136pub trait MinRetries<Other: ?Sized> {
137    /// The weaker of the two retry guarantees.
138    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
139}
140
141#[sealed::sealed]
142impl<R: Retries> MinRetries<R> for ExactlyOnce {
143    type Min = R;
144}
145
146#[sealed::sealed]
147impl<R: Retries> MinRetries<R> for AtLeastOnce {
148    type Min = AtLeastOnce;
149}
150
151#[sealed::sealed]
152#[diagnostic::on_unimplemented(
153    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
154    label = "required here",
155    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
156)]
157/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
158pub trait IsOrdered: Ordering {}
159
160#[sealed::sealed]
161#[diagnostic::do_not_recommend]
162impl IsOrdered for TotalOrder {}
163
164#[sealed::sealed]
165#[diagnostic::on_unimplemented(
166    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
167    label = "required here",
168    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
169)]
170/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
171pub trait IsExactlyOnce: Retries {}
172
173#[sealed::sealed]
174#[diagnostic::do_not_recommend]
175impl IsExactlyOnce for ExactlyOnce {}
176
177/// Streaming sequence of elements with type `Type`.
178///
179/// This live collection represents a growing sequence of elements, with new elements being
180/// asynchronously appended to the end of the sequence. This can be used to model the arrival
181/// of network input, such as API requests, or streaming ingestion.
182///
183/// By default, all streams have deterministic ordering and each element is materialized exactly
184/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
185/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
186/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
187///
188/// Type Parameters:
189/// - `Type`: the type of elements in the stream
190/// - `Loc`: the location where the stream is being materialized
191/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
192/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
193///   (default is [`TotalOrder`])
194/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
195///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
196pub struct Stream<
197    Type,
198    Loc,
199    Bound: Boundedness = Unbounded,
200    Order: Ordering = TotalOrder,
201    Retry: Retries = ExactlyOnce,
202> {
203    pub(crate) location: Loc,
204    pub(crate) ir_node: RefCell<HydroNode>,
205    pub(crate) flow_state: FlowState,
206
207    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
208}
209
210impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
211    fn drop(&mut self) {
212        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
213        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
214            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
215                input: Box::new(ir_node),
216                op_metadata: HydroIrOpMetadata::new(),
217            });
218        }
219    }
220}
221
222impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
223    for Stream<T, L, Unbounded, O, R>
224where
225    L: Location<'a>,
226{
227    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
228        let new_meta = stream
229            .location
230            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
231
232        Stream {
233            location: stream.location.clone(),
234            flow_state: stream.flow_state.clone(),
235            ir_node: RefCell::new(HydroNode::Cast {
236                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
237                metadata: new_meta,
238            }),
239            _phantom: PhantomData,
240        }
241    }
242}
243
244impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
245    for Stream<T, L, B, NoOrder, R>
246where
247    L: Location<'a>,
248{
249    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
250        stream.weaken_ordering()
251    }
252}
253
254impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
255    for Stream<T, L, B, O, AtLeastOnce>
256where
257    L: Location<'a>,
258{
259    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
260        stream.weaken_retries()
261    }
262}
263
264impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
265where
266    L: Location<'a>,
267{
268    fn defer_tick(self) -> Self {
269        Stream::defer_tick(self)
270    }
271}
272
273impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
274    for Stream<T, Tick<L>, Bounded, O, R>
275where
276    L: Location<'a>,
277{
278    type Location = Tick<L>;
279
280    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
281        Stream::new(
282            location.clone(),
283            HydroNode::CycleSource {
284                cycle_id,
285                metadata: location.new_node_metadata(Self::collection_kind()),
286            },
287        )
288    }
289}
290
291impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
292    for Stream<T, Tick<L>, Bounded, O, R>
293where
294    L: Location<'a>,
295{
296    type Location = Tick<L>;
297
298    fn location(&self) -> &Self::Location {
299        self.location()
300    }
301
302    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
303        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
304            location.clone(),
305            HydroNode::DeferTick {
306                input: Box::new(HydroNode::CycleSource {
307                    cycle_id,
308                    metadata: location.new_node_metadata(Self::collection_kind()),
309                }),
310                metadata: location.new_node_metadata(Self::collection_kind()),
311            },
312        );
313
314        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
315    }
316}
317
318impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
319    for Stream<T, Tick<L>, Bounded, O, R>
320where
321    L: Location<'a>,
322{
323    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
324        assert_eq!(
325            Location::id(&self.location),
326            expected_location,
327            "locations do not match"
328        );
329        self.location
330            .flow_state()
331            .borrow_mut()
332            .push_root(HydroRoot::CycleSink {
333                cycle_id,
334                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
335                op_metadata: HydroIrOpMetadata::new(),
336            });
337    }
338}
339
340impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
341    for Stream<T, L, B, O, R>
342where
343    L: Location<'a>,
344{
345    type Location = L;
346
347    fn create_source(cycle_id: CycleId, location: L) -> Self {
348        Stream::new(
349            location.clone(),
350            HydroNode::CycleSource {
351                cycle_id,
352                metadata: location.new_node_metadata(Self::collection_kind()),
353            },
354        )
355    }
356}
357
358impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
359    for Stream<T, L, B, O, R>
360where
361    L: Location<'a>,
362{
363    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
364        assert_eq!(
365            Location::id(&self.location),
366            expected_location,
367            "locations do not match"
368        );
369        self.location
370            .flow_state()
371            .borrow_mut()
372            .push_root(HydroRoot::CycleSink {
373                cycle_id,
374                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
375                op_metadata: HydroIrOpMetadata::new(),
376            });
377    }
378}
379
380impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
381where
382    T: Clone,
383    L: Location<'a>,
384{
385    fn clone(&self) -> Self {
386        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
387            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
388            *self.ir_node.borrow_mut() = HydroNode::Tee {
389                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
390                metadata: self.location.new_node_metadata(Self::collection_kind()),
391            };
392        }
393
394        let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
395            unreachable!()
396        };
397        Stream {
398            location: self.location.clone(),
399            flow_state: self.flow_state.clone(),
400            ir_node: HydroNode::Tee {
401                inner: SharedNode(inner.0.clone()),
402                metadata: metadata.clone(),
403            }
404            .into(),
405            _phantom: PhantomData,
406        }
407    }
408}
409
410impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
411where
412    L: Location<'a>,
413{
414    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
415        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
416        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
417
418        let flow_state = location.flow_state().clone();
419        Stream {
420            location,
421            flow_state,
422            ir_node: RefCell::new(ir_node),
423            _phantom: PhantomData,
424        }
425    }
426
427    /// Returns the [`Location`] where this stream is being materialized.
428    pub fn location(&self) -> &L {
429        &self.location
430    }
431
432    /// Weakens the consistency of this live collection to not guarantee any consistency across
433    /// cluster members (if this collection is on a cluster).
434    pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
435    where
436        L: Location<'a>,
437    {
438        if L::consistency()
439            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
440        {
441            // already no consistency
442            Stream::new(
443                self.location.drop_consistency(),
444                self.ir_node.replace(HydroNode::Placeholder),
445            )
446        } else {
447            Stream::new(
448                self.location.drop_consistency(),
449                HydroNode::Cast {
450                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
451                    metadata: self.location.drop_consistency().new_node_metadata(Stream::<
452                        T,
453                        L::DropConsistency,
454                        B,
455                        O,
456                        R,
457                    >::collection_kind(
458                    )),
459                },
460            )
461        }
462    }
463
464    /// Casts this live collection to have the consistency guarantees specified in the given
465    /// location type parameter. The developer must ensure that the strengthened consistency
466    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
467    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
468        self,
469        _proof: impl crate::properties::ConsistencyProof,
470    ) -> Stream<T, L2, B, O, R>
471    where
472        L: Location<'a>,
473    {
474        if L::consistency() == L2::consistency() {
475            Stream::new(
476                self.location.with_consistency_of(),
477                self.ir_node.replace(HydroNode::Placeholder),
478            )
479        } else {
480            Stream::new(
481                self.location.with_consistency_of(),
482                HydroNode::AssertIsConsistent {
483                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
484                    trusted: false,
485                    metadata: self
486                        .location
487                        .clone()
488                        .with_consistency_of::<L2>()
489                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
490                },
491            )
492        }
493    }
494
495    pub(crate) fn assert_has_consistency_of_trusted<
496        L2: Location<'a, DropConsistency = L::DropConsistency>,
497    >(
498        self,
499        _proof: impl crate::properties::ConsistencyProof,
500    ) -> Stream<T, L2, B, O, R>
501    where
502        L: Location<'a>,
503    {
504        if L::consistency() == L2::consistency() {
505            Stream::new(
506                self.location.with_consistency_of(),
507                self.ir_node.replace(HydroNode::Placeholder),
508            )
509        } else {
510            Stream::new(
511                self.location.with_consistency_of(),
512                HydroNode::AssertIsConsistent {
513                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
514                    trusted: true,
515                    metadata: self
516                        .location
517                        .clone()
518                        .with_consistency_of::<L2>()
519                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
520                },
521            )
522        }
523    }
524
525    pub(crate) fn collection_kind() -> CollectionKind {
526        CollectionKind::Stream {
527            bound: B::BOUND_KIND,
528            order: O::ORDERING_KIND,
529            retry: R::RETRIES_KIND,
530            element_type: quote_type::<T>().into(),
531        }
532    }
533
534    /// Produces a stream based on invoking `f` on each element.
535    /// If you do not want to modify the stream and instead only want to view
536    /// each item use [`Stream::inspect`] instead.
537    ///
538    /// # Example
539    /// ```rust
540    /// # #[cfg(feature = "deploy")] {
541    /// # use hydro_lang::prelude::*;
542    /// # use futures::StreamExt;
543    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
544    /// let words = process.source_iter(q!(vec!["hello", "world"]));
545    /// words.map(q!(|x| x.to_uppercase()))
546    /// # }, |mut stream| async move {
547    /// # for w in vec!["HELLO", "WORLD"] {
548    /// #     assert_eq!(stream.next().await.unwrap(), w);
549    /// # }
550    /// # }));
551    /// # }
552    /// ```
553    pub fn map<U, F, C, I, const WAS_MUT: bool>(
554        self,
555        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, I>>,
556    ) -> Stream<U, L, B, O, R>
557    where
558        F: FnMut(T) -> U + 'a,
559        C: ValidMutCommutativityFor<F, T, U, O, WAS_MUT>,
560        I: ValidMutIdempotenceFor<F, T, U, R, WAS_MUT>,
561    {
562        let f = crate::singleton_ref::with_singleton_capture(|| {
563            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
564            proof.register_proof(&expr);
565            expr.into()
566        });
567        Stream::new(
568            self.location.clone(),
569            HydroNode::Map {
570                f,
571                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
572                metadata: self
573                    .location
574                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
575            },
576        )
577    }
578
579    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
580    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
581    /// for the output type `U` must produce items in a **deterministic** order.
582    ///
583    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
584    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
585    ///
586    /// # Example
587    /// ```rust
588    /// # #[cfg(feature = "deploy")] {
589    /// # use hydro_lang::prelude::*;
590    /// # use futures::StreamExt;
591    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
592    /// process
593    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
594    ///     .flat_map_ordered(q!(|x| x))
595    /// # }, |mut stream| async move {
596    /// // 1, 2, 3, 4
597    /// # for w in (1..5) {
598    /// #     assert_eq!(stream.next().await.unwrap(), w);
599    /// # }
600    /// # }));
601    /// # }
602    /// ```
603    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
604    where
605        I: IntoIterator<Item = U>,
606        F: Fn(T) -> I + 'a,
607    {
608        let f = crate::singleton_ref::with_singleton_capture(|| {
609            f.splice_fn1_ctx(&self.location).into()
610        });
611        Stream::new(
612            self.location.clone(),
613            HydroNode::FlatMap {
614                f,
615                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
616                metadata: self
617                    .location
618                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
619            },
620        )
621    }
622
623    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
624    /// for the output type `U` to produce items in any order.
625    ///
626    /// # Example
627    /// ```rust
628    /// # #[cfg(feature = "deploy")] {
629    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
630    /// # use futures::StreamExt;
631    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
632    /// process
633    ///     .source_iter(q!(vec![
634    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
635    ///         std::collections::HashSet::from_iter(vec![3, 4]),
636    ///     ]))
637    ///     .flat_map_unordered(q!(|x| x))
638    /// # }, |mut stream| async move {
639    /// // 1, 2, 3, 4, but in no particular order
640    /// # let mut results = Vec::new();
641    /// # for w in (1..5) {
642    /// #     results.push(stream.next().await.unwrap());
643    /// # }
644    /// # results.sort();
645    /// # assert_eq!(results, vec![1, 2, 3, 4]);
646    /// # }));
647    /// # }
648    /// ```
649    pub fn flat_map_unordered<U, I, F>(
650        self,
651        f: impl IntoQuotedMut<'a, F, L>,
652    ) -> Stream<U, L, B, NoOrder, R>
653    where
654        I: IntoIterator<Item = U>,
655        F: Fn(T) -> I + 'a,
656    {
657        let f = crate::singleton_ref::with_singleton_capture(|| {
658            f.splice_fn1_ctx(&self.location).into()
659        });
660        Stream::new(
661            self.location.clone(),
662            HydroNode::FlatMap {
663                f,
664                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
665                metadata: self
666                    .location
667                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
668            },
669        )
670    }
671
672    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
673    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
674    ///
675    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
676    /// not deterministic, use [`Stream::flatten_unordered`] instead.
677    ///
678    /// ```rust
679    /// # #[cfg(feature = "deploy")] {
680    /// # use hydro_lang::prelude::*;
681    /// # use futures::StreamExt;
682    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
683    /// process
684    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
685    ///     .flatten_ordered()
686    /// # }, |mut stream| async move {
687    /// // 1, 2, 3, 4
688    /// # for w in (1..5) {
689    /// #     assert_eq!(stream.next().await.unwrap(), w);
690    /// # }
691    /// # }));
692    /// # }
693    /// ```
694    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
695    where
696        T: IntoIterator<Item = U>,
697    {
698        self.flat_map_ordered(q!(|d| d))
699    }
700
701    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
702    /// for the element type `T` to produce items in any order.
703    ///
704    /// # Example
705    /// ```rust
706    /// # #[cfg(feature = "deploy")] {
707    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
708    /// # use futures::StreamExt;
709    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
710    /// process
711    ///     .source_iter(q!(vec![
712    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
713    ///         std::collections::HashSet::from_iter(vec![3, 4]),
714    ///     ]))
715    ///     .flatten_unordered()
716    /// # }, |mut stream| async move {
717    /// // 1, 2, 3, 4, but in no particular order
718    /// # let mut results = Vec::new();
719    /// # for w in (1..5) {
720    /// #     results.push(stream.next().await.unwrap());
721    /// # }
722    /// # results.sort();
723    /// # assert_eq!(results, vec![1, 2, 3, 4]);
724    /// # }));
725    /// # }
726    /// ```
727    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
728    where
729        T: IntoIterator<Item = U>,
730    {
731        self.flat_map_unordered(q!(|d| d))
732    }
733
734    /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
735    /// then emit the elements of that stream one by one. When the inner stream yields
736    /// `Pending`, this operator yields as well.
737    pub fn flat_map_stream_blocking<U, S, F>(
738        self,
739        f: impl IntoQuotedMut<'a, F, L>,
740    ) -> Stream<U, L, B, O, R>
741    where
742        S: futures::Stream<Item = U>,
743        F: Fn(T) -> S + 'a,
744    {
745        let f = f.splice_fn1_ctx(&self.location).into();
746        Stream::new(
747            self.location.clone(),
748            HydroNode::FlatMapStreamBlocking {
749                f,
750                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
751                metadata: self
752                    .location
753                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
754            },
755        )
756    }
757
758    /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
759    /// emit its elements one by one. When the inner stream yields `Pending`, this operator
760    /// yields as well.
761    pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
762    where
763        T: futures::Stream<Item = U>,
764    {
765        self.flat_map_stream_blocking(q!(|d| d))
766    }
767
768    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
769    /// `f`, preserving the order of the elements.
770    ///
771    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
772    /// not modify or take ownership of the values. If you need to modify the values while filtering
773    /// use [`Stream::filter_map`] instead.
774    ///
775    /// # Example
776    /// ```rust
777    /// # #[cfg(feature = "deploy")] {
778    /// # use hydro_lang::prelude::*;
779    /// # use futures::StreamExt;
780    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
781    /// process
782    ///     .source_iter(q!(vec![1, 2, 3, 4]))
783    ///     .filter(q!(|&x| x > 2))
784    /// # }, |mut stream| async move {
785    /// // 3, 4
786    /// # for w in (3..5) {
787    /// #     assert_eq!(stream.next().await.unwrap(), w);
788    /// # }
789    /// # }));
790    /// # }
791    /// ```
792    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
793    where
794        F: Fn(&T) -> bool + 'a,
795    {
796        let f = crate::singleton_ref::with_singleton_capture(|| {
797            f.splice_fn1_borrow_ctx(&self.location).into()
798        });
799        Stream::new(
800            self.location.clone(),
801            HydroNode::Filter {
802                f,
803                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
804                metadata: self.location.new_node_metadata(Self::collection_kind()),
805            },
806        )
807    }
808
809    /// Splits the stream into two streams based on a predicate, without cloning elements.
810    ///
811    /// Elements for which `f` returns `true` are sent to the first output stream,
812    /// and elements for which `f` returns `false` are sent to the second output stream.
813    ///
814    /// Unlike using `filter` twice, this only evaluates the predicate once per element
815    /// and does not require `T: Clone`.
816    ///
817    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
818    /// the predicate is only used for routing; the element itself is moved to the
819    /// appropriate output stream.
820    ///
821    /// # Example
822    /// ```rust
823    /// # #[cfg(feature = "deploy")] {
824    /// # use hydro_lang::prelude::*;
825    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
826    /// # use futures::StreamExt;
827    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
828    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
829    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
830    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
831    /// evens.map(q!(|x| (x, true)))
832    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
833    /// # }, |mut stream| async move {
834    /// # let mut results = Vec::new();
835    /// # for _ in 0..6 {
836    /// #     results.push(stream.next().await.unwrap());
837    /// # }
838    /// # results.sort();
839    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
840    /// # }));
841    /// # }
842    /// ```
843    #[expect(
844        clippy::type_complexity,
845        reason = "return type mirrors the input stream type"
846    )]
847    pub fn partition<F>(
848        self,
849        f: impl IntoQuotedMut<'a, F, L>,
850    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
851    where
852        F: Fn(&T) -> bool + 'a,
853    {
854        let f = crate::singleton_ref::with_singleton_capture(|| {
855            f.splice_fn1_borrow_ctx(&self.location).into()
856        });
857        let shared = SharedNode(Rc::new(RefCell::new(
858            self.ir_node.replace(HydroNode::Placeholder),
859        )));
860
861        let true_stream = Stream::new(
862            self.location.clone(),
863            HydroNode::Partition {
864                inner: SharedNode(shared.0.clone()),
865                f: f.clone(),
866                is_true: true,
867                metadata: self.location.new_node_metadata(Self::collection_kind()),
868            },
869        );
870
871        let false_stream = Stream::new(
872            self.location.clone(),
873            HydroNode::Partition {
874                inner: SharedNode(shared.0),
875                f,
876                is_true: false,
877                metadata: self.location.new_node_metadata(Self::collection_kind()),
878            },
879        );
880
881        (true_stream, false_stream)
882    }
883
884    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
885    ///
886    /// # Example
887    /// ```rust
888    /// # #[cfg(feature = "deploy")] {
889    /// # use hydro_lang::prelude::*;
890    /// # use futures::StreamExt;
891    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
892    /// process
893    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
894    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
895    /// # }, |mut stream| async move {
896    /// // 1, 2
897    /// # for w in (1..3) {
898    /// #     assert_eq!(stream.next().await.unwrap(), w);
899    /// # }
900    /// # }));
901    /// # }
902    /// ```
903    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
904    where
905        F: Fn(T) -> Option<U> + 'a,
906    {
907        let f = crate::singleton_ref::with_singleton_capture(|| {
908            f.splice_fn1_ctx(&self.location).into()
909        });
910        Stream::new(
911            self.location.clone(),
912            HydroNode::FilterMap {
913                f,
914                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
915                metadata: self
916                    .location
917                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
918            },
919        )
920    }
921
922    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
923    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
924    /// If `other` is an empty [`Optional`], no values will be produced.
925    ///
926    /// # Example
927    /// ```rust
928    /// # #[cfg(feature = "deploy")] {
929    /// # use hydro_lang::prelude::*;
930    /// # use futures::StreamExt;
931    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
932    /// let tick = process.tick();
933    /// let batch = process
934    ///   .source_iter(q!(vec![1, 2, 3, 4]))
935    ///   .batch(&tick, nondet!(/** test */));
936    /// let count = batch.clone().count(); // `count()` returns a singleton
937    /// batch.cross_singleton(count).all_ticks()
938    /// # }, |mut stream| async move {
939    /// // (1, 4), (2, 4), (3, 4), (4, 4)
940    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
941    /// #     assert_eq!(stream.next().await.unwrap(), w);
942    /// # }
943    /// # }));
944    /// # }
945    /// ```
946    pub fn cross_singleton<O2>(
947        self,
948        other: impl Into<Optional<O2, L, Bounded>>,
949    ) -> Stream<(T, O2), L, B, O, R>
950    where
951        O2: Clone,
952    {
953        let other: Optional<O2, L, Bounded> = other.into();
954        check_matching_location(&self.location, &other.location);
955
956        Stream::new(
957            self.location.clone(),
958            HydroNode::CrossSingleton {
959                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
960                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
961                metadata: self
962                    .location
963                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
964            },
965        )
966    }
967
968    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
969    ///
970    /// # Example
971    /// ```rust
972    /// # #[cfg(feature = "deploy")] {
973    /// # use hydro_lang::prelude::*;
974    /// # use futures::StreamExt;
975    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
976    /// let tick = process.tick();
977    /// // ticks are lazy by default, forces the second tick to run
978    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
979    ///
980    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
981    /// let batch_first_tick = process
982    ///   .source_iter(q!(vec![1, 2, 3, 4]))
983    ///   .batch(&tick, nondet!(/** test */));
984    /// let batch_second_tick = process
985    ///   .source_iter(q!(vec![5, 6, 7, 8]))
986    ///   .batch(&tick, nondet!(/** test */))
987    ///   .defer_tick();
988    /// batch_first_tick.chain(batch_second_tick)
989    ///   .filter_if(signal)
990    ///   .all_ticks()
991    /// # }, |mut stream| async move {
992    /// // [1, 2, 3, 4]
993    /// # for w in vec![1, 2, 3, 4] {
994    /// #     assert_eq!(stream.next().await.unwrap(), w);
995    /// # }
996    /// # }));
997    /// # }
998    /// ```
999    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
1000        self.cross_singleton(signal.filter(q!(|b| *b)))
1001            .map(q!(|(d, _)| d))
1002    }
1003
1004    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
1005    ///
1006    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
1007    /// leader of a cluster.
1008    ///
1009    /// # Example
1010    /// ```rust
1011    /// # #[cfg(feature = "deploy")] {
1012    /// # use hydro_lang::prelude::*;
1013    /// # use futures::StreamExt;
1014    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1015    /// let tick = process.tick();
1016    /// // ticks are lazy by default, forces the second tick to run
1017    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1018    ///
1019    /// let batch_first_tick = process
1020    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1021    ///   .batch(&tick, nondet!(/** test */));
1022    /// let batch_second_tick = process
1023    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1024    ///   .batch(&tick, nondet!(/** test */))
1025    ///   .defer_tick(); // appears on the second tick
1026    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1027    /// batch_first_tick.chain(batch_second_tick)
1028    ///   .filter_if_some(some_on_first_tick)
1029    ///   .all_ticks()
1030    /// # }, |mut stream| async move {
1031    /// // [1, 2, 3, 4]
1032    /// # for w in vec![1, 2, 3, 4] {
1033    /// #     assert_eq!(stream.next().await.unwrap(), w);
1034    /// # }
1035    /// # }));
1036    /// # }
1037    /// ```
1038    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1039    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1040        self.filter_if(signal.is_some())
1041    }
1042
1043    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
1044    ///
1045    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
1046    /// some local state.
1047    ///
1048    /// # Example
1049    /// ```rust
1050    /// # #[cfg(feature = "deploy")] {
1051    /// # use hydro_lang::prelude::*;
1052    /// # use futures::StreamExt;
1053    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1054    /// let tick = process.tick();
1055    /// // ticks are lazy by default, forces the second tick to run
1056    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1057    ///
1058    /// let batch_first_tick = process
1059    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1060    ///   .batch(&tick, nondet!(/** test */));
1061    /// let batch_second_tick = process
1062    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1063    ///   .batch(&tick, nondet!(/** test */))
1064    ///   .defer_tick(); // appears on the second tick
1065    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1066    /// batch_first_tick.chain(batch_second_tick)
1067    ///   .filter_if_none(some_on_first_tick)
1068    ///   .all_ticks()
1069    /// # }, |mut stream| async move {
1070    /// // [5, 6, 7, 8]
1071    /// # for w in vec![5, 6, 7, 8] {
1072    /// #     assert_eq!(stream.next().await.unwrap(), w);
1073    /// # }
1074    /// # }));
1075    /// # }
1076    /// ```
1077    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1078    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1079        self.filter_if(other.is_none())
1080    }
1081
1082    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
1083    /// returning all tupled pairs.
1084    ///
1085    /// When the right side is [`Bounded`], it is accumulated first and the left side streams
1086    /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
1087    /// symmetric hash join is used and ordering is [`NoOrder`].
1088    ///
1089    /// # Example
1090    /// ```rust
1091    /// # #[cfg(feature = "deploy")] {
1092    /// # use hydro_lang::prelude::*;
1093    /// # use std::collections::HashSet;
1094    /// # use futures::StreamExt;
1095    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1096    /// let tick = process.tick();
1097    /// let stream1 = process.source_iter(q!(vec![1, 2]));
1098    /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
1099    /// stream1.cross_product(stream2)
1100    /// # }, |mut stream| async move {
1101    /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
1102    /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
1103    /// # stream.map(|i| assert!(expected.contains(&i)));
1104    /// # }));
1105    /// # }
1106    #[expect(
1107        clippy::type_complexity,
1108        reason = "MinRetries projection in return type"
1109    )]
1110    pub fn cross_product<T2, B2: Boundedness, O2: Ordering, R2: Retries>(
1111        self,
1112        other: Stream<T2, L, B2, O2, R2>,
1113    ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
1114    where
1115        T: Clone,
1116        T2: Clone,
1117        R: MinRetries<R2>,
1118    {
1119        self.map(q!(|v| ((), v)))
1120            .join(other.map(q!(|v| ((), v))))
1121            .map(q!(|((), (v1, v2))| (v1, v2)))
1122    }
1123
1124    /// Takes one stream as input and filters out any duplicate occurrences. The output
1125    /// contains all unique values from the input.
1126    ///
1127    /// # Example
1128    /// ```rust
1129    /// # #[cfg(feature = "deploy")] {
1130    /// # use hydro_lang::prelude::*;
1131    /// # use futures::StreamExt;
1132    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1133    /// let tick = process.tick();
1134    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1135    /// # }, |mut stream| async move {
1136    /// # for w in vec![1, 2, 3, 4] {
1137    /// #     assert_eq!(stream.next().await.unwrap(), w);
1138    /// # }
1139    /// # }));
1140    /// # }
1141    /// ```
1142    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1143    where
1144        T: Eq + Hash,
1145    {
1146        Stream::new(
1147            self.location.clone(),
1148            HydroNode::Unique {
1149                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1150                metadata: self
1151                    .location
1152                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1153            },
1154        )
1155    }
1156
1157    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1158    ///
1159    /// The `other` stream must be [`Bounded`], since this function will wait until
1160    /// all its elements are available before producing any output.
1161    /// # Example
1162    /// ```rust
1163    /// # #[cfg(feature = "deploy")] {
1164    /// # use hydro_lang::prelude::*;
1165    /// # use futures::StreamExt;
1166    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1167    /// let tick = process.tick();
1168    /// let stream = process
1169    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1170    ///   .batch(&tick, nondet!(/** test */));
1171    /// let batch = process
1172    ///   .source_iter(q!(vec![1, 2]))
1173    ///   .batch(&tick, nondet!(/** test */));
1174    /// stream.filter_not_in(batch).all_ticks()
1175    /// # }, |mut stream| async move {
1176    /// # for w in vec![3, 4] {
1177    /// #     assert_eq!(stream.next().await.unwrap(), w);
1178    /// # }
1179    /// # }));
1180    /// # }
1181    /// ```
1182    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1183    where
1184        T: Eq + Hash,
1185        B2: IsBounded,
1186    {
1187        check_matching_location(&self.location, &other.location);
1188
1189        Stream::new(
1190            self.location.clone(),
1191            HydroNode::Difference {
1192                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1193                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1194                metadata: self
1195                    .location
1196                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1197            },
1198        )
1199    }
1200
1201    /// An operator which allows you to "inspect" each element of a stream without
1202    /// modifying it. The closure `f` is called on a reference to each item. This is
1203    /// mainly useful for debugging, and should not be used to generate side-effects.
1204    ///
1205    /// # Example
1206    /// ```rust
1207    /// # #[cfg(feature = "deploy")] {
1208    /// # use hydro_lang::prelude::*;
1209    /// # use futures::StreamExt;
1210    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1211    /// let nums = process.source_iter(q!(vec![1, 2]));
1212    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1213    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1214    /// # }, |mut stream| async move {
1215    /// # for w in vec![1, 2] {
1216    /// #     assert_eq!(stream.next().await.unwrap(), w);
1217    /// # }
1218    /// # }));
1219    /// # }
1220    /// ```
1221    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L::DropConsistency>) -> Self
1222    where
1223        F: Fn(&T) + 'a,
1224    {
1225        let f = crate::singleton_ref::with_singleton_capture(|| {
1226            f.splice_fn1_borrow_ctx(&self.location.drop_consistency())
1227                .into()
1228        });
1229
1230        Stream::new(
1231            self.location.clone(),
1232            HydroNode::Inspect {
1233                f,
1234                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1235                metadata: self.location.new_node_metadata(Self::collection_kind()),
1236            },
1237        )
1238    }
1239
1240    /// Executes the provided closure for every element in this stream.
1241    ///
1242    /// Because the closure may have side effects, the stream must have deterministic order
1243    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1244    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1245    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1246    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1247    where
1248        O: IsOrdered,
1249        R: IsExactlyOnce,
1250    {
1251        let f = f.splice_fn1_ctx(&self.location).into();
1252        self.location
1253            .flow_state()
1254            .borrow_mut()
1255            .push_root(HydroRoot::ForEach {
1256                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1257                f,
1258                op_metadata: HydroIrOpMetadata::new(),
1259            });
1260    }
1261
1262    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1263    /// TCP socket to some other server. You should _not_ use this API for interacting with
1264    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1265    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1266    /// interaction with asynchronous sinks.
1267    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1268    where
1269        O: IsOrdered,
1270        R: IsExactlyOnce,
1271        S: 'a + futures::Sink<T> + Unpin,
1272    {
1273        self.location
1274            .flow_state()
1275            .borrow_mut()
1276            .push_root(HydroRoot::DestSink {
1277                sink: sink.splice_typed_ctx(&self.location).into(),
1278                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1279                op_metadata: HydroIrOpMetadata::new(),
1280            });
1281    }
1282
1283    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1284    ///
1285    /// # Example
1286    /// ```rust
1287    /// # #[cfg(feature = "deploy")] {
1288    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1289    /// # use futures::StreamExt;
1290    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1291    /// let tick = process.tick();
1292    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1293    /// numbers.enumerate()
1294    /// # }, |mut stream| async move {
1295    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1296    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1297    /// #     assert_eq!(stream.next().await.unwrap(), w);
1298    /// # }
1299    /// # }));
1300    /// # }
1301    /// ```
1302    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1303    where
1304        O: IsOrdered,
1305        R: IsExactlyOnce,
1306    {
1307        Stream::new(
1308            self.location.clone(),
1309            HydroNode::Enumerate {
1310                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1311                metadata: self.location.new_node_metadata(Stream::<
1312                    (usize, T),
1313                    L,
1314                    B,
1315                    TotalOrder,
1316                    ExactlyOnce,
1317                >::collection_kind()),
1318            },
1319        )
1320    }
1321
1322    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1323    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1324    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1325    ///
1326    /// Depending on the input stream guarantees, the closure may need to be commutative
1327    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1328    ///
1329    /// # Example
1330    /// ```rust
1331    /// # #[cfg(feature = "deploy")] {
1332    /// # use hydro_lang::prelude::*;
1333    /// # use futures::StreamExt;
1334    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1335    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1336    /// words
1337    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1338    ///     .into_stream()
1339    /// # }, |mut stream| async move {
1340    /// // "HELLOWORLD"
1341    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1342    /// # }));
1343    /// # }
1344    /// ```
1345    pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1346        self,
1347        init: impl IntoQuotedMut<'a, I, L>,
1348        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1349    ) -> Singleton<A, L, B2>
1350    where
1351        I: Fn() -> A + 'a,
1352        F: 'a + Fn(&mut A, T),
1353        C: ValidCommutativityFor<O>,
1354        Idemp: ValidIdempotenceFor<R>,
1355        B: ApplyMonotoneStream<M, B2>,
1356    {
1357        let init = init.splice_fn0_ctx(&self.location).into();
1358        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1359        proof.register_proof(&comb);
1360
1361        // Only assume_retries (for idempotence), not assume_ordering.
1362        // The fold hook in the simulator handles ordering non-determinism directly.
1363        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1364        let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1365
1366        let core = HydroNode::Fold {
1367            init,
1368            acc: comb.into(),
1369            input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1370            metadata: retried
1371                .location
1372                .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1373            // we do not guarantee consistency at this point because if the algebraic properties
1374            // do not hold in practice, replica consistency may fail to be maintained, so we
1375            // would like the simulator to assert consistency; in the future, this will be dynamic
1376            // based on the proof mechanism
1377        };
1378
1379        Singleton::new(retried.location.clone(), core)
1380            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1381    }
1382
1383    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1384    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1385    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1386    /// reference, so that it can be modified in place.
1387    ///
1388    /// Depending on the input stream guarantees, the closure may need to be commutative
1389    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1390    ///
1391    /// # Example
1392    /// ```rust
1393    /// # #[cfg(feature = "deploy")] {
1394    /// # use hydro_lang::prelude::*;
1395    /// # use futures::StreamExt;
1396    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1397    /// let bools = process.source_iter(q!(vec![false, true, false]));
1398    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1399    /// # }, |mut stream| async move {
1400    /// // true
1401    /// # assert_eq!(stream.next().await.unwrap(), true);
1402    /// # }));
1403    /// # }
1404    /// ```
1405    pub fn reduce<F, C, Idemp>(
1406        self,
1407        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1408    ) -> Optional<T, L, B>
1409    where
1410        F: Fn(&mut T, T) + 'a,
1411        C: ValidCommutativityFor<O>,
1412        Idemp: ValidIdempotenceFor<R>,
1413    {
1414        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1415        proof.register_proof(&f);
1416
1417        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1418        let ordered_etc: Stream<T, L::DropConsistency, B> =
1419            self.assume_retries(nondet).assume_ordering(nondet);
1420
1421        let core = HydroNode::Reduce {
1422            f: f.into(),
1423            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1424            metadata: ordered_etc
1425                .location
1426                .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1427        };
1428
1429        Optional::new(ordered_etc.location.clone(), core)
1430            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1431    }
1432
1433    /// Computes the maximum element in the stream as an [`Optional`], which
1434    /// will be empty until the first element in the input arrives.
1435    ///
1436    /// # Example
1437    /// ```rust
1438    /// # #[cfg(feature = "deploy")] {
1439    /// # use hydro_lang::prelude::*;
1440    /// # use futures::StreamExt;
1441    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1442    /// let tick = process.tick();
1443    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1444    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1445    /// batch.max().all_ticks()
1446    /// # }, |mut stream| async move {
1447    /// // 4
1448    /// # assert_eq!(stream.next().await.unwrap(), 4);
1449    /// # }));
1450    /// # }
1451    /// ```
1452    pub fn max(self) -> Optional<T, L, B>
1453    where
1454        T: Ord,
1455    {
1456        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1457            .assume_ordering_trusted_bounded::<TotalOrder>(
1458                nondet!(/** max is commutative, but order affects intermediates */),
1459            )
1460            .reduce(q!(|curr, new| {
1461                if new > *curr {
1462                    *curr = new;
1463                }
1464            }))
1465    }
1466
1467    /// Computes the minimum element in the stream as an [`Optional`], which
1468    /// will be empty until the first element in the input arrives.
1469    ///
1470    /// # Example
1471    /// ```rust
1472    /// # #[cfg(feature = "deploy")] {
1473    /// # use hydro_lang::prelude::*;
1474    /// # use futures::StreamExt;
1475    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1476    /// let tick = process.tick();
1477    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1478    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1479    /// batch.min().all_ticks()
1480    /// # }, |mut stream| async move {
1481    /// // 1
1482    /// # assert_eq!(stream.next().await.unwrap(), 1);
1483    /// # }));
1484    /// # }
1485    /// ```
1486    pub fn min(self) -> Optional<T, L, B>
1487    where
1488        T: Ord,
1489    {
1490        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1491            .assume_ordering_trusted_bounded::<TotalOrder>(
1492                nondet!(/** max is commutative, but order affects intermediates */),
1493            )
1494            .reduce(q!(|curr, new| {
1495                if new < *curr {
1496                    *curr = new;
1497                }
1498            }))
1499    }
1500
1501    /// Computes the first element in the stream as an [`Optional`], which
1502    /// will be empty until the first element in the input arrives.
1503    ///
1504    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1505    /// re-ordering of elements may cause the first element to change.
1506    ///
1507    /// # Example
1508    /// ```rust
1509    /// # #[cfg(feature = "deploy")] {
1510    /// # use hydro_lang::prelude::*;
1511    /// # use futures::StreamExt;
1512    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1513    /// let tick = process.tick();
1514    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1515    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1516    /// batch.first().all_ticks()
1517    /// # }, |mut stream| async move {
1518    /// // 1
1519    /// # assert_eq!(stream.next().await.unwrap(), 1);
1520    /// # }));
1521    /// # }
1522    /// ```
1523    pub fn first(self) -> Optional<T, L, B>
1524    where
1525        O: IsOrdered,
1526    {
1527        self.make_totally_ordered()
1528            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1529            .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1530            .reduce(q!(|_, _| {}))
1531    }
1532
1533    /// Computes the last element in the stream as an [`Optional`], which
1534    /// will be empty until an element in the input arrives.
1535    ///
1536    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1537    /// re-ordering of elements may cause the last element to change.
1538    ///
1539    /// # Example
1540    /// ```rust
1541    /// # #[cfg(feature = "deploy")] {
1542    /// # use hydro_lang::prelude::*;
1543    /// # use futures::StreamExt;
1544    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1545    /// let tick = process.tick();
1546    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1547    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1548    /// batch.last().all_ticks()
1549    /// # }, |mut stream| async move {
1550    /// // 4
1551    /// # assert_eq!(stream.next().await.unwrap(), 4);
1552    /// # }));
1553    /// # }
1554    /// ```
1555    pub fn last(self) -> Optional<T, L, B>
1556    where
1557        O: IsOrdered,
1558    {
1559        self.make_totally_ordered()
1560            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1561            .reduce(q!(|curr, new| *curr = new))
1562    }
1563
1564    /// Returns a stream containing at most the first `n` elements of the input stream,
1565    /// preserving the original order. Similar to `LIMIT` in SQL.
1566    ///
1567    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1568    /// retries, since the result depends on the order and cardinality of elements.
1569    ///
1570    /// # Example
1571    /// ```rust
1572    /// # #[cfg(feature = "deploy")] {
1573    /// # use hydro_lang::prelude::*;
1574    /// # use futures::StreamExt;
1575    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1576    /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1577    /// numbers.limit(q!(3))
1578    /// # }, |mut stream| async move {
1579    /// // 10, 20, 30
1580    /// # for w in vec![10, 20, 30] {
1581    /// #     assert_eq!(stream.next().await.unwrap(), w);
1582    /// # }
1583    /// # }));
1584    /// # }
1585    /// ```
1586    pub fn limit(
1587        self,
1588        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1589    ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1590    where
1591        O: IsOrdered,
1592        R: IsExactlyOnce,
1593    {
1594        self.generator(
1595            q!(|| 0usize),
1596            q!(move |count, item| {
1597                if *count == n {
1598                    Generate::Break
1599                } else {
1600                    *count += 1;
1601                    if *count == n {
1602                        Generate::Return(item)
1603                    } else {
1604                        Generate::Yield(item)
1605                    }
1606                }
1607            }),
1608        )
1609    }
1610
1611    /// Collects all the elements of this stream into a single [`Vec`] element.
1612    ///
1613    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1614    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1615    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1616    /// the vector at an arbitrary point in time.
1617    ///
1618    /// # Example
1619    /// ```rust
1620    /// # #[cfg(feature = "deploy")] {
1621    /// # use hydro_lang::prelude::*;
1622    /// # use futures::StreamExt;
1623    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1624    /// let tick = process.tick();
1625    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1626    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1627    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1628    /// # }, |mut stream| async move {
1629    /// // [ vec![1, 2, 3, 4] ]
1630    /// # for w in vec![vec![1, 2, 3, 4]] {
1631    /// #     assert_eq!(stream.next().await.unwrap(), w);
1632    /// # }
1633    /// # }));
1634    /// # }
1635    /// ```
1636    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1637    where
1638        O: IsOrdered,
1639        R: IsExactlyOnce,
1640    {
1641        self.make_totally_ordered().make_exactly_once().fold(
1642            q!(|| vec![]),
1643            q!(|acc, v| {
1644                acc.push(v);
1645            }),
1646        )
1647    }
1648
1649    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1650    /// and emitting each intermediate result.
1651    ///
1652    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1653    /// containing all intermediate accumulated values. The scan operation can also terminate early
1654    /// by returning `None`.
1655    ///
1656    /// The function takes a mutable reference to the accumulator and the current element, and returns
1657    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1658    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1659    ///
1660    /// # Examples
1661    ///
1662    /// Basic usage - running sum:
1663    /// ```rust
1664    /// # #[cfg(feature = "deploy")] {
1665    /// # use hydro_lang::prelude::*;
1666    /// # use futures::StreamExt;
1667    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1668    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1669    ///     q!(|| 0),
1670    ///     q!(|acc, x| {
1671    ///         *acc += x;
1672    ///         Some(*acc)
1673    ///     }),
1674    /// )
1675    /// # }, |mut stream| async move {
1676    /// // Output: 1, 3, 6, 10
1677    /// # for w in vec![1, 3, 6, 10] {
1678    /// #     assert_eq!(stream.next().await.unwrap(), w);
1679    /// # }
1680    /// # }));
1681    /// # }
1682    /// ```
1683    ///
1684    /// Early termination example:
1685    /// ```rust
1686    /// # #[cfg(feature = "deploy")] {
1687    /// # use hydro_lang::prelude::*;
1688    /// # use futures::StreamExt;
1689    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1690    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1691    ///     q!(|| 1),
1692    ///     q!(|state, x| {
1693    ///         *state = *state * x;
1694    ///         if *state > 6 {
1695    ///             None // Terminate the stream
1696    ///         } else {
1697    ///             Some(-*state)
1698    ///         }
1699    ///     }),
1700    /// )
1701    /// # }, |mut stream| async move {
1702    /// // Output: -1, -2, -6
1703    /// # for w in vec![-1, -2, -6] {
1704    /// #     assert_eq!(stream.next().await.unwrap(), w);
1705    /// # }
1706    /// # }));
1707    /// # }
1708    /// ```
1709    pub fn scan<A, U, I, F>(
1710        self,
1711        init: impl IntoQuotedMut<'a, I, L>,
1712        f: impl IntoQuotedMut<'a, F, L>,
1713    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1714    where
1715        O: IsOrdered,
1716        R: IsExactlyOnce,
1717        I: Fn() -> A + 'a,
1718        F: Fn(&mut A, T) -> Option<U> + 'a,
1719    {
1720        let init = init.splice_fn0_ctx(&self.location).into();
1721        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1722
1723        Stream::new(
1724            self.location.clone(),
1725            HydroNode::Scan {
1726                init,
1727                acc: f,
1728                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1729                metadata: self.location.new_node_metadata(
1730                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1731                ),
1732            },
1733        )
1734    }
1735
1736    /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1737    /// stream, maintaining an internal state (accumulator) and emitting the values returned
1738    /// by the function.
1739    ///
1740    /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1741    /// future. The future is polled to completion. If it resolves to `Some`, the value is
1742    /// emitted. If it resolves to `None`, the item is filtered out.
1743    ///
1744    /// # Examples
1745    ///
1746    /// ```rust
1747    /// # #[cfg(feature = "deploy")] {
1748    /// # use hydro_lang::prelude::*;
1749    /// # use futures::StreamExt;
1750    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1751    /// process
1752    ///     .source_iter(q!(vec![1, 2, 3, 4]))
1753    ///     .scan_async_blocking(
1754    ///         q!(|| 0),
1755    ///         q!(|acc, x| {
1756    ///             *acc += x;
1757    ///             let val = *acc;
1758    ///             async move { Some(val) }
1759    ///         }),
1760    ///     )
1761    /// # }, |mut stream| async move {
1762    /// // Output: 1, 3, 6, 10
1763    /// # for w in vec![1, 3, 6, 10] {
1764    /// #     assert_eq!(stream.next().await.unwrap(), w);
1765    /// # }
1766    /// # }));
1767    /// # }
1768    /// ```
1769    pub fn scan_async_blocking<A, U, I, F, Fut>(
1770        self,
1771        init: impl IntoQuotedMut<'a, I, L>,
1772        f: impl IntoQuotedMut<'a, F, L>,
1773    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1774    where
1775        O: IsOrdered,
1776        R: IsExactlyOnce,
1777        I: Fn() -> A + 'a,
1778        F: Fn(&mut A, T) -> Fut + 'a,
1779        Fut: Future<Output = Option<U>> + 'a,
1780    {
1781        let init = init.splice_fn0_ctx(&self.location).into();
1782        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1783
1784        Stream::new(
1785            self.location.clone(),
1786            HydroNode::ScanAsyncBlocking {
1787                init,
1788                acc: f,
1789                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1790                metadata: self.location.new_node_metadata(
1791                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1792                ),
1793            },
1794        )
1795    }
1796
1797    /// Iteratively processes the elements of the stream using a state machine that can yield
1798    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1799    /// syntax in Rust, without requiring special syntax.
1800    ///
1801    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1802    /// state. The second argument defines the processing logic, taking in a mutable reference
1803    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1804    /// variants define what is emitted and whether further inputs should be processed.
1805    ///
1806    /// # Example
1807    /// ```rust
1808    /// # #[cfg(feature = "deploy")] {
1809    /// # use hydro_lang::prelude::*;
1810    /// # use futures::StreamExt;
1811    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1812    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1813    ///     q!(|| 0),
1814    ///     q!(|acc, x| {
1815    ///         *acc += x;
1816    ///         if *acc > 100 {
1817    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1818    ///         } else if *acc % 2 == 0 {
1819    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1820    ///         } else {
1821    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1822    ///         }
1823    ///     }),
1824    /// )
1825    /// # }, |mut stream| async move {
1826    /// // Output: "even", "done!"
1827    /// # let mut results = Vec::new();
1828    /// # for _ in 0..2 {
1829    /// #     results.push(stream.next().await.unwrap());
1830    /// # }
1831    /// # results.sort();
1832    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1833    /// # }));
1834    /// # }
1835    /// ```
1836    pub fn generator<A, U, I, F>(
1837        self,
1838        init: impl IntoQuotedMut<'a, I, L> + Copy,
1839        f: impl IntoQuotedMut<'a, F, L> + Copy,
1840    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1841    where
1842        O: IsOrdered,
1843        R: IsExactlyOnce,
1844        I: Fn() -> A + 'a,
1845        F: Fn(&mut A, T) -> Generate<U> + 'a,
1846    {
1847        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1848        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1849
1850        let this = self.make_totally_ordered().make_exactly_once();
1851
1852        // State is Option<Option<A>>:
1853        //   None = not yet initialized
1854        //   Some(Some(a)) = active with state a
1855        //   Some(None) = terminated
1856        let scan_init = q!(|| None)
1857            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1858            .into();
1859        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1860            if state.is_none() {
1861                *state = Some(Some(init()));
1862            }
1863            match state {
1864                Some(Some(state_value)) => match f(state_value, v) {
1865                    Generate::Yield(out) => Some(Some(out)),
1866                    Generate::Return(out) => {
1867                        *state = Some(None);
1868                        Some(Some(out))
1869                    }
1870                    // Unlike KeyedStream, we can terminate the scan directly on
1871                    // Break/Return because there is only one state (no other keys
1872                    // that still need processing).
1873                    Generate::Break => None,
1874                    Generate::Continue => Some(None),
1875                },
1876                // State is Some(None) after Return; terminate the scan.
1877                _ => None,
1878            }
1879        })
1880        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1881        .into();
1882
1883        let scan_node = HydroNode::Scan {
1884            init: scan_init,
1885            acc: scan_f,
1886            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1887            metadata: this.location.new_node_metadata(Stream::<
1888                Option<U>,
1889                L,
1890                B,
1891                TotalOrder,
1892                ExactlyOnce,
1893            >::collection_kind()),
1894        };
1895
1896        let flatten_f = q!(|d| d)
1897            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1898            .into();
1899        let flatten_node = HydroNode::FlatMap {
1900            f: flatten_f,
1901            input: Box::new(scan_node),
1902            metadata: this
1903                .location
1904                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1905        };
1906
1907        Stream::new(this.location.clone(), flatten_node)
1908    }
1909
1910    /// Given a time interval, returns a stream corresponding to samples taken from the
1911    /// stream roughly at that interval. The output will have elements in the same order
1912    /// as the input, but with arbitrary elements skipped between samples. There is also
1913    /// no guarantee on the exact timing of the samples.
1914    ///
1915    /// # Non-Determinism
1916    /// The output stream is non-deterministic in which elements are sampled, since this
1917    /// is controlled by a clock.
1918    pub fn sample_every(
1919        self,
1920        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1921        nondet: NonDet,
1922    ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1923    where
1924        L: TopLevel<'a>,
1925    {
1926        let samples = self.location.source_interval(interval);
1927
1928        let tick = self.location.tick();
1929        self.batch(&tick, nondet)
1930            .filter_if(samples.batch(&tick, nondet).first().is_some())
1931            .all_ticks()
1932            .weaken_retries()
1933    }
1934
1935    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1936    /// stream has not emitted a value since that duration.
1937    ///
1938    /// # Non-Determinism
1939    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1940    /// samples take place, timeouts may be non-deterministically generated or missed,
1941    /// and the notification of the timeout may be delayed as well. There is also no
1942    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1943    /// detected based on when the next sample is taken.
1944    pub fn timeout(
1945        self,
1946        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
1947        nondet: NonDet,
1948    ) -> Optional<(), L::DropConsistency, Unbounded>
1949    where
1950        L: TopLevel<'a>,
1951    {
1952        let tick = self.location.tick();
1953
1954        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1955            q!(|| None),
1956            q!(
1957                |latest, _| {
1958                    *latest = Some(Instant::now());
1959                },
1960                commutative = manual_proof!(/** TODO */)
1961            ),
1962        );
1963
1964        latest_received
1965            .snapshot(&tick, nondet)
1966            .filter_map(q!(move |latest_received| {
1967                if let Some(latest_received) = latest_received {
1968                    if Instant::now().duration_since(latest_received) > duration {
1969                        Some(())
1970                    } else {
1971                        None
1972                    }
1973                } else {
1974                    Some(())
1975                }
1976            }))
1977            .latest()
1978    }
1979
1980    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1981    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1982    ///
1983    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1984    /// processed before an acknowledgement is emitted.
1985    pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1986        let id = self.location.flow_state().borrow_mut().next_clock_id();
1987        let out_location = Atomic {
1988            tick: Tick {
1989                id,
1990                l: self.location.clone(),
1991            },
1992        };
1993        Stream::new(
1994            out_location.clone(),
1995            HydroNode::BeginAtomic {
1996                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1997                metadata: out_location
1998                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1999            },
2000        )
2001    }
2002
2003    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2004    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2005    /// the order of the input. The output stream will execute in the [`Tick`] that was
2006    /// used to create the atomic section.
2007    ///
2008    /// # Non-Determinism
2009    /// The batch boundaries are non-deterministic and may change across executions.
2010    pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2011        self,
2012        tick: &Tick<L2>,
2013        _nondet: NonDet,
2014    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2015        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2016        Stream::new(
2017            tick.drop_consistency(),
2018            HydroNode::Batch {
2019                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2020                metadata: tick
2021                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2022            },
2023        )
2024    }
2025
2026    /// An operator which allows you to "name" a `HydroNode`.
2027    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
2028    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2029        {
2030            let mut node = self.ir_node.borrow_mut();
2031            let metadata = node.metadata_mut();
2032            metadata.tag = Some(name.to_owned());
2033        }
2034        self
2035    }
2036
2037    /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
2038    /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
2039    /// so uses must be carefully vetted.
2040    pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2041    where
2042        B: IsBounded,
2043    {
2044        Optional::new(
2045            self.location.clone(),
2046            HydroNode::Cast {
2047                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2048                metadata: self
2049                    .location
2050                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2051            },
2052        )
2053    }
2054
2055    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2056        if O::ORDERING_KIND == O2::ORDERING_KIND {
2057            Stream::new(
2058                self.location.clone(),
2059                self.ir_node.replace(HydroNode::Placeholder),
2060            )
2061        } else {
2062            panic!(
2063                "Runtime ordering {:?} did not match requested cast {:?}.",
2064                O::ORDERING_KIND,
2065                O2::ORDERING_KIND
2066            )
2067        }
2068    }
2069
2070    /// Explicitly "casts" the stream to a type with a different ordering
2071    /// guarantee. Useful in unsafe code where the ordering cannot be proven
2072    /// by the type-system.
2073    ///
2074    /// # Non-Determinism
2075    /// This function is used as an escape hatch, and any mistakes in the
2076    /// provided ordering guarantee will propagate into the guarantees
2077    /// for the rest of the program.
2078    pub fn assume_ordering<O2: Ordering>(
2079        self,
2080        _nondet: NonDet,
2081    ) -> Stream<T, L::DropConsistency, B, O2, R> {
2082        if O::ORDERING_KIND == O2::ORDERING_KIND {
2083            self.use_ordering_type().weaken_consistency()
2084        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2085            // We can always weaken the ordering guarantee
2086            let target_location = self.location().drop_consistency();
2087            Stream::new(
2088                target_location.clone(),
2089                HydroNode::Cast {
2090                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2091                    metadata: target_location
2092                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2093                },
2094            )
2095        } else {
2096            let target_location = self.location().drop_consistency();
2097            Stream::new(
2098                target_location.clone(),
2099                HydroNode::ObserveNonDet {
2100                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2101                    trusted: false,
2102                    metadata: target_location
2103                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2104                },
2105            )
2106        }
2107    }
2108
2109    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
2110    // intermediate states will not be revealed
2111    fn assume_ordering_trusted_bounded<O2: Ordering>(
2112        self,
2113        nondet: NonDet,
2114    ) -> Stream<T, L, B, O2, R> {
2115        if B::BOUNDED {
2116            self.assume_ordering_trusted(nondet)
2117        } else {
2118            let self_location = self.location.clone();
2119            let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2120            Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2121        }
2122    }
2123
2124    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2125    // is not observable
2126    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2127        self,
2128        _nondet: NonDet,
2129    ) -> Stream<T, L, B, O2, R> {
2130        if O::ORDERING_KIND == O2::ORDERING_KIND {
2131            self.use_ordering_type()
2132        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2133            // We can always weaken the ordering guarantee
2134            Stream::new(
2135                self.location.clone(),
2136                HydroNode::Cast {
2137                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2138                    metadata: self
2139                        .location
2140                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2141                },
2142            )
2143        } else {
2144            Stream::new(
2145                self.location.clone(),
2146                HydroNode::ObserveNonDet {
2147                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2148                    trusted: true,
2149                    metadata: self
2150                        .location
2151                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2152                },
2153            )
2154        }
2155    }
2156
2157    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2158    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2159    /// which is always safe because that is the weakest possible guarantee.
2160    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2161        self.weaken_ordering::<NoOrder>()
2162    }
2163
2164    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2165    /// enforcing that `O2` is weaker than the input ordering guarantee.
2166    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2167        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2168        self.assume_ordering_trusted::<O2>(nondet)
2169    }
2170
2171    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2172    /// implies that `O == TotalOrder`.
2173    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2174    where
2175        O: IsOrdered,
2176    {
2177        self.assume_ordering_trusted(nondet!(/** no-op */))
2178    }
2179
2180    /// Explicitly "casts" the stream to a type with a different retries
2181    /// guarantee. Useful in unsafe code where the lack of retries cannot
2182    /// be proven by the type-system.
2183    ///
2184    /// # Non-Determinism
2185    /// This function is used as an escape hatch, and any mistakes in the
2186    /// provided retries guarantee will propagate into the guarantees
2187    /// for the rest of the program.
2188    pub fn assume_retries<R2: Retries>(
2189        self,
2190        _nondet: NonDet,
2191    ) -> Stream<T, L::DropConsistency, B, O, R2> {
2192        if R::RETRIES_KIND == R2::RETRIES_KIND {
2193            Stream::new(
2194                self.location.drop_consistency(),
2195                self.ir_node.replace(HydroNode::Placeholder),
2196            )
2197        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2198            // We can always weaken the retries guarantee
2199            let target_location = self.location.drop_consistency();
2200            Stream::new(
2201                target_location.clone(),
2202                HydroNode::Cast {
2203                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2204                    metadata: target_location
2205                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2206                },
2207            )
2208        } else {
2209            let target_location = self.location.drop_consistency();
2210            Stream::new(
2211                target_location.clone(),
2212                HydroNode::ObserveNonDet {
2213                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2214                    trusted: false,
2215                    metadata: target_location
2216                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2217                },
2218            )
2219        }
2220    }
2221
2222    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2223    // is not observable
2224    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2225        if R::RETRIES_KIND == R2::RETRIES_KIND {
2226            Stream::new(
2227                self.location.clone(),
2228                self.ir_node.replace(HydroNode::Placeholder),
2229            )
2230        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2231            // We can always weaken the retries guarantee
2232            Stream::new(
2233                self.location.clone(),
2234                HydroNode::Cast {
2235                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2236                    metadata: self
2237                        .location
2238                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2239                },
2240            )
2241        } else {
2242            Stream::new(
2243                self.location.clone(),
2244                HydroNode::ObserveNonDet {
2245                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2246                    trusted: true,
2247                    metadata: self
2248                        .location
2249                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2250                },
2251            )
2252        }
2253    }
2254
2255    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2256    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2257    /// which is always safe because that is the weakest possible guarantee.
2258    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2259        self.weaken_retries::<AtLeastOnce>()
2260    }
2261
2262    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2263    /// enforcing that `R2` is weaker than the input retries guarantee.
2264    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2265        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2266        self.assume_retries_trusted::<R2>(nondet)
2267    }
2268
2269    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2270    /// implies that `R == ExactlyOnce`.
2271    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2272    where
2273        R: IsExactlyOnce,
2274    {
2275        self.assume_retries_trusted(nondet!(/** no-op */))
2276    }
2277
2278    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2279    /// implies that `B == Bounded`.
2280    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2281    where
2282        B: IsBounded,
2283    {
2284        self.weaken_boundedness()
2285    }
2286
2287    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2288    /// which implies that `B == Bounded`.
2289    pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2290        if B::BOUNDED == B2::BOUNDED {
2291            Stream::new(
2292                self.location.clone(),
2293                self.ir_node.replace(HydroNode::Placeholder),
2294            )
2295        } else {
2296            // We can always weaken the boundedness
2297            Stream::new(
2298                self.location.clone(),
2299                HydroNode::Cast {
2300                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2301                    metadata: self
2302                        .location
2303                        .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2304                },
2305            )
2306        }
2307    }
2308}
2309
2310impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2311where
2312    L: Location<'a>,
2313{
2314    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2315    ///
2316    /// # Example
2317    /// ```rust
2318    /// # #[cfg(feature = "deploy")] {
2319    /// # use hydro_lang::prelude::*;
2320    /// # use futures::StreamExt;
2321    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2322    /// process.source_iter(q!(&[1, 2, 3])).cloned()
2323    /// # }, |mut stream| async move {
2324    /// // 1, 2, 3
2325    /// # for w in vec![1, 2, 3] {
2326    /// #     assert_eq!(stream.next().await.unwrap(), w);
2327    /// # }
2328    /// # }));
2329    /// # }
2330    /// ```
2331    pub fn cloned(self) -> Stream<T, L, B, O, R>
2332    where
2333        T: Clone,
2334    {
2335        self.map(q!(|d| d.clone()))
2336    }
2337}
2338
2339impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2340where
2341    L: Location<'a>,
2342{
2343    /// Computes the number of elements in the stream as a [`Singleton`].
2344    ///
2345    /// # Example
2346    /// ```rust
2347    /// # #[cfg(feature = "deploy")] {
2348    /// # use hydro_lang::prelude::*;
2349    /// # use futures::StreamExt;
2350    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2351    /// let tick = process.tick();
2352    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2353    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2354    /// batch.count().all_ticks()
2355    /// # }, |mut stream| async move {
2356    /// // 4
2357    /// # assert_eq!(stream.next().await.unwrap(), 4);
2358    /// # }));
2359    /// # }
2360    /// ```
2361    pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2362        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2363            /// Order does not affect eventual count, and also does not affect intermediate states.
2364        ))
2365        .fold(
2366            q!(|| 0usize),
2367            q!(
2368                |count, _| *count += 1,
2369                monotone = manual_proof!(/** += 1 is monotone */)
2370            ),
2371        )
2372    }
2373}
2374
2375impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2376    /// Produces a new stream that merges the elements of the two input streams.
2377    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2378    ///
2379    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2380    /// [`Bounded`], you can use [`Stream::chain`] instead.
2381    ///
2382    /// # Example
2383    /// ```rust
2384    /// # #[cfg(feature = "deploy")] {
2385    /// # use hydro_lang::prelude::*;
2386    /// # use futures::StreamExt;
2387    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2388    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2389    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2390    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2391    /// # }, |mut stream| async move {
2392    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2393    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2394    /// #     assert_eq!(stream.next().await.unwrap(), w);
2395    /// # }
2396    /// # }));
2397    /// # }
2398    /// ```
2399    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2400        self,
2401        other: Stream<T, L, Unbounded, O2, R2>,
2402    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2403    where
2404        R: MinRetries<R2>,
2405    {
2406        Stream::new(
2407            self.location.clone(),
2408            HydroNode::Chain {
2409                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2410                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2411                metadata: self.location.new_node_metadata(Stream::<
2412                    T,
2413                    L,
2414                    Unbounded,
2415                    NoOrder,
2416                    <R as MinRetries<R2>>::Min,
2417                >::collection_kind()),
2418            },
2419        )
2420    }
2421
2422    /// Deprecated: use [`Stream::merge_unordered`] instead.
2423    #[deprecated(note = "use `merge_unordered` instead")]
2424    pub fn interleave<O2: Ordering, R2: Retries>(
2425        self,
2426        other: Stream<T, L, Unbounded, O2, R2>,
2427    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2428    where
2429        R: MinRetries<R2>,
2430    {
2431        self.merge_unordered(other)
2432    }
2433}
2434
2435impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2436    /// Produces a new stream that combines the elements of the two input streams,
2437    /// preserving the relative order of elements within each input.
2438    ///
2439    /// # Non-Determinism
2440    /// The order in which elements *across* the two streams will be interleaved is
2441    /// non-deterministic, so the order of elements will vary across runs. If the output
2442    /// order is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic
2443    /// but emits an unordered stream. For deterministic first-then-second ordering on
2444    /// bounded streams, use [`Stream::chain`].
2445    ///
2446    /// # Example
2447    /// ```rust
2448    /// # #[cfg(feature = "deploy")] {
2449    /// # use hydro_lang::prelude::*;
2450    /// # use futures::StreamExt;
2451    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2452    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2453    /// # process.source_iter(q!(vec![1, 3])).into();
2454    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2455    /// # }, |mut stream| async move {
2456    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2457    /// # for w in vec![1, 3, 2, 4] {
2458    /// #     assert_eq!(stream.next().await.unwrap(), w);
2459    /// # }
2460    /// # }));
2461    /// # }
2462    /// ```
2463    pub fn merge_ordered<R2: Retries>(
2464        self,
2465        other: Stream<T, L, B, TotalOrder, R2>,
2466        _nondet: NonDet,
2467    ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2468    where
2469        R: MinRetries<R2>,
2470    {
2471        let target_location = self.location().drop_consistency();
2472        Stream::new(
2473            target_location.clone(),
2474            HydroNode::MergeOrdered {
2475                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2476                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2477                metadata: target_location.new_node_metadata(Stream::<
2478                    T,
2479                    L::DropConsistency,
2480                    B,
2481                    TotalOrder,
2482                    <R as MinRetries<R2>>::Min,
2483                >::collection_kind()),
2484            },
2485        )
2486    }
2487}
2488
2489impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2490where
2491    L: Location<'a>,
2492{
2493    /// Produces a new stream that emits the input elements in sorted order.
2494    ///
2495    /// The input stream can have any ordering guarantee, but the output stream
2496    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2497    /// elements in the input stream are available, so it requires the input stream
2498    /// to be [`Bounded`].
2499    ///
2500    /// # Example
2501    /// ```rust
2502    /// # #[cfg(feature = "deploy")] {
2503    /// # use hydro_lang::prelude::*;
2504    /// # use futures::StreamExt;
2505    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2506    /// let tick = process.tick();
2507    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2508    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2509    /// batch.sort().all_ticks()
2510    /// # }, |mut stream| async move {
2511    /// // 1, 2, 3, 4
2512    /// # for w in (1..5) {
2513    /// #     assert_eq!(stream.next().await.unwrap(), w);
2514    /// # }
2515    /// # }));
2516    /// # }
2517    /// ```
2518    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2519    where
2520        B: IsBounded,
2521        T: Ord,
2522    {
2523        let this = self.make_bounded();
2524        Stream::new(
2525            this.location.clone(),
2526            HydroNode::Sort {
2527                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2528                metadata: this
2529                    .location
2530                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2531            },
2532        )
2533    }
2534
2535    /// Produces a new stream that first emits the elements of the `self` stream,
2536    /// and then emits the elements of the `other` stream. The output stream has
2537    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2538    /// [`TotalOrder`] guarantee.
2539    ///
2540    /// Currently, both input streams must be [`Bounded`]. This operator will block
2541    /// on the first stream until all its elements are available. In a future version,
2542    /// we will relax the requirement on the `other` stream.
2543    ///
2544    /// # Example
2545    /// ```rust
2546    /// # #[cfg(feature = "deploy")] {
2547    /// # use hydro_lang::prelude::*;
2548    /// # use futures::StreamExt;
2549    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2550    /// let tick = process.tick();
2551    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2552    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2553    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2554    /// # }, |mut stream| async move {
2555    /// // 2, 3, 4, 5, 1, 2, 3, 4
2556    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2557    /// #     assert_eq!(stream.next().await.unwrap(), w);
2558    /// # }
2559    /// # }));
2560    /// # }
2561    /// ```
2562    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2563        self,
2564        other: Stream<T, L, B2, O2, R2>,
2565    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2566    where
2567        B: IsBounded,
2568        O: MinOrder<O2>,
2569        R: MinRetries<R2>,
2570    {
2571        check_matching_location(&self.location, &other.location);
2572
2573        Stream::new(
2574            self.location.clone(),
2575            HydroNode::Chain {
2576                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2577                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2578                metadata: self.location.new_node_metadata(Stream::<
2579                    T,
2580                    L,
2581                    B2,
2582                    <O as MinOrder<O2>>::Min,
2583                    <R as MinRetries<R2>>::Min,
2584                >::collection_kind()),
2585            },
2586        )
2587    }
2588
2589    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2590    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2591    /// because this is compiled into a nested loop.
2592    #[expect(
2593        clippy::type_complexity,
2594        reason = "MinRetries projection in return type"
2595    )]
2596    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>, R2: Retries>(
2597        self,
2598        other: Stream<T2, L, Bounded, O2, R2>,
2599    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, <R as MinRetries<R2>>::Min>
2600    where
2601        B: IsBounded,
2602        T: Clone,
2603        T2: Clone,
2604        R: MinRetries<R2>,
2605    {
2606        let this = self.make_bounded();
2607        check_matching_location(&this.location, &other.location);
2608
2609        Stream::new(
2610            this.location.clone(),
2611            HydroNode::CrossProduct {
2612                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2613                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2614                metadata: this.location.new_node_metadata(Stream::<
2615                    (T, T2),
2616                    L,
2617                    Bounded,
2618                    <O2 as MinOrder<O>>::Min,
2619                    <R as MinRetries<R2>>::Min,
2620                >::collection_kind()),
2621            },
2622        )
2623    }
2624
2625    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2626    /// `self` used as the values for *each* key.
2627    ///
2628    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2629    /// values. For example, it can be used to send the same set of elements to several cluster
2630    /// members, if the membership information is available as a [`KeyedSingleton`].
2631    ///
2632    /// # Example
2633    /// ```rust
2634    /// # #[cfg(feature = "deploy")] {
2635    /// # use hydro_lang::prelude::*;
2636    /// # use futures::StreamExt;
2637    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2638    /// # let tick = process.tick();
2639    /// let keyed_singleton = // { 1: (), 2: () }
2640    /// # process
2641    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2642    /// #     .into_keyed()
2643    /// #     .batch(&tick, nondet!(/** test */))
2644    /// #     .first();
2645    /// let stream = // [ "a", "b" ]
2646    /// # process
2647    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2648    /// #     .batch(&tick, nondet!(/** test */));
2649    /// stream.repeat_with_keys(keyed_singleton)
2650    /// # .entries().all_ticks()
2651    /// # }, |mut stream| async move {
2652    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2653    /// # let mut results = Vec::new();
2654    /// # for _ in 0..4 {
2655    /// #     results.push(stream.next().await.unwrap());
2656    /// # }
2657    /// # results.sort();
2658    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2659    /// # }));
2660    /// # }
2661    /// ```
2662    pub fn repeat_with_keys<K, V2>(
2663        self,
2664        keys: KeyedSingleton<K, V2, L, Bounded>,
2665    ) -> KeyedStream<K, T, L, Bounded, O, R>
2666    where
2667        B: IsBounded,
2668        K: Clone,
2669        T: Clone,
2670    {
2671        keys.keys()
2672            .assume_ordering_trusted::<TotalOrder>(
2673                nondet!(/** keyed stream does not depend on ordering of keys */),
2674            )
2675            .cross_product_nested_loop(self.make_bounded())
2676            .into_keyed()
2677    }
2678
2679    /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2680    /// execution until all results are available. The output order is based on when futures
2681    /// complete, and may be different than the input order.
2682    ///
2683    /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2684    /// while futures are pending, this variant blocks until the futures resolve.
2685    ///
2686    /// # Example
2687    /// ```rust
2688    /// # #[cfg(feature = "deploy")] {
2689    /// # use std::collections::HashSet;
2690    /// # use futures::StreamExt;
2691    /// # use hydro_lang::prelude::*;
2692    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2693    /// process
2694    ///     .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2695    ///     .map(q!(|x| async move {
2696    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2697    ///         x
2698    ///     }))
2699    ///     .resolve_futures_blocking()
2700    /// #   },
2701    /// #   |mut stream| async move {
2702    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2703    /// #       let mut output = HashSet::new();
2704    /// #       for _ in 1..10 {
2705    /// #           output.insert(stream.next().await.unwrap());
2706    /// #       }
2707    /// #       assert_eq!(
2708    /// #           output,
2709    /// #           HashSet::<i32>::from_iter(1..10)
2710    /// #       );
2711    /// #   },
2712    /// # ));
2713    /// # }
2714    /// ```
2715    pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2716    where
2717        T: Future,
2718    {
2719        Stream::new(
2720            self.location.clone(),
2721            HydroNode::ResolveFuturesBlocking {
2722                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2723                metadata: self
2724                    .location
2725                    .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2726            },
2727        )
2728    }
2729
2730    /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2731    ///
2732    /// # Example
2733    /// ```rust
2734    /// # #[cfg(feature = "deploy")] {
2735    /// # use hydro_lang::prelude::*;
2736    /// # use futures::StreamExt;
2737    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2738    /// let tick = process.tick();
2739    /// let empty: Stream<i32, _, Bounded> = process
2740    ///   .source_iter(q!(Vec::<i32>::new()))
2741    ///   .batch(&tick, nondet!(/** test */));
2742    /// empty.is_empty().all_ticks()
2743    /// # }, |mut stream| async move {
2744    /// // true
2745    /// # assert_eq!(stream.next().await.unwrap(), true);
2746    /// # }));
2747    /// # }
2748    /// ```
2749    #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2750    pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2751    where
2752        B: IsBounded,
2753    {
2754        self.make_bounded()
2755            .assume_ordering_trusted::<TotalOrder>(
2756                nondet!(/** is_empty intermediates unaffected by order */),
2757            )
2758            .first()
2759            .is_none()
2760    }
2761}
2762
2763impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2764where
2765    L: Location<'a>,
2766{
2767    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2768    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2769    /// by equi-joining the two streams on the key attribute `K`.
2770    ///
2771    /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2772    /// and streams the left side through, preserving the left side's ordering. When both
2773    /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2774    ///
2775    /// # Example
2776    /// ```rust
2777    /// # #[cfg(feature = "deploy")] {
2778    /// # use hydro_lang::prelude::*;
2779    /// # use std::collections::HashSet;
2780    /// # use futures::StreamExt;
2781    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2782    /// let tick = process.tick();
2783    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2784    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2785    /// stream1.join(stream2)
2786    /// # }, |mut stream| async move {
2787    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2788    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2789    /// # stream.map(|i| assert!(expected.contains(&i)));
2790    /// # }));
2791    /// # }
2792    pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2793        self,
2794        n: Stream<(K, V2), L, B2, O2, R2>,
2795    ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2796    where
2797        K: Eq + Hash + Clone,
2798        R: MinRetries<R2>,
2799        V1: Clone,
2800        V2: Clone,
2801    {
2802        check_matching_location(&self.location, &n.location);
2803
2804        let ir_node = if B2::BOUNDED {
2805            HydroNode::JoinHalf {
2806                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2807                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2808                metadata: self.location.new_node_metadata(Stream::<
2809                    (K, (V1, V2)),
2810                    L,
2811                    B,
2812                    B2::PreserveOrderIfBounded<O>,
2813                    <R as MinRetries<R2>>::Min,
2814                >::collection_kind()),
2815            }
2816        } else {
2817            HydroNode::Join {
2818                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2819                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2820                metadata: self.location.new_node_metadata(Stream::<
2821                    (K, (V1, V2)),
2822                    L,
2823                    B,
2824                    B2::PreserveOrderIfBounded<O>,
2825                    <R as MinRetries<R2>>::Min,
2826                >::collection_kind()),
2827            }
2828        };
2829
2830        Stream::new(self.location.clone(), ir_node)
2831    }
2832
2833    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2834    /// computes the anti-join of the items in the input -- i.e. returns
2835    /// unique items in the first input that do not have a matching key
2836    /// in the second input.
2837    ///
2838    /// # Example
2839    /// ```rust
2840    /// # #[cfg(feature = "deploy")] {
2841    /// # use hydro_lang::prelude::*;
2842    /// # use futures::StreamExt;
2843    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2844    /// let tick = process.tick();
2845    /// let stream = process
2846    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2847    ///   .batch(&tick, nondet!(/** test */));
2848    /// let batch = process
2849    ///   .source_iter(q!(vec![1, 2]))
2850    ///   .batch(&tick, nondet!(/** test */));
2851    /// stream.anti_join(batch).all_ticks()
2852    /// # }, |mut stream| async move {
2853    /// # for w in vec![(3, 'c'), (4, 'd')] {
2854    /// #     assert_eq!(stream.next().await.unwrap(), w);
2855    /// # }
2856    /// # }));
2857    /// # }
2858    pub fn anti_join<O2: Ordering, R2: Retries>(
2859        self,
2860        n: Stream<K, L, Bounded, O2, R2>,
2861    ) -> Stream<(K, V1), L, B, O, R>
2862    where
2863        K: Eq + Hash,
2864    {
2865        check_matching_location(&self.location, &n.location);
2866
2867        Stream::new(
2868            self.location.clone(),
2869            HydroNode::AntiJoin {
2870                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2871                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2872                metadata: self
2873                    .location
2874                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2875            },
2876        )
2877    }
2878}
2879
2880impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2881    Stream<(K, V), L, B, O, R>
2882{
2883    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2884    /// is used as the key and the second element is added to the entries associated with that key.
2885    ///
2886    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2887    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2888    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2889    /// total ordering _within_ each group but no ordering _across_ groups.
2890    ///
2891    /// # Example
2892    /// ```rust
2893    /// # #[cfg(feature = "deploy")] {
2894    /// # use hydro_lang::prelude::*;
2895    /// # use futures::StreamExt;
2896    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2897    /// process
2898    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2899    ///     .into_keyed()
2900    /// #   .entries()
2901    /// # }, |mut stream| async move {
2902    /// // { 1: [2, 3], 2: [4] }
2903    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2904    /// #     assert_eq!(stream.next().await.unwrap(), w);
2905    /// # }
2906    /// # }));
2907    /// # }
2908    /// ```
2909    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2910        KeyedStream::new(
2911            self.location.clone(),
2912            HydroNode::Cast {
2913                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2914                metadata: self
2915                    .location
2916                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2917            },
2918        )
2919    }
2920}
2921
2922impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2923where
2924    K: Eq + Hash,
2925    L: Location<'a>,
2926{
2927    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2928    /// # Example
2929    /// ```rust
2930    /// # #[cfg(feature = "deploy")] {
2931    /// # use hydro_lang::prelude::*;
2932    /// # use futures::StreamExt;
2933    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2934    /// let tick = process.tick();
2935    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2936    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2937    /// batch.keys().all_ticks()
2938    /// # }, |mut stream| async move {
2939    /// // 1, 2
2940    /// # assert_eq!(stream.next().await.unwrap(), 1);
2941    /// # assert_eq!(stream.next().await.unwrap(), 2);
2942    /// # }));
2943    /// # }
2944    /// ```
2945    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2946        self.into_keyed()
2947            .fold(
2948                q!(|| ()),
2949                q!(
2950                    |_, _| {},
2951                    commutative = manual_proof!(/** values are ignored */),
2952                    idempotent = manual_proof!(/** values are ignored */)
2953                ),
2954            )
2955            .keys()
2956    }
2957}
2958
2959impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2960where
2961    L: Location<'a>,
2962{
2963    /// Returns a stream corresponding to the latest batch of elements being atomically
2964    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2965    /// the order of the input.
2966    ///
2967    /// # Non-Determinism
2968    /// The batch boundaries are non-deterministic and may change across executions.
2969    pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2970        self,
2971        tick: &Tick<L2>,
2972        _nondet: NonDet,
2973    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2974        Stream::new(
2975            tick.drop_consistency(),
2976            HydroNode::Batch {
2977                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2978                metadata: tick
2979                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2980            },
2981        )
2982    }
2983
2984    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2985    /// See [`Stream::atomic`] for more details.
2986    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2987        Stream::new(
2988            self.location.tick.l.clone(),
2989            HydroNode::EndAtomic {
2990                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2991                metadata: self
2992                    .location
2993                    .tick
2994                    .l
2995                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2996            },
2997        )
2998    }
2999}
3000
3001impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
3002where
3003    L: TopLevel<'a>,
3004    F: Future<Output = T>,
3005{
3006    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3007    /// Future outputs are produced as available, regardless of input arrival order.
3008    ///
3009    /// # Example
3010    /// ```rust
3011    /// # #[cfg(feature = "deploy")] {
3012    /// # use std::collections::HashSet;
3013    /// # use futures::StreamExt;
3014    /// # use hydro_lang::prelude::*;
3015    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3016    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3017    ///     .map(q!(|x| async move {
3018    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3019    ///         x
3020    ///     }))
3021    ///     .resolve_futures()
3022    /// #   },
3023    /// #   |mut stream| async move {
3024    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
3025    /// #       let mut output = HashSet::new();
3026    /// #       for _ in 1..10 {
3027    /// #           output.insert(stream.next().await.unwrap());
3028    /// #       }
3029    /// #       assert_eq!(
3030    /// #           output,
3031    /// #           HashSet::<i32>::from_iter(1..10)
3032    /// #       );
3033    /// #   },
3034    /// # ));
3035    /// # }
3036    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3037        Stream::new(
3038            self.location.clone(),
3039            HydroNode::ResolveFutures {
3040                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3041                metadata: self
3042                    .location
3043                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3044            },
3045        )
3046    }
3047
3048    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3049    /// Future outputs are produced in the same order as the input stream.
3050    ///
3051    /// # Example
3052    /// ```rust
3053    /// # #[cfg(feature = "deploy")] {
3054    /// # use std::collections::HashSet;
3055    /// # use futures::StreamExt;
3056    /// # use hydro_lang::prelude::*;
3057    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3058    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3059    ///     .map(q!(|x| async move {
3060    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3061    ///         x
3062    ///     }))
3063    ///     .resolve_futures_ordered()
3064    /// #   },
3065    /// #   |mut stream| async move {
3066    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
3067    /// #       let mut output = Vec::new();
3068    /// #       for _ in 1..10 {
3069    /// #           output.push(stream.next().await.unwrap());
3070    /// #       }
3071    /// #       assert_eq!(
3072    /// #           output,
3073    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
3074    /// #       );
3075    /// #   },
3076    /// # ));
3077    /// # }
3078    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3079        Stream::new(
3080            self.location.clone(),
3081            HydroNode::ResolveFuturesOrdered {
3082                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3083                metadata: self
3084                    .location
3085                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3086            },
3087        )
3088    }
3089}
3090
3091impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3092where
3093    L: Location<'a>,
3094{
3095    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
3096    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3097    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3098        Stream::new(
3099            self.location.outer().clone(),
3100            HydroNode::YieldConcat {
3101                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3102                metadata: self
3103                    .location
3104                    .outer()
3105                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3106            },
3107        )
3108    }
3109
3110    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
3111    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3112    ///
3113    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
3114    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
3115    /// stream's [`Tick`] context.
3116    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3117        let out_location = Atomic {
3118            tick: self.location.clone(),
3119        };
3120
3121        Stream::new(
3122            out_location.clone(),
3123            HydroNode::YieldConcat {
3124                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3125                metadata: out_location
3126                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3127            },
3128        )
3129    }
3130
3131    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
3132    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
3133    /// input.
3134    ///
3135    /// This API is particularly useful for stateful computation on batches of data, such as
3136    /// maintaining an accumulated state that is up to date with the current batch.
3137    ///
3138    /// # Example
3139    /// ```rust
3140    /// # #[cfg(feature = "deploy")] {
3141    /// # use hydro_lang::prelude::*;
3142    /// # use futures::StreamExt;
3143    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3144    /// let tick = process.tick();
3145    /// # // ticks are lazy by default, forces the second tick to run
3146    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3147    /// # let batch_first_tick = process
3148    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
3149    /// #  .batch(&tick, nondet!(/** test */));
3150    /// # let batch_second_tick = process
3151    /// #   .source_iter(q!(vec![5, 6, 7]))
3152    /// #   .batch(&tick, nondet!(/** test */))
3153    /// #   .defer_tick(); // appears on the second tick
3154    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3155    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3156    ///
3157    /// input.batch(&tick, nondet!(/** test */))
3158    ///     .across_ticks(|s| s.count()).all_ticks()
3159    /// # }, |mut stream| async move {
3160    /// // [4, 7]
3161    /// assert_eq!(stream.next().await.unwrap(), 4);
3162    /// assert_eq!(stream.next().await.unwrap(), 7);
3163    /// # }));
3164    /// # }
3165    /// ```
3166    pub fn across_ticks<Out: BatchAtomic<'a>>(
3167        self,
3168        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3169    ) -> Out::Batched {
3170        thunk(self.all_ticks_atomic()).batched_atomic()
3171    }
3172
3173    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3174    /// always has the elements of `self` at tick `T - 1`.
3175    ///
3176    /// At tick `0`, the output stream is empty, since there is no previous tick.
3177    ///
3178    /// This operator enables stateful iterative processing with ticks, by sending data from one
3179    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3180    ///
3181    /// # Example
3182    /// ```rust
3183    /// # #[cfg(feature = "deploy")] {
3184    /// # use hydro_lang::prelude::*;
3185    /// # use futures::StreamExt;
3186    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3187    /// let tick = process.tick();
3188    /// // ticks are lazy by default, forces the second tick to run
3189    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3190    ///
3191    /// let batch_first_tick = process
3192    ///   .source_iter(q!(vec![1, 2, 3, 4]))
3193    ///   .batch(&tick, nondet!(/** test */));
3194    /// let batch_second_tick = process
3195    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
3196    ///   .batch(&tick, nondet!(/** test */))
3197    ///   .defer_tick(); // appears on the second tick
3198    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3199    ///
3200    /// changes_across_ticks.clone().filter_not_in(
3201    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
3202    /// ).all_ticks()
3203    /// # }, |mut stream| async move {
3204    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3205    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3206    /// #     assert_eq!(stream.next().await.unwrap(), w);
3207    /// # }
3208    /// # }));
3209    /// # }
3210    /// ```
3211    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3212        Stream::new(
3213            self.location.clone(),
3214            HydroNode::DeferTick {
3215                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3216                metadata: self
3217                    .location
3218                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3219            },
3220        )
3221    }
3222}
3223
3224#[cfg(test)]
3225mod tests {
3226    #[cfg(feature = "deploy")]
3227    use futures::{SinkExt, StreamExt};
3228    #[cfg(feature = "deploy")]
3229    use hydro_deploy::Deployment;
3230    #[cfg(feature = "deploy")]
3231    use serde::{Deserialize, Serialize};
3232    #[cfg(any(feature = "deploy", feature = "sim"))]
3233    use stageleft::q;
3234
3235    #[cfg(any(feature = "deploy", feature = "sim"))]
3236    use crate::compile::builder::FlowBuilder;
3237    #[cfg(feature = "deploy")]
3238    use crate::live_collections::sliced::sliced;
3239    #[cfg(feature = "deploy")]
3240    use crate::live_collections::stream::ExactlyOnce;
3241    #[cfg(feature = "sim")]
3242    use crate::live_collections::stream::NoOrder;
3243    #[cfg(any(feature = "deploy", feature = "sim"))]
3244    use crate::live_collections::stream::TotalOrder;
3245    #[cfg(any(feature = "deploy", feature = "sim"))]
3246    use crate::location::Location;
3247    #[cfg(feature = "sim")]
3248    use crate::networking::TCP;
3249    #[cfg(any(feature = "deploy", feature = "sim"))]
3250    use crate::nondet::nondet;
3251
3252    mod backtrace_chained_ops;
3253
3254    #[cfg(feature = "deploy")]
3255    struct P1 {}
3256    #[cfg(feature = "deploy")]
3257    struct P2 {}
3258
3259    #[cfg(feature = "deploy")]
3260    #[derive(Serialize, Deserialize, Debug)]
3261    struct SendOverNetwork {
3262        n: u32,
3263    }
3264
3265    #[cfg(feature = "deploy")]
3266    #[tokio::test]
3267    async fn first_ten_distributed() {
3268        use crate::networking::TCP;
3269
3270        let mut deployment = Deployment::new();
3271
3272        let mut flow = FlowBuilder::new();
3273        let first_node = flow.process::<P1>();
3274        let second_node = flow.process::<P2>();
3275        let external = flow.external::<P2>();
3276
3277        let numbers = first_node.source_iter(q!(0..10));
3278        let out_port = numbers
3279            .map(q!(|n| SendOverNetwork { n }))
3280            .send(&second_node, TCP.fail_stop().bincode())
3281            .send_bincode_external(&external);
3282
3283        let nodes = flow
3284            .with_process(&first_node, deployment.Localhost())
3285            .with_process(&second_node, deployment.Localhost())
3286            .with_external(&external, deployment.Localhost())
3287            .deploy(&mut deployment);
3288
3289        deployment.deploy().await.unwrap();
3290
3291        let mut external_out = nodes.connect(out_port).await;
3292
3293        deployment.start().await.unwrap();
3294
3295        for i in 0..10 {
3296            assert_eq!(external_out.next().await.unwrap().n, i);
3297        }
3298    }
3299
3300    #[cfg(feature = "deploy")]
3301    #[tokio::test]
3302    async fn first_cardinality() {
3303        let mut deployment = Deployment::new();
3304
3305        let mut flow = FlowBuilder::new();
3306        let node = flow.process::<()>();
3307        let external = flow.external::<()>();
3308
3309        let node_tick = node.tick();
3310        let count = node_tick
3311            .singleton(q!([1, 2, 3]))
3312            .into_stream()
3313            .flatten_ordered()
3314            .first()
3315            .into_stream()
3316            .count()
3317            .all_ticks()
3318            .send_bincode_external(&external);
3319
3320        let nodes = flow
3321            .with_process(&node, deployment.Localhost())
3322            .with_external(&external, deployment.Localhost())
3323            .deploy(&mut deployment);
3324
3325        deployment.deploy().await.unwrap();
3326
3327        let mut external_out = nodes.connect(count).await;
3328
3329        deployment.start().await.unwrap();
3330
3331        assert_eq!(external_out.next().await.unwrap(), 1);
3332    }
3333
3334    #[cfg(feature = "deploy")]
3335    #[tokio::test]
3336    async fn unbounded_reduce_remembers_state() {
3337        let mut deployment = Deployment::new();
3338
3339        let mut flow = FlowBuilder::new();
3340        let node = flow.process::<()>();
3341        let external = flow.external::<()>();
3342
3343        let (input_port, input) = node.source_external_bincode(&external);
3344        let out = input
3345            .reduce(q!(|acc, v| *acc += v))
3346            .sample_eager(nondet!(/** test */))
3347            .send_bincode_external(&external);
3348
3349        let nodes = flow
3350            .with_process(&node, deployment.Localhost())
3351            .with_external(&external, deployment.Localhost())
3352            .deploy(&mut deployment);
3353
3354        deployment.deploy().await.unwrap();
3355
3356        let mut external_in = nodes.connect(input_port).await;
3357        let mut external_out = nodes.connect(out).await;
3358
3359        deployment.start().await.unwrap();
3360
3361        external_in.send(1).await.unwrap();
3362        assert_eq!(external_out.next().await.unwrap(), 1);
3363
3364        external_in.send(2).await.unwrap();
3365        assert_eq!(external_out.next().await.unwrap(), 3);
3366    }
3367
3368    #[cfg(feature = "deploy")]
3369    #[tokio::test]
3370    async fn top_level_bounded_cross_singleton() {
3371        let mut deployment = Deployment::new();
3372
3373        let mut flow = FlowBuilder::new();
3374        let node = flow.process::<()>();
3375        let external = flow.external::<()>();
3376
3377        let (input_port, input) =
3378            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3379
3380        let out = input
3381            .cross_singleton(
3382                node.source_iter(q!(vec![1, 2, 3]))
3383                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3384            )
3385            .send_bincode_external(&external);
3386
3387        let nodes = flow
3388            .with_process(&node, deployment.Localhost())
3389            .with_external(&external, deployment.Localhost())
3390            .deploy(&mut deployment);
3391
3392        deployment.deploy().await.unwrap();
3393
3394        let mut external_in = nodes.connect(input_port).await;
3395        let mut external_out = nodes.connect(out).await;
3396
3397        deployment.start().await.unwrap();
3398
3399        external_in.send(1).await.unwrap();
3400        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3401
3402        external_in.send(2).await.unwrap();
3403        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3404    }
3405
3406    #[cfg(feature = "deploy")]
3407    #[tokio::test]
3408    async fn top_level_bounded_reduce_cardinality() {
3409        let mut deployment = Deployment::new();
3410
3411        let mut flow = FlowBuilder::new();
3412        let node = flow.process::<()>();
3413        let external = flow.external::<()>();
3414
3415        let (input_port, input) =
3416            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3417
3418        let out = sliced! {
3419            let input = use(input, nondet!(/** test */));
3420            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3421            input.cross_singleton(v.into_stream().count())
3422        }
3423        .send_bincode_external(&external);
3424
3425        let nodes = flow
3426            .with_process(&node, deployment.Localhost())
3427            .with_external(&external, deployment.Localhost())
3428            .deploy(&mut deployment);
3429
3430        deployment.deploy().await.unwrap();
3431
3432        let mut external_in = nodes.connect(input_port).await;
3433        let mut external_out = nodes.connect(out).await;
3434
3435        deployment.start().await.unwrap();
3436
3437        external_in.send(1).await.unwrap();
3438        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3439
3440        external_in.send(2).await.unwrap();
3441        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3442    }
3443
3444    #[cfg(feature = "deploy")]
3445    #[tokio::test]
3446    async fn top_level_bounded_into_singleton_cardinality() {
3447        let mut deployment = Deployment::new();
3448
3449        let mut flow = FlowBuilder::new();
3450        let node = flow.process::<()>();
3451        let external = flow.external::<()>();
3452
3453        let (input_port, input) =
3454            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3455
3456        let out = sliced! {
3457            let input = use(input, nondet!(/** test */));
3458            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3459            input.cross_singleton(v.into_stream().count())
3460        }
3461        .send_bincode_external(&external);
3462
3463        let nodes = flow
3464            .with_process(&node, deployment.Localhost())
3465            .with_external(&external, deployment.Localhost())
3466            .deploy(&mut deployment);
3467
3468        deployment.deploy().await.unwrap();
3469
3470        let mut external_in = nodes.connect(input_port).await;
3471        let mut external_out = nodes.connect(out).await;
3472
3473        deployment.start().await.unwrap();
3474
3475        external_in.send(1).await.unwrap();
3476        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3477
3478        external_in.send(2).await.unwrap();
3479        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3480    }
3481
3482    #[cfg(feature = "deploy")]
3483    #[tokio::test]
3484    async fn atomic_fold_replays_each_tick() {
3485        let mut deployment = Deployment::new();
3486
3487        let mut flow = FlowBuilder::new();
3488        let node = flow.process::<()>();
3489        let external = flow.external::<()>();
3490
3491        let (input_port, input) =
3492            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3493        let tick = node.tick();
3494
3495        let out = input
3496            .batch(&tick, nondet!(/** test */))
3497            .cross_singleton(
3498                node.source_iter(q!(vec![1, 2, 3]))
3499                    .atomic()
3500                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3501                    .snapshot_atomic(&tick, nondet!(/** test */)),
3502            )
3503            .all_ticks()
3504            .send_bincode_external(&external);
3505
3506        let nodes = flow
3507            .with_process(&node, deployment.Localhost())
3508            .with_external(&external, deployment.Localhost())
3509            .deploy(&mut deployment);
3510
3511        deployment.deploy().await.unwrap();
3512
3513        let mut external_in = nodes.connect(input_port).await;
3514        let mut external_out = nodes.connect(out).await;
3515
3516        deployment.start().await.unwrap();
3517
3518        external_in.send(1).await.unwrap();
3519        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3520
3521        external_in.send(2).await.unwrap();
3522        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3523    }
3524
3525    #[cfg(feature = "deploy")]
3526    #[tokio::test]
3527    async fn unbounded_scan_remembers_state() {
3528        let mut deployment = Deployment::new();
3529
3530        let mut flow = FlowBuilder::new();
3531        let node = flow.process::<()>();
3532        let external = flow.external::<()>();
3533
3534        let (input_port, input) = node.source_external_bincode(&external);
3535        let out = input
3536            .scan(
3537                q!(|| 0),
3538                q!(|acc, v| {
3539                    *acc += v;
3540                    Some(*acc)
3541                }),
3542            )
3543            .send_bincode_external(&external);
3544
3545        let nodes = flow
3546            .with_process(&node, deployment.Localhost())
3547            .with_external(&external, deployment.Localhost())
3548            .deploy(&mut deployment);
3549
3550        deployment.deploy().await.unwrap();
3551
3552        let mut external_in = nodes.connect(input_port).await;
3553        let mut external_out = nodes.connect(out).await;
3554
3555        deployment.start().await.unwrap();
3556
3557        external_in.send(1).await.unwrap();
3558        assert_eq!(external_out.next().await.unwrap(), 1);
3559
3560        external_in.send(2).await.unwrap();
3561        assert_eq!(external_out.next().await.unwrap(), 3);
3562    }
3563
3564    #[cfg(feature = "deploy")]
3565    #[tokio::test]
3566    async fn unbounded_enumerate_remembers_state() {
3567        let mut deployment = Deployment::new();
3568
3569        let mut flow = FlowBuilder::new();
3570        let node = flow.process::<()>();
3571        let external = flow.external::<()>();
3572
3573        let (input_port, input) = node.source_external_bincode(&external);
3574        let out = input.enumerate().send_bincode_external(&external);
3575
3576        let nodes = flow
3577            .with_process(&node, deployment.Localhost())
3578            .with_external(&external, deployment.Localhost())
3579            .deploy(&mut deployment);
3580
3581        deployment.deploy().await.unwrap();
3582
3583        let mut external_in = nodes.connect(input_port).await;
3584        let mut external_out = nodes.connect(out).await;
3585
3586        deployment.start().await.unwrap();
3587
3588        external_in.send(1).await.unwrap();
3589        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3590
3591        external_in.send(2).await.unwrap();
3592        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3593    }
3594
3595    #[cfg(feature = "deploy")]
3596    #[tokio::test]
3597    async fn unbounded_unique_remembers_state() {
3598        let mut deployment = Deployment::new();
3599
3600        let mut flow = FlowBuilder::new();
3601        let node = flow.process::<()>();
3602        let external = flow.external::<()>();
3603
3604        let (input_port, input) =
3605            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3606        let out = input.unique().send_bincode_external(&external);
3607
3608        let nodes = flow
3609            .with_process(&node, deployment.Localhost())
3610            .with_external(&external, deployment.Localhost())
3611            .deploy(&mut deployment);
3612
3613        deployment.deploy().await.unwrap();
3614
3615        let mut external_in = nodes.connect(input_port).await;
3616        let mut external_out = nodes.connect(out).await;
3617
3618        deployment.start().await.unwrap();
3619
3620        external_in.send(1).await.unwrap();
3621        assert_eq!(external_out.next().await.unwrap(), 1);
3622
3623        external_in.send(2).await.unwrap();
3624        assert_eq!(external_out.next().await.unwrap(), 2);
3625
3626        external_in.send(1).await.unwrap();
3627        external_in.send(3).await.unwrap();
3628        assert_eq!(external_out.next().await.unwrap(), 3);
3629    }
3630
3631    #[cfg(feature = "sim")]
3632    #[test]
3633    #[should_panic]
3634    fn sim_batch_nondet_size() {
3635        let mut flow = FlowBuilder::new();
3636        let node = flow.process::<()>();
3637
3638        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3639
3640        let tick = node.tick();
3641        let out_recv = input
3642            .batch(&tick, nondet!(/** test */))
3643            .count()
3644            .all_ticks()
3645            .sim_output();
3646
3647        flow.sim().exhaustive(async || {
3648            in_send.send(());
3649            in_send.send(());
3650            in_send.send(());
3651
3652            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3653        });
3654    }
3655
3656    #[cfg(feature = "sim")]
3657    #[test]
3658    fn sim_batch_preserves_order() {
3659        let mut flow = FlowBuilder::new();
3660        let node = flow.process::<()>();
3661
3662        let (in_send, input) = node.sim_input();
3663
3664        let tick = node.tick();
3665        let out_recv = input
3666            .batch(&tick, nondet!(/** test */))
3667            .all_ticks()
3668            .sim_output();
3669
3670        flow.sim().exhaustive(async || {
3671            in_send.send(1);
3672            in_send.send(2);
3673            in_send.send(3);
3674
3675            out_recv.assert_yields_only([1, 2, 3]).await;
3676        });
3677    }
3678
3679    #[cfg(feature = "sim")]
3680    #[test]
3681    #[should_panic]
3682    fn sim_batch_unordered_shuffles() {
3683        let mut flow = FlowBuilder::new();
3684        let node = flow.process::<()>();
3685
3686        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3687
3688        let tick = node.tick();
3689        let batch = input.batch(&tick, nondet!(/** test */));
3690        let out_recv = batch
3691            .clone()
3692            .min()
3693            .zip(batch.max())
3694            .all_ticks()
3695            .sim_output();
3696
3697        flow.sim().exhaustive(async || {
3698            in_send.send_many_unordered([1, 2, 3]);
3699
3700            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3701                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3702            }
3703        });
3704    }
3705
3706    #[cfg(feature = "sim")]
3707    #[test]
3708    fn sim_batch_unordered_shuffles_count() {
3709        let mut flow = FlowBuilder::new();
3710        let node = flow.process::<()>();
3711
3712        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3713
3714        let tick = node.tick();
3715        let batch = input.batch(&tick, nondet!(/** test */));
3716        let out_recv = batch.all_ticks().sim_output();
3717
3718        let instance_count = flow.sim().exhaustive(async || {
3719            in_send.send_many_unordered([1, 2, 3, 4]);
3720            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3721        });
3722
3723        assert_eq!(
3724            instance_count,
3725            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3726        )
3727    }
3728
3729    #[cfg(feature = "sim")]
3730    #[test]
3731    #[should_panic]
3732    fn sim_observe_order_batched() {
3733        let mut flow = FlowBuilder::new();
3734        let node = flow.process::<()>();
3735
3736        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3737
3738        let tick = node.tick();
3739        let batch = input.batch(&tick, nondet!(/** test */));
3740        let out_recv = batch
3741            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3742            .all_ticks()
3743            .sim_output();
3744
3745        flow.sim().exhaustive(async || {
3746            in_send.send_many_unordered([1, 2, 3, 4]);
3747            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3748        });
3749    }
3750
3751    #[cfg(feature = "sim")]
3752    #[test]
3753    fn sim_observe_order_batched_count() {
3754        let mut flow = FlowBuilder::new();
3755        let node = flow.process::<()>();
3756
3757        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3758
3759        let tick = node.tick();
3760        let batch = input.batch(&tick, nondet!(/** test */));
3761        let out_recv = batch
3762            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3763            .all_ticks()
3764            .sim_output();
3765
3766        let instance_count = flow.sim().exhaustive(async || {
3767            in_send.send_many_unordered([1, 2, 3, 4]);
3768            let _ = out_recv.collect::<Vec<_>>().await;
3769        });
3770
3771        assert_eq!(
3772            instance_count,
3773            192 // 4! * 2^{4 - 1}
3774        )
3775    }
3776
3777    #[cfg(feature = "sim")]
3778    #[test]
3779    fn sim_unordered_count_instance_count() {
3780        let mut flow = FlowBuilder::new();
3781        let node = flow.process::<()>();
3782
3783        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3784
3785        let tick = node.tick();
3786        let out_recv = input
3787            .count()
3788            .snapshot(&tick, nondet!(/** test */))
3789            .all_ticks()
3790            .sim_output();
3791
3792        let instance_count = flow.sim().exhaustive(async || {
3793            in_send.send_many_unordered([1, 2, 3, 4]);
3794            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3795        });
3796
3797        assert_eq!(
3798            instance_count,
3799            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3800        )
3801    }
3802
3803    #[cfg(feature = "sim")]
3804    #[test]
3805    fn sim_top_level_assume_ordering() {
3806        let mut flow = FlowBuilder::new();
3807        let node = flow.process::<()>();
3808
3809        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3810
3811        let out_recv = input
3812            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3813            .sim_output();
3814
3815        let instance_count = flow.sim().exhaustive(async || {
3816            in_send.send_many_unordered([1, 2, 3]);
3817            let mut out = out_recv.collect::<Vec<_>>().await;
3818            out.sort();
3819            assert_eq!(out, vec![1, 2, 3]);
3820        });
3821
3822        assert_eq!(instance_count, 6)
3823    }
3824
3825    #[cfg(feature = "sim")]
3826    #[test]
3827    fn sim_top_level_assume_ordering_cycle_back() {
3828        let mut flow = FlowBuilder::new();
3829        let node = flow.process::<()>();
3830        let node2 = flow.process::<()>();
3831
3832        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3833
3834        let (complete_cycle_back, cycle_back) =
3835            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3836        let ordered = input
3837            .merge_unordered(cycle_back)
3838            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3839        complete_cycle_back.complete(
3840            ordered
3841                .clone()
3842                .map(q!(|v| v + 1))
3843                .filter(q!(|v| v % 2 == 1))
3844                .send(&node2, TCP.fail_stop().bincode())
3845                .send(&node, TCP.fail_stop().bincode()),
3846        );
3847
3848        let out_recv = ordered.sim_output();
3849
3850        let mut saw = false;
3851        let instance_count = flow.sim().exhaustive(async || {
3852            in_send.send_many_unordered([0, 2]);
3853            let out = out_recv.collect::<Vec<_>>().await;
3854
3855            if out.starts_with(&[0, 1, 2]) {
3856                saw = true;
3857            }
3858        });
3859
3860        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3861        assert_eq!(instance_count, 6);
3862    }
3863
3864    #[cfg(feature = "sim")]
3865    #[test]
3866    fn sim_top_level_assume_ordering_cycle_back_tick() {
3867        let mut flow = FlowBuilder::new();
3868        let node = flow.process::<()>();
3869        let node2 = flow.process::<()>();
3870
3871        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3872
3873        let (complete_cycle_back, cycle_back) =
3874            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3875        let ordered = input
3876            .merge_unordered(cycle_back)
3877            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3878        complete_cycle_back.complete(
3879            ordered
3880                .clone()
3881                .batch(&node.tick(), nondet!(/** test */))
3882                .all_ticks()
3883                .map(q!(|v| v + 1))
3884                .filter(q!(|v| v % 2 == 1))
3885                .send(&node2, TCP.fail_stop().bincode())
3886                .send(&node, TCP.fail_stop().bincode()),
3887        );
3888
3889        let out_recv = ordered.sim_output();
3890
3891        let mut saw = false;
3892        let instance_count = flow.sim().exhaustive(async || {
3893            in_send.send_many_unordered([0, 2]);
3894            let out = out_recv.collect::<Vec<_>>().await;
3895
3896            if out.starts_with(&[0, 1, 2]) {
3897                saw = true;
3898            }
3899        });
3900
3901        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3902        assert_eq!(instance_count, 58);
3903    }
3904
3905    #[cfg(feature = "sim")]
3906    #[test]
3907    fn sim_top_level_assume_ordering_multiple() {
3908        let mut flow = FlowBuilder::new();
3909        let node = flow.process::<()>();
3910        let node2 = flow.process::<()>();
3911
3912        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3913        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3914
3915        let (complete_cycle_back, cycle_back) =
3916            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3917        let input1_ordered = input
3918            .clone()
3919            .merge_unordered(cycle_back)
3920            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3921        let foo = input1_ordered
3922            .clone()
3923            .map(q!(|v| v + 3))
3924            .weaken_ordering::<NoOrder>()
3925            .merge_unordered(input2)
3926            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3927
3928        complete_cycle_back.complete(
3929            foo.filter(q!(|v| *v == 3))
3930                .send(&node2, TCP.fail_stop().bincode())
3931                .send(&node, TCP.fail_stop().bincode()),
3932        );
3933
3934        let out_recv = input1_ordered.sim_output();
3935
3936        let mut saw = false;
3937        let instance_count = flow.sim().exhaustive(async || {
3938            in_send.send_many_unordered([0, 1]);
3939            let out = out_recv.collect::<Vec<_>>().await;
3940
3941            if out.starts_with(&[0, 3, 1]) {
3942                saw = true;
3943            }
3944        });
3945
3946        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3947        assert_eq!(instance_count, 24);
3948    }
3949
3950    #[cfg(feature = "sim")]
3951    #[test]
3952    fn sim_atomic_assume_ordering_cycle_back() {
3953        let mut flow = FlowBuilder::new();
3954        let node = flow.process::<()>();
3955        let node2 = flow.process::<()>();
3956
3957        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3958
3959        let (complete_cycle_back, cycle_back) =
3960            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3961        let ordered = input
3962            .merge_unordered(cycle_back)
3963            .atomic()
3964            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3965            .end_atomic();
3966        complete_cycle_back.complete(
3967            ordered
3968                .clone()
3969                .map(q!(|v| v + 1))
3970                .filter(q!(|v| v % 2 == 1))
3971                .send(&node2, TCP.fail_stop().bincode())
3972                .send(&node, TCP.fail_stop().bincode()),
3973        );
3974
3975        let out_recv = ordered.sim_output();
3976
3977        let instance_count = flow.sim().exhaustive(async || {
3978            in_send.send_many_unordered([0, 2]);
3979            let out = out_recv.collect::<Vec<_>>().await;
3980            assert_eq!(out.len(), 4);
3981        });
3982        assert_eq!(instance_count, 22);
3983    }
3984
3985    #[cfg(feature = "deploy")]
3986    #[tokio::test]
3987    async fn partition_evens_odds() {
3988        let mut deployment = Deployment::new();
3989
3990        let mut flow = FlowBuilder::new();
3991        let node = flow.process::<()>();
3992        let external = flow.external::<()>();
3993
3994        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3995        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3996        let evens_port = evens.send_bincode_external(&external);
3997        let odds_port = odds.send_bincode_external(&external);
3998
3999        let nodes = flow
4000            .with_process(&node, deployment.Localhost())
4001            .with_external(&external, deployment.Localhost())
4002            .deploy(&mut deployment);
4003
4004        deployment.deploy().await.unwrap();
4005
4006        let mut evens_out = nodes.connect(evens_port).await;
4007        let mut odds_out = nodes.connect(odds_port).await;
4008
4009        deployment.start().await.unwrap();
4010
4011        let mut even_results = Vec::new();
4012        for _ in 0..3 {
4013            even_results.push(evens_out.next().await.unwrap());
4014        }
4015        even_results.sort();
4016        assert_eq!(even_results, vec![2, 4, 6]);
4017
4018        let mut odd_results = Vec::new();
4019        for _ in 0..3 {
4020            odd_results.push(odds_out.next().await.unwrap());
4021        }
4022        odd_results.sort();
4023        assert_eq!(odd_results, vec![1, 3, 5]);
4024    }
4025
4026    #[cfg(feature = "deploy")]
4027    #[tokio::test]
4028    async fn unconsumed_inspect_still_runs() {
4029        use crate::deploy::DeployCrateWrapper;
4030
4031        let mut deployment = Deployment::new();
4032
4033        let mut flow = FlowBuilder::new();
4034        let node = flow.process::<()>();
4035
4036        // The return value of .inspect() is intentionally dropped.
4037        // Before the Null-root fix, this would silently do nothing.
4038        node.source_iter(q!(0..5))
4039            .inspect(q!(|x| println!("inspect: {}", x)));
4040
4041        let nodes = flow
4042            .with_process(&node, deployment.Localhost())
4043            .deploy(&mut deployment);
4044
4045        deployment.deploy().await.unwrap();
4046
4047        let mut stdout = nodes.get_process(&node).stdout();
4048
4049        deployment.start().await.unwrap();
4050
4051        let mut lines = Vec::new();
4052        for _ in 0..5 {
4053            lines.push(stdout.recv().await.unwrap());
4054        }
4055        lines.sort();
4056        assert_eq!(
4057            lines,
4058            vec![
4059                "inspect: 0",
4060                "inspect: 1",
4061                "inspect: 2",
4062                "inspect: 3",
4063                "inspect: 4",
4064            ]
4065        );
4066    }
4067
4068    #[cfg(feature = "sim")]
4069    #[test]
4070    fn sim_limit() {
4071        let mut flow = FlowBuilder::new();
4072        let node = flow.process::<()>();
4073
4074        let (in_send, input) = node.sim_input();
4075
4076        let out_recv = input.limit(q!(3)).sim_output();
4077
4078        flow.sim().exhaustive(async || {
4079            in_send.send(1);
4080            in_send.send(2);
4081            in_send.send(3);
4082            in_send.send(4);
4083            in_send.send(5);
4084
4085            out_recv.assert_yields_only([1, 2, 3]).await;
4086        });
4087    }
4088
4089    #[cfg(feature = "sim")]
4090    #[test]
4091    fn sim_limit_zero() {
4092        let mut flow = FlowBuilder::new();
4093        let node = flow.process::<()>();
4094
4095        let (in_send, input) = node.sim_input();
4096
4097        let out_recv = input.limit(q!(0)).sim_output();
4098
4099        flow.sim().exhaustive(async || {
4100            in_send.send(1);
4101            in_send.send(2);
4102
4103            out_recv.assert_yields_only::<i32, _>([]).await;
4104        });
4105    }
4106
4107    #[cfg(feature = "sim")]
4108    #[test]
4109    fn sim_merge_ordered() {
4110        let mut flow = FlowBuilder::new();
4111        let node = flow.process::<()>();
4112
4113        let (in_send, input) = node.sim_input();
4114        let (in_send2, input2) = node.sim_input();
4115
4116        let out_recv = input
4117            .merge_ordered(input2, nondet!(/** test */))
4118            .sim_output();
4119
4120        let mut saw_out_of_order = false;
4121        let instances = flow.sim().exhaustive(async || {
4122            in_send.send(1);
4123            in_send.send(2);
4124            in_send2.send(3);
4125            in_send2.send(4);
4126
4127            let out = out_recv.collect::<Vec<_>>().await;
4128
4129            if out == [1, 3, 2, 4] {
4130                saw_out_of_order = true;
4131            }
4132
4133            // Assert ordering preservation: elements from each input must
4134            // appear in their original relative order.
4135            let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4136            let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4137            assert_eq!(
4138                first_elements,
4139                vec![1, 2],
4140                "first input order violated: {:?}",
4141                out
4142            );
4143            assert_eq!(
4144                second_elements,
4145                vec![3, 4],
4146                "second input order violated: {:?}",
4147                out
4148            );
4149
4150            first_elements.append(&mut second_elements);
4151            first_elements.sort();
4152            assert_eq!(first_elements, vec![1, 2, 3, 4]);
4153        });
4154
4155        assert!(saw_out_of_order);
4156        assert_eq!(instances, 6);
4157    }
4158
4159    /// Tests that merge_ordered passes through elements when only one input
4160    /// has data.
4161    #[cfg(feature = "sim")]
4162    #[test]
4163    fn sim_merge_ordered_one_empty() {
4164        let mut flow = FlowBuilder::new();
4165        let node = flow.process::<()>();
4166
4167        let (in_send, input) = node.sim_input();
4168        let (_in_send2, input2) = node.sim_input();
4169
4170        let out_recv = input
4171            .merge_ordered(input2, nondet!(/** test */))
4172            .sim_output();
4173
4174        let instances = flow.sim().exhaustive(async || {
4175            in_send.send(1);
4176            in_send.send(2);
4177
4178            let out = out_recv.collect::<Vec<_>>().await;
4179            assert_eq!(out, vec![1, 2]);
4180        });
4181
4182        // Only one possible interleaving when one input is empty
4183        assert_eq!(instances, 1);
4184    }
4185
4186    /// Tests that merge_ordered correctly handles feedback cycles.
4187    /// An element output from merge_ordered is filtered and cycled back to
4188    /// one of its inputs. The one-at-a-time release must allow the cycled-back
4189    /// element to arrive and potentially be emitted before elements still
4190    /// waiting on the other input.
4191    #[cfg(feature = "sim")]
4192    #[test]
4193    fn sim_merge_ordered_cycle_back() {
4194        let mut flow = FlowBuilder::new();
4195        let node = flow.process::<()>();
4196
4197        let (in_send, input) = node.sim_input();
4198
4199        // Create a forward ref for the cycle back
4200        let (complete_cycle_back, cycle_back) =
4201            node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4202
4203        // merge_ordered: input (external) with cycle_back
4204        let merged = input.merge_ordered(cycle_back, nondet!(/** test */));
4205
4206        // Cycle back: elements equal to 1 get mapped to 10 and fed back
4207        complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4208
4209        let out_recv = merged.sim_output();
4210
4211        // Send 1 and 2. Element 1 should cycle back as 10.
4212        // Valid orderings must have 1 before 10 (since 10 depends on 1).
4213        let mut saw_cycle_before_second = false;
4214        flow.sim().exhaustive(async || {
4215            in_send.send(1);
4216            in_send.send(2);
4217
4218            let out = out_recv.collect::<Vec<_>>().await;
4219
4220            // 10 must always come after 1 (causal dependency)
4221            let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4222            let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4223            assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4224
4225            // Check if we see [1, 10, 2] — the cycled element beats the second input
4226            if out == [1, 10, 2] {
4227                saw_cycle_before_second = true;
4228            }
4229
4230            let mut sorted = out;
4231            sorted.sort();
4232            assert_eq!(sorted, vec![1, 2, 10]);
4233        });
4234
4235        assert!(
4236            saw_cycle_before_second,
4237            "never saw the cycled element arrive before the second input element"
4238        );
4239    }
4240
4241    /// Tests that merge_ordered correctly interleaves when one input has a
4242    /// delayed element. With a: [1, _delay_, 2] and b: [3, 4], the delayed
4243    /// element 2 should be able to appear after b's elements.
4244    #[cfg(feature = "sim")]
4245    #[test]
4246    fn sim_merge_ordered_delayed() {
4247        let mut flow = FlowBuilder::new();
4248        let node = flow.process::<()>();
4249
4250        let (in_send, input) = node.sim_input();
4251        let (in_send2, input2) = node.sim_input();
4252
4253        let out_recv = input
4254            .merge_ordered(input2, nondet!(/** test */))
4255            .sim_output();
4256
4257        let mut saw_delayed_interleaving = false;
4258        flow.sim().exhaustive(async || {
4259            // Send 1 from a, and 3, 4 from b
4260            in_send.send(1);
4261            in_send2.send(3);
4262            in_send2.send(4);
4263
4264            // Collect what's available so far
4265            let first_batch = out_recv.collect::<Vec<_>>().await;
4266
4267            // Now send the delayed element 2 from a
4268            in_send.send(2);
4269            let second_batch = out_recv.collect::<Vec<_>>().await;
4270
4271            let mut all: Vec<_> = first_batch
4272                .iter()
4273                .chain(second_batch.iter())
4274                .copied()
4275                .collect();
4276
4277            // Check if we saw [1, 3, 4, 2] — the delayed interleaving
4278            if all == [1, 3, 4, 2] {
4279                saw_delayed_interleaving = true;
4280            }
4281
4282            all.sort();
4283            assert_eq!(all, vec![1, 2, 3, 4]);
4284        });
4285
4286        assert!(saw_delayed_interleaving);
4287    }
4288
4289    /// Deploy test: merge_ordered with a delayed element on one input.
4290    /// Sends a=1, b=3, b=4, then after receiving those, sends a=2.
4291    /// Expects to see [1, 3, 4] first, then [2] — demonstrating that
4292    /// both inputs are pulled and the delayed element arrives later.
4293    #[cfg(feature = "deploy")]
4294    #[tokio::test]
4295    async fn deploy_merge_ordered_delayed() {
4296        let mut deployment = Deployment::new();
4297
4298        let mut flow = FlowBuilder::new();
4299        let node = flow.process::<()>();
4300        let external = flow.external::<()>();
4301
4302        let (input_a_port, input_a) = node.source_external_bincode(&external);
4303        let (input_b_port, input_b) = node.source_external_bincode(&external);
4304
4305        let out = input_a
4306            .assume_ordering(nondet!(/** test */))
4307            .merge_ordered(
4308                input_b.assume_ordering(nondet!(/** test */)),
4309                nondet!(/** test */),
4310            )
4311            .send_bincode_external(&external);
4312
4313        let nodes = flow
4314            .with_process(&node, deployment.Localhost())
4315            .with_external(&external, deployment.Localhost())
4316            .deploy(&mut deployment);
4317
4318        deployment.deploy().await.unwrap();
4319
4320        let mut ext_a = nodes.connect(input_a_port).await;
4321        let mut ext_b = nodes.connect(input_b_port).await;
4322        let mut ext_out = nodes.connect(out).await;
4323
4324        deployment.start().await.unwrap();
4325
4326        // Send a=1, b=3, b=4
4327        ext_a.send(1).await.unwrap();
4328        ext_b.send(3).await.unwrap();
4329        ext_b.send(4).await.unwrap();
4330
4331        // Collect the first 3 elements
4332        let mut received = Vec::new();
4333        for _ in 0..3 {
4334            received.push(ext_out.next().await.unwrap());
4335        }
4336
4337        // Now send the delayed a=2
4338        ext_a.send(2).await.unwrap();
4339        received.push(ext_out.next().await.unwrap());
4340
4341        // All elements should be present
4342        received.sort();
4343        assert_eq!(received, vec![1, 2, 3, 4]);
4344    }
4345
4346    #[cfg(feature = "deploy")]
4347    #[tokio::test]
4348    async fn monotone_fold_threshold() {
4349        use crate::properties::manual_proof;
4350
4351        let mut deployment = Deployment::new();
4352
4353        let mut flow = FlowBuilder::new();
4354        let node = flow.process::<()>();
4355        let external = flow.external::<()>();
4356
4357        let in_unbounded: super::Stream<_, _> =
4358            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4359        let sum = in_unbounded.fold(
4360            q!(|| 0),
4361            q!(
4362                |sum, v| {
4363                    *sum += v;
4364                },
4365                monotone = manual_proof!(/** test */)
4366            ),
4367        );
4368
4369        let threshold_out = sum
4370            .threshold_greater_or_equal(node.singleton(q!(7)))
4371            .send_bincode_external(&external);
4372
4373        let nodes = flow
4374            .with_process(&node, deployment.Localhost())
4375            .with_external(&external, deployment.Localhost())
4376            .deploy(&mut deployment);
4377
4378        deployment.deploy().await.unwrap();
4379
4380        let mut threshold_out = nodes.connect(threshold_out).await;
4381
4382        deployment.start().await.unwrap();
4383
4384        assert_eq!(threshold_out.next().await.unwrap(), 7);
4385    }
4386
4387    #[cfg(feature = "deploy")]
4388    #[tokio::test]
4389    async fn monotone_count_threshold() {
4390        let mut deployment = Deployment::new();
4391
4392        let mut flow = FlowBuilder::new();
4393        let node = flow.process::<()>();
4394        let external = flow.external::<()>();
4395
4396        let in_unbounded: super::Stream<_, _> =
4397            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4398        let sum = in_unbounded.count();
4399
4400        let threshold_out = sum
4401            .threshold_greater_or_equal(node.singleton(q!(3)))
4402            .send_bincode_external(&external);
4403
4404        let nodes = flow
4405            .with_process(&node, deployment.Localhost())
4406            .with_external(&external, deployment.Localhost())
4407            .deploy(&mut deployment);
4408
4409        deployment.deploy().await.unwrap();
4410
4411        let mut threshold_out = nodes.connect(threshold_out).await;
4412
4413        deployment.start().await.unwrap();
4414
4415        assert_eq!(threshold_out.next().await.unwrap(), 3);
4416    }
4417
4418    #[cfg(feature = "deploy")]
4419    #[tokio::test]
4420    async fn monotone_map_order_preserving_threshold() {
4421        use crate::properties::manual_proof;
4422
4423        let mut deployment = Deployment::new();
4424
4425        let mut flow = FlowBuilder::new();
4426        let node = flow.process::<()>();
4427        let external = flow.external::<()>();
4428
4429        let in_unbounded: super::Stream<_, _> =
4430            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4431        let sum = in_unbounded.fold(
4432            q!(|| 0),
4433            q!(
4434                |sum, v| {
4435                    *sum += v;
4436                },
4437                monotone = manual_proof!(/** test */)
4438            ),
4439        );
4440
4441        // map with order_preserving should preserve monotonicity
4442        let doubled = sum.map(q!(
4443            |v| v * 2,
4444            order_preserving = manual_proof!(/** doubling preserves order */)
4445        ));
4446
4447        let threshold_out = doubled
4448            .threshold_greater_or_equal(node.singleton(q!(14)))
4449            .send_bincode_external(&external);
4450
4451        let nodes = flow
4452            .with_process(&node, deployment.Localhost())
4453            .with_external(&external, deployment.Localhost())
4454            .deploy(&mut deployment);
4455
4456        deployment.deploy().await.unwrap();
4457
4458        let mut threshold_out = nodes.connect(threshold_out).await;
4459
4460        deployment.start().await.unwrap();
4461
4462        assert_eq!(threshold_out.next().await.unwrap(), 14);
4463    }
4464
4465    // === Compile-time type tests for join/cross_product ordering ===
4466
4467    #[cfg(any(feature = "deploy", feature = "sim"))]
4468    mod join_ordering_type_tests {
4469        use crate::live_collections::boundedness::{Bounded, Unbounded};
4470        use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4471        use crate::location::{Location, Process};
4472
4473        #[expect(dead_code, reason = "compile-time type test")]
4474        fn join_unbounded_with_bounded_preserves_order<'a>(
4475            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4476            right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4477        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4478            left.join(right)
4479        }
4480
4481        #[expect(dead_code, reason = "compile-time type test")]
4482        fn join_unbounded_with_unbounded_is_no_order<'a>(
4483            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4484            right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4485        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4486            left.join(right)
4487        }
4488
4489        #[expect(dead_code, reason = "compile-time type test")]
4490        fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4491            left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4492            right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4493        ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4494            left.join(right)
4495        }
4496
4497        #[expect(dead_code, reason = "compile-time type test")]
4498        fn join_unbounded_noorder_with_bounded<'a>(
4499            left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4500            right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4501        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4502            left.join(right)
4503        }
4504
4505        // === Compile-time type tests for cross_product ordering ===
4506
4507        #[expect(dead_code, reason = "compile-time type test")]
4508        fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4509            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4510            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4511        ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4512            left.cross_product(right)
4513        }
4514
4515        #[expect(dead_code, reason = "compile-time type test")]
4516        fn cross_product_bounded_with_bounded_preserves_order<'a>(
4517            left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4518            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4519        ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4520            left.cross_product(right)
4521        }
4522
4523        #[expect(dead_code, reason = "compile-time type test")]
4524        fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4525            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4526            right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4527        ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4528            left.cross_product(right)
4529        }
4530    } // mod join_ordering_type_tests
4531
4532    // === Runtime correctness tests for bounded join/cross_product ===
4533
4534    #[cfg(feature = "sim")]
4535    #[test]
4536    fn cross_product_mixed_boundedness_correctness() {
4537        use stageleft::q;
4538
4539        use crate::compile::builder::FlowBuilder;
4540        use crate::nondet::nondet;
4541
4542        let mut flow = FlowBuilder::new();
4543        let process = flow.process::<()>();
4544        let tick = process.tick();
4545
4546        let left = process.source_iter(q!(vec![1, 2]));
4547        let right = process
4548            .source_iter(q!(vec!['a', 'b']))
4549            .batch(&tick, nondet!(/** test */))
4550            .all_ticks();
4551
4552        let out = left.cross_product(right).sim_output();
4553
4554        flow.sim().exhaustive(async || {
4555            out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4556                .await;
4557        });
4558    }
4559
4560    #[cfg(feature = "sim")]
4561    #[test]
4562    fn join_mixed_boundedness_correctness() {
4563        use stageleft::q;
4564
4565        use crate::compile::builder::FlowBuilder;
4566        use crate::nondet::nondet;
4567
4568        let mut flow = FlowBuilder::new();
4569        let process = flow.process::<()>();
4570        let tick = process.tick();
4571
4572        let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4573        let right = process
4574            .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4575            .batch(&tick, nondet!(/** test */))
4576            .all_ticks();
4577
4578        let out = left.join(right).sim_output();
4579
4580        flow.sim().exhaustive(async || {
4581            out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4582                .await;
4583        });
4584    }
4585
4586    #[cfg(feature = "sim")]
4587    #[test]
4588    fn sim_merge_unordered_independent_atomics() {
4589        let mut flow = FlowBuilder::new();
4590        let node = flow.process::<()>();
4591
4592        let (in1_send, input1) = node.sim_input::<_, TotalOrder, _>();
4593        let (in2_send, input2) = node.sim_input::<_, TotalOrder, _>();
4594
4595        let out = input1
4596            .atomic()
4597            .merge_unordered(input2.atomic())
4598            .end_atomic()
4599            .sim_output();
4600
4601        flow.sim().exhaustive(async || {
4602            in1_send.send(1);
4603            in2_send.send(2);
4604
4605            out.assert_yields_only_unordered(vec![1, 2]).await;
4606        });
4607    }
4608}