Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28#[cfg(feature = "build")]
29use crate::compile::builder::StmtId;
30use crate::compile::builder::{CycleId, ExternalPortId};
31#[cfg(feature = "build")]
32use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35#[cfg(feature = "build")]
36use crate::singleton_ref::singleton_ref_ident;
37
38pub mod backtrace;
39use backtrace::Backtrace;
40
41/// A closure expression bundled with any singleton references it captures.
42///
43/// When a `q!()` closure captures a `SingletonRef`, the reference is recorded here
44/// alongside the closure's expression. This allows per-closure tracking of singleton
45/// captures, which is important for nodes with multiple closures (e.g. Fold has `init` and `acc`).
46pub struct ClosureExpr {
47    pub(crate) expr: DebugExpr,
48    /// Each entry is `(singleton_node, is_mut)`.
49    /// The index in the Vec determines the ident name via [`singleton_ref_ident`].
50    pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
51}
52
53impl Clone for ClosureExpr {
54    fn clone(&self) -> Self {
55        Self {
56            expr: self.expr.clone(),
57            singleton_refs: self
58                .singleton_refs
59                .iter()
60                .map(|(node, is_mut)| {
61                    let HydroNode::Singleton { inner, metadata } = node else {
62                        panic!("singleton_refs should only contain HydroNode::Singleton");
63                    };
64                    (
65                        HydroNode::Singleton {
66                            inner: SharedNode(Rc::clone(&inner.0)),
67                            metadata: metadata.clone(),
68                        },
69                        *is_mut,
70                    )
71                })
72                .collect(),
73        }
74    }
75}
76
77impl Hash for ClosureExpr {
78    fn hash<H: Hasher>(&self, state: &mut H) {
79        self.expr.hash(state);
80        // singleton_refs are structural children (like HydroIrMetadata), not
81        // identity-defining. Two closures with the same expr but different
82        // captured refs are the same closure text — the refs only affect codegen.
83    }
84}
85
86impl serde::Serialize for ClosureExpr {
87    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
88        use serde::ser::SerializeStruct;
89        let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
90        s.serialize_field("expr", &self.expr)?;
91        s.serialize_field(
92            "singleton_refs",
93            &SerializableSingletonRefs(&self.singleton_refs),
94        )?;
95        s.end()
96    }
97}
98
99struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
100
101impl serde::Serialize for SerializableSingletonRefs<'_> {
102    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
103        use serde::ser::SerializeSeq;
104        let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
105        for (node, is_mut) in self.0.iter() {
106            seq.serialize_element(&(node, is_mut))?;
107        }
108        seq.end()
109    }
110}
111
112impl Debug for ClosureExpr {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        Debug::fmt(&self.expr, f)
115    }
116}
117
118impl Display for ClosureExpr {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        Display::fmt(&self.expr, f)
121    }
122}
123
124impl From<syn::Expr> for ClosureExpr {
125    fn from(expr: syn::Expr) -> Self {
126        Self {
127            expr: DebugExpr(Box::new(expr)),
128            singleton_refs: Vec::new(),
129        }
130    }
131}
132
133impl From<DebugExpr> for ClosureExpr {
134    fn from(expr: DebugExpr) -> Self {
135        Self {
136            expr,
137            singleton_refs: Vec::new(),
138        }
139    }
140}
141
142impl ClosureExpr {
143    pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
144        Self {
145            expr,
146            singleton_refs,
147        }
148    }
149
150    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
151        Self {
152            expr: self.expr.clone(),
153            singleton_refs: self
154                .singleton_refs
155                .iter()
156                .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
157                .collect(),
158        }
159    }
160
161    pub fn transform_children(
162        &mut self,
163        transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
164        seen_tees: &mut SeenSharedNodes,
165    ) {
166        for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
167            transform(ref_node, seen_tees);
168        }
169    }
170
171    /// Pop singleton ref idents from the stack and rewrite the closure's token stream,
172    /// replacing local singleton ref idents with `#{N} dfir_ident` or `#{N} mut dfir_ident` references.
173    #[cfg(feature = "build")]
174    pub fn emit_tokens(
175        &self,
176        ident_stack: &mut Vec<syn::Ident>,
177        access_counters: &mut HashMap<*const RefCell<HydroNode>, u32>,
178    ) -> TokenStream {
179        if self.singleton_refs.is_empty() {
180            self.expr.0.to_token_stream()
181        } else {
182            assert!(
183                ident_stack.len() >= self.singleton_refs.len(),
184                "ident_stack has {} entries but expected at least {} for singleton_refs",
185                ident_stack.len(),
186                self.singleton_refs.len()
187            );
188            let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
189
190            let mut let_bindings = Vec::new();
191            for ((i, (node, is_mut)), ref_ident) in
192                self.singleton_refs.iter().enumerate().zip(ref_idents)
193            {
194                let HydroNode::Singleton { inner, .. } = node else {
195                    panic!("singleton_refs should only contain HydroNode::Singleton");
196                };
197                let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
198                let counter = access_counters.entry(ptr).or_insert(0);
199                let group = if *is_mut {
200                    *counter += 1;
201                    let g = *counter;
202                    *counter += 1;
203                    g
204                } else {
205                    *counter
206                };
207
208                // TODO(mingwei): proper spanning?
209                let local_ident = singleton_ref_ident(i);
210                let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
211                let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
212                let mut_token = is_mut.then(|| quote!(mut));
213                let binding = quote! {
214                    let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
215                };
216                let_bindings.push(binding);
217            }
218
219            let expr = &self.expr.0;
220            quote! {
221                {
222                    #( #let_bindings )*
223                    #expr
224                }
225            }
226        }
227    }
228}
229
230/// Wrapper that displays only the tokens of a parsed expr.
231///
232/// Boxes `syn::Type` which is ~240 bytes.
233#[derive(Clone, Hash)]
234pub struct DebugExpr(pub Box<syn::Expr>);
235
236impl serde::Serialize for DebugExpr {
237    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
238        serializer.serialize_str(&self.to_string())
239    }
240}
241
242impl From<syn::Expr> for DebugExpr {
243    fn from(expr: syn::Expr) -> Self {
244        Self(Box::new(expr))
245    }
246}
247
248impl Deref for DebugExpr {
249    type Target = syn::Expr;
250
251    fn deref(&self) -> &Self::Target {
252        &self.0
253    }
254}
255
256impl ToTokens for DebugExpr {
257    fn to_tokens(&self, tokens: &mut TokenStream) {
258        self.0.to_tokens(tokens);
259    }
260}
261
262impl Debug for DebugExpr {
263    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264        write!(f, "{}", self.0.to_token_stream())
265    }
266}
267
268impl Display for DebugExpr {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        let original = self.0.as_ref().clone();
271        let simplified = simplify_q_macro(original);
272
273        // For now, just use quote formatting without trying to parse as a statement
274        // This avoids the syn::parse_quote! issues entirely
275        write!(f, "q!({})", quote::quote!(#simplified))
276    }
277}
278
279/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
280fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
281    // Try to parse the token string as a syn::Expr
282    // Use a visitor to simplify q! macro expansions
283    let mut simplifier = QMacroSimplifier::new();
284    simplifier.visit_expr_mut(&mut expr);
285
286    // If we found and simplified a q! macro, return the simplified version
287    if let Some(simplified) = simplifier.simplified_result {
288        simplified
289    } else {
290        expr
291    }
292}
293
294/// AST visitor that simplifies q! macro expansions
295#[derive(Default)]
296pub struct QMacroSimplifier {
297    pub simplified_result: Option<syn::Expr>,
298}
299
300impl QMacroSimplifier {
301    pub fn new() -> Self {
302        Self::default()
303    }
304}
305
306impl VisitMut for QMacroSimplifier {
307    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
308        // Check if we already found a result to avoid further processing
309        if self.simplified_result.is_some() {
310            return;
311        }
312
313        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
314            // Look for calls to stageleft::runtime_support::fn*
315            && self.is_stageleft_runtime_support_call(&path_expr.path)
316            // Try to extract the closure from the arguments
317            && let Some(closure) = self.extract_closure_from_args(&call.args)
318        {
319            self.simplified_result = Some(closure);
320            return;
321        }
322
323        // Continue visiting child expressions using the default implementation
324        // Use the default visitor to avoid infinite recursion
325        syn::visit_mut::visit_expr_mut(self, expr);
326    }
327}
328
329impl QMacroSimplifier {
330    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
331        // Check if this is a call to stageleft::runtime_support::fn*
332        if let Some(last_segment) = path.segments.last() {
333            let fn_name = last_segment.ident.to_string();
334            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
335            fn_name.contains("_type_hint")
336                && path.segments.len() > 2
337                && path.segments[0].ident == "stageleft"
338                && path.segments[1].ident == "runtime_support"
339        } else {
340            false
341        }
342    }
343
344    fn extract_closure_from_args(
345        &self,
346        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
347    ) -> Option<syn::Expr> {
348        // Look through the arguments for a closure expression
349        for arg in args {
350            if let syn::Expr::Closure(_) = arg {
351                return Some(arg.clone());
352            }
353            // Also check for closures nested in other expressions (like blocks)
354            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
355                return Some(closure_expr);
356            }
357        }
358        None
359    }
360
361    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
362        let mut visitor = ClosureFinder {
363            found_closure: None,
364            prefer_inner_blocks: true,
365        };
366        visitor.visit_expr(expr);
367        visitor.found_closure
368    }
369}
370
371/// Visitor that finds closures in expressions with special block handling
372struct ClosureFinder {
373    found_closure: Option<syn::Expr>,
374    prefer_inner_blocks: bool,
375}
376
377impl<'ast> Visit<'ast> for ClosureFinder {
378    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
379        // If we already found a closure, don't continue searching
380        if self.found_closure.is_some() {
381            return;
382        }
383
384        match expr {
385            syn::Expr::Closure(_) => {
386                self.found_closure = Some(expr.clone());
387            }
388            syn::Expr::Block(block) if self.prefer_inner_blocks => {
389                // Special handling for blocks - look for inner blocks that contain closures
390                for stmt in &block.block.stmts {
391                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
392                        && let syn::Expr::Block(_) = stmt_expr
393                    {
394                        // Check if this nested block contains a closure
395                        let mut inner_visitor = ClosureFinder {
396                            found_closure: None,
397                            prefer_inner_blocks: false, // Avoid infinite recursion
398                        };
399                        inner_visitor.visit_expr(stmt_expr);
400                        if inner_visitor.found_closure.is_some() {
401                            // Found a closure in an inner block, return that block
402                            self.found_closure = Some(stmt_expr.clone());
403                            return;
404                        }
405                    }
406                }
407
408                // If no inner block with closure found, continue with normal visitation
409                visit::visit_expr(self, expr);
410
411                // If we found a closure, just return the closure itself, not the whole block
412                // unless we're in the special case where we want the containing block
413                if self.found_closure.is_some() {
414                    // The closure was found during visitation, no need to wrap in block
415                }
416            }
417            _ => {
418                // Use default visitor behavior for all other expressions
419                visit::visit_expr(self, expr);
420            }
421        }
422    }
423}
424
425/// Debug displays the type's tokens.
426///
427/// Boxes `syn::Type` which is ~320 bytes.
428#[derive(Clone, PartialEq, Eq, Hash)]
429pub struct DebugType(pub Box<syn::Type>);
430
431impl From<syn::Type> for DebugType {
432    fn from(t: syn::Type) -> Self {
433        Self(Box::new(t))
434    }
435}
436
437impl Deref for DebugType {
438    type Target = syn::Type;
439
440    fn deref(&self) -> &Self::Target {
441        &self.0
442    }
443}
444
445impl ToTokens for DebugType {
446    fn to_tokens(&self, tokens: &mut TokenStream) {
447        self.0.to_tokens(tokens);
448    }
449}
450
451impl Debug for DebugType {
452    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
453        write!(f, "{}", self.0.to_token_stream())
454    }
455}
456
457impl serde::Serialize for DebugType {
458    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
459        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
460    }
461}
462
463fn serialize_backtrace_as_span<S: serde::Serializer>(
464    backtrace: &Backtrace,
465    serializer: S,
466) -> Result<S::Ok, S::Error> {
467    match backtrace.format_span() {
468        Some(span) => serializer.serialize_some(&span),
469        None => serializer.serialize_none(),
470    }
471}
472
473fn serialize_ident<S: serde::Serializer>(
474    ident: &syn::Ident,
475    serializer: S,
476) -> Result<S::Ok, S::Error> {
477    serializer.serialize_str(&ident.to_string())
478}
479
480pub enum DebugInstantiate {
481    Building,
482    Finalized(Box<DebugInstantiateFinalized>),
483}
484
485impl serde::Serialize for DebugInstantiate {
486    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
487        match self {
488            DebugInstantiate::Building => {
489                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
490            }
491            DebugInstantiate::Finalized(_) => {
492                panic!(
493                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
494                )
495            }
496        }
497    }
498}
499
500#[cfg_attr(
501    not(feature = "build"),
502    expect(
503        dead_code,
504        reason = "sink, source unused without `feature = \"build\"`."
505    )
506)]
507pub struct DebugInstantiateFinalized {
508    sink: syn::Expr,
509    source: syn::Expr,
510    connect_fn: Option<Box<dyn FnOnce()>>,
511}
512
513impl From<DebugInstantiateFinalized> for DebugInstantiate {
514    fn from(f: DebugInstantiateFinalized) -> Self {
515        Self::Finalized(Box::new(f))
516    }
517}
518
519impl Debug for DebugInstantiate {
520    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
521        write!(f, "<network instantiate>")
522    }
523}
524
525impl Hash for DebugInstantiate {
526    fn hash<H: Hasher>(&self, _state: &mut H) {
527        // Do nothing
528    }
529}
530
531impl Clone for DebugInstantiate {
532    fn clone(&self) -> Self {
533        match self {
534            DebugInstantiate::Building => DebugInstantiate::Building,
535            DebugInstantiate::Finalized(_) => {
536                panic!("DebugInstantiate::Finalized should not be cloned")
537            }
538        }
539    }
540}
541
542/// Tracks the instantiation state of a `ClusterMembers` source.
543///
544/// During `compile_network`, the first `ClusterMembers` node for a given
545/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
546/// receives the expression returned by `Deploy::cluster_membership_stream`.
547/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
548/// during code-gen they simply reference the tee output of the first node
549/// instead of creating a redundant `source_stream`.
550#[derive(Debug, Hash, Clone, serde::Serialize)]
551pub enum ClusterMembersState {
552    /// Not yet instantiated.
553    Uninit,
554    /// The primary instance: holds the stream expression and will emit
555    /// `source_stream(expr) -> tee()` during code-gen.
556    Stream(DebugExpr),
557    /// A secondary instance that references the tee output of the primary.
558    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
559    /// can derive the deterministic tee ident without extra state.
560    Tee(LocationId, LocationId),
561}
562
563/// A source in a Hydro graph, where data enters the graph.
564#[derive(Debug, Hash, Clone, serde::Serialize)]
565pub enum HydroSource {
566    Stream(DebugExpr),
567    ExternalNetwork(),
568    Iter(DebugExpr),
569    Spin(),
570    ClusterMembers(LocationId, ClusterMembersState),
571    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
572    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
573}
574
575#[cfg(feature = "build")]
576/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
577/// and simulations.
578///
579/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
580/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
581pub trait DfirBuilder {
582    /// Whether the representation of singletons should include intermediate states.
583    fn singleton_intermediates(&self) -> bool;
584
585    /// Gets the DFIR builder for the given location, creating it if necessary.
586    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
587
588    #[expect(clippy::too_many_arguments, reason = "TODO")]
589    fn batch(
590        &mut self,
591        in_ident: syn::Ident,
592        in_location: &LocationId,
593        in_kind: &CollectionKind,
594        out_ident: &syn::Ident,
595        out_location: &LocationId,
596        op_meta: &HydroIrOpMetadata,
597        fold_hooked_idents: &HashSet<String>,
598    );
599    fn yield_from_tick(
600        &mut self,
601        in_ident: syn::Ident,
602        in_location: &LocationId,
603        in_kind: &CollectionKind,
604        out_ident: &syn::Ident,
605        out_location: &LocationId,
606    );
607
608    fn begin_atomic(
609        &mut self,
610        in_ident: syn::Ident,
611        in_location: &LocationId,
612        in_kind: &CollectionKind,
613        out_ident: &syn::Ident,
614        out_location: &LocationId,
615        op_meta: &HydroIrOpMetadata,
616    );
617    fn end_atomic(
618        &mut self,
619        in_ident: syn::Ident,
620        in_location: &LocationId,
621        in_kind: &CollectionKind,
622        out_ident: &syn::Ident,
623    );
624
625    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
626    fn observe_nondet(
627        &mut self,
628        trusted: bool,
629        location: &LocationId,
630        in_ident: syn::Ident,
631        in_kind: &CollectionKind,
632        out_ident: &syn::Ident,
633        out_kind: &CollectionKind,
634        op_meta: &HydroIrOpMetadata,
635    );
636
637    #[expect(clippy::too_many_arguments, reason = "TODO")]
638    fn merge_ordered(
639        &mut self,
640        location: &LocationId,
641        first_ident: syn::Ident,
642        second_ident: syn::Ident,
643        out_ident: &syn::Ident,
644        in_kind: &CollectionKind,
645        op_meta: &HydroIrOpMetadata,
646        operator_tag: Option<&str>,
647    );
648
649    #[expect(clippy::too_many_arguments, reason = "TODO")]
650    fn create_network(
651        &mut self,
652        from: &LocationId,
653        to: &LocationId,
654        input_ident: syn::Ident,
655        out_ident: &syn::Ident,
656        serialize: Option<&DebugExpr>,
657        sink: syn::Expr,
658        source: syn::Expr,
659        deserialize: Option<&DebugExpr>,
660        tag_id: StmtId,
661        networking_info: &crate::networking::NetworkingInfo,
662    );
663
664    fn create_external_source(
665        &mut self,
666        on: &LocationId,
667        source_expr: syn::Expr,
668        out_ident: &syn::Ident,
669        deserialize: Option<&DebugExpr>,
670        tag_id: StmtId,
671    );
672
673    fn create_external_output(
674        &mut self,
675        on: &LocationId,
676        sink_expr: syn::Expr,
677        input_ident: &syn::Ident,
678        serialize: Option<&DebugExpr>,
679        tag_id: StmtId,
680    );
681
682    /// Optionally emit a fold hook that buffers and permutes inputs before the fold.
683    /// Returns the new input ident to use for the fold if a hook was emitted.
684    fn emit_fold_hook(
685        &mut self,
686        location: &LocationId,
687        in_ident: &syn::Ident,
688        in_kind: &CollectionKind,
689        op_meta: &HydroIrOpMetadata,
690    ) -> Option<syn::Ident>;
691
692    /// Inserts necessary code to validate a manual assertion that at this point the
693    /// input live collection is consistent. In production, this is a no-op, but in simulation
694    /// this will (not yet implemented) inject assertions that validate consistency.
695    fn assert_is_consistent(
696        &mut self,
697        trusted: bool,
698        location: &LocationId,
699        in_ident: syn::Ident,
700        out_ident: &syn::Ident,
701    );
702}
703
704#[cfg(feature = "build")]
705impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
706    fn singleton_intermediates(&self) -> bool {
707        false
708    }
709
710    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
711        self.entry(location.root().key())
712            .expect("location was removed")
713            .or_default()
714    }
715
716    fn batch(
717        &mut self,
718        in_ident: syn::Ident,
719        in_location: &LocationId,
720        in_kind: &CollectionKind,
721        out_ident: &syn::Ident,
722        _out_location: &LocationId,
723        _op_meta: &HydroIrOpMetadata,
724        _fold_hooked_idents: &HashSet<String>,
725    ) {
726        let builder = self.get_dfir_mut(in_location.root());
727        if in_kind.is_bounded()
728            && matches!(
729                in_kind,
730                CollectionKind::Singleton { .. }
731                    | CollectionKind::Optional { .. }
732                    | CollectionKind::KeyedSingleton { .. }
733            )
734        {
735            assert!(in_location.is_top_level());
736            builder.add_dfir(
737                parse_quote! {
738                    #out_ident = #in_ident -> persist::<'static>();
739                },
740                None,
741                None,
742            );
743        } else {
744            builder.add_dfir(
745                parse_quote! {
746                    #out_ident = #in_ident;
747                },
748                None,
749                None,
750            );
751        }
752    }
753
754    fn yield_from_tick(
755        &mut self,
756        in_ident: syn::Ident,
757        in_location: &LocationId,
758        _in_kind: &CollectionKind,
759        out_ident: &syn::Ident,
760        _out_location: &LocationId,
761    ) {
762        let builder = self.get_dfir_mut(in_location.root());
763        builder.add_dfir(
764            parse_quote! {
765                #out_ident = #in_ident;
766            },
767            None,
768            None,
769        );
770    }
771
772    fn begin_atomic(
773        &mut self,
774        in_ident: syn::Ident,
775        in_location: &LocationId,
776        _in_kind: &CollectionKind,
777        out_ident: &syn::Ident,
778        _out_location: &LocationId,
779        _op_meta: &HydroIrOpMetadata,
780    ) {
781        let builder = self.get_dfir_mut(in_location.root());
782        builder.add_dfir(
783            parse_quote! {
784                #out_ident = #in_ident;
785            },
786            None,
787            None,
788        );
789    }
790
791    fn end_atomic(
792        &mut self,
793        in_ident: syn::Ident,
794        in_location: &LocationId,
795        _in_kind: &CollectionKind,
796        out_ident: &syn::Ident,
797    ) {
798        let builder = self.get_dfir_mut(in_location.root());
799        builder.add_dfir(
800            parse_quote! {
801                #out_ident = #in_ident;
802            },
803            None,
804            None,
805        );
806    }
807
808    fn observe_nondet(
809        &mut self,
810        _trusted: bool,
811        location: &LocationId,
812        in_ident: syn::Ident,
813        _in_kind: &CollectionKind,
814        out_ident: &syn::Ident,
815        _out_kind: &CollectionKind,
816        _op_meta: &HydroIrOpMetadata,
817    ) {
818        let builder = self.get_dfir_mut(location);
819        builder.add_dfir(
820            parse_quote! {
821                #out_ident = #in_ident;
822            },
823            None,
824            None,
825        );
826    }
827
828    fn merge_ordered(
829        &mut self,
830        location: &LocationId,
831        first_ident: syn::Ident,
832        second_ident: syn::Ident,
833        out_ident: &syn::Ident,
834        _in_kind: &CollectionKind,
835        _op_meta: &HydroIrOpMetadata,
836        operator_tag: Option<&str>,
837    ) {
838        let builder = self.get_dfir_mut(location);
839        builder.add_dfir(
840            parse_quote! {
841                #out_ident = union();
842                #first_ident -> [0]#out_ident;
843                #second_ident -> [1]#out_ident;
844            },
845            None,
846            operator_tag,
847        );
848    }
849
850    fn create_network(
851        &mut self,
852        from: &LocationId,
853        to: &LocationId,
854        input_ident: syn::Ident,
855        out_ident: &syn::Ident,
856        serialize: Option<&DebugExpr>,
857        sink: syn::Expr,
858        source: syn::Expr,
859        deserialize: Option<&DebugExpr>,
860        tag_id: StmtId,
861        _networking_info: &crate::networking::NetworkingInfo,
862    ) {
863        let sender_builder = self.get_dfir_mut(from);
864        if let Some(serialize_pipeline) = serialize {
865            sender_builder.add_dfir(
866                parse_quote! {
867                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
868                },
869                None,
870                // operator tag separates send and receive, which otherwise have the same next_stmt_id
871                Some(&format!("send{}", tag_id)),
872            );
873        } else {
874            sender_builder.add_dfir(
875                parse_quote! {
876                    #input_ident -> dest_sink(#sink);
877                },
878                None,
879                Some(&format!("send{}", tag_id)),
880            );
881        }
882
883        let receiver_builder = self.get_dfir_mut(to);
884        if let Some(deserialize_pipeline) = deserialize {
885            receiver_builder.add_dfir(
886                parse_quote! {
887                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
888                },
889                None,
890                Some(&format!("recv{}", tag_id)),
891            );
892        } else {
893            receiver_builder.add_dfir(
894                parse_quote! {
895                    #out_ident = source_stream(#source);
896                },
897                None,
898                Some(&format!("recv{}", tag_id)),
899            );
900        }
901    }
902
903    fn create_external_source(
904        &mut self,
905        on: &LocationId,
906        source_expr: syn::Expr,
907        out_ident: &syn::Ident,
908        deserialize: Option<&DebugExpr>,
909        tag_id: StmtId,
910    ) {
911        let receiver_builder = self.get_dfir_mut(on);
912        if let Some(deserialize_pipeline) = deserialize {
913            receiver_builder.add_dfir(
914                parse_quote! {
915                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
916                },
917                None,
918                Some(&format!("recv{}", tag_id)),
919            );
920        } else {
921            receiver_builder.add_dfir(
922                parse_quote! {
923                    #out_ident = source_stream(#source_expr);
924                },
925                None,
926                Some(&format!("recv{}", tag_id)),
927            );
928        }
929    }
930
931    fn create_external_output(
932        &mut self,
933        on: &LocationId,
934        sink_expr: syn::Expr,
935        input_ident: &syn::Ident,
936        serialize: Option<&DebugExpr>,
937        tag_id: StmtId,
938    ) {
939        let sender_builder = self.get_dfir_mut(on);
940        if let Some(serialize_fn) = serialize {
941            sender_builder.add_dfir(
942                parse_quote! {
943                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
944                },
945                None,
946                // operator tag separates send and receive, which otherwise have the same next_stmt_id
947                Some(&format!("send{}", tag_id)),
948            );
949        } else {
950            sender_builder.add_dfir(
951                parse_quote! {
952                    #input_ident -> dest_sink(#sink_expr);
953                },
954                None,
955                Some(&format!("send{}", tag_id)),
956            );
957        }
958    }
959
960    fn emit_fold_hook(
961        &mut self,
962        _location: &LocationId,
963        _in_ident: &syn::Ident,
964        _in_kind: &CollectionKind,
965        _op_meta: &HydroIrOpMetadata,
966    ) -> Option<syn::Ident> {
967        None
968    }
969
970    fn assert_is_consistent(
971        &mut self,
972        _trusted: bool,
973        location: &LocationId,
974        in_ident: syn::Ident,
975        out_ident: &syn::Ident,
976    ) {
977        let builder = self.get_dfir_mut(location);
978        builder.add_dfir(
979            parse_quote! {
980                #out_ident = #in_ident;
981            },
982            None,
983            None,
984        );
985    }
986}
987
988#[cfg(feature = "build")]
989pub enum BuildersOrCallback<'a, L, N>
990where
991    L: FnMut(&mut HydroRoot, &mut StmtId),
992    N: FnMut(&mut HydroNode, &mut StmtId),
993{
994    Builders(&'a mut dyn DfirBuilder),
995    Callback(L, N),
996}
997
998/// An root in a Hydro graph, which is an pipeline that doesn't emit
999/// any downstream values. Traversals over the dataflow graph and
1000/// generating DFIR IR start from roots.
1001#[derive(Debug, Hash, serde::Serialize)]
1002pub enum HydroRoot {
1003    ForEach {
1004        f: DebugExpr,
1005        input: Box<HydroNode>,
1006        op_metadata: HydroIrOpMetadata,
1007    },
1008    SendExternal {
1009        to_external_key: LocationKey,
1010        to_port_id: ExternalPortId,
1011        to_many: bool,
1012        unpaired: bool,
1013        serialize_fn: Option<DebugExpr>,
1014        instantiate_fn: DebugInstantiate,
1015        input: Box<HydroNode>,
1016        op_metadata: HydroIrOpMetadata,
1017    },
1018    DestSink {
1019        sink: DebugExpr,
1020        input: Box<HydroNode>,
1021        op_metadata: HydroIrOpMetadata,
1022    },
1023    CycleSink {
1024        cycle_id: CycleId,
1025        input: Box<HydroNode>,
1026        op_metadata: HydroIrOpMetadata,
1027    },
1028    EmbeddedOutput {
1029        #[serde(serialize_with = "serialize_ident")]
1030        ident: syn::Ident,
1031        input: Box<HydroNode>,
1032        op_metadata: HydroIrOpMetadata,
1033    },
1034    Null {
1035        input: Box<HydroNode>,
1036        op_metadata: HydroIrOpMetadata,
1037    },
1038}
1039
1040impl HydroRoot {
1041    #[cfg(feature = "build")]
1042    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
1043    pub fn compile_network<'a, D>(
1044        &mut self,
1045        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
1046        seen_tees: &mut SeenSharedNodes,
1047        seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
1048        processes: &SparseSecondaryMap<LocationKey, D::Process>,
1049        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
1050        externals: &SparseSecondaryMap<LocationKey, D::External>,
1051        env: &mut D::InstantiateEnv,
1052    ) where
1053        D: Deploy<'a>,
1054    {
1055        let refcell_extra_stmts = RefCell::new(extra_stmts);
1056        let refcell_env = RefCell::new(env);
1057        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
1058        self.transform_bottom_up(
1059            &mut |l| {
1060                if let HydroRoot::SendExternal {
1061                    input,
1062                    to_external_key,
1063                    to_port_id,
1064                    to_many,
1065                    unpaired,
1066                    instantiate_fn,
1067                    ..
1068                } = l
1069                {
1070                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1071                        DebugInstantiate::Building => {
1072                            let to_node = externals
1073                                .get(*to_external_key)
1074                                .unwrap_or_else(|| {
1075                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
1076                                })
1077                                .clone();
1078
1079                            match input.metadata().location_id.root() {
1080                                &LocationId::Process(process_key) => {
1081                                    if *to_many {
1082                                        (
1083                                            (
1084                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1085                                                parse_quote!(DUMMY),
1086                                            ),
1087                                            Box::new(|| {}) as Box<dyn FnOnce()>,
1088                                        )
1089                                    } else {
1090                                        let from_node = processes
1091                                            .get(process_key)
1092                                            .unwrap_or_else(|| {
1093                                                panic!("A process used in the graph was not instantiated: {}", process_key)
1094                                            })
1095                                            .clone();
1096
1097                                        let sink_port = from_node.next_port();
1098                                        let source_port = to_node.next_port();
1099
1100                                        if *unpaired {
1101                                            use stageleft::quote_type;
1102                                            use tokio_util::codec::LengthDelimitedCodec;
1103
1104                                            to_node.register(*to_port_id, source_port.clone());
1105
1106                                            let _ = D::e2o_source(
1107                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1108                                                &to_node, &source_port,
1109                                                &from_node, &sink_port,
1110                                                &quote_type::<LengthDelimitedCodec>(),
1111                                                format!("{}_{}", *to_external_key, *to_port_id)
1112                                            );
1113                                        }
1114
1115                                        (
1116                                            (
1117                                                D::o2e_sink(
1118                                                    &from_node,
1119                                                    &sink_port,
1120                                                    &to_node,
1121                                                    &source_port,
1122                                                    format!("{}_{}", *to_external_key, *to_port_id)
1123                                                ),
1124                                                parse_quote!(DUMMY),
1125                                            ),
1126                                            if *unpaired {
1127                                                D::e2o_connect(
1128                                                    &to_node,
1129                                                    &source_port,
1130                                                    &from_node,
1131                                                    &sink_port,
1132                                                    *to_many,
1133                                                    NetworkHint::Auto,
1134                                                )
1135                                            } else {
1136                                                Box::new(|| {}) as Box<dyn FnOnce()>
1137                                            },
1138                                        )
1139                                    }
1140                                }
1141                                LocationId::Cluster(cluster_key) => {
1142                                    let from_node = clusters
1143                                        .get(*cluster_key)
1144                                        .unwrap_or_else(|| {
1145                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1146                                        })
1147                                        .clone();
1148
1149                                    let sink_port = from_node.next_port();
1150                                    let source_port = to_node.next_port();
1151
1152                                    if *unpaired {
1153                                        to_node.register(*to_port_id, source_port.clone());
1154                                    }
1155
1156                                    (
1157                                        (
1158                                            D::m2e_sink(
1159                                                &from_node,
1160                                                &sink_port,
1161                                                &to_node,
1162                                                &source_port,
1163                                                format!("{}_{}", *to_external_key, *to_port_id)
1164                                            ),
1165                                            parse_quote!(DUMMY),
1166                                        ),
1167                                        Box::new(|| {}) as Box<dyn FnOnce()>,
1168                                    )
1169                                }
1170                                _ => panic!()
1171                            }
1172                        },
1173
1174                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1175                    };
1176
1177                    *instantiate_fn = DebugInstantiateFinalized {
1178                        sink: sink_expr,
1179                        source: source_expr,
1180                        connect_fn: Some(connect_fn),
1181                    }
1182                    .into();
1183                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1184                    let element_type = match &input.metadata().collection_kind {
1185                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1186                        _ => panic!("Embedded output must have Stream collection kind"),
1187                    };
1188                    let location_key = match input.metadata().location_id.root() {
1189                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1190                        _ => panic!("Embedded output must be on a process or cluster"),
1191                    };
1192                    D::register_embedded_output(
1193                        &mut refcell_env.borrow_mut(),
1194                        location_key,
1195                        ident,
1196                        &element_type,
1197                    );
1198                }
1199            },
1200            &mut |n| {
1201                if let HydroNode::Network {
1202                    name,
1203                    networking_info,
1204                    input,
1205                    instantiate_fn,
1206                    metadata,
1207                    ..
1208                } = n
1209                {
1210                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1211                        DebugInstantiate::Building => instantiate_network::<D>(
1212                            &mut refcell_env.borrow_mut(),
1213                            input.metadata().location_id.root(),
1214                            metadata.location_id.root(),
1215                            processes,
1216                            clusters,
1217                            name.as_deref(),
1218                            networking_info,
1219                        ),
1220
1221                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1222                    };
1223
1224                    *instantiate_fn = DebugInstantiateFinalized {
1225                        sink: sink_expr,
1226                        source: source_expr,
1227                        connect_fn: Some(connect_fn),
1228                    }
1229                    .into();
1230                } else if let HydroNode::ExternalInput {
1231                    from_external_key,
1232                    from_port_id,
1233                    from_many,
1234                    codec_type,
1235                    port_hint,
1236                    instantiate_fn,
1237                    metadata,
1238                    ..
1239                } = n
1240                {
1241                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1242                        DebugInstantiate::Building => {
1243                            let from_node = externals
1244                                .get(*from_external_key)
1245                                .unwrap_or_else(|| {
1246                                    panic!(
1247                                        "A external used in the graph was not instantiated: {}",
1248                                        from_external_key,
1249                                    )
1250                                })
1251                                .clone();
1252
1253                            match metadata.location_id.root() {
1254                                &LocationId::Process(process_key) => {
1255                                    let to_node = processes
1256                                        .get(process_key)
1257                                        .unwrap_or_else(|| {
1258                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1259                                        })
1260                                        .clone();
1261
1262                                    let sink_port = from_node.next_port();
1263                                    let source_port = to_node.next_port();
1264
1265                                    from_node.register(*from_port_id, sink_port.clone());
1266
1267                                    (
1268                                        (
1269                                            parse_quote!(DUMMY),
1270                                            if *from_many {
1271                                                D::e2o_many_source(
1272                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1273                                                    &to_node, &source_port,
1274                                                    codec_type.0.as_ref(),
1275                                                    format!("{}_{}", *from_external_key, *from_port_id)
1276                                                )
1277                                            } else {
1278                                                D::e2o_source(
1279                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1280                                                    &from_node, &sink_port,
1281                                                    &to_node, &source_port,
1282                                                    codec_type.0.as_ref(),
1283                                                    format!("{}_{}", *from_external_key, *from_port_id)
1284                                                )
1285                                            },
1286                                        ),
1287                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1288                                    )
1289                                }
1290                                LocationId::Cluster(cluster_key) => {
1291                                    let to_node = clusters
1292                                        .get(*cluster_key)
1293                                        .unwrap_or_else(|| {
1294                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1295                                        })
1296                                        .clone();
1297
1298                                    let sink_port = from_node.next_port();
1299                                    let source_port = to_node.next_port();
1300
1301                                    from_node.register(*from_port_id, sink_port.clone());
1302
1303                                    (
1304                                        (
1305                                            parse_quote!(DUMMY),
1306                                            D::e2m_source(
1307                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1308                                                &from_node, &sink_port,
1309                                                &to_node, &source_port,
1310                                                codec_type.0.as_ref(),
1311                                                format!("{}_{}", *from_external_key, *from_port_id)
1312                                            ),
1313                                        ),
1314                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1315                                    )
1316                                }
1317                                _ => panic!()
1318                            }
1319                        },
1320
1321                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1322                    };
1323
1324                    *instantiate_fn = DebugInstantiateFinalized {
1325                        sink: sink_expr,
1326                        source: source_expr,
1327                        connect_fn: Some(connect_fn),
1328                    }
1329                    .into();
1330                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1331                    let element_type = match &metadata.collection_kind {
1332                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1333                        _ => panic!("Embedded source must have Stream collection kind"),
1334                    };
1335                    let location_key = match metadata.location_id.root() {
1336                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1337                        _ => panic!("Embedded source must be on a process or cluster"),
1338                    };
1339                    D::register_embedded_stream_input(
1340                        &mut refcell_env.borrow_mut(),
1341                        location_key,
1342                        ident,
1343                        &element_type,
1344                    );
1345                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1346                    let element_type = match &metadata.collection_kind {
1347                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1348                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1349                    };
1350                    let location_key = match metadata.location_id.root() {
1351                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1352                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1353                    };
1354                    D::register_embedded_singleton_input(
1355                        &mut refcell_env.borrow_mut(),
1356                        location_key,
1357                        ident,
1358                        &element_type,
1359                    );
1360                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1361                    match state {
1362                        ClusterMembersState::Uninit => {
1363                            let at_location = metadata.location_id.root().clone();
1364                            let key = (at_location.clone(), location_id.key());
1365                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1366                                // First occurrence: call cluster_membership_stream and mark as Stream.
1367                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1368                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1369                                    &(),
1370                                );
1371                                *state = ClusterMembersState::Stream(expr.into());
1372                            } else {
1373                                // Already instantiated for this (at, target) pair: just tee.
1374                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1375                            }
1376                        }
1377                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1378                            panic!("cluster members already finalized");
1379                        }
1380                    }
1381                }
1382            },
1383            seen_tees,
1384            false,
1385        );
1386    }
1387
1388    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1389        self.transform_bottom_up(
1390            &mut |l| {
1391                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1392                    match instantiate_fn {
1393                        DebugInstantiate::Building => panic!("network not built"),
1394
1395                        DebugInstantiate::Finalized(finalized) => {
1396                            (finalized.connect_fn.take().unwrap())();
1397                        }
1398                    }
1399                }
1400            },
1401            &mut |n| {
1402                if let HydroNode::Network { instantiate_fn, .. }
1403                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1404                {
1405                    match instantiate_fn {
1406                        DebugInstantiate::Building => panic!("network not built"),
1407
1408                        DebugInstantiate::Finalized(finalized) => {
1409                            (finalized.connect_fn.take().unwrap())();
1410                        }
1411                    }
1412                }
1413            },
1414            seen_tees,
1415            false,
1416        );
1417    }
1418
1419    pub fn transform_bottom_up(
1420        &mut self,
1421        transform_root: &mut impl FnMut(&mut HydroRoot),
1422        transform_node: &mut impl FnMut(&mut HydroNode),
1423        seen_tees: &mut SeenSharedNodes,
1424        check_well_formed: bool,
1425    ) {
1426        self.transform_children(
1427            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1428            seen_tees,
1429        );
1430
1431        transform_root(self);
1432    }
1433
1434    pub fn transform_children(
1435        &mut self,
1436        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1437        seen_tees: &mut SeenSharedNodes,
1438    ) {
1439        match self {
1440            HydroRoot::ForEach { input, .. }
1441            | HydroRoot::SendExternal { input, .. }
1442            | HydroRoot::DestSink { input, .. }
1443            | HydroRoot::CycleSink { input, .. }
1444            | HydroRoot::EmbeddedOutput { input, .. }
1445            | HydroRoot::Null { input, .. } => {
1446                transform(input, seen_tees);
1447            }
1448        }
1449    }
1450
1451    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1452        match self {
1453            HydroRoot::ForEach {
1454                f,
1455                input,
1456                op_metadata,
1457            } => HydroRoot::ForEach {
1458                f: f.clone(),
1459                input: Box::new(input.deep_clone(seen_tees)),
1460                op_metadata: op_metadata.clone(),
1461            },
1462            HydroRoot::SendExternal {
1463                to_external_key,
1464                to_port_id,
1465                to_many,
1466                unpaired,
1467                serialize_fn,
1468                instantiate_fn,
1469                input,
1470                op_metadata,
1471            } => HydroRoot::SendExternal {
1472                to_external_key: *to_external_key,
1473                to_port_id: *to_port_id,
1474                to_many: *to_many,
1475                unpaired: *unpaired,
1476                serialize_fn: serialize_fn.clone(),
1477                instantiate_fn: instantiate_fn.clone(),
1478                input: Box::new(input.deep_clone(seen_tees)),
1479                op_metadata: op_metadata.clone(),
1480            },
1481            HydroRoot::DestSink {
1482                sink,
1483                input,
1484                op_metadata,
1485            } => HydroRoot::DestSink {
1486                sink: sink.clone(),
1487                input: Box::new(input.deep_clone(seen_tees)),
1488                op_metadata: op_metadata.clone(),
1489            },
1490            HydroRoot::CycleSink {
1491                cycle_id,
1492                input,
1493                op_metadata,
1494            } => HydroRoot::CycleSink {
1495                cycle_id: *cycle_id,
1496                input: Box::new(input.deep_clone(seen_tees)),
1497                op_metadata: op_metadata.clone(),
1498            },
1499            HydroRoot::EmbeddedOutput {
1500                ident,
1501                input,
1502                op_metadata,
1503            } => HydroRoot::EmbeddedOutput {
1504                ident: ident.clone(),
1505                input: Box::new(input.deep_clone(seen_tees)),
1506                op_metadata: op_metadata.clone(),
1507            },
1508            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1509                input: Box::new(input.deep_clone(seen_tees)),
1510                op_metadata: op_metadata.clone(),
1511            },
1512        }
1513    }
1514
1515    #[cfg(feature = "build")]
1516    pub fn emit(
1517        &mut self,
1518        graph_builders: &mut dyn DfirBuilder,
1519        seen_tees: &mut SeenSharedNodes,
1520        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1521        next_stmt_id: &mut StmtId,
1522        fold_hooked_idents: &mut HashSet<String>,
1523    ) {
1524        self.emit_core(
1525            &mut BuildersOrCallback::<
1526                fn(&mut HydroRoot, &mut StmtId),
1527                fn(&mut HydroNode, &mut StmtId),
1528            >::Builders(graph_builders),
1529            seen_tees,
1530            built_tees,
1531            next_stmt_id,
1532            fold_hooked_idents,
1533        );
1534    }
1535
1536    #[cfg(feature = "build")]
1537    pub fn emit_core(
1538        &mut self,
1539        builders_or_callback: &mut BuildersOrCallback<
1540            impl FnMut(&mut HydroRoot, &mut StmtId),
1541            impl FnMut(&mut HydroNode, &mut StmtId),
1542        >,
1543        seen_tees: &mut SeenSharedNodes,
1544        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1545        next_stmt_id: &mut StmtId,
1546        fold_hooked_idents: &mut HashSet<String>,
1547    ) {
1548        match self {
1549            HydroRoot::ForEach { f, input, .. } => {
1550                let input_ident = input.emit_core(
1551                    builders_or_callback,
1552                    seen_tees,
1553                    built_tees,
1554                    next_stmt_id,
1555                    fold_hooked_idents,
1556                );
1557
1558                match builders_or_callback {
1559                    BuildersOrCallback::Builders(graph_builders) => {
1560                        graph_builders
1561                            .get_dfir_mut(&input.metadata().location_id)
1562                            .add_dfir(
1563                                parse_quote! {
1564                                    #input_ident -> for_each(#f);
1565                                },
1566                                None,
1567                                Some(&next_stmt_id.to_string()),
1568                            );
1569                    }
1570                    BuildersOrCallback::Callback(leaf_callback, _) => {
1571                        leaf_callback(self, next_stmt_id);
1572                    }
1573                }
1574
1575                let _ = next_stmt_id.get_and_increment();
1576            }
1577
1578            HydroRoot::SendExternal {
1579                serialize_fn,
1580                instantiate_fn,
1581                input,
1582                ..
1583            } => {
1584                let input_ident = input.emit_core(
1585                    builders_or_callback,
1586                    seen_tees,
1587                    built_tees,
1588                    next_stmt_id,
1589                    fold_hooked_idents,
1590                );
1591
1592                match builders_or_callback {
1593                    BuildersOrCallback::Builders(graph_builders) => {
1594                        let (sink_expr, _) = match instantiate_fn {
1595                            DebugInstantiate::Building => (
1596                                syn::parse_quote!(DUMMY_SINK),
1597                                syn::parse_quote!(DUMMY_SOURCE),
1598                            ),
1599
1600                            DebugInstantiate::Finalized(finalized) => {
1601                                (finalized.sink.clone(), finalized.source.clone())
1602                            }
1603                        };
1604
1605                        graph_builders.create_external_output(
1606                            &input.metadata().location_id,
1607                            sink_expr,
1608                            &input_ident,
1609                            serialize_fn.as_ref(),
1610                            *next_stmt_id,
1611                        );
1612                    }
1613                    BuildersOrCallback::Callback(leaf_callback, _) => {
1614                        leaf_callback(self, next_stmt_id);
1615                    }
1616                }
1617
1618                let _ = next_stmt_id.get_and_increment();
1619            }
1620
1621            HydroRoot::DestSink { sink, input, .. } => {
1622                let input_ident = input.emit_core(
1623                    builders_or_callback,
1624                    seen_tees,
1625                    built_tees,
1626                    next_stmt_id,
1627                    fold_hooked_idents,
1628                );
1629
1630                match builders_or_callback {
1631                    BuildersOrCallback::Builders(graph_builders) => {
1632                        graph_builders
1633                            .get_dfir_mut(&input.metadata().location_id)
1634                            .add_dfir(
1635                                parse_quote! {
1636                                    #input_ident -> dest_sink(#sink);
1637                                },
1638                                None,
1639                                Some(&next_stmt_id.to_string()),
1640                            );
1641                    }
1642                    BuildersOrCallback::Callback(leaf_callback, _) => {
1643                        leaf_callback(self, next_stmt_id);
1644                    }
1645                }
1646
1647                let _ = next_stmt_id.get_and_increment();
1648            }
1649
1650            HydroRoot::CycleSink {
1651                cycle_id, input, ..
1652            } => {
1653                let input_ident = input.emit_core(
1654                    builders_or_callback,
1655                    seen_tees,
1656                    built_tees,
1657                    next_stmt_id,
1658                    fold_hooked_idents,
1659                );
1660
1661                match builders_or_callback {
1662                    BuildersOrCallback::Builders(graph_builders) => {
1663                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1664                            CollectionKind::KeyedSingleton {
1665                                key_type,
1666                                value_type,
1667                                ..
1668                            }
1669                            | CollectionKind::KeyedStream {
1670                                key_type,
1671                                value_type,
1672                                ..
1673                            } => {
1674                                parse_quote!((#key_type, #value_type))
1675                            }
1676                            CollectionKind::Stream { element_type, .. }
1677                            | CollectionKind::Singleton { element_type, .. }
1678                            | CollectionKind::Optional { element_type, .. } => {
1679                                parse_quote!(#element_type)
1680                            }
1681                        };
1682
1683                        let cycle_id_ident = cycle_id.as_ident();
1684                        graph_builders
1685                            .get_dfir_mut(&input.metadata().location_id)
1686                            .add_dfir(
1687                                parse_quote! {
1688                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1689                                },
1690                                None,
1691                                None,
1692                            );
1693                    }
1694                    // No ID, no callback
1695                    BuildersOrCallback::Callback(_, _) => {}
1696                }
1697            }
1698
1699            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1700                let input_ident = input.emit_core(
1701                    builders_or_callback,
1702                    seen_tees,
1703                    built_tees,
1704                    next_stmt_id,
1705                    fold_hooked_idents,
1706                );
1707
1708                match builders_or_callback {
1709                    BuildersOrCallback::Builders(graph_builders) => {
1710                        graph_builders
1711                            .get_dfir_mut(&input.metadata().location_id)
1712                            .add_dfir(
1713                                parse_quote! {
1714                                    #input_ident -> for_each(&mut #ident);
1715                                },
1716                                None,
1717                                Some(&next_stmt_id.to_string()),
1718                            );
1719                    }
1720                    BuildersOrCallback::Callback(leaf_callback, _) => {
1721                        leaf_callback(self, next_stmt_id);
1722                    }
1723                }
1724
1725                let _ = next_stmt_id.get_and_increment();
1726            }
1727
1728            HydroRoot::Null { input, .. } => {
1729                let input_ident = input.emit_core(
1730                    builders_or_callback,
1731                    seen_tees,
1732                    built_tees,
1733                    next_stmt_id,
1734                    fold_hooked_idents,
1735                );
1736
1737                match builders_or_callback {
1738                    BuildersOrCallback::Builders(graph_builders) => {
1739                        graph_builders
1740                            .get_dfir_mut(&input.metadata().location_id)
1741                            .add_dfir(
1742                                parse_quote! {
1743                                    #input_ident -> for_each(|_| {});
1744                                },
1745                                None,
1746                                Some(&next_stmt_id.to_string()),
1747                            );
1748                    }
1749                    BuildersOrCallback::Callback(leaf_callback, _) => {
1750                        leaf_callback(self, next_stmt_id);
1751                    }
1752                }
1753
1754                let _ = next_stmt_id.get_and_increment();
1755            }
1756        }
1757    }
1758
1759    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1760        match self {
1761            HydroRoot::ForEach { op_metadata, .. }
1762            | HydroRoot::SendExternal { op_metadata, .. }
1763            | HydroRoot::DestSink { op_metadata, .. }
1764            | HydroRoot::CycleSink { op_metadata, .. }
1765            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1766            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1767        }
1768    }
1769
1770    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1771        match self {
1772            HydroRoot::ForEach { op_metadata, .. }
1773            | HydroRoot::SendExternal { op_metadata, .. }
1774            | HydroRoot::DestSink { op_metadata, .. }
1775            | HydroRoot::CycleSink { op_metadata, .. }
1776            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1777            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1778        }
1779    }
1780
1781    pub fn input(&self) -> &HydroNode {
1782        match self {
1783            HydroRoot::ForEach { input, .. }
1784            | HydroRoot::SendExternal { input, .. }
1785            | HydroRoot::DestSink { input, .. }
1786            | HydroRoot::CycleSink { input, .. }
1787            | HydroRoot::EmbeddedOutput { input, .. }
1788            | HydroRoot::Null { input, .. } => input,
1789        }
1790    }
1791
1792    pub fn input_metadata(&self) -> &HydroIrMetadata {
1793        self.input().metadata()
1794    }
1795
1796    pub fn print_root(&self) -> String {
1797        match self {
1798            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1799            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1800            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1801            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1802            HydroRoot::EmbeddedOutput { ident, .. } => {
1803                format!("EmbeddedOutput({})", ident)
1804            }
1805            HydroRoot::Null { .. } => "Null".to_owned(),
1806        }
1807    }
1808
1809    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1810        match self {
1811            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1812                transform(f);
1813            }
1814            HydroRoot::SendExternal { .. }
1815            | HydroRoot::CycleSink { .. }
1816            | HydroRoot::EmbeddedOutput { .. }
1817            | HydroRoot::Null { .. } => {}
1818        }
1819    }
1820}
1821
1822#[cfg(feature = "build")]
1823fn tick_of(loc: &LocationId) -> Option<ClockId> {
1824    match loc {
1825        LocationId::Tick(id, _) => Some(*id),
1826        LocationId::Atomic(inner) => tick_of(inner),
1827        _ => None,
1828    }
1829}
1830
1831#[cfg(feature = "build")]
1832fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1833    match loc {
1834        LocationId::Tick(id, inner) => {
1835            *id = uf_find(uf, *id);
1836            remap_location(inner, uf);
1837        }
1838        LocationId::Atomic(inner) => {
1839            remap_location(inner, uf);
1840        }
1841        LocationId::Process(_) | LocationId::Cluster(_) => {}
1842    }
1843}
1844
1845#[cfg(feature = "build")]
1846fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1847    let p = *parent.get(&x).unwrap_or(&x);
1848    if p == x {
1849        return x;
1850    }
1851    let root = uf_find(parent, p);
1852    parent.insert(x, root);
1853    root
1854}
1855
1856#[cfg(feature = "build")]
1857fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1858    let ra = uf_find(parent, a);
1859    let rb = uf_find(parent, b);
1860    if ra != rb {
1861        parent.insert(ra, rb);
1862    }
1863}
1864
1865/// Traverse the IR to build a union-find that unifies tick IDs connected
1866/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1867/// rewrite all `LocationId`s to use the representative tick ID.
1868#[cfg(feature = "build")]
1869pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1870    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1871
1872    // Pass 1: collect unifications.
1873    transform_bottom_up(
1874        ir,
1875        &mut |_| {},
1876        &mut |node: &mut HydroNode| match node {
1877            HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1878                if let (Some(a), Some(b)) = (
1879                    tick_of(&inner.metadata().location_id),
1880                    tick_of(&metadata.location_id),
1881                ) {
1882                    uf_union(&mut uf, a, b);
1883                }
1884            }
1885            HydroNode::Chain {
1886                first,
1887                second,
1888                metadata,
1889            }
1890            | HydroNode::ChainFirst {
1891                first,
1892                second,
1893                metadata,
1894            }
1895            | HydroNode::MergeOrdered {
1896                first,
1897                second,
1898                metadata,
1899            } => {
1900                if let (Some(a), Some(b)) = (
1901                    tick_of(&first.metadata().location_id),
1902                    tick_of(&metadata.location_id),
1903                ) {
1904                    uf_union(&mut uf, a, b);
1905                }
1906                if let (Some(a), Some(b)) = (
1907                    tick_of(&second.metadata().location_id),
1908                    tick_of(&metadata.location_id),
1909                ) {
1910                    uf_union(&mut uf, a, b);
1911                }
1912            }
1913            _ => {}
1914        },
1915        false,
1916    );
1917
1918    // Pass 2: rewrite all LocationIds.
1919    transform_bottom_up(
1920        ir,
1921        &mut |_| {},
1922        &mut |node: &mut HydroNode| {
1923            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1924        },
1925        false,
1926    );
1927}
1928
1929#[cfg(feature = "build")]
1930pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1931    let mut builders = SecondaryMap::new();
1932    let mut seen_tees = HashMap::new();
1933    let mut built_tees = HashMap::new();
1934    let mut next_stmt_id = StmtId::default();
1935    let mut fold_hooked_idents = HashSet::new();
1936    for leaf in ir {
1937        leaf.emit(
1938            &mut builders,
1939            &mut seen_tees,
1940            &mut built_tees,
1941            &mut next_stmt_id,
1942            &mut fold_hooked_idents,
1943        );
1944    }
1945    builders
1946}
1947
1948#[cfg(feature = "build")]
1949pub fn traverse_dfir(
1950    ir: &mut [HydroRoot],
1951    transform_root: impl FnMut(&mut HydroRoot, &mut StmtId),
1952    transform_node: impl FnMut(&mut HydroNode, &mut StmtId),
1953) {
1954    let mut seen_tees = HashMap::new();
1955    let mut built_tees = HashMap::new();
1956    let mut next_stmt_id = StmtId::default();
1957    let mut fold_hooked_idents = HashSet::new();
1958    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1959    ir.iter_mut().for_each(|leaf| {
1960        leaf.emit_core(
1961            &mut callback,
1962            &mut seen_tees,
1963            &mut built_tees,
1964            &mut next_stmt_id,
1965            &mut fold_hooked_idents,
1966        );
1967    });
1968}
1969
1970pub fn transform_bottom_up(
1971    ir: &mut [HydroRoot],
1972    transform_root: &mut impl FnMut(&mut HydroRoot),
1973    transform_node: &mut impl FnMut(&mut HydroNode),
1974    check_well_formed: bool,
1975) {
1976    let mut seen_tees = HashMap::new();
1977    ir.iter_mut().for_each(|leaf| {
1978        leaf.transform_bottom_up(
1979            transform_root,
1980            transform_node,
1981            &mut seen_tees,
1982            check_well_formed,
1983        );
1984    });
1985}
1986
1987pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1988    let mut seen_tees = HashMap::new();
1989    ir.iter()
1990        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1991        .collect()
1992}
1993
1994type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1995thread_local! {
1996    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1997    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1998    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1999    /// on subsequent encounters, preventing infinite loops.
2000    static SERIALIZED_SHARED: PrintedTees
2001        = const { RefCell::new(None) };
2002}
2003
2004pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
2005    PRINTED_TEES.with(|printed_tees| {
2006        let mut printed_tees_mut = printed_tees.borrow_mut();
2007        *printed_tees_mut = Some((0, HashMap::new()));
2008        drop(printed_tees_mut);
2009
2010        let ret = f();
2011
2012        let mut printed_tees_mut = printed_tees.borrow_mut();
2013        *printed_tees_mut = None;
2014
2015        ret
2016    })
2017}
2018
2019/// Runs `f` with a fresh shared-node deduplication scope for serialization.
2020/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
2021/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
2022/// back-reference.  The tracking state is restored when `f` returns or panics.
2023pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
2024    let _guard = SerializedSharedGuard::enter();
2025    f()
2026}
2027
2028/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
2029/// making `serialize_dedup_shared` re-entrant and panic-safe.
2030struct SerializedSharedGuard {
2031    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
2032}
2033
2034impl SerializedSharedGuard {
2035    fn enter() -> Self {
2036        let previous = SERIALIZED_SHARED.with(|cell| {
2037            let mut guard = cell.borrow_mut();
2038            guard.replace((0, HashMap::new()))
2039        });
2040        Self { previous }
2041    }
2042}
2043
2044impl Drop for SerializedSharedGuard {
2045    fn drop(&mut self) {
2046        SERIALIZED_SHARED.with(|cell| {
2047            *cell.borrow_mut() = self.previous.take();
2048        });
2049    }
2050}
2051
2052pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2053
2054impl serde::Serialize for SharedNode {
2055    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
2056    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
2057    /// same subtree every time and, if the graph ever contains a cycle, loop
2058    /// forever.
2059    ///
2060    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
2061    /// integer id.  The first time we see a pointer we assign it the next id and
2062    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
2063    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
2064    /// recursion.  Requires an active `serialize_dedup_shared` scope.
2065    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2066        SERIALIZED_SHARED.with(|cell| {
2067            let mut guard = cell.borrow_mut();
2068            // (next_id, pointer → assigned_id)
2069            let state = guard.as_mut().ok_or_else(|| {
2070                serde::ser::Error::custom(
2071                    "SharedNode serialization requires an active serialize_dedup_shared scope",
2072                )
2073            })?;
2074            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2075
2076            if let Some(&id) = state.1.get(&ptr) {
2077                drop(guard);
2078                use serde::ser::SerializeMap;
2079                let mut map = serializer.serialize_map(Some(1))?;
2080                map.serialize_entry("$shared_ref", &id)?;
2081                map.end()
2082            } else {
2083                let id = state.0;
2084                state.0 += 1;
2085                state.1.insert(ptr, id);
2086                drop(guard);
2087
2088                use serde::ser::SerializeMap;
2089                let mut map = serializer.serialize_map(Some(2))?;
2090                map.serialize_entry("$shared", &id)?;
2091                map.serialize_entry("node", &*self.0.borrow())?;
2092                map.end()
2093            }
2094        })
2095    }
2096}
2097
2098impl SharedNode {
2099    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2100        Rc::as_ptr(&self.0)
2101    }
2102}
2103
2104impl Debug for SharedNode {
2105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2106        PRINTED_TEES.with(|printed_tees| {
2107            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2108            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2109
2110            if let Some(printed_tees_mut) = printed_tees_mut {
2111                if let Some(existing) = printed_tees_mut
2112                    .1
2113                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2114                {
2115                    write!(f, "<shared {}>", existing)
2116                } else {
2117                    let next_id = printed_tees_mut.0;
2118                    printed_tees_mut.0 += 1;
2119                    printed_tees_mut
2120                        .1
2121                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2122                    drop(printed_tees_mut_borrow);
2123                    write!(f, "<shared {}>: ", next_id)?;
2124                    Debug::fmt(&self.0.borrow(), f)
2125                }
2126            } else {
2127                drop(printed_tees_mut_borrow);
2128                write!(f, "<shared>: ")?;
2129                Debug::fmt(&self.0.borrow(), f)
2130            }
2131        })
2132    }
2133}
2134
2135impl Hash for SharedNode {
2136    fn hash<H: Hasher>(&self, state: &mut H) {
2137        self.0.borrow_mut().hash(state);
2138    }
2139}
2140
2141#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2142pub enum BoundKind {
2143    Unbounded,
2144    Bounded,
2145}
2146
2147#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2148pub enum StreamOrder {
2149    NoOrder,
2150    TotalOrder,
2151}
2152
2153#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2154pub enum StreamRetry {
2155    AtLeastOnce,
2156    ExactlyOnce,
2157}
2158
2159#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2160pub enum KeyedSingletonBoundKind {
2161    Unbounded,
2162    MonotonicValue,
2163    BoundedValue,
2164    Bounded,
2165}
2166
2167#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2168pub enum SingletonBoundKind {
2169    Unbounded,
2170    Monotonic,
2171    Bounded,
2172}
2173
2174#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2175pub enum CollectionKind {
2176    Stream {
2177        bound: BoundKind,
2178        order: StreamOrder,
2179        retry: StreamRetry,
2180        element_type: DebugType,
2181    },
2182    Singleton {
2183        bound: SingletonBoundKind,
2184        element_type: DebugType,
2185    },
2186    Optional {
2187        bound: BoundKind,
2188        element_type: DebugType,
2189    },
2190    KeyedStream {
2191        bound: BoundKind,
2192        value_order: StreamOrder,
2193        value_retry: StreamRetry,
2194        key_type: DebugType,
2195        value_type: DebugType,
2196    },
2197    KeyedSingleton {
2198        bound: KeyedSingletonBoundKind,
2199        key_type: DebugType,
2200        value_type: DebugType,
2201    },
2202}
2203
2204impl CollectionKind {
2205    pub fn is_bounded(&self) -> bool {
2206        matches!(
2207            self,
2208            CollectionKind::Stream {
2209                bound: BoundKind::Bounded,
2210                ..
2211            } | CollectionKind::Singleton {
2212                bound: SingletonBoundKind::Bounded,
2213                ..
2214            } | CollectionKind::Optional {
2215                bound: BoundKind::Bounded,
2216                ..
2217            } | CollectionKind::KeyedStream {
2218                bound: BoundKind::Bounded,
2219                ..
2220            } | CollectionKind::KeyedSingleton {
2221                bound: KeyedSingletonBoundKind::Bounded,
2222                ..
2223            }
2224        )
2225    }
2226}
2227
2228#[derive(Clone, serde::Serialize)]
2229pub struct HydroIrMetadata {
2230    pub location_id: LocationId,
2231    pub collection_kind: CollectionKind,
2232    pub consistency: Option<ClusterConsistency>,
2233    pub cardinality: Option<usize>,
2234    pub tag: Option<String>,
2235    pub op: HydroIrOpMetadata,
2236}
2237
2238// HydroIrMetadata shouldn't be used to hash or compare
2239impl Hash for HydroIrMetadata {
2240    fn hash<H: Hasher>(&self, _: &mut H) {}
2241}
2242
2243impl PartialEq for HydroIrMetadata {
2244    fn eq(&self, _: &Self) -> bool {
2245        true
2246    }
2247}
2248
2249impl Eq for HydroIrMetadata {}
2250
2251impl Debug for HydroIrMetadata {
2252    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2253        f.debug_struct("HydroIrMetadata")
2254            .field("location_id", &self.location_id)
2255            .field("collection_kind", &self.collection_kind)
2256            .finish()
2257    }
2258}
2259
2260/// Metadata that is specific to the operator itself, rather than its outputs.
2261/// This is available on _both_ inner nodes and roots.
2262#[derive(Clone, serde::Serialize)]
2263pub struct HydroIrOpMetadata {
2264    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2265    pub backtrace: Backtrace,
2266    pub cpu_usage: Option<f64>,
2267    pub network_recv_cpu_usage: Option<f64>,
2268    pub id: Option<usize>,
2269}
2270
2271impl HydroIrOpMetadata {
2272    #[expect(
2273        clippy::new_without_default,
2274        reason = "explicit calls to new ensure correct backtrace bounds"
2275    )]
2276    pub fn new() -> HydroIrOpMetadata {
2277        Self::new_with_skip(1)
2278    }
2279
2280    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2281        HydroIrOpMetadata {
2282            backtrace: Backtrace::get_backtrace(2 + skip_count),
2283            cpu_usage: None,
2284            network_recv_cpu_usage: None,
2285            id: None,
2286        }
2287    }
2288}
2289
2290impl Debug for HydroIrOpMetadata {
2291    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2292        f.debug_struct("HydroIrOpMetadata").finish()
2293    }
2294}
2295
2296impl Hash for HydroIrOpMetadata {
2297    fn hash<H: Hasher>(&self, _: &mut H) {}
2298}
2299
2300/// An intermediate node in a Hydro graph, which consumes data
2301/// from upstream nodes and emits data to downstream nodes.
2302#[derive(Debug, Hash, serde::Serialize)]
2303pub enum HydroNode {
2304    Placeholder,
2305
2306    /// Manually "casts" between two different collection kinds.
2307    ///
2308    /// Using this IR node requires special care, since it bypasses many of Hydro's core
2309    /// correctness checks. In particular, the user must ensure that every possible
2310    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
2311    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
2312    /// collection. This ensures that the simulator does not miss any possible outputs.
2313    Cast {
2314        inner: Box<HydroNode>,
2315        metadata: HydroIrMetadata,
2316    },
2317
2318    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
2319    /// interpretation of the input stream.
2320    ///
2321    /// In production, this simply passes through the input, but in simulation, this operator
2322    /// explicitly selects a randomized interpretation.
2323    ObserveNonDet {
2324        inner: Box<HydroNode>,
2325        trusted: bool, // if true, we do not need to simulate non-determinism
2326        metadata: HydroIrMetadata,
2327    },
2328
2329    Source {
2330        source: HydroSource,
2331        metadata: HydroIrMetadata,
2332    },
2333
2334    SingletonSource {
2335        value: DebugExpr,
2336        first_tick_only: bool,
2337        metadata: HydroIrMetadata,
2338    },
2339
2340    CycleSource {
2341        cycle_id: CycleId,
2342        metadata: HydroIrMetadata,
2343    },
2344
2345    Tee {
2346        inner: SharedNode,
2347        metadata: HydroIrMetadata,
2348    },
2349
2350    /// A singleton materialization point. Wraps a SharedNode so that:
2351    /// - The pipe output delivers the single item to one consumer
2352    /// - `#var` references can borrow the value from the singleton slot
2353    ///
2354    /// In DFIR codegen, emits `ident = inner_ident -> singleton()`.
2355    ///
2356    /// Uses the same `built_tees` dedup pattern as `Tee`.
2357    Singleton {
2358        inner: SharedNode,
2359        metadata: HydroIrMetadata,
2360    },
2361
2362    Partition {
2363        inner: SharedNode,
2364        f: ClosureExpr,
2365        is_true: bool,
2366        metadata: HydroIrMetadata,
2367    },
2368
2369    BeginAtomic {
2370        inner: Box<HydroNode>,
2371        metadata: HydroIrMetadata,
2372    },
2373
2374    EndAtomic {
2375        inner: Box<HydroNode>,
2376        metadata: HydroIrMetadata,
2377    },
2378
2379    Batch {
2380        inner: Box<HydroNode>,
2381        metadata: HydroIrMetadata,
2382    },
2383
2384    YieldConcat {
2385        inner: Box<HydroNode>,
2386        metadata: HydroIrMetadata,
2387    },
2388
2389    Chain {
2390        first: Box<HydroNode>,
2391        second: Box<HydroNode>,
2392        metadata: HydroIrMetadata,
2393    },
2394
2395    MergeOrdered {
2396        first: Box<HydroNode>,
2397        second: Box<HydroNode>,
2398        metadata: HydroIrMetadata,
2399    },
2400
2401    ChainFirst {
2402        first: Box<HydroNode>,
2403        second: Box<HydroNode>,
2404        metadata: HydroIrMetadata,
2405    },
2406
2407    CrossProduct {
2408        left: Box<HydroNode>,
2409        right: Box<HydroNode>,
2410        metadata: HydroIrMetadata,
2411    },
2412
2413    CrossSingleton {
2414        left: Box<HydroNode>,
2415        right: Box<HydroNode>,
2416        metadata: HydroIrMetadata,
2417    },
2418
2419    Join {
2420        left: Box<HydroNode>,
2421        right: Box<HydroNode>,
2422        metadata: HydroIrMetadata,
2423    },
2424
2425    /// Asymmetric join where the right (build) side is bounded.
2426    /// The build side is accumulated (stratum-delayed) into a hash table,
2427    /// then the left (probe) side streams through preserving its ordering.
2428    JoinHalf {
2429        left: Box<HydroNode>,
2430        right: Box<HydroNode>,
2431        metadata: HydroIrMetadata,
2432    },
2433
2434    Difference {
2435        pos: Box<HydroNode>,
2436        neg: Box<HydroNode>,
2437        metadata: HydroIrMetadata,
2438    },
2439
2440    AntiJoin {
2441        pos: Box<HydroNode>,
2442        neg: Box<HydroNode>,
2443        metadata: HydroIrMetadata,
2444    },
2445
2446    ResolveFutures {
2447        input: Box<HydroNode>,
2448        metadata: HydroIrMetadata,
2449    },
2450    ResolveFuturesBlocking {
2451        input: Box<HydroNode>,
2452        metadata: HydroIrMetadata,
2453    },
2454    ResolveFuturesOrdered {
2455        input: Box<HydroNode>,
2456        metadata: HydroIrMetadata,
2457    },
2458
2459    Map {
2460        f: ClosureExpr,
2461        input: Box<HydroNode>,
2462        metadata: HydroIrMetadata,
2463    },
2464    FlatMap {
2465        f: ClosureExpr,
2466        input: Box<HydroNode>,
2467        metadata: HydroIrMetadata,
2468    },
2469    FlatMapStreamBlocking {
2470        f: ClosureExpr,
2471        input: Box<HydroNode>,
2472        metadata: HydroIrMetadata,
2473    },
2474    Filter {
2475        f: ClosureExpr,
2476        input: Box<HydroNode>,
2477        metadata: HydroIrMetadata,
2478    },
2479    FilterMap {
2480        f: ClosureExpr,
2481        input: Box<HydroNode>,
2482        metadata: HydroIrMetadata,
2483    },
2484
2485    DeferTick {
2486        input: Box<HydroNode>,
2487        metadata: HydroIrMetadata,
2488    },
2489    Enumerate {
2490        input: Box<HydroNode>,
2491        metadata: HydroIrMetadata,
2492    },
2493    Inspect {
2494        f: ClosureExpr,
2495        input: Box<HydroNode>,
2496        metadata: HydroIrMetadata,
2497    },
2498
2499    Unique {
2500        input: Box<HydroNode>,
2501        metadata: HydroIrMetadata,
2502    },
2503
2504    Sort {
2505        input: Box<HydroNode>,
2506        metadata: HydroIrMetadata,
2507    },
2508    Fold {
2509        init: ClosureExpr,
2510        acc: ClosureExpr,
2511        input: Box<HydroNode>,
2512        metadata: HydroIrMetadata,
2513    },
2514
2515    Scan {
2516        init: ClosureExpr,
2517        acc: ClosureExpr,
2518        input: Box<HydroNode>,
2519        metadata: HydroIrMetadata,
2520    },
2521    ScanAsyncBlocking {
2522        init: ClosureExpr,
2523        acc: ClosureExpr,
2524        input: Box<HydroNode>,
2525        metadata: HydroIrMetadata,
2526    },
2527    FoldKeyed {
2528        init: ClosureExpr,
2529        acc: ClosureExpr,
2530        input: Box<HydroNode>,
2531        metadata: HydroIrMetadata,
2532    },
2533
2534    Reduce {
2535        f: ClosureExpr,
2536        input: Box<HydroNode>,
2537        metadata: HydroIrMetadata,
2538    },
2539    ReduceKeyed {
2540        f: ClosureExpr,
2541        input: Box<HydroNode>,
2542        metadata: HydroIrMetadata,
2543    },
2544    ReduceKeyedWatermark {
2545        f: ClosureExpr,
2546        input: Box<HydroNode>,
2547        watermark: Box<HydroNode>,
2548        metadata: HydroIrMetadata,
2549    },
2550
2551    Network {
2552        name: Option<String>,
2553        networking_info: crate::networking::NetworkingInfo,
2554        serialize_fn: Option<DebugExpr>,
2555        instantiate_fn: DebugInstantiate,
2556        deserialize_fn: Option<DebugExpr>,
2557        input: Box<HydroNode>,
2558        metadata: HydroIrMetadata,
2559    },
2560
2561    ExternalInput {
2562        from_external_key: LocationKey,
2563        from_port_id: ExternalPortId,
2564        from_many: bool,
2565        codec_type: DebugType,
2566        #[serde(skip)]
2567        port_hint: NetworkHint,
2568        instantiate_fn: DebugInstantiate,
2569        deserialize_fn: Option<DebugExpr>,
2570        metadata: HydroIrMetadata,
2571    },
2572
2573    Counter {
2574        tag: String,
2575        duration: DebugExpr,
2576        prefix: String,
2577        input: Box<HydroNode>,
2578        metadata: HydroIrMetadata,
2579    },
2580
2581    AssertIsConsistent {
2582        inner: Box<HydroNode>,
2583        trusted: bool,
2584        metadata: HydroIrMetadata,
2585    },
2586
2587    UnboundSingleton {
2588        inner: Box<HydroNode>,
2589        metadata: HydroIrMetadata,
2590    },
2591}
2592
2593pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2594pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2595
2596impl HydroNode {
2597    pub fn transform_bottom_up(
2598        &mut self,
2599        transform: &mut impl FnMut(&mut HydroNode),
2600        seen_tees: &mut SeenSharedNodes,
2601        check_well_formed: bool,
2602    ) {
2603        self.transform_children(
2604            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2605            seen_tees,
2606        );
2607
2608        transform(self);
2609
2610        let self_location = self.metadata().location_id.root();
2611
2612        if check_well_formed {
2613            match &*self {
2614                HydroNode::Network { .. } => {}
2615                _ => {
2616                    self.input_metadata().iter().for_each(|i| {
2617                        if i.location_id.root() != self_location {
2618                            panic!(
2619                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2620                                i,
2621                                i.location_id.root(),
2622                                self,
2623                                self_location
2624                            )
2625                        }
2626                    });
2627                }
2628            }
2629        }
2630    }
2631
2632    #[inline(always)]
2633    pub fn transform_children(
2634        &mut self,
2635        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2636        seen_tees: &mut SeenSharedNodes,
2637    ) {
2638        match self {
2639            HydroNode::Placeholder => {
2640                panic!();
2641            }
2642
2643            HydroNode::Source { .. }
2644            | HydroNode::SingletonSource { .. }
2645            | HydroNode::CycleSource { .. }
2646            | HydroNode::ExternalInput { .. } => {}
2647
2648            HydroNode::Tee { inner, .. } | HydroNode::Singleton { inner, .. } => {
2649                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2650                    *inner = SharedNode(transformed.clone());
2651                } else {
2652                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2653                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2654                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2655                    transform(&mut orig, seen_tees);
2656                    *transformed_cell.borrow_mut() = orig;
2657                    *inner = SharedNode(transformed_cell);
2658                }
2659            }
2660
2661            HydroNode::Partition { inner, f, .. } => {
2662                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2663                    *inner = SharedNode(transformed.clone());
2664                } else {
2665                    f.transform_children(&mut transform, seen_tees);
2666                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2667                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2668                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2669                    transform(&mut orig, seen_tees);
2670                    *transformed_cell.borrow_mut() = orig;
2671                    *inner = SharedNode(transformed_cell);
2672                }
2673            }
2674
2675            HydroNode::Cast { inner, .. }
2676            | HydroNode::ObserveNonDet { inner, .. }
2677            | HydroNode::BeginAtomic { inner, .. }
2678            | HydroNode::EndAtomic { inner, .. }
2679            | HydroNode::Batch { inner, .. }
2680            | HydroNode::YieldConcat { inner, .. }
2681            | HydroNode::UnboundSingleton { inner, .. }
2682            | HydroNode::AssertIsConsistent { inner, .. } => {
2683                transform(inner.as_mut(), seen_tees);
2684            }
2685
2686            HydroNode::Chain { first, second, .. } => {
2687                transform(first.as_mut(), seen_tees);
2688                transform(second.as_mut(), seen_tees);
2689            }
2690
2691            HydroNode::MergeOrdered { first, second, .. } => {
2692                transform(first.as_mut(), seen_tees);
2693                transform(second.as_mut(), seen_tees);
2694            }
2695
2696            HydroNode::ChainFirst { first, second, .. } => {
2697                transform(first.as_mut(), seen_tees);
2698                transform(second.as_mut(), seen_tees);
2699            }
2700
2701            HydroNode::CrossSingleton { left, right, .. }
2702            | HydroNode::CrossProduct { left, right, .. }
2703            | HydroNode::Join { left, right, .. }
2704            | HydroNode::JoinHalf { left, right, .. } => {
2705                transform(left.as_mut(), seen_tees);
2706                transform(right.as_mut(), seen_tees);
2707            }
2708
2709            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2710                transform(pos.as_mut(), seen_tees);
2711                transform(neg.as_mut(), seen_tees);
2712            }
2713
2714            HydroNode::Map { f, input, .. } => {
2715                f.transform_children(&mut transform, seen_tees);
2716                transform(input.as_mut(), seen_tees);
2717            }
2718            HydroNode::FlatMap { f, input, .. }
2719            | HydroNode::FlatMapStreamBlocking { f, input, .. }
2720            | HydroNode::Filter { f, input, .. }
2721            | HydroNode::FilterMap { f, input, .. }
2722            | HydroNode::Inspect { f, input, .. }
2723            | HydroNode::Reduce { f, input, .. }
2724            | HydroNode::ReduceKeyed { f, input, .. } => {
2725                f.transform_children(&mut transform, seen_tees);
2726                transform(input.as_mut(), seen_tees);
2727            }
2728            HydroNode::ReduceKeyedWatermark {
2729                f,
2730                input,
2731                watermark,
2732                ..
2733            } => {
2734                f.transform_children(&mut transform, seen_tees);
2735                transform(input.as_mut(), seen_tees);
2736                transform(watermark.as_mut(), seen_tees);
2737            }
2738            HydroNode::Fold {
2739                init, acc, input, ..
2740            }
2741            | HydroNode::Scan {
2742                init, acc, input, ..
2743            }
2744            | HydroNode::ScanAsyncBlocking {
2745                init, acc, input, ..
2746            }
2747            | HydroNode::FoldKeyed {
2748                init, acc, input, ..
2749            } => {
2750                init.transform_children(&mut transform, seen_tees);
2751                acc.transform_children(&mut transform, seen_tees);
2752                transform(input.as_mut(), seen_tees);
2753            }
2754            HydroNode::ResolveFutures { input, .. }
2755            | HydroNode::ResolveFuturesBlocking { input, .. }
2756            | HydroNode::ResolveFuturesOrdered { input, .. }
2757            | HydroNode::Sort { input, .. }
2758            | HydroNode::DeferTick { input, .. }
2759            | HydroNode::Enumerate { input, .. }
2760            | HydroNode::Unique { input, .. }
2761            | HydroNode::Network { input, .. }
2762            | HydroNode::Counter { input, .. } => {
2763                transform(input.as_mut(), seen_tees);
2764            }
2765        }
2766    }
2767
2768    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2769        match self {
2770            HydroNode::Placeholder => HydroNode::Placeholder,
2771            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2772                inner: Box::new(inner.deep_clone(seen_tees)),
2773                metadata: metadata.clone(),
2774            },
2775            HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2776                inner: Box::new(inner.deep_clone(seen_tees)),
2777                metadata: metadata.clone(),
2778            },
2779            HydroNode::ObserveNonDet {
2780                inner,
2781                trusted,
2782                metadata,
2783            } => HydroNode::ObserveNonDet {
2784                inner: Box::new(inner.deep_clone(seen_tees)),
2785                trusted: *trusted,
2786                metadata: metadata.clone(),
2787            },
2788            HydroNode::AssertIsConsistent {
2789                inner,
2790                trusted,
2791                metadata,
2792            } => HydroNode::AssertIsConsistent {
2793                inner: Box::new(inner.deep_clone(seen_tees)),
2794                trusted: *trusted,
2795                metadata: metadata.clone(),
2796            },
2797            HydroNode::Source { source, metadata } => HydroNode::Source {
2798                source: source.clone(),
2799                metadata: metadata.clone(),
2800            },
2801            HydroNode::SingletonSource {
2802                value,
2803                first_tick_only,
2804                metadata,
2805            } => HydroNode::SingletonSource {
2806                value: value.clone(),
2807                first_tick_only: *first_tick_only,
2808                metadata: metadata.clone(),
2809            },
2810            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2811                cycle_id: *cycle_id,
2812                metadata: metadata.clone(),
2813            },
2814            HydroNode::Tee { inner, metadata } | HydroNode::Singleton { inner, metadata } => {
2815                let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2816                    SharedNode(transformed.clone())
2817                } else {
2818                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2819                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2820                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2821                    *new_rc.borrow_mut() = cloned;
2822                    SharedNode(new_rc)
2823                };
2824                if matches!(self, HydroNode::Singleton { .. }) {
2825                    HydroNode::Singleton {
2826                        inner: cloned_inner,
2827                        metadata: metadata.clone(),
2828                    }
2829                } else {
2830                    HydroNode::Tee {
2831                        inner: cloned_inner,
2832                        metadata: metadata.clone(),
2833                    }
2834                }
2835            }
2836            HydroNode::Partition {
2837                inner,
2838                f,
2839                is_true,
2840                metadata,
2841            } => {
2842                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2843                    HydroNode::Partition {
2844                        inner: SharedNode(transformed.clone()),
2845                        f: f.deep_clone(seen_tees),
2846                        is_true: *is_true,
2847                        metadata: metadata.clone(),
2848                    }
2849                } else {
2850                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2851                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2852                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2853                    *new_rc.borrow_mut() = cloned;
2854                    HydroNode::Partition {
2855                        inner: SharedNode(new_rc),
2856                        f: f.deep_clone(seen_tees),
2857                        is_true: *is_true,
2858                        metadata: metadata.clone(),
2859                    }
2860                }
2861            }
2862            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2863                inner: Box::new(inner.deep_clone(seen_tees)),
2864                metadata: metadata.clone(),
2865            },
2866            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2867                inner: Box::new(inner.deep_clone(seen_tees)),
2868                metadata: metadata.clone(),
2869            },
2870            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2871                inner: Box::new(inner.deep_clone(seen_tees)),
2872                metadata: metadata.clone(),
2873            },
2874            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2875                inner: Box::new(inner.deep_clone(seen_tees)),
2876                metadata: metadata.clone(),
2877            },
2878            HydroNode::Chain {
2879                first,
2880                second,
2881                metadata,
2882            } => HydroNode::Chain {
2883                first: Box::new(first.deep_clone(seen_tees)),
2884                second: Box::new(second.deep_clone(seen_tees)),
2885                metadata: metadata.clone(),
2886            },
2887            HydroNode::MergeOrdered {
2888                first,
2889                second,
2890                metadata,
2891            } => HydroNode::MergeOrdered {
2892                first: Box::new(first.deep_clone(seen_tees)),
2893                second: Box::new(second.deep_clone(seen_tees)),
2894                metadata: metadata.clone(),
2895            },
2896            HydroNode::ChainFirst {
2897                first,
2898                second,
2899                metadata,
2900            } => HydroNode::ChainFirst {
2901                first: Box::new(first.deep_clone(seen_tees)),
2902                second: Box::new(second.deep_clone(seen_tees)),
2903                metadata: metadata.clone(),
2904            },
2905            HydroNode::CrossProduct {
2906                left,
2907                right,
2908                metadata,
2909            } => HydroNode::CrossProduct {
2910                left: Box::new(left.deep_clone(seen_tees)),
2911                right: Box::new(right.deep_clone(seen_tees)),
2912                metadata: metadata.clone(),
2913            },
2914            HydroNode::CrossSingleton {
2915                left,
2916                right,
2917                metadata,
2918            } => HydroNode::CrossSingleton {
2919                left: Box::new(left.deep_clone(seen_tees)),
2920                right: Box::new(right.deep_clone(seen_tees)),
2921                metadata: metadata.clone(),
2922            },
2923            HydroNode::Join {
2924                left,
2925                right,
2926                metadata,
2927            } => HydroNode::Join {
2928                left: Box::new(left.deep_clone(seen_tees)),
2929                right: Box::new(right.deep_clone(seen_tees)),
2930                metadata: metadata.clone(),
2931            },
2932            HydroNode::JoinHalf {
2933                left,
2934                right,
2935                metadata,
2936            } => HydroNode::JoinHalf {
2937                left: Box::new(left.deep_clone(seen_tees)),
2938                right: Box::new(right.deep_clone(seen_tees)),
2939                metadata: metadata.clone(),
2940            },
2941            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2942                pos: Box::new(pos.deep_clone(seen_tees)),
2943                neg: Box::new(neg.deep_clone(seen_tees)),
2944                metadata: metadata.clone(),
2945            },
2946            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2947                pos: Box::new(pos.deep_clone(seen_tees)),
2948                neg: Box::new(neg.deep_clone(seen_tees)),
2949                metadata: metadata.clone(),
2950            },
2951            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2952                input: Box::new(input.deep_clone(seen_tees)),
2953                metadata: metadata.clone(),
2954            },
2955            HydroNode::ResolveFuturesBlocking { input, metadata } => {
2956                HydroNode::ResolveFuturesBlocking {
2957                    input: Box::new(input.deep_clone(seen_tees)),
2958                    metadata: metadata.clone(),
2959                }
2960            }
2961            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2962                HydroNode::ResolveFuturesOrdered {
2963                    input: Box::new(input.deep_clone(seen_tees)),
2964                    metadata: metadata.clone(),
2965                }
2966            }
2967            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2968                f: f.deep_clone(seen_tees),
2969                input: Box::new(input.deep_clone(seen_tees)),
2970                metadata: metadata.clone(),
2971            },
2972            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2973                f: f.deep_clone(seen_tees),
2974                input: Box::new(input.deep_clone(seen_tees)),
2975                metadata: metadata.clone(),
2976            },
2977            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2978                HydroNode::FlatMapStreamBlocking {
2979                    f: f.deep_clone(seen_tees),
2980                    input: Box::new(input.deep_clone(seen_tees)),
2981                    metadata: metadata.clone(),
2982                }
2983            }
2984            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2985                f: f.deep_clone(seen_tees),
2986                input: Box::new(input.deep_clone(seen_tees)),
2987                metadata: metadata.clone(),
2988            },
2989            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2990                f: f.deep_clone(seen_tees),
2991                input: Box::new(input.deep_clone(seen_tees)),
2992                metadata: metadata.clone(),
2993            },
2994            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2995                input: Box::new(input.deep_clone(seen_tees)),
2996                metadata: metadata.clone(),
2997            },
2998            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2999                input: Box::new(input.deep_clone(seen_tees)),
3000                metadata: metadata.clone(),
3001            },
3002            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
3003                f: f.deep_clone(seen_tees),
3004                input: Box::new(input.deep_clone(seen_tees)),
3005                metadata: metadata.clone(),
3006            },
3007            HydroNode::Unique { input, metadata } => HydroNode::Unique {
3008                input: Box::new(input.deep_clone(seen_tees)),
3009                metadata: metadata.clone(),
3010            },
3011            HydroNode::Sort { input, metadata } => HydroNode::Sort {
3012                input: Box::new(input.deep_clone(seen_tees)),
3013                metadata: metadata.clone(),
3014            },
3015            HydroNode::Fold {
3016                init,
3017                acc,
3018                input,
3019                metadata,
3020            } => HydroNode::Fold {
3021                init: init.deep_clone(seen_tees),
3022                acc: acc.deep_clone(seen_tees),
3023                input: Box::new(input.deep_clone(seen_tees)),
3024                metadata: metadata.clone(),
3025            },
3026            HydroNode::Scan {
3027                init,
3028                acc,
3029                input,
3030                metadata,
3031            } => HydroNode::Scan {
3032                init: init.deep_clone(seen_tees),
3033                acc: acc.deep_clone(seen_tees),
3034                input: Box::new(input.deep_clone(seen_tees)),
3035                metadata: metadata.clone(),
3036            },
3037            HydroNode::ScanAsyncBlocking {
3038                init,
3039                acc,
3040                input,
3041                metadata,
3042            } => HydroNode::ScanAsyncBlocking {
3043                init: init.deep_clone(seen_tees),
3044                acc: acc.deep_clone(seen_tees),
3045                input: Box::new(input.deep_clone(seen_tees)),
3046                metadata: metadata.clone(),
3047            },
3048            HydroNode::FoldKeyed {
3049                init,
3050                acc,
3051                input,
3052                metadata,
3053            } => HydroNode::FoldKeyed {
3054                init: init.deep_clone(seen_tees),
3055                acc: acc.deep_clone(seen_tees),
3056                input: Box::new(input.deep_clone(seen_tees)),
3057                metadata: metadata.clone(),
3058            },
3059            HydroNode::ReduceKeyedWatermark {
3060                f,
3061                input,
3062                watermark,
3063                metadata,
3064            } => HydroNode::ReduceKeyedWatermark {
3065                f: f.deep_clone(seen_tees),
3066                input: Box::new(input.deep_clone(seen_tees)),
3067                watermark: Box::new(watermark.deep_clone(seen_tees)),
3068                metadata: metadata.clone(),
3069            },
3070            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3071                f: f.deep_clone(seen_tees),
3072                input: Box::new(input.deep_clone(seen_tees)),
3073                metadata: metadata.clone(),
3074            },
3075            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3076                f: f.deep_clone(seen_tees),
3077                input: Box::new(input.deep_clone(seen_tees)),
3078                metadata: metadata.clone(),
3079            },
3080            HydroNode::Network {
3081                name,
3082                networking_info,
3083                serialize_fn,
3084                instantiate_fn,
3085                deserialize_fn,
3086                input,
3087                metadata,
3088            } => HydroNode::Network {
3089                name: name.clone(),
3090                networking_info: networking_info.clone(),
3091                serialize_fn: serialize_fn.clone(),
3092                instantiate_fn: instantiate_fn.clone(),
3093                deserialize_fn: deserialize_fn.clone(),
3094                input: Box::new(input.deep_clone(seen_tees)),
3095                metadata: metadata.clone(),
3096            },
3097            HydroNode::ExternalInput {
3098                from_external_key,
3099                from_port_id,
3100                from_many,
3101                codec_type,
3102                port_hint,
3103                instantiate_fn,
3104                deserialize_fn,
3105                metadata,
3106            } => HydroNode::ExternalInput {
3107                from_external_key: *from_external_key,
3108                from_port_id: *from_port_id,
3109                from_many: *from_many,
3110                codec_type: codec_type.clone(),
3111                port_hint: *port_hint,
3112                instantiate_fn: instantiate_fn.clone(),
3113                deserialize_fn: deserialize_fn.clone(),
3114                metadata: metadata.clone(),
3115            },
3116            HydroNode::Counter {
3117                tag,
3118                duration,
3119                prefix,
3120                input,
3121                metadata,
3122            } => HydroNode::Counter {
3123                tag: tag.clone(),
3124                duration: duration.clone(),
3125                prefix: prefix.clone(),
3126                input: Box::new(input.deep_clone(seen_tees)),
3127                metadata: metadata.clone(),
3128            },
3129        }
3130    }
3131
3132    #[cfg(feature = "build")]
3133    pub fn emit_core(
3134        &mut self,
3135        builders_or_callback: &mut BuildersOrCallback<
3136            impl FnMut(&mut HydroRoot, &mut StmtId),
3137            impl FnMut(&mut HydroNode, &mut StmtId),
3138        >,
3139        seen_tees: &mut SeenSharedNodes,
3140        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3141        next_stmt_id: &mut StmtId,
3142        fold_hooked_idents: &mut HashSet<String>,
3143    ) -> syn::Ident {
3144        let mut ident_stack: Vec<syn::Ident> = Vec::new();
3145        let mut singleton_access_counters: HashMap<*const RefCell<HydroNode>, u32> = HashMap::new();
3146
3147        self.transform_bottom_up(
3148            &mut |node: &mut HydroNode| {
3149                let out_location = node.metadata().location_id.clone();
3150                match node {
3151                    HydroNode::Placeholder => {
3152                        panic!()
3153                    }
3154
3155                    HydroNode::Cast { .. } => {
3156                        // Cast passes through the input ident unchanged
3157                        // The input ident is already on the stack from processing the child
3158                        match builders_or_callback {
3159                            BuildersOrCallback::Builders(_) => {}
3160                            BuildersOrCallback::Callback(_, node_callback) => {
3161                                node_callback(node, next_stmt_id);
3162                            }
3163                        }
3164
3165                        let _ = next_stmt_id.get_and_increment();
3166                        // input_ident stays on stack as output
3167                    }
3168
3169                    HydroNode::UnboundSingleton { .. } => {
3170                        let inner_ident = ident_stack.pop().unwrap();
3171
3172                        let out_ident =
3173                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3174
3175                        match builders_or_callback {
3176                            BuildersOrCallback::Builders(graph_builders) => {
3177                                if graph_builders.singleton_intermediates() {
3178                                    let builder = graph_builders.get_dfir_mut(&out_location);
3179                                    builder.add_dfir(
3180                                        parse_quote! {
3181                                            #out_ident = #inner_ident;
3182                                        },
3183                                        None,
3184                                        None,
3185                                    );
3186                                } else {
3187                                    let builder = graph_builders.get_dfir_mut(&out_location);
3188                                    builder.add_dfir(
3189                                        parse_quote! {
3190                                            #out_ident = #inner_ident -> persist::<'static>();
3191                                        },
3192                                        None,
3193                                        None,
3194                                    );
3195                                }
3196                            }
3197                            BuildersOrCallback::Callback(_, node_callback) => {
3198                                node_callback(node, next_stmt_id);
3199                            }
3200                        }
3201
3202                        let _ = next_stmt_id.get_and_increment();
3203
3204                        ident_stack.push(out_ident);
3205                    }
3206
3207                    HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3208                        let inner_ident = ident_stack.pop().unwrap();
3209
3210                        let out_ident =
3211                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3212
3213                        match builders_or_callback {
3214                            BuildersOrCallback::Builders(graph_builders) => {
3215                                graph_builders.assert_is_consistent(
3216                                    *trusted,
3217                                    &inner.metadata().location_id,
3218                                    inner_ident,
3219                                    &out_ident,
3220                                );
3221                            }
3222                            BuildersOrCallback::Callback(_, node_callback) => {
3223                                node_callback(node, next_stmt_id);
3224                            }
3225                        }
3226
3227                        let _ = next_stmt_id.get_and_increment();
3228
3229                        ident_stack.push(out_ident);
3230                    }
3231
3232                    HydroNode::ObserveNonDet {
3233                        inner,
3234                        trusted,
3235                        metadata,
3236                        ..
3237                    } => {
3238                        let inner_ident = ident_stack.pop().unwrap();
3239
3240                        let observe_ident =
3241                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3242
3243                        match builders_or_callback {
3244                            BuildersOrCallback::Builders(graph_builders) => {
3245                                graph_builders.observe_nondet(
3246                                    *trusted,
3247                                    &inner.metadata().location_id,
3248                                    inner_ident,
3249                                    &inner.metadata().collection_kind,
3250                                    &observe_ident,
3251                                    &metadata.collection_kind,
3252                                    &metadata.op,
3253                                );
3254                            }
3255                            BuildersOrCallback::Callback(_, node_callback) => {
3256                                node_callback(node, next_stmt_id);
3257                            }
3258                        }
3259
3260                        let _ = next_stmt_id.get_and_increment();
3261
3262                        ident_stack.push(observe_ident);
3263                    }
3264
3265                    HydroNode::Batch {
3266                        inner, metadata, ..
3267                    } => {
3268                        let inner_ident = ident_stack.pop().unwrap();
3269
3270                        let batch_ident =
3271                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3272
3273                        match builders_or_callback {
3274                            BuildersOrCallback::Builders(graph_builders) => {
3275                                graph_builders.batch(
3276                                    inner_ident,
3277                                    &inner.metadata().location_id,
3278                                    &inner.metadata().collection_kind,
3279                                    &batch_ident,
3280                                    &out_location,
3281                                    &metadata.op,
3282                                    fold_hooked_idents,
3283                                );
3284                            }
3285                            BuildersOrCallback::Callback(_, node_callback) => {
3286                                node_callback(node, next_stmt_id);
3287                            }
3288                        }
3289
3290                        let _ = next_stmt_id.get_and_increment();
3291
3292                        ident_stack.push(batch_ident);
3293                    }
3294
3295                    HydroNode::YieldConcat { inner, .. } => {
3296                        let inner_ident = ident_stack.pop().unwrap();
3297
3298                        let yield_ident =
3299                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3300
3301                        match builders_or_callback {
3302                            BuildersOrCallback::Builders(graph_builders) => {
3303                                graph_builders.yield_from_tick(
3304                                    inner_ident,
3305                                    &inner.metadata().location_id,
3306                                    &inner.metadata().collection_kind,
3307                                    &yield_ident,
3308                                    &out_location,
3309                                );
3310                            }
3311                            BuildersOrCallback::Callback(_, node_callback) => {
3312                                node_callback(node, next_stmt_id);
3313                            }
3314                        }
3315
3316                        let _ = next_stmt_id.get_and_increment();
3317
3318                        ident_stack.push(yield_ident);
3319                    }
3320
3321                    HydroNode::BeginAtomic { inner, metadata } => {
3322                        let inner_ident = ident_stack.pop().unwrap();
3323
3324                        let begin_ident =
3325                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3326
3327                        match builders_or_callback {
3328                            BuildersOrCallback::Builders(graph_builders) => {
3329                                graph_builders.begin_atomic(
3330                                    inner_ident,
3331                                    &inner.metadata().location_id,
3332                                    &inner.metadata().collection_kind,
3333                                    &begin_ident,
3334                                    &out_location,
3335                                    &metadata.op,
3336                                );
3337                            }
3338                            BuildersOrCallback::Callback(_, node_callback) => {
3339                                node_callback(node, next_stmt_id);
3340                            }
3341                        }
3342
3343                        let _ = next_stmt_id.get_and_increment();
3344
3345                        ident_stack.push(begin_ident);
3346                    }
3347
3348                    HydroNode::EndAtomic { inner, .. } => {
3349                        let inner_ident = ident_stack.pop().unwrap();
3350
3351                        let end_ident =
3352                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3353
3354                        match builders_or_callback {
3355                            BuildersOrCallback::Builders(graph_builders) => {
3356                                graph_builders.end_atomic(
3357                                    inner_ident,
3358                                    &inner.metadata().location_id,
3359                                    &inner.metadata().collection_kind,
3360                                    &end_ident,
3361                                );
3362                            }
3363                            BuildersOrCallback::Callback(_, node_callback) => {
3364                                node_callback(node, next_stmt_id);
3365                            }
3366                        }
3367
3368                        let _ = next_stmt_id.get_and_increment();
3369
3370                        ident_stack.push(end_ident);
3371                    }
3372
3373                    HydroNode::Source {
3374                        source, metadata, ..
3375                    } => {
3376                        if let HydroSource::ExternalNetwork() = source {
3377                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3378                        } else {
3379                            let source_ident =
3380                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3381
3382                            let source_stmt = match source {
3383                                HydroSource::Stream(expr) => {
3384                                    debug_assert!(metadata.location_id.is_top_level());
3385                                    parse_quote! {
3386                                        #source_ident = source_stream(#expr);
3387                                    }
3388                                }
3389
3390                                HydroSource::ExternalNetwork() => {
3391                                    unreachable!()
3392                                }
3393
3394                                HydroSource::Iter(expr) => {
3395                                    if metadata.location_id.is_top_level() {
3396                                        parse_quote! {
3397                                            #source_ident = source_iter(#expr);
3398                                        }
3399                                    } else {
3400                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
3401                                        parse_quote! {
3402                                            #source_ident = source_iter(#expr) -> persist::<'static>();
3403                                        }
3404                                    }
3405                                }
3406
3407                                HydroSource::Spin() => {
3408                                    debug_assert!(metadata.location_id.is_top_level());
3409                                    parse_quote! {
3410                                        #source_ident = spin();
3411                                    }
3412                                }
3413
3414                                HydroSource::ClusterMembers(target_loc, state) => {
3415                                    debug_assert!(metadata.location_id.is_top_level());
3416
3417                                    let members_tee_ident = syn::Ident::new(
3418                                        &format!(
3419                                            "__cluster_members_tee_{}_{}",
3420                                            metadata.location_id.root().key(),
3421                                            target_loc.key(),
3422                                        ),
3423                                        Span::call_site(),
3424                                    );
3425
3426                                    match state {
3427                                        ClusterMembersState::Stream(d) => {
3428                                            parse_quote! {
3429                                                #members_tee_ident = source_stream(#d) -> tee();
3430                                                #source_ident = #members_tee_ident;
3431                                            }
3432                                        },
3433                                        ClusterMembersState::Uninit => syn::parse_quote! {
3434                                            #source_ident = source_stream(DUMMY);
3435                                        },
3436                                        ClusterMembersState::Tee(..) => parse_quote! {
3437                                            #source_ident = #members_tee_ident;
3438                                        },
3439                                    }
3440                                }
3441
3442                                HydroSource::Embedded(ident) => {
3443                                    parse_quote! {
3444                                        #source_ident = source_stream(#ident);
3445                                    }
3446                                }
3447
3448                                HydroSource::EmbeddedSingleton(ident) => {
3449                                    parse_quote! {
3450                                        #source_ident = source_iter([#ident]);
3451                                    }
3452                                }
3453                            };
3454
3455                            match builders_or_callback {
3456                                BuildersOrCallback::Builders(graph_builders) => {
3457                                    let builder = graph_builders.get_dfir_mut(&out_location);
3458                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3459                                }
3460                                BuildersOrCallback::Callback(_, node_callback) => {
3461                                    node_callback(node, next_stmt_id);
3462                                }
3463                            }
3464
3465                            let _ = next_stmt_id.get_and_increment();
3466
3467                            ident_stack.push(source_ident);
3468                        }
3469                    }
3470
3471                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3472                        let source_ident =
3473                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3474
3475                        match builders_or_callback {
3476                            BuildersOrCallback::Builders(graph_builders) => {
3477                                let builder = graph_builders.get_dfir_mut(&out_location);
3478
3479                                if *first_tick_only {
3480                                    assert!(
3481                                        !metadata.location_id.is_top_level(),
3482                                        "first_tick_only SingletonSource must be inside a tick"
3483                                    );
3484                                }
3485
3486                                if *first_tick_only
3487                                    || (metadata.location_id.is_top_level()
3488                                        && metadata.collection_kind.is_bounded())
3489                                {
3490                                    builder.add_dfir(
3491                                        parse_quote! {
3492                                            #source_ident = source_iter([#value]);
3493                                        },
3494                                        None,
3495                                        Some(&next_stmt_id.to_string()),
3496                                    );
3497                                } else {
3498                                    builder.add_dfir(
3499                                        parse_quote! {
3500                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3501                                        },
3502                                        None,
3503                                        Some(&next_stmt_id.to_string()),
3504                                    );
3505                                }
3506                            }
3507                            BuildersOrCallback::Callback(_, node_callback) => {
3508                                node_callback(node, next_stmt_id);
3509                            }
3510                        }
3511
3512                        let _ = next_stmt_id.get_and_increment();
3513
3514                        ident_stack.push(source_ident);
3515                    }
3516
3517                    HydroNode::CycleSource { cycle_id, .. } => {
3518                        let ident = cycle_id.as_ident();
3519
3520                        match builders_or_callback {
3521                            BuildersOrCallback::Builders(_) => {}
3522                            BuildersOrCallback::Callback(_, node_callback) => {
3523                                node_callback(node, next_stmt_id);
3524                            }
3525                        }
3526
3527                        // consume a stmt id even though we did not emit anything so that we can instrument this
3528                        let _ = next_stmt_id.get_and_increment();
3529
3530                        ident_stack.push(ident);
3531                    }
3532
3533                    HydroNode::Tee { inner, .. } => {
3534                        let ret_ident = if let Some(built_idents) =
3535                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3536                        {
3537                            match builders_or_callback {
3538                                BuildersOrCallback::Builders(_) => {}
3539                                BuildersOrCallback::Callback(_, node_callback) => {
3540                                    node_callback(node, next_stmt_id);
3541                                }
3542                            }
3543
3544                            built_idents[0].clone()
3545                        } else {
3546                            // The inner node was already processed by transform_bottom_up,
3547                            // so its ident is on the stack
3548                            let inner_ident = ident_stack.pop().unwrap();
3549
3550                            let tee_ident =
3551                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3552
3553                            built_tees.insert(
3554                                inner.0.as_ref() as *const RefCell<HydroNode>,
3555                                vec![tee_ident.clone()],
3556                            );
3557
3558                            match builders_or_callback {
3559                                BuildersOrCallback::Builders(graph_builders) => {
3560                                    // NOTE: With `forward_ref`, the fold codegen may not have
3561                                    // run yet when we reach this tee, so `fold_hooked_idents`
3562                                    // might not contain the inner ident. In that case we won't
3563                                    // propagate the "hooked" status to the tee and the
3564                                    // downstream singleton batch will use the normal
3565                                    // `SingletonHook` instead of `PassthroughSingletonHook`.
3566                                    // This is not a soundness issue: the fallback hook still
3567                                    // produces correct behavior, just with a redundant decision
3568                                    // point. TODO(https://github.com/hydro-project/hydro/issues/2856):
3569                                    // fix ordering so forward_ref folds are always processed
3570                                    // before their downstream tees.
3571                                    if fold_hooked_idents.contains(&inner_ident.to_string()) {
3572                                        fold_hooked_idents.insert(tee_ident.to_string());
3573                                    }
3574                                    let builder = graph_builders.get_dfir_mut(&out_location);
3575                                    builder.add_dfir(
3576                                        parse_quote! {
3577                                            #tee_ident = #inner_ident -> tee();
3578                                        },
3579                                        None,
3580                                        Some(&next_stmt_id.to_string()),
3581                                    );
3582                                }
3583                                BuildersOrCallback::Callback(_, node_callback) => {
3584                                    node_callback(node, next_stmt_id);
3585                                }
3586                            }
3587
3588                            tee_ident
3589                        };
3590
3591                        // we consume a stmt id regardless of if we emit the tee() operator,
3592                        // so that during rewrites we touch all recipients of the tee()
3593
3594                        let _ = next_stmt_id.get_and_increment();
3595                        ident_stack.push(ret_ident);
3596                    }
3597
3598                    HydroNode::Singleton { inner, .. } => {
3599                        let ret_ident = if let Some(built_idents) =
3600                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3601                        {
3602                            built_idents[0].clone()
3603                        } else {
3604                            let inner_ident = ident_stack.pop().unwrap();
3605
3606                            let singleton_ident =
3607                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3608
3609                            built_tees.insert(
3610                                inner.0.as_ref() as *const RefCell<HydroNode>,
3611                                vec![singleton_ident.clone()],
3612                            );
3613
3614                            match builders_or_callback {
3615                                BuildersOrCallback::Builders(graph_builders) => {
3616                                    let builder = graph_builders.get_dfir_mut(&out_location);
3617                                    builder.add_dfir(
3618                                        parse_quote! {
3619                                            #singleton_ident = #inner_ident -> singleton();
3620                                        },
3621                                        None,
3622                                        Some(&next_stmt_id.to_string()),
3623                                    );
3624                                }
3625                                BuildersOrCallback::Callback(_, node_callback) => {
3626                                    node_callback(node, next_stmt_id);
3627                                }
3628                            }
3629
3630                            singleton_ident
3631                        };
3632
3633                        // we consume a stmt id regardless of if we emit the singleton() operator,
3634                        // so that during rewrites we touch all recipients of the singleton()
3635                        let _ = next_stmt_id.get_and_increment();
3636                        ident_stack.push(ret_ident);
3637                    }
3638
3639                    HydroNode::Partition {
3640                        inner, f, is_true, ..
3641                    } => {
3642                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3643                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3644                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3645                            match builders_or_callback {
3646                                BuildersOrCallback::Builders(_) => {}
3647                                BuildersOrCallback::Callback(_, node_callback) => {
3648                                    node_callback(node, next_stmt_id);
3649                                }
3650                            }
3651
3652                            let idx = if is_true { 0 } else { 1 };
3653                            built_idents[idx].clone()
3654                        } else {
3655                            // The inner node was already processed by transform_bottom_up,
3656                            // so its ident is on the stack
3657                            let inner_ident = ident_stack.pop().unwrap();
3658                            let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
3659
3660                            let partition_ident = syn::Ident::new(
3661                                &format!("stream_{}_partition", *next_stmt_id),
3662                                Span::call_site(),
3663                            );
3664                            let true_ident = syn::Ident::new(
3665                                &format!("stream_{}_true", *next_stmt_id),
3666                                Span::call_site(),
3667                            );
3668                            let false_ident = syn::Ident::new(
3669                                &format!("stream_{}_false", *next_stmt_id),
3670                                Span::call_site(),
3671                            );
3672
3673                            built_tees.insert(
3674                                ptr,
3675                                vec![true_ident.clone(), false_ident.clone()],
3676                            );
3677
3678                            match builders_or_callback {
3679                                BuildersOrCallback::Builders(graph_builders) => {
3680                                    let builder = graph_builders.get_dfir_mut(&out_location);
3681                                    builder.add_dfir(
3682                                        parse_quote! {
3683                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3684                                            #true_ident = #partition_ident[0];
3685                                            #false_ident = #partition_ident[1];
3686                                        },
3687                                        None,
3688                                        Some(&next_stmt_id.to_string()),
3689                                    );
3690                                }
3691                                BuildersOrCallback::Callback(_, node_callback) => {
3692                                    node_callback(node, next_stmt_id);
3693                                }
3694                            }
3695
3696                            if is_true { true_ident } else { false_ident }
3697                        };
3698
3699                        let _ = next_stmt_id.get_and_increment();
3700                        ident_stack.push(ret_ident);
3701                    }
3702
3703                    HydroNode::Chain { .. } => {
3704                        // Children are processed left-to-right, so second is on top
3705                        let second_ident = ident_stack.pop().unwrap();
3706                        let first_ident = ident_stack.pop().unwrap();
3707
3708                        let chain_ident =
3709                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3710
3711                        match builders_or_callback {
3712                            BuildersOrCallback::Builders(graph_builders) => {
3713                                let builder = graph_builders.get_dfir_mut(&out_location);
3714                                builder.add_dfir(
3715                                    parse_quote! {
3716                                        #chain_ident = chain();
3717                                        #first_ident -> [0]#chain_ident;
3718                                        #second_ident -> [1]#chain_ident;
3719                                    },
3720                                    None,
3721                                    Some(&next_stmt_id.to_string()),
3722                                );
3723                            }
3724                            BuildersOrCallback::Callback(_, node_callback) => {
3725                                node_callback(node, next_stmt_id);
3726                            }
3727                        }
3728
3729                        let _ = next_stmt_id.get_and_increment();
3730
3731                        ident_stack.push(chain_ident);
3732                    }
3733
3734                    HydroNode::MergeOrdered { first, metadata, .. } => {
3735                        let second_ident = ident_stack.pop().unwrap();
3736                        let first_ident = ident_stack.pop().unwrap();
3737
3738                        let merge_ident =
3739                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3740
3741                        match builders_or_callback {
3742                            BuildersOrCallback::Builders(graph_builders) => {
3743                                graph_builders.merge_ordered(
3744                                    &first.metadata().location_id,
3745                                    first_ident,
3746                                    second_ident,
3747                                    &merge_ident,
3748                                    &first.metadata().collection_kind,
3749                                    &metadata.op,
3750                                    Some(&next_stmt_id.to_string()),
3751                                );
3752                            }
3753                            BuildersOrCallback::Callback(_, node_callback) => {
3754                                node_callback(node, next_stmt_id);
3755                            }
3756                        }
3757
3758                        let _ = next_stmt_id.get_and_increment();
3759
3760                        ident_stack.push(merge_ident);
3761                    }
3762
3763                    HydroNode::ChainFirst { .. } => {
3764                        let second_ident = ident_stack.pop().unwrap();
3765                        let first_ident = ident_stack.pop().unwrap();
3766
3767                        let chain_ident =
3768                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3769
3770                        match builders_or_callback {
3771                            BuildersOrCallback::Builders(graph_builders) => {
3772                                let builder = graph_builders.get_dfir_mut(&out_location);
3773                                builder.add_dfir(
3774                                    parse_quote! {
3775                                        #chain_ident = chain_first_n(1);
3776                                        #first_ident -> [0]#chain_ident;
3777                                        #second_ident -> [1]#chain_ident;
3778                                    },
3779                                    None,
3780                                    Some(&next_stmt_id.to_string()),
3781                                );
3782                            }
3783                            BuildersOrCallback::Callback(_, node_callback) => {
3784                                node_callback(node, next_stmt_id);
3785                            }
3786                        }
3787
3788                        let _ = next_stmt_id.get_and_increment();
3789
3790                        ident_stack.push(chain_ident);
3791                    }
3792
3793                    HydroNode::CrossSingleton { right, .. } => {
3794                        let right_ident = ident_stack.pop().unwrap();
3795                        let left_ident = ident_stack.pop().unwrap();
3796
3797                        let cross_ident =
3798                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3799
3800                        match builders_or_callback {
3801                            BuildersOrCallback::Builders(graph_builders) => {
3802                                let builder = graph_builders.get_dfir_mut(&out_location);
3803
3804                                if right.metadata().location_id.is_top_level()
3805                                    && right.metadata().collection_kind.is_bounded()
3806                                {
3807                                    builder.add_dfir(
3808                                        parse_quote! {
3809                                            #cross_ident = cross_singleton::<'static>();
3810                                            #left_ident -> [input]#cross_ident;
3811                                            #right_ident -> [single]#cross_ident;
3812                                        },
3813                                        None,
3814                                        Some(&next_stmt_id.to_string()),
3815                                    );
3816                                } else {
3817                                    builder.add_dfir(
3818                                        parse_quote! {
3819                                            #cross_ident = cross_singleton();
3820                                            #left_ident -> [input]#cross_ident;
3821                                            #right_ident -> [single]#cross_ident;
3822                                        },
3823                                        None,
3824                                        Some(&next_stmt_id.to_string()),
3825                                    );
3826                                }
3827                            }
3828                            BuildersOrCallback::Callback(_, node_callback) => {
3829                                node_callback(node, next_stmt_id);
3830                            }
3831                        }
3832
3833                        let _ = next_stmt_id.get_and_increment();
3834
3835                        ident_stack.push(cross_ident);
3836                    }
3837
3838                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3839                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3840                            parse_quote!(cross_join_multiset)
3841                        } else {
3842                            parse_quote!(join_multiset)
3843                        };
3844
3845                        let (HydroNode::CrossProduct { left, right, .. }
3846                        | HydroNode::Join { left, right, .. }) = node
3847                        else {
3848                            unreachable!()
3849                        };
3850
3851                        let is_top_level = left.metadata().location_id.is_top_level()
3852                            && right.metadata().location_id.is_top_level();
3853                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3854                            quote!('static)
3855                        } else {
3856                            quote!('tick)
3857                        };
3858
3859                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3860                            quote!('static)
3861                        } else {
3862                            quote!('tick)
3863                        };
3864
3865                        let right_ident = ident_stack.pop().unwrap();
3866                        let left_ident = ident_stack.pop().unwrap();
3867
3868                        let stream_ident =
3869                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3870
3871                        match builders_or_callback {
3872                            BuildersOrCallback::Builders(graph_builders) => {
3873                                let builder = graph_builders.get_dfir_mut(&out_location);
3874                                builder.add_dfir(
3875                                    if is_top_level {
3876                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
3877                                        // a multiset_delta() to negate the replay behavior
3878                                        parse_quote! {
3879                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3880                                            #left_ident -> [0]#stream_ident;
3881                                            #right_ident -> [1]#stream_ident;
3882                                        }
3883                                    } else {
3884                                        parse_quote! {
3885                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3886                                            #left_ident -> [0]#stream_ident;
3887                                            #right_ident -> [1]#stream_ident;
3888                                        }
3889                                    }
3890                                    ,
3891                                    None,
3892                                    Some(&next_stmt_id.to_string()),
3893                                );
3894                            }
3895                            BuildersOrCallback::Callback(_, node_callback) => {
3896                                node_callback(node, next_stmt_id);
3897                            }
3898                        }
3899
3900                        let _ = next_stmt_id.get_and_increment();
3901
3902                        ident_stack.push(stream_ident);
3903                    }
3904
3905                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3906                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3907                            parse_quote!(difference)
3908                        } else {
3909                            parse_quote!(anti_join)
3910                        };
3911
3912                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3913                            node
3914                        else {
3915                            unreachable!()
3916                        };
3917
3918                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3919                            quote!('static)
3920                        } else {
3921                            quote!('tick)
3922                        };
3923
3924                        let neg_ident = ident_stack.pop().unwrap();
3925                        let pos_ident = ident_stack.pop().unwrap();
3926
3927                        let stream_ident =
3928                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3929
3930                        match builders_or_callback {
3931                            BuildersOrCallback::Builders(graph_builders) => {
3932                                let builder = graph_builders.get_dfir_mut(&out_location);
3933                                builder.add_dfir(
3934                                    parse_quote! {
3935                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
3936                                        #pos_ident -> [pos]#stream_ident;
3937                                        #neg_ident -> [neg]#stream_ident;
3938                                    },
3939                                    None,
3940                                    Some(&next_stmt_id.to_string()),
3941                                );
3942                            }
3943                            BuildersOrCallback::Callback(_, node_callback) => {
3944                                node_callback(node, next_stmt_id);
3945                            }
3946                        }
3947
3948                        let _ = next_stmt_id.get_and_increment();
3949
3950                        ident_stack.push(stream_ident);
3951                    }
3952
3953                    HydroNode::JoinHalf { .. } => {
3954                        let HydroNode::JoinHalf { right, .. } = node else {
3955                            unreachable!()
3956                        };
3957
3958                        assert!(
3959                            right.metadata().collection_kind.is_bounded(),
3960                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3961                            right.metadata().collection_kind
3962                        );
3963
3964                        let build_lifetime = if right.metadata().location_id.is_top_level() {
3965                            quote!('static)
3966                        } else {
3967                            quote!('tick)
3968                        };
3969
3970                        let build_ident = ident_stack.pop().unwrap();
3971                        let probe_ident = ident_stack.pop().unwrap();
3972
3973                        let stream_ident =
3974                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3975
3976                        match builders_or_callback {
3977                            BuildersOrCallback::Builders(graph_builders) => {
3978                                let builder = graph_builders.get_dfir_mut(&out_location);
3979                                builder.add_dfir(
3980                                    parse_quote! {
3981                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3982                                        #probe_ident -> [probe]#stream_ident;
3983                                        #build_ident -> [build]#stream_ident;
3984                                    },
3985                                    None,
3986                                    Some(&next_stmt_id.to_string()),
3987                                );
3988                            }
3989                            BuildersOrCallback::Callback(_, node_callback) => {
3990                                node_callback(node, next_stmt_id);
3991                            }
3992                        }
3993
3994                        let _ = next_stmt_id.get_and_increment();
3995
3996                        ident_stack.push(stream_ident);
3997                    }
3998
3999                    HydroNode::ResolveFutures { .. } => {
4000                        let input_ident = ident_stack.pop().unwrap();
4001
4002                        let futures_ident =
4003                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4004
4005                        match builders_or_callback {
4006                            BuildersOrCallback::Builders(graph_builders) => {
4007                                let builder = graph_builders.get_dfir_mut(&out_location);
4008                                builder.add_dfir(
4009                                    parse_quote! {
4010                                        #futures_ident = #input_ident -> resolve_futures();
4011                                    },
4012                                    None,
4013                                    Some(&next_stmt_id.to_string()),
4014                                );
4015                            }
4016                            BuildersOrCallback::Callback(_, node_callback) => {
4017                                node_callback(node, next_stmt_id);
4018                            }
4019                        }
4020
4021                        let _ = next_stmt_id.get_and_increment();
4022
4023                        ident_stack.push(futures_ident);
4024                    }
4025
4026                    HydroNode::ResolveFuturesBlocking { .. } => {
4027                        let input_ident = ident_stack.pop().unwrap();
4028
4029                        let futures_ident =
4030                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4031
4032                        match builders_or_callback {
4033                            BuildersOrCallback::Builders(graph_builders) => {
4034                                let builder = graph_builders.get_dfir_mut(&out_location);
4035                                builder.add_dfir(
4036                                    parse_quote! {
4037                                        #futures_ident = #input_ident -> resolve_futures_blocking();
4038                                    },
4039                                    None,
4040                                    Some(&next_stmt_id.to_string()),
4041                                );
4042                            }
4043                            BuildersOrCallback::Callback(_, node_callback) => {
4044                                node_callback(node, next_stmt_id);
4045                            }
4046                        }
4047
4048                        let _ = next_stmt_id.get_and_increment();
4049
4050                        ident_stack.push(futures_ident);
4051                    }
4052
4053                    HydroNode::ResolveFuturesOrdered { .. } => {
4054                        let input_ident = ident_stack.pop().unwrap();
4055
4056                        let futures_ident =
4057                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4058
4059                        match builders_or_callback {
4060                            BuildersOrCallback::Builders(graph_builders) => {
4061                                let builder = graph_builders.get_dfir_mut(&out_location);
4062                                builder.add_dfir(
4063                                    parse_quote! {
4064                                        #futures_ident = #input_ident -> resolve_futures_ordered();
4065                                    },
4066                                    None,
4067                                    Some(&next_stmt_id.to_string()),
4068                                );
4069                            }
4070                            BuildersOrCallback::Callback(_, node_callback) => {
4071                                node_callback(node, next_stmt_id);
4072                            }
4073                        }
4074
4075                        let _ = next_stmt_id.get_and_increment();
4076
4077                        ident_stack.push(futures_ident);
4078                    }
4079
4080                    HydroNode::Map { f, .. } => {
4081                        // Pop input ident (pushed last by transform_children).
4082                        let input_ident = ident_stack.pop().unwrap();
4083                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4084
4085                        let map_ident =
4086                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4087
4088                        match builders_or_callback {
4089                            BuildersOrCallback::Builders(graph_builders) => {
4090                                let builder = graph_builders.get_dfir_mut(&out_location);
4091                                builder.add_dfir(
4092                                    parse_quote! {
4093                                        #map_ident = #input_ident -> map(#f_tokens);
4094                                    },
4095                                    None,
4096                                    Some(&next_stmt_id.to_string()),
4097                                );
4098                            }
4099                            BuildersOrCallback::Callback(_, node_callback) => {
4100                                node_callback(node, next_stmt_id);
4101                            }
4102                        }
4103
4104                        let _ = next_stmt_id.get_and_increment();
4105
4106                        ident_stack.push(map_ident);
4107                    }
4108
4109                    HydroNode::FlatMap { f, .. } => {
4110                        let input_ident = ident_stack.pop().unwrap();
4111                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4112
4113                        let flat_map_ident =
4114                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4115
4116                        match builders_or_callback {
4117                            BuildersOrCallback::Builders(graph_builders) => {
4118                                let builder = graph_builders.get_dfir_mut(&out_location);
4119                                builder.add_dfir(
4120                                    parse_quote! {
4121                                        #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4122                                    },
4123                                    None,
4124                                    Some(&next_stmt_id.to_string()),
4125                                );
4126                            }
4127                            BuildersOrCallback::Callback(_, node_callback) => {
4128                                node_callback(node, next_stmt_id);
4129                            }
4130                        }
4131
4132                        let _ = next_stmt_id.get_and_increment();
4133
4134                        ident_stack.push(flat_map_ident);
4135                    }
4136
4137                    HydroNode::FlatMapStreamBlocking { f, .. } => {
4138                        let input_ident = ident_stack.pop().unwrap();
4139                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4140
4141                        let flat_map_stream_blocking_ident =
4142                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4143
4144                        match builders_or_callback {
4145                            BuildersOrCallback::Builders(graph_builders) => {
4146                                let builder = graph_builders.get_dfir_mut(&out_location);
4147                                builder.add_dfir(
4148                                    parse_quote! {
4149                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4150                                    },
4151                                    None,
4152                                    Some(&next_stmt_id.to_string()),
4153                                );
4154                            }
4155                            BuildersOrCallback::Callback(_, node_callback) => {
4156                                node_callback(node, next_stmt_id);
4157                            }
4158                        }
4159
4160                        let _ = next_stmt_id.get_and_increment();
4161
4162                        ident_stack.push(flat_map_stream_blocking_ident);
4163                    }
4164
4165                    HydroNode::Filter { f, .. } => {
4166                        let input_ident = ident_stack.pop().unwrap();
4167                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4168
4169                        let filter_ident =
4170                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4171
4172                        match builders_or_callback {
4173                            BuildersOrCallback::Builders(graph_builders) => {
4174                                let builder = graph_builders.get_dfir_mut(&out_location);
4175                                builder.add_dfir(
4176                                    parse_quote! {
4177                                        #filter_ident = #input_ident -> filter(#f_tokens);
4178                                    },
4179                                    None,
4180                                    Some(&next_stmt_id.to_string()),
4181                                );
4182                            }
4183                            BuildersOrCallback::Callback(_, node_callback) => {
4184                                node_callback(node, next_stmt_id);
4185                            }
4186                        }
4187
4188                        let _ = next_stmt_id.get_and_increment();
4189
4190                        ident_stack.push(filter_ident);
4191                    }
4192
4193                    HydroNode::FilterMap { f, .. } => {
4194                        let input_ident = ident_stack.pop().unwrap();
4195                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4196
4197                        let filter_map_ident =
4198                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4199
4200                        match builders_or_callback {
4201                            BuildersOrCallback::Builders(graph_builders) => {
4202                                let builder = graph_builders.get_dfir_mut(&out_location);
4203                                builder.add_dfir(
4204                                    parse_quote! {
4205                                        #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4206                                    },
4207                                    None,
4208                                    Some(&next_stmt_id.to_string()),
4209                                );
4210                            }
4211                            BuildersOrCallback::Callback(_, node_callback) => {
4212                                node_callback(node, next_stmt_id);
4213                            }
4214                        }
4215
4216                        let _ = next_stmt_id.get_and_increment();
4217
4218                        ident_stack.push(filter_map_ident);
4219                    }
4220
4221                    HydroNode::Sort { .. } => {
4222                        let input_ident = ident_stack.pop().unwrap();
4223
4224                        let sort_ident =
4225                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4226
4227                        match builders_or_callback {
4228                            BuildersOrCallback::Builders(graph_builders) => {
4229                                let builder = graph_builders.get_dfir_mut(&out_location);
4230                                builder.add_dfir(
4231                                    parse_quote! {
4232                                        #sort_ident = #input_ident -> sort();
4233                                    },
4234                                    None,
4235                                    Some(&next_stmt_id.to_string()),
4236                                );
4237                            }
4238                            BuildersOrCallback::Callback(_, node_callback) => {
4239                                node_callback(node, next_stmt_id);
4240                            }
4241                        }
4242
4243                        let _ = next_stmt_id.get_and_increment();
4244
4245                        ident_stack.push(sort_ident);
4246                    }
4247
4248                    HydroNode::DeferTick { .. } => {
4249                        let input_ident = ident_stack.pop().unwrap();
4250
4251                        let defer_tick_ident =
4252                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4253
4254                        match builders_or_callback {
4255                            BuildersOrCallback::Builders(graph_builders) => {
4256                                let builder = graph_builders.get_dfir_mut(&out_location);
4257                                builder.add_dfir(
4258                                    parse_quote! {
4259                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
4260                                    },
4261                                    None,
4262                                    Some(&next_stmt_id.to_string()),
4263                                );
4264                            }
4265                            BuildersOrCallback::Callback(_, node_callback) => {
4266                                node_callback(node, next_stmt_id);
4267                            }
4268                        }
4269
4270                        let _ = next_stmt_id.get_and_increment();
4271
4272                        ident_stack.push(defer_tick_ident);
4273                    }
4274
4275                    HydroNode::Enumerate { input, .. } => {
4276                        let input_ident = ident_stack.pop().unwrap();
4277
4278                        let enumerate_ident =
4279                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4280
4281                        match builders_or_callback {
4282                            BuildersOrCallback::Builders(graph_builders) => {
4283                                let builder = graph_builders.get_dfir_mut(&out_location);
4284                                let lifetime = if input.metadata().location_id.is_top_level() {
4285                                    quote!('static)
4286                                } else {
4287                                    quote!('tick)
4288                                };
4289                                builder.add_dfir(
4290                                    parse_quote! {
4291                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4292                                    },
4293                                    None,
4294                                    Some(&next_stmt_id.to_string()),
4295                                );
4296                            }
4297                            BuildersOrCallback::Callback(_, node_callback) => {
4298                                node_callback(node, next_stmt_id);
4299                            }
4300                        }
4301
4302                        let _ = next_stmt_id.get_and_increment();
4303
4304                        ident_stack.push(enumerate_ident);
4305                    }
4306
4307                    HydroNode::Inspect { f, .. } => {
4308                        let input_ident = ident_stack.pop().unwrap();
4309                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4310
4311                        let inspect_ident =
4312                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4313
4314                        match builders_or_callback {
4315                            BuildersOrCallback::Builders(graph_builders) => {
4316                                let builder = graph_builders.get_dfir_mut(&out_location);
4317                                builder.add_dfir(
4318                                    parse_quote! {
4319                                        #inspect_ident = #input_ident -> inspect(#f_tokens);
4320                                    },
4321                                    None,
4322                                    Some(&next_stmt_id.to_string()),
4323                                );
4324                            }
4325                            BuildersOrCallback::Callback(_, node_callback) => {
4326                                node_callback(node, next_stmt_id);
4327                            }
4328                        }
4329
4330                        let _ = next_stmt_id.get_and_increment();
4331
4332                        ident_stack.push(inspect_ident);
4333                    }
4334
4335                    HydroNode::Unique { input, .. } => {
4336                        let input_ident = ident_stack.pop().unwrap();
4337
4338                        let unique_ident =
4339                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4340
4341                        match builders_or_callback {
4342                            BuildersOrCallback::Builders(graph_builders) => {
4343                                let builder = graph_builders.get_dfir_mut(&out_location);
4344                                let lifetime = if input.metadata().location_id.is_top_level() {
4345                                    quote!('static)
4346                                } else {
4347                                    quote!('tick)
4348                                };
4349
4350                                builder.add_dfir(
4351                                    parse_quote! {
4352                                        #unique_ident = #input_ident -> unique::<#lifetime>();
4353                                    },
4354                                    None,
4355                                    Some(&next_stmt_id.to_string()),
4356                                );
4357                            }
4358                            BuildersOrCallback::Callback(_, node_callback) => {
4359                                node_callback(node, next_stmt_id);
4360                            }
4361                        }
4362
4363                        let _ = next_stmt_id.get_and_increment();
4364
4365                        ident_stack.push(unique_ident);
4366                    }
4367
4368                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4369                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4370                            if input.metadata().location_id.is_top_level()
4371                                && input.metadata().collection_kind.is_bounded()
4372                            {
4373                                parse_quote!(fold_no_replay)
4374                            } else {
4375                                parse_quote!(fold)
4376                            }
4377                        } else if matches!(node, HydroNode::Scan { .. }) {
4378                            parse_quote!(scan)
4379                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4380                            parse_quote!(scan_async_blocking)
4381                        } else if let HydroNode::FoldKeyed { input, .. } = node {
4382                            if input.metadata().location_id.is_top_level()
4383                                && input.metadata().collection_kind.is_bounded()
4384                            {
4385                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
4386                            } else {
4387                                parse_quote!(fold_keyed)
4388                            }
4389                        } else {
4390                            unreachable!()
4391                        };
4392
4393                        let (HydroNode::Fold { input, .. }
4394                        | HydroNode::FoldKeyed { input, .. }
4395                        | HydroNode::Scan { input, .. }
4396                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
4397                        else {
4398                            unreachable!()
4399                        };
4400
4401                        let lifetime = if input.metadata().location_id.is_top_level() {
4402                            quote!('static)
4403                        } else {
4404                            quote!('tick)
4405                        };
4406
4407                        let input_ident = ident_stack.pop().unwrap();
4408
4409                        let (HydroNode::Fold { init, acc, .. }
4410                        | HydroNode::FoldKeyed { init, acc, .. }
4411                        | HydroNode::Scan { init, acc, .. }
4412                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4413                        else {
4414                            unreachable!()
4415                        };
4416
4417                        let acc_tokens = acc.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4418                        let init_tokens = init.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4419
4420                        let fold_ident =
4421                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4422
4423                        match builders_or_callback {
4424                            BuildersOrCallback::Builders(graph_builders) => {
4425                                if matches!(node, HydroNode::Fold { .. })
4426                                    && node.metadata().location_id.is_top_level()
4427                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4428                                    && graph_builders.singleton_intermediates()
4429                                    && !node.metadata().collection_kind.is_bounded()
4430                                {
4431                                    let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4432                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4433                                        &input.metadata().location_id,
4434                                        &input_ident,
4435                                        &input.metadata().collection_kind,
4436                                        &node.metadata().op,
4437                                    );
4438
4439                                    let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4440                                        let acc: syn::Expr = parse_quote!({
4441                                            let mut __inner = #acc_tokens;
4442                                            move |__state, __batch: Vec<_>| {
4443                                                if __batch.is_empty() {
4444                                                    return None;
4445                                                }
4446                                                for __value in __batch {
4447                                                    __inner(__state, __value);
4448                                                }
4449                                                Some(__state.clone())
4450                                            }
4451                                        });
4452                                        (hooked, acc)
4453                                    } else {
4454                                        let acc: syn::Expr = parse_quote!({
4455                                            let mut __inner = #acc_tokens;
4456                                            move |__state, __value| {
4457                                                __inner(__state, __value);
4458                                                Some(__state.clone())
4459                                            }
4460                                        });
4461                                        (&input_ident, acc)
4462                                    };
4463
4464                                    let builder = graph_builders.get_dfir_mut(&out_location);
4465                                    builder.add_dfir(
4466                                        parse_quote! {
4467                                            source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4468                                            #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4469                                            #fold_ident = chain();
4470                                        },
4471                                        None,
4472                                        Some(&next_stmt_id.to_string()),
4473                                    );
4474
4475                                    if hooked_input_ident.is_some() {
4476                                        fold_hooked_idents.insert(fold_ident.to_string());
4477                                    }
4478                                } else if matches!(node, HydroNode::FoldKeyed { .. })
4479                                    && node.metadata().location_id.is_top_level()
4480                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4481                                    && graph_builders.singleton_intermediates()
4482                                    && !node.metadata().collection_kind.is_bounded()
4483                                {
4484                                    let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4485                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4486                                        &input.metadata().location_id,
4487                                        &input_ident,
4488                                        &input.metadata().collection_kind,
4489                                        &node.metadata().op,
4490                                    );
4491                                    let builder = graph_builders.get_dfir_mut(&out_location);
4492
4493                                    let wrapped_acc: syn::Expr = parse_quote!({
4494                                        let mut __init = #init_tokens;
4495                                        let mut __inner = #acc_tokens;
4496                                        move |__state, __kv: (_, _)| {
4497                                            // TODO(shadaj): we can avoid the clone when the entry exists
4498                                            let __state = __state
4499                                                .entry(::std::clone::Clone::clone(&__kv.0))
4500                                                .or_insert_with(|| (__init)());
4501                                            __inner(__state, __kv.1);
4502                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4503                                        }
4504                                    });
4505
4506                                    if let Some(hooked_input_ident) = hooked_input_ident {
4507                                        builder.add_dfir(
4508                                            parse_quote! {
4509                                                #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4510                                            },
4511                                            None,
4512                                            Some(&next_stmt_id.to_string()),
4513                                        );
4514
4515                                        fold_hooked_idents.insert(fold_ident.to_string());
4516                                    } else {
4517                                        builder.add_dfir(
4518                                            parse_quote! {
4519                                                #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4520                                            },
4521                                            None,
4522                                            Some(&next_stmt_id.to_string()),
4523                                        );
4524                                    }
4525                                } else if (matches!(node, HydroNode::Fold { .. })
4526                                    || matches!(node, HydroNode::FoldKeyed { .. }))
4527                                    && !node.metadata().location_id.is_top_level()
4528                                    && graph_builders.singleton_intermediates()
4529                                {
4530                                    let input_ref = match &*node {
4531                                        HydroNode::Fold { input, .. } => input,
4532                                        HydroNode::FoldKeyed { input, .. } => input,
4533                                        _ => unreachable!(),
4534                                    };
4535                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4536                                        &input_ref.metadata().location_id,
4537                                        &input_ident,
4538                                        &input_ref.metadata().collection_kind,
4539                                        &node.metadata().op,
4540                                    );
4541
4542                                    let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4543                                    let builder = graph_builders.get_dfir_mut(&out_location);
4544                                    builder.add_dfir(
4545                                        parse_quote! {
4546                                            #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4547                                        },
4548                                        None,
4549                                        Some(&next_stmt_id.to_string()),
4550                                    );
4551                                } else {
4552                                    let builder = graph_builders.get_dfir_mut(&out_location);
4553                                    builder.add_dfir(
4554                                        parse_quote! {
4555                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4556                                        },
4557                                        None,
4558                                        Some(&next_stmt_id.to_string()),
4559                                    );
4560                                }
4561                            }
4562                            BuildersOrCallback::Callback(_, node_callback) => {
4563                                node_callback(node, next_stmt_id);
4564                            }
4565                        }
4566
4567                        let _ = next_stmt_id.get_and_increment();
4568
4569                        ident_stack.push(fold_ident);
4570                    }
4571
4572                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4573                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4574                            if input.metadata().location_id.is_top_level()
4575                                && input.metadata().collection_kind.is_bounded()
4576                            {
4577                                parse_quote!(reduce_no_replay)
4578                            } else {
4579                                parse_quote!(reduce)
4580                            }
4581                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
4582                            if input.metadata().location_id.is_top_level()
4583                                && input.metadata().collection_kind.is_bounded()
4584                            {
4585                                todo!(
4586                                    "Calling keyed reduce on a top-level bounded collection is not supported"
4587                                )
4588                            } else {
4589                                parse_quote!(reduce_keyed)
4590                            }
4591                        } else {
4592                            unreachable!()
4593                        };
4594
4595                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4596                        else {
4597                            unreachable!()
4598                        };
4599
4600                        let lifetime = if input.metadata().location_id.is_top_level() {
4601                            quote!('static)
4602                        } else {
4603                            quote!('tick)
4604                        };
4605
4606                        let input_ident = ident_stack.pop().unwrap();
4607
4608                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4609                        else {
4610                            unreachable!()
4611                        };
4612
4613                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4614
4615                        let reduce_ident =
4616                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4617
4618                        match builders_or_callback {
4619                            BuildersOrCallback::Builders(graph_builders) => {
4620                                if matches!(node, HydroNode::Reduce { .. })
4621                                    && node.metadata().location_id.is_top_level()
4622                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4623                                    && graph_builders.singleton_intermediates()
4624                                    && !node.metadata().collection_kind.is_bounded()
4625                                {
4626                                    todo!(
4627                                        "Reduce with optional intermediates is not yet supported in simulator"
4628                                    );
4629                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4630                                    && node.metadata().location_id.is_top_level()
4631                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4632                                    && graph_builders.singleton_intermediates()
4633                                    && !node.metadata().collection_kind.is_bounded()
4634                                {
4635                                    todo!(
4636                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4637                                    );
4638                                } else {
4639                                    let builder = graph_builders.get_dfir_mut(&out_location);
4640                                    builder.add_dfir(
4641                                        parse_quote! {
4642                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4643                                        },
4644                                        None,
4645                                        Some(&next_stmt_id.to_string()),
4646                                    );
4647                                }
4648                            }
4649                            BuildersOrCallback::Callback(_, node_callback) => {
4650                                node_callback(node, next_stmt_id);
4651                            }
4652                        }
4653
4654                        let _ = next_stmt_id.get_and_increment();
4655
4656                        ident_stack.push(reduce_ident);
4657                    }
4658
4659                    HydroNode::ReduceKeyedWatermark {
4660                        f,
4661                        input,
4662                        metadata,
4663                        ..
4664                    } => {
4665                        let lifetime = if input.metadata().location_id.is_top_level() {
4666                            quote!('static)
4667                        } else {
4668                            quote!('tick)
4669                        };
4670
4671                        // watermark is processed second, so it's on top
4672                        let watermark_ident = ident_stack.pop().unwrap();
4673                        let input_ident = ident_stack.pop().unwrap();
4674                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4675
4676                        let chain_ident = syn::Ident::new(
4677                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4678                            Span::call_site(),
4679                        );
4680
4681                        let fold_ident =
4682                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4683
4684                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4685                            && input.metadata().collection_kind.is_bounded()
4686                        {
4687                            parse_quote!(fold_no_replay)
4688                        } else {
4689                            parse_quote!(fold)
4690                        };
4691
4692                        match builders_or_callback {
4693                            BuildersOrCallback::Builders(graph_builders) => {
4694                                if metadata.location_id.is_top_level()
4695                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4696                                    && graph_builders.singleton_intermediates()
4697                                    && !metadata.collection_kind.is_bounded()
4698                                {
4699                                    todo!(
4700                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4701                                    )
4702                                } else {
4703                                    let builder = graph_builders.get_dfir_mut(&out_location);
4704                                    builder.add_dfir(
4705                                        parse_quote! {
4706                                            #chain_ident = chain();
4707                                            #input_ident
4708                                                -> map(|x| (Some(x), None))
4709                                                -> [0]#chain_ident;
4710                                            #watermark_ident
4711                                                -> map(|watermark| (None, Some(watermark)))
4712                                                -> [1]#chain_ident;
4713
4714                                            #fold_ident = #chain_ident
4715                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4716                                                    let __reduce_keyed_fn = #f_tokens;
4717                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4718                                                        if let Some((k, v)) = opt_payload {
4719                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4720                                                                if k < curr_watermark {
4721                                                                    return;
4722                                                                }
4723                                                            }
4724                                                            match map.entry(k) {
4725                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4726                                                                    e.insert(v);
4727                                                                }
4728                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4729                                                                    __reduce_keyed_fn(e.get_mut(), v);
4730                                                                }
4731                                                            }
4732                                                        } else {
4733                                                            let watermark = opt_watermark.unwrap();
4734                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4735                                                                if watermark <= curr_watermark {
4736                                                                    return;
4737                                                                }
4738                                                            }
4739                                                            map.retain(|k, _| *k >= watermark);
4740                                                            *opt_curr_watermark = Some(watermark);
4741                                                        }
4742                                                    }
4743                                                })
4744                                                -> flat_map(|(map, _curr_watermark)| map);
4745                                        },
4746                                        None,
4747                                        Some(&next_stmt_id.to_string()),
4748                                    );
4749                                }
4750                            }
4751                            BuildersOrCallback::Callback(_, node_callback) => {
4752                                node_callback(node, next_stmt_id);
4753                            }
4754                        }
4755
4756                        let _ = next_stmt_id.get_and_increment();
4757
4758                        ident_stack.push(fold_ident);
4759                    }
4760
4761                    HydroNode::Network {
4762                        networking_info,
4763                        serialize_fn: serialize_pipeline,
4764                        instantiate_fn,
4765                        deserialize_fn: deserialize_pipeline,
4766                        input,
4767                        ..
4768                    } => {
4769                        let input_ident = ident_stack.pop().unwrap();
4770
4771                        let receiver_stream_ident =
4772                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4773
4774                        match builders_or_callback {
4775                            BuildersOrCallback::Builders(graph_builders) => {
4776                                let (sink_expr, source_expr) = match instantiate_fn {
4777                                    DebugInstantiate::Building => (
4778                                        syn::parse_quote!(DUMMY_SINK),
4779                                        syn::parse_quote!(DUMMY_SOURCE),
4780                                    ),
4781
4782                                    DebugInstantiate::Finalized(finalized) => {
4783                                        (finalized.sink.clone(), finalized.source.clone())
4784                                    }
4785                                };
4786
4787                                graph_builders.create_network(
4788                                    &input.metadata().location_id,
4789                                    &out_location,
4790                                    input_ident,
4791                                    &receiver_stream_ident,
4792                                    serialize_pipeline.as_ref(),
4793                                    sink_expr,
4794                                    source_expr,
4795                                    deserialize_pipeline.as_ref(),
4796                                    *next_stmt_id,
4797                                    networking_info,
4798                                );
4799                            }
4800                            BuildersOrCallback::Callback(_, node_callback) => {
4801                                node_callback(node, next_stmt_id);
4802                            }
4803                        }
4804
4805                        let _ = next_stmt_id.get_and_increment();
4806
4807                        ident_stack.push(receiver_stream_ident);
4808                    }
4809
4810                    HydroNode::ExternalInput {
4811                        instantiate_fn,
4812                        deserialize_fn: deserialize_pipeline,
4813                        ..
4814                    } => {
4815                        let receiver_stream_ident =
4816                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4817
4818                        match builders_or_callback {
4819                            BuildersOrCallback::Builders(graph_builders) => {
4820                                let (_, source_expr) = match instantiate_fn {
4821                                    DebugInstantiate::Building => (
4822                                        syn::parse_quote!(DUMMY_SINK),
4823                                        syn::parse_quote!(DUMMY_SOURCE),
4824                                    ),
4825
4826                                    DebugInstantiate::Finalized(finalized) => {
4827                                        (finalized.sink.clone(), finalized.source.clone())
4828                                    }
4829                                };
4830
4831                                graph_builders.create_external_source(
4832                                    &out_location,
4833                                    source_expr,
4834                                    &receiver_stream_ident,
4835                                    deserialize_pipeline.as_ref(),
4836                                    *next_stmt_id,
4837                                );
4838                            }
4839                            BuildersOrCallback::Callback(_, node_callback) => {
4840                                node_callback(node, next_stmt_id);
4841                            }
4842                        }
4843
4844                        let _ = next_stmt_id.get_and_increment();
4845
4846                        ident_stack.push(receiver_stream_ident);
4847                    }
4848
4849                    HydroNode::Counter {
4850                        tag,
4851                        duration,
4852                        prefix,
4853                        ..
4854                    } => {
4855                        let input_ident = ident_stack.pop().unwrap();
4856
4857                        let counter_ident =
4858                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4859
4860                        match builders_or_callback {
4861                            BuildersOrCallback::Builders(graph_builders) => {
4862                                let arg = format!("{}({})", prefix, tag);
4863                                let builder = graph_builders.get_dfir_mut(&out_location);
4864                                builder.add_dfir(
4865                                    parse_quote! {
4866                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
4867                                    },
4868                                    None,
4869                                    Some(&next_stmt_id.to_string()),
4870                                );
4871                            }
4872                            BuildersOrCallback::Callback(_, node_callback) => {
4873                                node_callback(node, next_stmt_id);
4874                            }
4875                        }
4876
4877                        let _ = next_stmt_id.get_and_increment();
4878
4879                        ident_stack.push(counter_ident);
4880                    }
4881                }
4882            },
4883            seen_tees,
4884            false,
4885        );
4886
4887        let ret = ident_stack
4888            .pop()
4889            .expect("ident_stack should have exactly one element after traversal");
4890        assert!(
4891            ident_stack.is_empty(),
4892            "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
4893             This indicates a bug in the code gen: some node pushed idents that were never consumed.",
4894            ident_stack.len()
4895        );
4896        ret
4897    }
4898
4899    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4900        match self {
4901            HydroNode::Placeholder => {
4902                panic!()
4903            }
4904            HydroNode::Cast { .. }
4905            | HydroNode::ObserveNonDet { .. }
4906            | HydroNode::UnboundSingleton { .. }
4907            | HydroNode::AssertIsConsistent { .. } => {}
4908            HydroNode::Source { source, .. } => match source {
4909                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4910                HydroSource::ExternalNetwork()
4911                | HydroSource::Spin()
4912                | HydroSource::ClusterMembers(_, _)
4913                | HydroSource::Embedded(_)
4914                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
4915            },
4916            HydroNode::SingletonSource { value, .. } => {
4917                transform(value);
4918            }
4919            HydroNode::CycleSource { .. }
4920            | HydroNode::Tee { .. }
4921            | HydroNode::Singleton { .. }
4922            | HydroNode::YieldConcat { .. }
4923            | HydroNode::BeginAtomic { .. }
4924            | HydroNode::EndAtomic { .. }
4925            | HydroNode::Batch { .. }
4926            | HydroNode::Chain { .. }
4927            | HydroNode::MergeOrdered { .. }
4928            | HydroNode::ChainFirst { .. }
4929            | HydroNode::CrossProduct { .. }
4930            | HydroNode::CrossSingleton { .. }
4931            | HydroNode::ResolveFutures { .. }
4932            | HydroNode::ResolveFuturesBlocking { .. }
4933            | HydroNode::ResolveFuturesOrdered { .. }
4934            | HydroNode::Join { .. }
4935            | HydroNode::JoinHalf { .. }
4936            | HydroNode::Difference { .. }
4937            | HydroNode::AntiJoin { .. }
4938            | HydroNode::DeferTick { .. }
4939            | HydroNode::Enumerate { .. }
4940            | HydroNode::Unique { .. }
4941            | HydroNode::Sort { .. } => {}
4942            HydroNode::Map { f, .. }
4943            | HydroNode::FlatMap { f, .. }
4944            | HydroNode::FlatMapStreamBlocking { f, .. }
4945            | HydroNode::Filter { f, .. }
4946            | HydroNode::FilterMap { f, .. }
4947            | HydroNode::Inspect { f, .. }
4948            | HydroNode::Partition { f, .. }
4949            | HydroNode::Reduce { f, .. }
4950            | HydroNode::ReduceKeyed { f, .. }
4951            | HydroNode::ReduceKeyedWatermark { f, .. } => {
4952                transform(&mut f.expr);
4953            }
4954            HydroNode::Fold { init, acc, .. }
4955            | HydroNode::Scan { init, acc, .. }
4956            | HydroNode::ScanAsyncBlocking { init, acc, .. }
4957            | HydroNode::FoldKeyed { init, acc, .. } => {
4958                transform(&mut init.expr);
4959                transform(&mut acc.expr);
4960            }
4961            HydroNode::Network {
4962                serialize_fn,
4963                deserialize_fn,
4964                ..
4965            } => {
4966                if let Some(serialize_fn) = serialize_fn {
4967                    transform(serialize_fn);
4968                }
4969                if let Some(deserialize_fn) = deserialize_fn {
4970                    transform(deserialize_fn);
4971                }
4972            }
4973            HydroNode::ExternalInput { deserialize_fn, .. } => {
4974                if let Some(deserialize_fn) = deserialize_fn {
4975                    transform(deserialize_fn);
4976                }
4977            }
4978            HydroNode::Counter { duration, .. } => {
4979                transform(duration);
4980            }
4981        }
4982    }
4983
4984    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4985        &self.metadata().op
4986    }
4987
4988    pub fn metadata(&self) -> &HydroIrMetadata {
4989        match self {
4990            HydroNode::Placeholder => {
4991                panic!()
4992            }
4993            HydroNode::Cast { metadata, .. }
4994            | HydroNode::ObserveNonDet { metadata, .. }
4995            | HydroNode::AssertIsConsistent { metadata, .. }
4996            | HydroNode::UnboundSingleton { metadata, .. }
4997            | HydroNode::Source { metadata, .. }
4998            | HydroNode::SingletonSource { metadata, .. }
4999            | HydroNode::CycleSource { metadata, .. }
5000            | HydroNode::Tee { metadata, .. }
5001            | HydroNode::Singleton { metadata, .. }
5002            | HydroNode::Partition { metadata, .. }
5003            | HydroNode::YieldConcat { metadata, .. }
5004            | HydroNode::BeginAtomic { metadata, .. }
5005            | HydroNode::EndAtomic { metadata, .. }
5006            | HydroNode::Batch { metadata, .. }
5007            | HydroNode::Chain { metadata, .. }
5008            | HydroNode::MergeOrdered { metadata, .. }
5009            | HydroNode::ChainFirst { metadata, .. }
5010            | HydroNode::CrossProduct { metadata, .. }
5011            | HydroNode::CrossSingleton { metadata, .. }
5012            | HydroNode::Join { metadata, .. }
5013            | HydroNode::JoinHalf { metadata, .. }
5014            | HydroNode::Difference { metadata, .. }
5015            | HydroNode::AntiJoin { metadata, .. }
5016            | HydroNode::ResolveFutures { metadata, .. }
5017            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5018            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5019            | HydroNode::Map { metadata, .. }
5020            | HydroNode::FlatMap { metadata, .. }
5021            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5022            | HydroNode::Filter { metadata, .. }
5023            | HydroNode::FilterMap { metadata, .. }
5024            | HydroNode::DeferTick { metadata, .. }
5025            | HydroNode::Enumerate { metadata, .. }
5026            | HydroNode::Inspect { metadata, .. }
5027            | HydroNode::Unique { metadata, .. }
5028            | HydroNode::Sort { metadata, .. }
5029            | HydroNode::Scan { metadata, .. }
5030            | HydroNode::ScanAsyncBlocking { metadata, .. }
5031            | HydroNode::Fold { metadata, .. }
5032            | HydroNode::FoldKeyed { metadata, .. }
5033            | HydroNode::Reduce { metadata, .. }
5034            | HydroNode::ReduceKeyed { metadata, .. }
5035            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5036            | HydroNode::ExternalInput { metadata, .. }
5037            | HydroNode::Network { metadata, .. }
5038            | HydroNode::Counter { metadata, .. } => metadata,
5039        }
5040    }
5041
5042    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5043        &mut self.metadata_mut().op
5044    }
5045
5046    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5047        match self {
5048            HydroNode::Placeholder => {
5049                panic!()
5050            }
5051            HydroNode::Cast { metadata, .. }
5052            | HydroNode::ObserveNonDet { metadata, .. }
5053            | HydroNode::AssertIsConsistent { metadata, .. }
5054            | HydroNode::UnboundSingleton { metadata, .. }
5055            | HydroNode::Source { metadata, .. }
5056            | HydroNode::SingletonSource { metadata, .. }
5057            | HydroNode::CycleSource { metadata, .. }
5058            | HydroNode::Tee { metadata, .. }
5059            | HydroNode::Singleton { metadata, .. }
5060            | HydroNode::Partition { metadata, .. }
5061            | HydroNode::YieldConcat { metadata, .. }
5062            | HydroNode::BeginAtomic { metadata, .. }
5063            | HydroNode::EndAtomic { metadata, .. }
5064            | HydroNode::Batch { metadata, .. }
5065            | HydroNode::Chain { metadata, .. }
5066            | HydroNode::MergeOrdered { metadata, .. }
5067            | HydroNode::ChainFirst { metadata, .. }
5068            | HydroNode::CrossProduct { metadata, .. }
5069            | HydroNode::CrossSingleton { metadata, .. }
5070            | HydroNode::Join { metadata, .. }
5071            | HydroNode::JoinHalf { metadata, .. }
5072            | HydroNode::Difference { metadata, .. }
5073            | HydroNode::AntiJoin { metadata, .. }
5074            | HydroNode::ResolveFutures { metadata, .. }
5075            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5076            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5077            | HydroNode::Map { metadata, .. }
5078            | HydroNode::FlatMap { metadata, .. }
5079            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5080            | HydroNode::Filter { metadata, .. }
5081            | HydroNode::FilterMap { metadata, .. }
5082            | HydroNode::DeferTick { metadata, .. }
5083            | HydroNode::Enumerate { metadata, .. }
5084            | HydroNode::Inspect { metadata, .. }
5085            | HydroNode::Unique { metadata, .. }
5086            | HydroNode::Sort { metadata, .. }
5087            | HydroNode::Scan { metadata, .. }
5088            | HydroNode::ScanAsyncBlocking { metadata, .. }
5089            | HydroNode::Fold { metadata, .. }
5090            | HydroNode::FoldKeyed { metadata, .. }
5091            | HydroNode::Reduce { metadata, .. }
5092            | HydroNode::ReduceKeyed { metadata, .. }
5093            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5094            | HydroNode::ExternalInput { metadata, .. }
5095            | HydroNode::Network { metadata, .. }
5096            | HydroNode::Counter { metadata, .. } => metadata,
5097        }
5098    }
5099
5100    pub fn input(&self) -> Vec<&HydroNode> {
5101        match self {
5102            HydroNode::Placeholder => {
5103                panic!()
5104            }
5105            HydroNode::Source { .. }
5106            | HydroNode::SingletonSource { .. }
5107            | HydroNode::ExternalInput { .. }
5108            | HydroNode::CycleSource { .. }
5109            | HydroNode::Tee { .. }
5110            | HydroNode::Singleton { .. }
5111            | HydroNode::Partition { .. } => {
5112                // Tee/Partition should find their input in separate special ways
5113                vec![]
5114            }
5115            HydroNode::Cast { inner, .. }
5116            | HydroNode::ObserveNonDet { inner, .. }
5117            | HydroNode::YieldConcat { inner, .. }
5118            | HydroNode::BeginAtomic { inner, .. }
5119            | HydroNode::EndAtomic { inner, .. }
5120            | HydroNode::Batch { inner, .. }
5121            | HydroNode::UnboundSingleton { inner, .. }
5122            | HydroNode::AssertIsConsistent { inner, .. } => {
5123                vec![inner]
5124            }
5125            HydroNode::Chain { first, second, .. } => {
5126                vec![first, second]
5127            }
5128            HydroNode::MergeOrdered { first, second, .. } => {
5129                vec![first, second]
5130            }
5131            HydroNode::ChainFirst { first, second, .. } => {
5132                vec![first, second]
5133            }
5134            HydroNode::CrossProduct { left, right, .. }
5135            | HydroNode::CrossSingleton { left, right, .. }
5136            | HydroNode::Join { left, right, .. }
5137            | HydroNode::JoinHalf { left, right, .. } => {
5138                vec![left, right]
5139            }
5140            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5141                vec![pos, neg]
5142            }
5143            HydroNode::Map { input, .. }
5144            | HydroNode::FlatMap { input, .. }
5145            | HydroNode::FlatMapStreamBlocking { input, .. }
5146            | HydroNode::Filter { input, .. }
5147            | HydroNode::FilterMap { input, .. }
5148            | HydroNode::Sort { input, .. }
5149            | HydroNode::DeferTick { input, .. }
5150            | HydroNode::Enumerate { input, .. }
5151            | HydroNode::Inspect { input, .. }
5152            | HydroNode::Unique { input, .. }
5153            | HydroNode::Network { input, .. }
5154            | HydroNode::Counter { input, .. }
5155            | HydroNode::ResolveFutures { input, .. }
5156            | HydroNode::ResolveFuturesBlocking { input, .. }
5157            | HydroNode::ResolveFuturesOrdered { input, .. }
5158            | HydroNode::Fold { input, .. }
5159            | HydroNode::FoldKeyed { input, .. }
5160            | HydroNode::Reduce { input, .. }
5161            | HydroNode::ReduceKeyed { input, .. }
5162            | HydroNode::Scan { input, .. }
5163            | HydroNode::ScanAsyncBlocking { input, .. } => {
5164                vec![input]
5165            }
5166            HydroNode::ReduceKeyedWatermark {
5167                input, watermark, ..
5168            } => {
5169                vec![input, watermark]
5170            }
5171        }
5172    }
5173
5174    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5175        self.input()
5176            .iter()
5177            .map(|input_node| input_node.metadata())
5178            .collect()
5179    }
5180
5181    /// Returns `true` if this node is a Tee or Partition whose inner Rc
5182    /// has other live references, meaning the upstream is already driven
5183    /// by another consumer and does not need a Null sink.
5184    pub fn is_shared_with_others(&self) -> bool {
5185        match self {
5186            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5187                Rc::strong_count(&inner.0) > 1
5188            }
5189            // A zero-output singleton() is valid in DFIR (it drains itself at
5190            // end of tick), so it doesn't need to be driven by another consumer.
5191            HydroNode::Singleton { .. } => false,
5192            _ => false,
5193        }
5194    }
5195
5196    pub fn print_root(&self) -> String {
5197        match self {
5198            HydroNode::Placeholder => {
5199                panic!()
5200            }
5201            HydroNode::Cast { .. } => "Cast()".to_owned(),
5202            HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5203            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5204            HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5205            HydroNode::Source { source, .. } => format!("Source({:?})", source),
5206            HydroNode::SingletonSource {
5207                value,
5208                first_tick_only,
5209                ..
5210            } => format!(
5211                "SingletonSource({:?}, first_tick_only={})",
5212                value, first_tick_only
5213            ),
5214            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5215            HydroNode::Tee { inner, .. } => {
5216                format!("Tee({})", inner.0.borrow().print_root())
5217            }
5218            HydroNode::Singleton { inner, .. } => {
5219                format!("Singleton({})", inner.0.borrow().print_root())
5220            }
5221            HydroNode::Partition { f, is_true, .. } => {
5222                format!("Partition({:?}, is_true={})", f, is_true)
5223            }
5224            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5225            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5226            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5227            HydroNode::Batch { .. } => "Batch()".to_owned(),
5228            HydroNode::Chain { first, second, .. } => {
5229                format!("Chain({}, {})", first.print_root(), second.print_root())
5230            }
5231            HydroNode::MergeOrdered { first, second, .. } => {
5232                format!(
5233                    "MergeOrdered({}, {})",
5234                    first.print_root(),
5235                    second.print_root()
5236                )
5237            }
5238            HydroNode::ChainFirst { first, second, .. } => {
5239                format!(
5240                    "ChainFirst({}, {})",
5241                    first.print_root(),
5242                    second.print_root()
5243                )
5244            }
5245            HydroNode::CrossProduct { left, right, .. } => {
5246                format!(
5247                    "CrossProduct({}, {})",
5248                    left.print_root(),
5249                    right.print_root()
5250                )
5251            }
5252            HydroNode::CrossSingleton { left, right, .. } => {
5253                format!(
5254                    "CrossSingleton({}, {})",
5255                    left.print_root(),
5256                    right.print_root()
5257                )
5258            }
5259            HydroNode::Join { left, right, .. } => {
5260                format!("Join({}, {})", left.print_root(), right.print_root())
5261            }
5262            HydroNode::JoinHalf { left, right, .. } => {
5263                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5264            }
5265            HydroNode::Difference { pos, neg, .. } => {
5266                format!("Difference({}, {})", pos.print_root(), neg.print_root())
5267            }
5268            HydroNode::AntiJoin { pos, neg, .. } => {
5269                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5270            }
5271            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5272            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5273            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5274            HydroNode::Map { f, .. } => format!("Map({:?})", f),
5275            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5276            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5277            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5278            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5279            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5280            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5281            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5282            HydroNode::Unique { .. } => "Unique()".to_owned(),
5283            HydroNode::Sort { .. } => "Sort()".to_owned(),
5284            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5285            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5286            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5287                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5288            }
5289            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5290            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5291            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5292            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5293            HydroNode::Network { .. } => "Network()".to_owned(),
5294            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5295            HydroNode::Counter { tag, duration, .. } => {
5296                format!("Counter({:?}, {:?})", tag, duration)
5297            }
5298        }
5299    }
5300}
5301
5302#[cfg(feature = "build")]
5303fn instantiate_network<'a, D>(
5304    env: &mut D::InstantiateEnv,
5305    from_location: &LocationId,
5306    to_location: &LocationId,
5307    processes: &SparseSecondaryMap<LocationKey, D::Process>,
5308    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5309    name: Option<&str>,
5310    networking_info: &crate::networking::NetworkingInfo,
5311) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5312where
5313    D: Deploy<'a>,
5314{
5315    let ((sink, source), connect_fn) = match (from_location, to_location) {
5316        (&LocationId::Process(from), &LocationId::Process(to)) => {
5317            let from_node = processes
5318                .get(from)
5319                .unwrap_or_else(|| {
5320                    panic!("A process used in the graph was not instantiated: {}", from)
5321                })
5322                .clone();
5323            let to_node = processes
5324                .get(to)
5325                .unwrap_or_else(|| {
5326                    panic!("A process used in the graph was not instantiated: {}", to)
5327                })
5328                .clone();
5329
5330            let sink_port = from_node.next_port();
5331            let source_port = to_node.next_port();
5332
5333            (
5334                D::o2o_sink_source(
5335                    env,
5336                    &from_node,
5337                    &sink_port,
5338                    &to_node,
5339                    &source_port,
5340                    name,
5341                    networking_info,
5342                ),
5343                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5344            )
5345        }
5346        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5347            let from_node = processes
5348                .get(from)
5349                .unwrap_or_else(|| {
5350                    panic!("A process used in the graph was not instantiated: {}", from)
5351                })
5352                .clone();
5353            let to_node = clusters
5354                .get(to)
5355                .unwrap_or_else(|| {
5356                    panic!("A cluster used in the graph was not instantiated: {}", to)
5357                })
5358                .clone();
5359
5360            let sink_port = from_node.next_port();
5361            let source_port = to_node.next_port();
5362
5363            (
5364                D::o2m_sink_source(
5365                    env,
5366                    &from_node,
5367                    &sink_port,
5368                    &to_node,
5369                    &source_port,
5370                    name,
5371                    networking_info,
5372                ),
5373                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5374            )
5375        }
5376        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5377            let from_node = clusters
5378                .get(from)
5379                .unwrap_or_else(|| {
5380                    panic!("A cluster used in the graph was not instantiated: {}", from)
5381                })
5382                .clone();
5383            let to_node = processes
5384                .get(to)
5385                .unwrap_or_else(|| {
5386                    panic!("A process used in the graph was not instantiated: {}", to)
5387                })
5388                .clone();
5389
5390            let sink_port = from_node.next_port();
5391            let source_port = to_node.next_port();
5392
5393            (
5394                D::m2o_sink_source(
5395                    env,
5396                    &from_node,
5397                    &sink_port,
5398                    &to_node,
5399                    &source_port,
5400                    name,
5401                    networking_info,
5402                ),
5403                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5404            )
5405        }
5406        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5407            let from_node = clusters
5408                .get(from)
5409                .unwrap_or_else(|| {
5410                    panic!("A cluster used in the graph was not instantiated: {}", from)
5411                })
5412                .clone();
5413            let to_node = clusters
5414                .get(to)
5415                .unwrap_or_else(|| {
5416                    panic!("A cluster used in the graph was not instantiated: {}", to)
5417                })
5418                .clone();
5419
5420            let sink_port = from_node.next_port();
5421            let source_port = to_node.next_port();
5422
5423            (
5424                D::m2m_sink_source(
5425                    env,
5426                    &from_node,
5427                    &sink_port,
5428                    &to_node,
5429                    &source_port,
5430                    name,
5431                    networking_info,
5432                ),
5433                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5434            )
5435        }
5436        (LocationId::Tick(_, _), _) => panic!(),
5437        (_, LocationId::Tick(_, _)) => panic!(),
5438        (LocationId::Atomic(_), _) => panic!(),
5439        (_, LocationId::Atomic(_)) => panic!(),
5440    };
5441    (sink, source, connect_fn)
5442}
5443
5444#[cfg(test)]
5445mod serde_test;
5446
5447#[cfg(test)]
5448mod test {
5449    use std::mem::size_of;
5450
5451    use stageleft::{QuotedWithContext, q};
5452
5453    use super::*;
5454
5455    #[test]
5456    #[cfg_attr(
5457        not(feature = "build"),
5458        ignore = "expects inclusion of feature-gated fields"
5459    )]
5460    fn hydro_node_size() {
5461        assert_eq!(size_of::<HydroNode>(), 264);
5462    }
5463
5464    #[test]
5465    #[cfg_attr(
5466        not(feature = "build"),
5467        ignore = "expects inclusion of feature-gated fields"
5468    )]
5469    fn hydro_root_size() {
5470        assert_eq!(size_of::<HydroRoot>(), 136);
5471    }
5472
5473    #[test]
5474    fn test_simplify_q_macro_basic() {
5475        // Test basic non-q! expression
5476        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5477        let result = simplify_q_macro(simple_expr.clone());
5478        assert_eq!(result, simple_expr);
5479    }
5480
5481    #[test]
5482    fn test_simplify_q_macro_actual_stageleft_call() {
5483        // Test a simplified version of what a real stageleft call might look like
5484        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5485        let result = simplify_q_macro(stageleft_call);
5486        // This should be processed by our visitor and simplified to q!(...)
5487        // since we detect the stageleft::runtime_support::fn_* pattern
5488        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5489    }
5490
5491    #[test]
5492    fn test_closure_no_pipe_at_start() {
5493        // Test a closure that does not start with a pipe
5494        let stageleft_call = q!({
5495            let foo = 123;
5496            move |b: usize| b + foo
5497        })
5498        .splice_fn1_ctx(&());
5499        let result = simplify_q_macro(stageleft_call);
5500        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5501    }
5502}