Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39/// A closure expression bundled with any singleton references it captures.
40///
41/// When a `q!()` closure captures a `SingletonRef`, the reference is recorded here
42/// alongside the closure's expression. This allows per-closure tracking of singleton
43/// captures, which is important for nodes with multiple closures (e.g. Fold has `init` and `acc`).
44pub struct ClosureExpr {
45    pub(crate) expr: DebugExpr,
46    /// Each entry is `(HydroNode::Reference, is_mut: bool)`.
47    /// The index in the Vec determines the ident name via [`handoff_ref_ident`].
48    /// The `access_counter` was assigned at staging time in code order.
49    pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
50}
51
52impl Clone for ClosureExpr {
53    fn clone(&self) -> Self {
54        Self {
55            expr: self.expr.clone(),
56            singleton_refs: self
57                .singleton_refs
58                .iter()
59                .map(|(node, is_mut)| {
60                    let HydroNode::Reference {
61                        inner,
62                        kind,
63                        access_counter,
64                        metadata,
65                    } = node
66                    else {
67                        panic!("singleton_refs should only contain HydroNode::Reference");
68                    };
69                    (
70                        HydroNode::Reference {
71                            inner: SharedNode(Rc::clone(&inner.0)),
72                            kind: *kind,
73                            access_counter: access_counter.freeze(),
74                            metadata: metadata.clone(),
75                        },
76                        *is_mut,
77                    )
78                })
79                .collect(),
80        }
81    }
82}
83
84impl Hash for ClosureExpr {
85    fn hash<H: Hasher>(&self, state: &mut H) {
86        self.expr.hash(state);
87        // singleton_refs are structural children (like HydroIrMetadata), not
88        // identity-defining. Two closures with the same expr but different
89        // captured refs are the same closure text — the refs only affect codegen.
90    }
91}
92
93impl serde::Serialize for ClosureExpr {
94    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95        use serde::ser::SerializeStruct;
96        let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
97        s.serialize_field("expr", &self.expr)?;
98        s.serialize_field(
99            "singleton_refs",
100            &SerializableSingletonRefs(&self.singleton_refs),
101        )?;
102        s.end()
103    }
104}
105
106struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
107
108impl serde::Serialize for SerializableSingletonRefs<'_> {
109    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
110        use serde::ser::SerializeSeq;
111        let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
112        for (node, is_mut) in self.0.iter() {
113            seq.serialize_element(&(node, is_mut))?;
114        }
115        seq.end()
116    }
117}
118
119impl Debug for ClosureExpr {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        Debug::fmt(&self.expr, f)
122    }
123}
124
125impl Display for ClosureExpr {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        Display::fmt(&self.expr, f)
128    }
129}
130
131impl From<syn::Expr> for ClosureExpr {
132    fn from(expr: syn::Expr) -> Self {
133        Self {
134            expr: DebugExpr(Box::new(expr)),
135            singleton_refs: Vec::new(),
136        }
137    }
138}
139
140impl From<DebugExpr> for ClosureExpr {
141    fn from(expr: DebugExpr) -> Self {
142        Self {
143            expr,
144            singleton_refs: Vec::new(),
145        }
146    }
147}
148
149impl ClosureExpr {
150    pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
151        Self {
152            expr,
153            singleton_refs,
154        }
155    }
156
157    pub fn has_mut_ref(&self) -> bool {
158        self.singleton_refs.iter().any(|(_, is_mut)| *is_mut)
159    }
160
161    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
162        Self {
163            expr: self.expr.clone(),
164            singleton_refs: self
165                .singleton_refs
166                .iter()
167                .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
168                .collect(),
169        }
170    }
171
172    pub fn transform_children(
173        &mut self,
174        transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
175        seen_tees: &mut SeenSharedNodes,
176    ) {
177        for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
178            transform(ref_node, seen_tees);
179        }
180    }
181
182    /// Pop singleton ref idents from the stack and rewrite the closure's token stream,
183    /// replacing local singleton ref idents with `#{N} dfir_ident` or `#{N} mut dfir_ident` references.
184    #[cfg(feature = "build")]
185    pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
186        if self.singleton_refs.is_empty() {
187            self.expr.0.to_token_stream()
188        } else {
189            assert!(
190                ident_stack.len() >= self.singleton_refs.len(),
191                "ident_stack has {} entries but expected at least {} for singleton_refs",
192                ident_stack.len(),
193                self.singleton_refs.len()
194            );
195            let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
196
197            let mut let_bindings = Vec::new();
198            for ((i, (ref_node, is_mut)), ref_ident) in
199                self.singleton_refs.iter().enumerate().zip(ref_idents)
200            {
201                let HydroNode::Reference { access_counter, .. } = ref_node else {
202                    panic!("ClosureExpression expected references to `HydroNode::Reference`");
203                };
204                let group = access_counter.frozen_group();
205                // TODO(mingwei): proper spanning?
206                let local_ident = handoff_ref_ident(i);
207                let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
208                let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
209                let mut_token = is_mut.then(|| quote!(mut));
210                let binding = quote! {
211                    let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
212                };
213                let_bindings.push(binding);
214            }
215
216            let expr = &self.expr.0;
217            quote! {
218                {
219                    #( #let_bindings )*
220                    #expr
221                }
222            }
223        }
224    }
225}
226
227/// Wrapper that displays only the tokens of a parsed expr.
228///
229/// Boxes `syn::Type` which is ~240 bytes.
230#[derive(Clone, Hash)]
231pub struct DebugExpr(pub Box<syn::Expr>);
232
233impl serde::Serialize for DebugExpr {
234    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
235        serializer.serialize_str(&self.to_string())
236    }
237}
238
239impl From<syn::Expr> for DebugExpr {
240    fn from(expr: syn::Expr) -> Self {
241        Self(Box::new(expr))
242    }
243}
244
245impl Deref for DebugExpr {
246    type Target = syn::Expr;
247
248    fn deref(&self) -> &Self::Target {
249        &self.0
250    }
251}
252
253impl ToTokens for DebugExpr {
254    fn to_tokens(&self, tokens: &mut TokenStream) {
255        self.0.to_tokens(tokens);
256    }
257}
258
259impl Debug for DebugExpr {
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        write!(f, "{}", self.0.to_token_stream())
262    }
263}
264
265impl Display for DebugExpr {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        let original = self.0.as_ref().clone();
268        let simplified = simplify_q_macro(original);
269
270        // For now, just use quote formatting without trying to parse as a statement
271        // This avoids the syn::parse_quote! issues entirely
272        write!(f, "q!({})", quote::quote!(#simplified))
273    }
274}
275
276/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
277fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
278    if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
279        // Look for calls to stageleft::runtime_support::fn*
280        && is_stageleft_runtime_support_call(&path_expr.path)
281        && let syn::Expr::Block(b) = &call.args[0]
282        && b.block.stmts.len() == 3
283        && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
284    // skip the first two, which are imports
285    {
286        let mut e = e.clone();
287        while let syn::Expr::Block(ref mut block) = e
288            && block.block.stmts.len() == 1
289            && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
290        {
291            e = inner_e;
292        }
293
294        e
295    } else {
296        expr
297    }
298}
299
300fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
301    // Check if this is a call to stageleft::runtime_support::fn*
302    if let Some(last_segment) = path.segments.last() {
303        let fn_name = last_segment.ident.to_string();
304        path.segments.len() > 2
305            && path.segments[0].ident == "stageleft"
306            && path.segments[1].ident == "runtime_support"
307            && fn_name.contains("_type_hint")
308    } else {
309        false
310    }
311}
312
313/// Debug displays the type's tokens.
314///
315/// Boxes `syn::Type` which is ~320 bytes.
316#[derive(Clone, PartialEq, Eq, Hash)]
317pub struct DebugType(pub Box<syn::Type>);
318
319impl From<syn::Type> for DebugType {
320    fn from(t: syn::Type) -> Self {
321        Self(Box::new(t))
322    }
323}
324
325impl Deref for DebugType {
326    type Target = syn::Type;
327
328    fn deref(&self) -> &Self::Target {
329        &self.0
330    }
331}
332
333impl ToTokens for DebugType {
334    fn to_tokens(&self, tokens: &mut TokenStream) {
335        self.0.to_tokens(tokens);
336    }
337}
338
339impl Debug for DebugType {
340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341        write!(f, "{}", self.0.to_token_stream())
342    }
343}
344
345impl serde::Serialize for DebugType {
346    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
347        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
348    }
349}
350
351fn serialize_backtrace_as_span<S: serde::Serializer>(
352    backtrace: &Backtrace,
353    serializer: S,
354) -> Result<S::Ok, S::Error> {
355    match backtrace.format_span() {
356        Some(span) => serializer.serialize_some(&span),
357        None => serializer.serialize_none(),
358    }
359}
360
361fn serialize_ident<S: serde::Serializer>(
362    ident: &syn::Ident,
363    serializer: S,
364) -> Result<S::Ok, S::Error> {
365    serializer.serialize_str(&ident.to_string())
366}
367
368pub enum DebugInstantiate {
369    Building,
370    Finalized(Box<DebugInstantiateFinalized>),
371}
372
373impl serde::Serialize for DebugInstantiate {
374    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
375        match self {
376            DebugInstantiate::Building => {
377                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
378            }
379            DebugInstantiate::Finalized(_) => {
380                panic!(
381                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
382                )
383            }
384        }
385    }
386}
387
388#[cfg_attr(
389    not(feature = "build"),
390    expect(
391        dead_code,
392        reason = "sink, source unused without `feature = \"build\"`."
393    )
394)]
395pub struct DebugInstantiateFinalized {
396    sink: syn::Expr,
397    source: syn::Expr,
398    connect_fn: Option<Box<dyn FnOnce()>>,
399}
400
401impl From<DebugInstantiateFinalized> for DebugInstantiate {
402    fn from(f: DebugInstantiateFinalized) -> Self {
403        Self::Finalized(Box::new(f))
404    }
405}
406
407impl Debug for DebugInstantiate {
408    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409        write!(f, "<network instantiate>")
410    }
411}
412
413impl Hash for DebugInstantiate {
414    fn hash<H: Hasher>(&self, _state: &mut H) {
415        // Do nothing
416    }
417}
418
419impl Clone for DebugInstantiate {
420    fn clone(&self) -> Self {
421        match self {
422            DebugInstantiate::Building => DebugInstantiate::Building,
423            DebugInstantiate::Finalized(_) => {
424                panic!("DebugInstantiate::Finalized should not be cloned")
425            }
426        }
427    }
428}
429
430/// Tracks the instantiation state of a `ClusterMembers` source.
431///
432/// During `compile_network`, the first `ClusterMembers` node for a given
433/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
434/// receives the expression returned by `Deploy::cluster_membership_stream`.
435/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
436/// during code-gen they simply reference the tee output of the first node
437/// instead of creating a redundant `source_stream`.
438#[derive(Debug, Hash, Clone, serde::Serialize)]
439pub enum ClusterMembersState {
440    /// Not yet instantiated.
441    Uninit,
442    /// The primary instance: holds the stream expression and will emit
443    /// `source_stream(expr) -> tee()` during code-gen.
444    Stream(DebugExpr),
445    /// A secondary instance that references the tee output of the primary.
446    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
447    /// can derive the deterministic tee ident without extra state.
448    Tee(LocationId, LocationId),
449}
450
451/// A source in a Hydro graph, where data enters the graph.
452#[derive(Debug, Hash, Clone, serde::Serialize)]
453pub enum HydroSource {
454    Stream(DebugExpr),
455    ExternalNetwork(),
456    Iter(DebugExpr),
457    Spin(),
458    ClusterMembers(LocationId, ClusterMembersState),
459    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
460    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
461}
462
463#[cfg(feature = "build")]
464/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
465/// and simulations.
466///
467/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
468/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
469pub trait DfirBuilder {
470    /// Whether the representation of singletons should include intermediate states.
471    fn singleton_intermediates(&self) -> bool;
472
473    /// Gets the DFIR builder for the given location, creating it if necessary.
474    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
475
476    #[expect(clippy::too_many_arguments, reason = "TODO")]
477    fn batch(
478        &mut self,
479        in_ident: syn::Ident,
480        in_location: &LocationId,
481        in_kind: &CollectionKind,
482        out_ident: &syn::Ident,
483        out_location: &LocationId,
484        op_meta: &HydroIrOpMetadata,
485        fold_hooked_idents: &HashSet<String>,
486    );
487    fn yield_from_tick(
488        &mut self,
489        in_ident: syn::Ident,
490        in_location: &LocationId,
491        in_kind: &CollectionKind,
492        out_ident: &syn::Ident,
493        out_location: &LocationId,
494    );
495
496    fn begin_atomic(
497        &mut self,
498        in_ident: syn::Ident,
499        in_location: &LocationId,
500        in_kind: &CollectionKind,
501        out_ident: &syn::Ident,
502        out_location: &LocationId,
503        op_meta: &HydroIrOpMetadata,
504    );
505    fn end_atomic(
506        &mut self,
507        in_ident: syn::Ident,
508        in_location: &LocationId,
509        in_kind: &CollectionKind,
510        out_ident: &syn::Ident,
511    );
512
513    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
514    fn observe_nondet(
515        &mut self,
516        trusted: bool,
517        location: &LocationId,
518        in_ident: syn::Ident,
519        in_kind: &CollectionKind,
520        out_ident: &syn::Ident,
521        out_kind: &CollectionKind,
522        op_meta: &HydroIrOpMetadata,
523    );
524
525    #[expect(clippy::too_many_arguments, reason = "TODO")]
526    fn merge_ordered(
527        &mut self,
528        location: &LocationId,
529        first_ident: syn::Ident,
530        second_ident: syn::Ident,
531        out_ident: &syn::Ident,
532        in_kind: &CollectionKind,
533        op_meta: &HydroIrOpMetadata,
534        operator_tag: Option<&str>,
535    );
536
537    #[expect(clippy::too_many_arguments, reason = "TODO")]
538    fn create_network(
539        &mut self,
540        from: &LocationId,
541        to: &LocationId,
542        input_ident: syn::Ident,
543        out_ident: &syn::Ident,
544        serialize: Option<&DebugExpr>,
545        sink: syn::Expr,
546        source: syn::Expr,
547        deserialize: Option<&DebugExpr>,
548        tag_id: StmtId,
549        networking_info: &crate::networking::NetworkingInfo,
550    );
551
552    fn create_external_source(
553        &mut self,
554        on: &LocationId,
555        source_expr: syn::Expr,
556        out_ident: &syn::Ident,
557        deserialize: Option<&DebugExpr>,
558        tag_id: StmtId,
559    );
560
561    fn create_external_output(
562        &mut self,
563        on: &LocationId,
564        sink_expr: syn::Expr,
565        input_ident: &syn::Ident,
566        serialize: Option<&DebugExpr>,
567        tag_id: StmtId,
568    );
569
570    /// Optionally emit a fold hook that buffers and permutes inputs before the fold.
571    /// Returns the new input ident to use for the fold if a hook was emitted.
572    fn emit_fold_hook(
573        &mut self,
574        location: &LocationId,
575        in_ident: &syn::Ident,
576        in_kind: &CollectionKind,
577        op_meta: &HydroIrOpMetadata,
578    ) -> Option<syn::Ident>;
579
580    /// Inserts necessary code to validate a manual assertion that at this point the
581    /// input live collection is consistent. In production, this is a no-op, but in simulation
582    /// this will (not yet implemented) inject assertions that validate consistency.
583    fn assert_is_consistent(
584        &mut self,
585        trusted: bool,
586        location: &LocationId,
587        in_ident: syn::Ident,
588        out_ident: &syn::Ident,
589    );
590
591    /// Observes non-determinism introduced by a mut closure operating on a non-strict
592    /// (unordered / at-least-once) input. In production this is identity; in simulation
593    /// it delegates to `observe_nondet` with the strict output kind.
594    fn observe_for_mut(
595        &mut self,
596        location: &LocationId,
597        in_ident: syn::Ident,
598        in_kind: &CollectionKind,
599        out_ident: &syn::Ident,
600        op_meta: &HydroIrOpMetadata,
601    );
602
603    fn create_versioned_network_fork(
604        &mut self,
605        channel_id: u32,
606        dest: &LocationId,
607        senders: Vec<(LocationId, syn::Ident, Option<DebugExpr>)>,
608        tag_id: StmtId,
609    );
610
611    fn create_versioned_network(
612        &mut self,
613        channel_id: u32,
614        source: &LocationId,
615        dest: &LocationId,
616        out_ident: &syn::Ident,
617        deserialize: Option<&DebugExpr>,
618        tag_id: StmtId,
619    );
620}
621
622#[cfg(feature = "build")]
623impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
624    fn singleton_intermediates(&self) -> bool {
625        false
626    }
627
628    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
629        self.entry(location.root().key())
630            .expect("location was removed")
631            .or_default()
632    }
633
634    fn batch(
635        &mut self,
636        in_ident: syn::Ident,
637        in_location: &LocationId,
638        in_kind: &CollectionKind,
639        out_ident: &syn::Ident,
640        _out_location: &LocationId,
641        _op_meta: &HydroIrOpMetadata,
642        _fold_hooked_idents: &HashSet<String>,
643    ) {
644        let builder = self.get_dfir_mut(in_location.root());
645        if in_kind.is_bounded()
646            && matches!(
647                in_kind,
648                CollectionKind::Singleton { .. }
649                    | CollectionKind::Optional { .. }
650                    | CollectionKind::KeyedSingleton { .. }
651            )
652        {
653            assert!(in_location.is_top_level());
654            builder.add_dfir(
655                parse_quote! {
656                    #out_ident = #in_ident -> persist::<'static>();
657                },
658                None,
659                None,
660            );
661        } else {
662            builder.add_dfir(
663                parse_quote! {
664                    #out_ident = #in_ident;
665                },
666                None,
667                None,
668            );
669        }
670    }
671
672    fn yield_from_tick(
673        &mut self,
674        in_ident: syn::Ident,
675        in_location: &LocationId,
676        _in_kind: &CollectionKind,
677        out_ident: &syn::Ident,
678        _out_location: &LocationId,
679    ) {
680        let builder = self.get_dfir_mut(in_location.root());
681        builder.add_dfir(
682            parse_quote! {
683                #out_ident = #in_ident;
684            },
685            None,
686            None,
687        );
688    }
689
690    fn begin_atomic(
691        &mut self,
692        in_ident: syn::Ident,
693        in_location: &LocationId,
694        _in_kind: &CollectionKind,
695        out_ident: &syn::Ident,
696        _out_location: &LocationId,
697        _op_meta: &HydroIrOpMetadata,
698    ) {
699        let builder = self.get_dfir_mut(in_location.root());
700        builder.add_dfir(
701            parse_quote! {
702                #out_ident = #in_ident;
703            },
704            None,
705            None,
706        );
707    }
708
709    fn end_atomic(
710        &mut self,
711        in_ident: syn::Ident,
712        in_location: &LocationId,
713        _in_kind: &CollectionKind,
714        out_ident: &syn::Ident,
715    ) {
716        let builder = self.get_dfir_mut(in_location.root());
717        builder.add_dfir(
718            parse_quote! {
719                #out_ident = #in_ident;
720            },
721            None,
722            None,
723        );
724    }
725
726    fn observe_nondet(
727        &mut self,
728        _trusted: bool,
729        location: &LocationId,
730        in_ident: syn::Ident,
731        _in_kind: &CollectionKind,
732        out_ident: &syn::Ident,
733        _out_kind: &CollectionKind,
734        _op_meta: &HydroIrOpMetadata,
735    ) {
736        let builder = self.get_dfir_mut(location);
737        builder.add_dfir(
738            parse_quote! {
739                #out_ident = #in_ident;
740            },
741            None,
742            None,
743        );
744    }
745
746    fn merge_ordered(
747        &mut self,
748        location: &LocationId,
749        first_ident: syn::Ident,
750        second_ident: syn::Ident,
751        out_ident: &syn::Ident,
752        _in_kind: &CollectionKind,
753        _op_meta: &HydroIrOpMetadata,
754        operator_tag: Option<&str>,
755    ) {
756        let builder = self.get_dfir_mut(location);
757        builder.add_dfir(
758            parse_quote! {
759                #out_ident = union();
760                #first_ident -> [0]#out_ident;
761                #second_ident -> [1]#out_ident;
762            },
763            None,
764            operator_tag,
765        );
766    }
767
768    fn create_network(
769        &mut self,
770        from: &LocationId,
771        to: &LocationId,
772        input_ident: syn::Ident,
773        out_ident: &syn::Ident,
774        serialize: Option<&DebugExpr>,
775        sink: syn::Expr,
776        source: syn::Expr,
777        deserialize: Option<&DebugExpr>,
778        tag_id: StmtId,
779        _networking_info: &crate::networking::NetworkingInfo,
780    ) {
781        let sender_builder = self.get_dfir_mut(from);
782        if let Some(serialize_pipeline) = serialize {
783            sender_builder.add_dfir(
784                parse_quote! {
785                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
786                },
787                None,
788                // operator tag separates send and receive, which otherwise have the same next_stmt_id
789                Some(&format!("send{}", tag_id)),
790            );
791        } else {
792            sender_builder.add_dfir(
793                parse_quote! {
794                    #input_ident -> dest_sink(#sink);
795                },
796                None,
797                Some(&format!("send{}", tag_id)),
798            );
799        }
800
801        let receiver_builder = self.get_dfir_mut(to);
802        if let Some(deserialize_pipeline) = deserialize {
803            receiver_builder.add_dfir(
804                parse_quote! {
805                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
806                },
807                None,
808                Some(&format!("recv{}", tag_id)),
809            );
810        } else {
811            receiver_builder.add_dfir(
812                parse_quote! {
813                    #out_ident = source_stream(#source);
814                },
815                None,
816                Some(&format!("recv{}", tag_id)),
817            );
818        }
819    }
820
821    fn create_external_source(
822        &mut self,
823        on: &LocationId,
824        source_expr: syn::Expr,
825        out_ident: &syn::Ident,
826        deserialize: Option<&DebugExpr>,
827        tag_id: StmtId,
828    ) {
829        let receiver_builder = self.get_dfir_mut(on);
830        if let Some(deserialize_pipeline) = deserialize {
831            receiver_builder.add_dfir(
832                parse_quote! {
833                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
834                },
835                None,
836                Some(&format!("recv{}", tag_id)),
837            );
838        } else {
839            receiver_builder.add_dfir(
840                parse_quote! {
841                    #out_ident = source_stream(#source_expr);
842                },
843                None,
844                Some(&format!("recv{}", tag_id)),
845            );
846        }
847    }
848
849    fn create_external_output(
850        &mut self,
851        on: &LocationId,
852        sink_expr: syn::Expr,
853        input_ident: &syn::Ident,
854        serialize: Option<&DebugExpr>,
855        tag_id: StmtId,
856    ) {
857        let sender_builder = self.get_dfir_mut(on);
858        if let Some(serialize_fn) = serialize {
859            sender_builder.add_dfir(
860                parse_quote! {
861                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
862                },
863                None,
864                // operator tag separates send and receive, which otherwise have the same next_stmt_id
865                Some(&format!("send{}", tag_id)),
866            );
867        } else {
868            sender_builder.add_dfir(
869                parse_quote! {
870                    #input_ident -> dest_sink(#sink_expr);
871                },
872                None,
873                Some(&format!("send{}", tag_id)),
874            );
875        }
876    }
877
878    fn emit_fold_hook(
879        &mut self,
880        _location: &LocationId,
881        _in_ident: &syn::Ident,
882        _in_kind: &CollectionKind,
883        _op_meta: &HydroIrOpMetadata,
884    ) -> Option<syn::Ident> {
885        None
886    }
887
888    fn assert_is_consistent(
889        &mut self,
890        _trusted: bool,
891        location: &LocationId,
892        in_ident: syn::Ident,
893        out_ident: &syn::Ident,
894    ) {
895        let builder = self.get_dfir_mut(location);
896        builder.add_dfir(
897            parse_quote! {
898                #out_ident = #in_ident;
899            },
900            None,
901            None,
902        );
903    }
904
905    fn observe_for_mut(
906        &mut self,
907        location: &LocationId,
908        in_ident: syn::Ident,
909        _in_kind: &CollectionKind,
910        out_ident: &syn::Ident,
911        _op_meta: &HydroIrOpMetadata,
912    ) {
913        let builder = self.get_dfir_mut(location);
914        builder.add_dfir(
915            parse_quote! {
916                #out_ident = #in_ident;
917            },
918            None,
919            None,
920        );
921    }
922
923    fn create_versioned_network_fork(
924        &mut self,
925        _channel_id: u32,
926        _dest: &LocationId,
927        _senders: Vec<(LocationId, syn::Ident, Option<DebugExpr>)>,
928        _tag_id: StmtId,
929    ) {
930        unreachable!(
931            "HydroNode::VersionedNetworkFork is only produced by the multi-version simulator merge \
932             pass and cannot be emitted by the non-simulation builder"
933        );
934    }
935
936    fn create_versioned_network(
937        &mut self,
938        _channel_id: u32,
939        _source: &LocationId,
940        _dest: &LocationId,
941        _out_ident: &syn::Ident,
942        _deserialize: Option<&DebugExpr>,
943        _tag_id: StmtId,
944    ) {
945        unreachable!(
946            "HydroNode::VersionedNetwork is only produced by the multi-version simulator merge \
947             pass and cannot be emitted by the non-simulation builder"
948        );
949    }
950}
951
952#[cfg(feature = "build")]
953pub enum BuildersOrCallback<'a, L, N>
954where
955    L: FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
956    N: FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
957{
958    Builders(&'a mut dyn DfirBuilder),
959    Callback(L, N),
960}
961
962/// An root in a Hydro graph, which is an pipeline that doesn't emit
963/// any downstream values. Traversals over the dataflow graph and
964/// generating DFIR IR start from roots.
965#[derive(Debug, Hash, serde::Serialize)]
966pub enum HydroRoot {
967    ForEach {
968        f: ClosureExpr,
969        input: Box<HydroNode>,
970        op_metadata: HydroIrOpMetadata,
971    },
972    SendExternal {
973        to_external_key: LocationKey,
974        to_port_id: ExternalPortId,
975        to_many: bool,
976        unpaired: bool,
977        serialize_fn: Option<DebugExpr>,
978        instantiate_fn: DebugInstantiate,
979        input: Box<HydroNode>,
980        op_metadata: HydroIrOpMetadata,
981    },
982    DestSink {
983        sink: DebugExpr,
984        input: Box<HydroNode>,
985        op_metadata: HydroIrOpMetadata,
986    },
987    CycleSink {
988        cycle_id: CycleId,
989        input: Box<HydroNode>,
990        op_metadata: HydroIrOpMetadata,
991    },
992    EmbeddedOutput {
993        #[serde(serialize_with = "serialize_ident")]
994        ident: syn::Ident,
995        input: Box<HydroNode>,
996        op_metadata: HydroIrOpMetadata,
997    },
998    Null {
999        input: Box<HydroNode>,
1000        op_metadata: HydroIrOpMetadata,
1001    },
1002}
1003
1004impl HydroRoot {
1005    #[cfg(feature = "build")]
1006    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
1007    pub fn compile_network<'a, D>(
1008        &mut self,
1009        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
1010        seen_tees: &mut SeenSharedNodes,
1011        seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
1012        processes: &SparseSecondaryMap<LocationKey, D::Process>,
1013        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
1014        externals: &SparseSecondaryMap<LocationKey, D::External>,
1015        env: &mut D::InstantiateEnv,
1016    ) where
1017        D: Deploy<'a>,
1018    {
1019        let refcell_extra_stmts = RefCell::new(extra_stmts);
1020        let refcell_env = RefCell::new(env);
1021        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
1022        self.transform_bottom_up(
1023            &mut |l| {
1024                if let HydroRoot::SendExternal {
1025                    input,
1026                    to_external_key,
1027                    to_port_id,
1028                    to_many,
1029                    unpaired,
1030                    instantiate_fn,
1031                    ..
1032                } = l
1033                {
1034                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1035                        DebugInstantiate::Building => {
1036                            let to_node = externals
1037                                .get(*to_external_key)
1038                                .unwrap_or_else(|| {
1039                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
1040                                })
1041                                .clone();
1042
1043                            match input.metadata().location_id.root() {
1044                                &LocationId::Process(process_key) => {
1045                                    if *to_many {
1046                                        (
1047                                            (
1048                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1049                                                parse_quote!(DUMMY),
1050                                            ),
1051                                            Box::new(|| {}) as Box<dyn FnOnce()>,
1052                                        )
1053                                    } else {
1054                                        let from_node = processes
1055                                            .get(process_key)
1056                                            .unwrap_or_else(|| {
1057                                                panic!("A process used in the graph was not instantiated: {}", process_key)
1058                                            })
1059                                            .clone();
1060
1061                                        let sink_port = from_node.next_port();
1062                                        let source_port = to_node.next_port();
1063
1064                                        if *unpaired {
1065                                            use stageleft::quote_type;
1066                                            use tokio_util::codec::LengthDelimitedCodec;
1067
1068                                            to_node.register(*to_port_id, source_port.clone());
1069
1070                                            let _ = D::e2o_source(
1071                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1072                                                &to_node, &source_port,
1073                                                &from_node, &sink_port,
1074                                                &quote_type::<LengthDelimitedCodec>(),
1075                                                format!("{}_{}", *to_external_key, *to_port_id)
1076                                            );
1077                                        }
1078
1079                                        (
1080                                            (
1081                                                D::o2e_sink(
1082                                                    &from_node,
1083                                                    &sink_port,
1084                                                    &to_node,
1085                                                    &source_port,
1086                                                    format!("{}_{}", *to_external_key, *to_port_id)
1087                                                ),
1088                                                parse_quote!(DUMMY),
1089                                            ),
1090                                            if *unpaired {
1091                                                D::e2o_connect(
1092                                                    &to_node,
1093                                                    &source_port,
1094                                                    &from_node,
1095                                                    &sink_port,
1096                                                    *to_many,
1097                                                    NetworkHint::Auto,
1098                                                )
1099                                            } else {
1100                                                Box::new(|| {}) as Box<dyn FnOnce()>
1101                                            },
1102                                        )
1103                                    }
1104                                }
1105                                LocationId::Cluster(cluster_key) => {
1106                                    let from_node = clusters
1107                                        .get(*cluster_key)
1108                                        .unwrap_or_else(|| {
1109                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1110                                        })
1111                                        .clone();
1112
1113                                    let sink_port = from_node.next_port();
1114                                    let source_port = to_node.next_port();
1115
1116                                    if *unpaired {
1117                                        to_node.register(*to_port_id, source_port.clone());
1118                                    }
1119
1120                                    (
1121                                        (
1122                                            D::m2e_sink(
1123                                                &from_node,
1124                                                &sink_port,
1125                                                &to_node,
1126                                                &source_port,
1127                                                format!("{}_{}", *to_external_key, *to_port_id)
1128                                            ),
1129                                            parse_quote!(DUMMY),
1130                                        ),
1131                                        Box::new(|| {}) as Box<dyn FnOnce()>,
1132                                    )
1133                                }
1134                                _ => panic!()
1135                            }
1136                        },
1137
1138                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1139                    };
1140
1141                    *instantiate_fn = DebugInstantiateFinalized {
1142                        sink: sink_expr,
1143                        source: source_expr,
1144                        connect_fn: Some(connect_fn),
1145                    }
1146                    .into();
1147                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1148                    let element_type = match &input.metadata().collection_kind {
1149                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1150                        _ => panic!("Embedded output must have Stream collection kind"),
1151                    };
1152                    let location_key = match input.metadata().location_id.root() {
1153                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1154                        _ => panic!("Embedded output must be on a process or cluster"),
1155                    };
1156                    D::register_embedded_output(
1157                        &mut refcell_env.borrow_mut(),
1158                        location_key,
1159                        ident,
1160                        &element_type,
1161                    );
1162                }
1163            },
1164            &mut |n| {
1165                if let HydroNode::Network {
1166                    name,
1167                    networking_info,
1168                    input,
1169                    instantiate_fn,
1170                    metadata,
1171                    ..
1172                } = n
1173                {
1174                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1175                        DebugInstantiate::Building => instantiate_network::<D>(
1176                            &mut refcell_env.borrow_mut(),
1177                            input.metadata().location_id.root(),
1178                            metadata.location_id.root(),
1179                            processes,
1180                            clusters,
1181                            name.as_deref(),
1182                            networking_info,
1183                        ),
1184
1185                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1186                    };
1187
1188                    *instantiate_fn = DebugInstantiateFinalized {
1189                        sink: sink_expr,
1190                        source: source_expr,
1191                        connect_fn: Some(connect_fn),
1192                    }
1193                    .into();
1194                } else if let HydroNode::ExternalInput {
1195                    from_external_key,
1196                    from_port_id,
1197                    from_many,
1198                    codec_type,
1199                    port_hint,
1200                    instantiate_fn,
1201                    metadata,
1202                    ..
1203                } = n
1204                {
1205                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1206                        DebugInstantiate::Building => {
1207                            let from_node = externals
1208                                .get(*from_external_key)
1209                                .unwrap_or_else(|| {
1210                                    panic!(
1211                                        "A external used in the graph was not instantiated: {}",
1212                                        from_external_key,
1213                                    )
1214                                })
1215                                .clone();
1216
1217                            match metadata.location_id.root() {
1218                                &LocationId::Process(process_key) => {
1219                                    let to_node = processes
1220                                        .get(process_key)
1221                                        .unwrap_or_else(|| {
1222                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1223                                        })
1224                                        .clone();
1225
1226                                    let sink_port = from_node.next_port();
1227                                    let source_port = to_node.next_port();
1228
1229                                    from_node.register(*from_port_id, sink_port.clone());
1230
1231                                    (
1232                                        (
1233                                            parse_quote!(DUMMY),
1234                                            if *from_many {
1235                                                D::e2o_many_source(
1236                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1237                                                    &to_node, &source_port,
1238                                                    codec_type.0.as_ref(),
1239                                                    format!("{}_{}", *from_external_key, *from_port_id)
1240                                                )
1241                                            } else {
1242                                                D::e2o_source(
1243                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1244                                                    &from_node, &sink_port,
1245                                                    &to_node, &source_port,
1246                                                    codec_type.0.as_ref(),
1247                                                    format!("{}_{}", *from_external_key, *from_port_id)
1248                                                )
1249                                            },
1250                                        ),
1251                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1252                                    )
1253                                }
1254                                LocationId::Cluster(cluster_key) => {
1255                                    let to_node = clusters
1256                                        .get(*cluster_key)
1257                                        .unwrap_or_else(|| {
1258                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1259                                        })
1260                                        .clone();
1261
1262                                    let sink_port = from_node.next_port();
1263                                    let source_port = to_node.next_port();
1264
1265                                    from_node.register(*from_port_id, sink_port.clone());
1266
1267                                    (
1268                                        (
1269                                            parse_quote!(DUMMY),
1270                                            D::e2m_source(
1271                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1272                                                &from_node, &sink_port,
1273                                                &to_node, &source_port,
1274                                                codec_type.0.as_ref(),
1275                                                format!("{}_{}", *from_external_key, *from_port_id)
1276                                            ),
1277                                        ),
1278                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1279                                    )
1280                                }
1281                                _ => panic!()
1282                            }
1283                        },
1284
1285                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1286                    };
1287
1288                    *instantiate_fn = DebugInstantiateFinalized {
1289                        sink: sink_expr,
1290                        source: source_expr,
1291                        connect_fn: Some(connect_fn),
1292                    }
1293                    .into();
1294                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1295                    let element_type = match &metadata.collection_kind {
1296                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1297                        _ => panic!("Embedded source must have Stream collection kind"),
1298                    };
1299                    let location_key = match metadata.location_id.root() {
1300                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1301                        _ => panic!("Embedded source must be on a process or cluster"),
1302                    };
1303                    D::register_embedded_stream_input(
1304                        &mut refcell_env.borrow_mut(),
1305                        location_key,
1306                        ident,
1307                        &element_type,
1308                    );
1309                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1310                    let element_type = match &metadata.collection_kind {
1311                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1312                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1313                    };
1314                    let location_key = match metadata.location_id.root() {
1315                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1316                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1317                    };
1318                    D::register_embedded_singleton_input(
1319                        &mut refcell_env.borrow_mut(),
1320                        location_key,
1321                        ident,
1322                        &element_type,
1323                    );
1324                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1325                    match state {
1326                        ClusterMembersState::Uninit => {
1327                            let at_location = metadata.location_id.root().clone();
1328                            let key = (at_location.clone(), location_id.key());
1329                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1330                                // First occurrence: call cluster_membership_stream and mark as Stream.
1331                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1332                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1333                                    &(),
1334                                );
1335                                *state = ClusterMembersState::Stream(expr.into());
1336                            } else {
1337                                // Already instantiated for this (at, target) pair: just tee.
1338                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1339                            }
1340                        }
1341                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1342                            panic!("cluster members already finalized");
1343                        }
1344                    }
1345                }
1346            },
1347            seen_tees,
1348            false,
1349        );
1350    }
1351
1352    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1353        self.transform_bottom_up(
1354            &mut |l| {
1355                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1356                    match instantiate_fn {
1357                        DebugInstantiate::Building => panic!("network not built"),
1358
1359                        DebugInstantiate::Finalized(finalized) => {
1360                            (finalized.connect_fn.take().unwrap())();
1361                        }
1362                    }
1363                }
1364            },
1365            &mut |n| {
1366                if let HydroNode::Network { instantiate_fn, .. }
1367                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1368                {
1369                    match instantiate_fn {
1370                        DebugInstantiate::Building => panic!("network not built"),
1371
1372                        DebugInstantiate::Finalized(finalized) => {
1373                            (finalized.connect_fn.take().unwrap())();
1374                        }
1375                    }
1376                }
1377            },
1378            seen_tees,
1379            false,
1380        );
1381    }
1382
1383    pub fn transform_bottom_up(
1384        &mut self,
1385        transform_root: &mut impl FnMut(&mut HydroRoot),
1386        transform_node: &mut impl FnMut(&mut HydroNode),
1387        seen_tees: &mut SeenSharedNodes,
1388        check_well_formed: bool,
1389    ) {
1390        self.transform_children(
1391            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1392            seen_tees,
1393        );
1394
1395        transform_root(self);
1396    }
1397
1398    pub fn transform_children(
1399        &mut self,
1400        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1401        seen_tees: &mut SeenSharedNodes,
1402    ) {
1403        match self {
1404            HydroRoot::ForEach { f, input, .. } => {
1405                f.transform_children(&mut transform, seen_tees);
1406                transform(input, seen_tees);
1407            }
1408            HydroRoot::SendExternal { input, .. }
1409            | HydroRoot::DestSink { input, .. }
1410            | HydroRoot::CycleSink { input, .. }
1411            | HydroRoot::EmbeddedOutput { input, .. }
1412            | HydroRoot::Null { input, .. } => {
1413                transform(input, seen_tees);
1414            }
1415        }
1416    }
1417
1418    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1419        match self {
1420            HydroRoot::ForEach {
1421                f,
1422                input,
1423                op_metadata,
1424            } => HydroRoot::ForEach {
1425                f: f.deep_clone(seen_tees),
1426                input: Box::new(input.deep_clone(seen_tees)),
1427                op_metadata: op_metadata.clone(),
1428            },
1429            HydroRoot::SendExternal {
1430                to_external_key,
1431                to_port_id,
1432                to_many,
1433                unpaired,
1434                serialize_fn,
1435                instantiate_fn,
1436                input,
1437                op_metadata,
1438            } => HydroRoot::SendExternal {
1439                to_external_key: *to_external_key,
1440                to_port_id: *to_port_id,
1441                to_many: *to_many,
1442                unpaired: *unpaired,
1443                serialize_fn: serialize_fn.clone(),
1444                instantiate_fn: instantiate_fn.clone(),
1445                input: Box::new(input.deep_clone(seen_tees)),
1446                op_metadata: op_metadata.clone(),
1447            },
1448            HydroRoot::DestSink {
1449                sink,
1450                input,
1451                op_metadata,
1452            } => HydroRoot::DestSink {
1453                sink: sink.clone(),
1454                input: Box::new(input.deep_clone(seen_tees)),
1455                op_metadata: op_metadata.clone(),
1456            },
1457            HydroRoot::CycleSink {
1458                cycle_id,
1459                input,
1460                op_metadata,
1461            } => HydroRoot::CycleSink {
1462                cycle_id: *cycle_id,
1463                input: Box::new(input.deep_clone(seen_tees)),
1464                op_metadata: op_metadata.clone(),
1465            },
1466            HydroRoot::EmbeddedOutput {
1467                ident,
1468                input,
1469                op_metadata,
1470            } => HydroRoot::EmbeddedOutput {
1471                ident: ident.clone(),
1472                input: Box::new(input.deep_clone(seen_tees)),
1473                op_metadata: op_metadata.clone(),
1474            },
1475            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1476                input: Box::new(input.deep_clone(seen_tees)),
1477                op_metadata: op_metadata.clone(),
1478            },
1479        }
1480    }
1481
1482    #[cfg(feature = "build")]
1483    pub fn emit(
1484        &mut self,
1485        graph_builders: &mut dyn DfirBuilder,
1486        seen_tees: &mut SeenSharedNodes,
1487        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1488        next_stmt_id: &mut crate::Counter<StmtId>,
1489        fold_hooked_idents: &mut HashSet<String>,
1490    ) {
1491        self.emit_core(
1492            &mut BuildersOrCallback::<
1493                fn(&mut HydroRoot, &mut crate::Counter<StmtId>),
1494                fn(&mut HydroNode, &mut crate::Counter<StmtId>),
1495            >::Builders(graph_builders),
1496            seen_tees,
1497            built_tees,
1498            next_stmt_id,
1499            fold_hooked_idents,
1500        );
1501    }
1502
1503    #[cfg(feature = "build")]
1504    pub fn emit_core(
1505        &mut self,
1506        builders_or_callback: &mut BuildersOrCallback<
1507            impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1508            impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1509        >,
1510        seen_tees: &mut SeenSharedNodes,
1511        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1512        next_stmt_id: &mut crate::Counter<StmtId>,
1513        fold_hooked_idents: &mut HashSet<String>,
1514    ) {
1515        match self {
1516            HydroRoot::ForEach { f, input, .. } => {
1517                let input_ident = input.emit_core(
1518                    builders_or_callback,
1519                    seen_tees,
1520                    built_tees,
1521                    next_stmt_id,
1522                    fold_hooked_idents,
1523                );
1524
1525                let stmt_id = next_stmt_id.get_and_increment();
1526
1527                match builders_or_callback {
1528                    BuildersOrCallback::Builders(graph_builders) => {
1529                        let mut ident_stack: Vec<syn::Ident> = Vec::new();
1530
1531                        // Look up each captured ref's ident from built_tees
1532                        for (ref_node, _is_mut) in f.singleton_refs.iter() {
1533                            let HydroNode::Reference { inner, .. } = ref_node else {
1534                                panic!("singleton_refs should only contain HydroNode::Reference");
1535                            };
1536                            let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1537                            let idents = built_tees.get(&ptr).expect(
1538                                "ForEach singleton ref not found in built_tees — ref node was not emitted",
1539                            );
1540                            ident_stack.push(idents[0].clone());
1541                        }
1542
1543                        let f_tokens = f.emit_tokens(&mut ident_stack);
1544
1545                        graph_builders
1546                            .get_dfir_mut(&input.metadata().location_id)
1547                            .add_dfir(
1548                                parse_quote! {
1549                                    #input_ident -> for_each(#f_tokens);
1550                                },
1551                                None,
1552                                Some(&stmt_id.to_string()),
1553                            );
1554                    }
1555                    BuildersOrCallback::Callback(leaf_callback, _) => {
1556                        leaf_callback(self, next_stmt_id);
1557                    }
1558                }
1559            }
1560
1561            HydroRoot::SendExternal {
1562                serialize_fn,
1563                instantiate_fn,
1564                input,
1565                ..
1566            } => {
1567                let input_ident = input.emit_core(
1568                    builders_or_callback,
1569                    seen_tees,
1570                    built_tees,
1571                    next_stmt_id,
1572                    fold_hooked_idents,
1573                );
1574
1575                let stmt_id = next_stmt_id.get_and_increment();
1576
1577                match builders_or_callback {
1578                    BuildersOrCallback::Builders(graph_builders) => {
1579                        let (sink_expr, _) = match instantiate_fn {
1580                            DebugInstantiate::Building => (
1581                                syn::parse_quote!(DUMMY_SINK),
1582                                syn::parse_quote!(DUMMY_SOURCE),
1583                            ),
1584
1585                            DebugInstantiate::Finalized(finalized) => {
1586                                (finalized.sink.clone(), finalized.source.clone())
1587                            }
1588                        };
1589
1590                        graph_builders.create_external_output(
1591                            &input.metadata().location_id,
1592                            sink_expr,
1593                            &input_ident,
1594                            serialize_fn.as_ref(),
1595                            stmt_id,
1596                        );
1597                    }
1598                    BuildersOrCallback::Callback(leaf_callback, _) => {
1599                        leaf_callback(self, next_stmt_id);
1600                    }
1601                }
1602            }
1603
1604            HydroRoot::DestSink { sink, input, .. } => {
1605                let input_ident = input.emit_core(
1606                    builders_or_callback,
1607                    seen_tees,
1608                    built_tees,
1609                    next_stmt_id,
1610                    fold_hooked_idents,
1611                );
1612
1613                let stmt_id = next_stmt_id.get_and_increment();
1614
1615                match builders_or_callback {
1616                    BuildersOrCallback::Builders(graph_builders) => {
1617                        graph_builders
1618                            .get_dfir_mut(&input.metadata().location_id)
1619                            .add_dfir(
1620                                parse_quote! {
1621                                    #input_ident -> dest_sink(#sink);
1622                                },
1623                                None,
1624                                Some(&stmt_id.to_string()),
1625                            );
1626                    }
1627                    BuildersOrCallback::Callback(leaf_callback, _) => {
1628                        leaf_callback(self, next_stmt_id);
1629                    }
1630                }
1631            }
1632
1633            HydroRoot::CycleSink {
1634                cycle_id, input, ..
1635            } => {
1636                let input_ident = input.emit_core(
1637                    builders_or_callback,
1638                    seen_tees,
1639                    built_tees,
1640                    next_stmt_id,
1641                    fold_hooked_idents,
1642                );
1643
1644                match builders_or_callback {
1645                    BuildersOrCallback::Builders(graph_builders) => {
1646                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1647                            CollectionKind::KeyedSingleton {
1648                                key_type,
1649                                value_type,
1650                                ..
1651                            }
1652                            | CollectionKind::KeyedStream {
1653                                key_type,
1654                                value_type,
1655                                ..
1656                            } => {
1657                                parse_quote!((#key_type, #value_type))
1658                            }
1659                            CollectionKind::Stream { element_type, .. }
1660                            | CollectionKind::Singleton { element_type, .. }
1661                            | CollectionKind::Optional { element_type, .. } => {
1662                                parse_quote!(#element_type)
1663                            }
1664                        };
1665
1666                        let cycle_id_ident = cycle_id.as_ident();
1667                        graph_builders
1668                            .get_dfir_mut(&input.metadata().location_id)
1669                            .add_dfir(
1670                                parse_quote! {
1671                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1672                                },
1673                                None,
1674                                None,
1675                            );
1676                    }
1677                    // No ID, no callback
1678                    BuildersOrCallback::Callback(_, _) => {}
1679                }
1680            }
1681
1682            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1683                let input_ident = input.emit_core(
1684                    builders_or_callback,
1685                    seen_tees,
1686                    built_tees,
1687                    next_stmt_id,
1688                    fold_hooked_idents,
1689                );
1690
1691                let stmt_id = next_stmt_id.get_and_increment();
1692
1693                match builders_or_callback {
1694                    BuildersOrCallback::Builders(graph_builders) => {
1695                        graph_builders
1696                            .get_dfir_mut(&input.metadata().location_id)
1697                            .add_dfir(
1698                                parse_quote! {
1699                                    #input_ident -> for_each(&mut #ident);
1700                                },
1701                                None,
1702                                Some(&stmt_id.to_string()),
1703                            );
1704                    }
1705                    BuildersOrCallback::Callback(leaf_callback, _) => {
1706                        leaf_callback(self, next_stmt_id);
1707                    }
1708                }
1709            }
1710
1711            HydroRoot::Null { input, .. } => {
1712                let input_ident = input.emit_core(
1713                    builders_or_callback,
1714                    seen_tees,
1715                    built_tees,
1716                    next_stmt_id,
1717                    fold_hooked_idents,
1718                );
1719
1720                let stmt_id = next_stmt_id.get_and_increment();
1721
1722                match builders_or_callback {
1723                    BuildersOrCallback::Builders(graph_builders) => {
1724                        graph_builders
1725                            .get_dfir_mut(&input.metadata().location_id)
1726                            .add_dfir(
1727                                parse_quote! {
1728                                    #input_ident -> for_each(|_| {});
1729                                },
1730                                None,
1731                                Some(&stmt_id.to_string()),
1732                            );
1733                    }
1734                    BuildersOrCallback::Callback(leaf_callback, _) => {
1735                        leaf_callback(self, next_stmt_id);
1736                    }
1737                }
1738            }
1739        }
1740    }
1741
1742    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1743        match self {
1744            HydroRoot::ForEach { op_metadata, .. }
1745            | HydroRoot::SendExternal { op_metadata, .. }
1746            | HydroRoot::DestSink { op_metadata, .. }
1747            | HydroRoot::CycleSink { op_metadata, .. }
1748            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1749            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1750        }
1751    }
1752
1753    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1754        match self {
1755            HydroRoot::ForEach { op_metadata, .. }
1756            | HydroRoot::SendExternal { op_metadata, .. }
1757            | HydroRoot::DestSink { op_metadata, .. }
1758            | HydroRoot::CycleSink { op_metadata, .. }
1759            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1760            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1761        }
1762    }
1763
1764    pub fn input(&self) -> &HydroNode {
1765        match self {
1766            HydroRoot::ForEach { input, .. }
1767            | HydroRoot::SendExternal { input, .. }
1768            | HydroRoot::DestSink { input, .. }
1769            | HydroRoot::CycleSink { input, .. }
1770            | HydroRoot::EmbeddedOutput { input, .. }
1771            | HydroRoot::Null { input, .. } => input,
1772        }
1773    }
1774
1775    pub fn input_metadata(&self) -> &HydroIrMetadata {
1776        self.input().metadata()
1777    }
1778
1779    pub fn print_root(&self) -> String {
1780        match self {
1781            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1782            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1783            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1784            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1785            HydroRoot::EmbeddedOutput { ident, .. } => {
1786                format!("EmbeddedOutput({})", ident)
1787            }
1788            HydroRoot::Null { .. } => "Null".to_owned(),
1789        }
1790    }
1791
1792    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1793        match self {
1794            HydroRoot::ForEach { f, .. } => {
1795                transform(&mut f.expr);
1796            }
1797            HydroRoot::DestSink { sink, .. } => {
1798                transform(sink);
1799            }
1800            HydroRoot::SendExternal { .. }
1801            | HydroRoot::CycleSink { .. }
1802            | HydroRoot::EmbeddedOutput { .. }
1803            | HydroRoot::Null { .. } => {}
1804        }
1805    }
1806}
1807
1808#[cfg(feature = "build")]
1809fn tick_of(loc: &LocationId) -> Option<ClockId> {
1810    match loc {
1811        LocationId::Tick(id, _) => Some(*id),
1812        LocationId::Atomic(inner) => tick_of(inner),
1813        _ => None,
1814    }
1815}
1816
1817#[cfg(feature = "build")]
1818fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1819    match loc {
1820        LocationId::Tick(id, inner) => {
1821            *id = uf_find(uf, *id);
1822            remap_location(inner, uf);
1823        }
1824        LocationId::Atomic(inner) => {
1825            remap_location(inner, uf);
1826        }
1827        LocationId::Process(_) | LocationId::Cluster(_) => {}
1828    }
1829}
1830
1831#[cfg(feature = "build")]
1832fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1833    let p = *parent.get(&x).unwrap_or(&x);
1834    if p == x {
1835        return x;
1836    }
1837    let root = uf_find(parent, p);
1838    parent.insert(x, root);
1839    root
1840}
1841
1842#[cfg(feature = "build")]
1843fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1844    let ra = uf_find(parent, a);
1845    let rb = uf_find(parent, b);
1846    if ra != rb {
1847        parent.insert(ra, rb);
1848    }
1849}
1850
1851/// Traverse the IR to build a union-find that unifies tick IDs connected
1852/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1853/// rewrite all `LocationId`s to use the representative tick ID.
1854#[cfg(feature = "build")]
1855pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1856    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1857
1858    // Pass 1: collect unifications.
1859    transform_bottom_up(
1860        ir,
1861        &mut |_| {},
1862        &mut |node: &mut HydroNode| match node {
1863            HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1864                if let (Some(a), Some(b)) = (
1865                    tick_of(&inner.metadata().location_id),
1866                    tick_of(&metadata.location_id),
1867                ) {
1868                    uf_union(&mut uf, a, b);
1869                }
1870            }
1871            HydroNode::Chain {
1872                first,
1873                second,
1874                metadata,
1875            }
1876            | HydroNode::ChainFirst {
1877                first,
1878                second,
1879                metadata,
1880            }
1881            | HydroNode::MergeOrdered {
1882                first,
1883                second,
1884                metadata,
1885            } => {
1886                if let (Some(a), Some(b)) = (
1887                    tick_of(&first.metadata().location_id),
1888                    tick_of(&metadata.location_id),
1889                ) {
1890                    uf_union(&mut uf, a, b);
1891                }
1892                if let (Some(a), Some(b)) = (
1893                    tick_of(&second.metadata().location_id),
1894                    tick_of(&metadata.location_id),
1895                ) {
1896                    uf_union(&mut uf, a, b);
1897                }
1898            }
1899            _ => {}
1900        },
1901        false,
1902    );
1903
1904    // Pass 2: rewrite all LocationIds.
1905    transform_bottom_up(
1906        ir,
1907        &mut |_| {},
1908        &mut |node: &mut HydroNode| {
1909            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1910        },
1911        false,
1912    );
1913}
1914
1915#[cfg(feature = "build")]
1916pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1917    let mut builders = SecondaryMap::new();
1918    let mut seen_tees = HashMap::new();
1919    let mut built_tees = HashMap::new();
1920    let mut next_stmt_id = crate::Counter::<StmtId>::default();
1921    let mut fold_hooked_idents = HashSet::new();
1922    for leaf in ir {
1923        leaf.emit(
1924            &mut builders,
1925            &mut seen_tees,
1926            &mut built_tees,
1927            &mut next_stmt_id,
1928            &mut fold_hooked_idents,
1929        );
1930    }
1931    builders
1932}
1933
1934#[cfg(feature = "build")]
1935pub fn traverse_dfir(
1936    ir: &mut [HydroRoot],
1937    transform_root: impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1938    transform_node: impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1939) {
1940    let mut seen_tees = HashMap::new();
1941    let mut built_tees = HashMap::new();
1942    let mut next_stmt_id = crate::Counter::<StmtId>::default();
1943    let mut fold_hooked_idents = HashSet::new();
1944    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1945    ir.iter_mut().for_each(|leaf| {
1946        leaf.emit_core(
1947            &mut callback,
1948            &mut seen_tees,
1949            &mut built_tees,
1950            &mut next_stmt_id,
1951            &mut fold_hooked_idents,
1952        );
1953    });
1954}
1955
1956pub fn transform_bottom_up(
1957    ir: &mut [HydroRoot],
1958    transform_root: &mut impl FnMut(&mut HydroRoot),
1959    transform_node: &mut impl FnMut(&mut HydroNode),
1960    check_well_formed: bool,
1961) {
1962    let mut seen_tees = HashMap::new();
1963    ir.iter_mut().for_each(|leaf| {
1964        leaf.transform_bottom_up(
1965            transform_root,
1966            transform_node,
1967            &mut seen_tees,
1968            check_well_formed,
1969        );
1970    });
1971}
1972
1973pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1974    let mut seen_tees = HashMap::new();
1975    ir.iter()
1976        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1977        .collect()
1978}
1979
1980type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1981thread_local! {
1982    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1983    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1984    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1985    /// on subsequent encounters, preventing infinite loops.
1986    static SERIALIZED_SHARED: PrintedTees
1987        = const { RefCell::new(None) };
1988}
1989
1990pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1991    PRINTED_TEES.with(|printed_tees| {
1992        let mut printed_tees_mut = printed_tees.borrow_mut();
1993        *printed_tees_mut = Some((0, HashMap::new()));
1994        drop(printed_tees_mut);
1995
1996        let ret = f();
1997
1998        let mut printed_tees_mut = printed_tees.borrow_mut();
1999        *printed_tees_mut = None;
2000
2001        ret
2002    })
2003}
2004
2005/// Runs `f` with a fresh shared-node deduplication scope for serialization.
2006/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
2007/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
2008/// back-reference.  The tracking state is restored when `f` returns or panics.
2009pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
2010    let _guard = SerializedSharedGuard::enter();
2011    f()
2012}
2013
2014/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
2015/// making `serialize_dedup_shared` re-entrant and panic-safe.
2016struct SerializedSharedGuard {
2017    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
2018}
2019
2020impl SerializedSharedGuard {
2021    fn enter() -> Self {
2022        let previous = SERIALIZED_SHARED.with(|cell| {
2023            let mut guard = cell.borrow_mut();
2024            guard.replace((0, HashMap::new()))
2025        });
2026        Self { previous }
2027    }
2028}
2029
2030impl Drop for SerializedSharedGuard {
2031    fn drop(&mut self) {
2032        SERIALIZED_SHARED.with(|cell| {
2033            *cell.borrow_mut() = self.previous.take();
2034        });
2035    }
2036}
2037
2038pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2039
2040impl serde::Serialize for SharedNode {
2041    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
2042    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
2043    /// same subtree every time and, if the graph ever contains a cycle, loop
2044    /// forever.
2045    ///
2046    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
2047    /// integer id.  The first time we see a pointer we assign it the next id and
2048    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
2049    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
2050    /// recursion.  Requires an active `serialize_dedup_shared` scope.
2051    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2052        SERIALIZED_SHARED.with(|cell| {
2053            let mut guard = cell.borrow_mut();
2054            // (next_id, pointer → assigned_id)
2055            let state = guard.as_mut().ok_or_else(|| {
2056                serde::ser::Error::custom(
2057                    "SharedNode serialization requires an active serialize_dedup_shared scope",
2058                )
2059            })?;
2060            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2061
2062            if let Some(&id) = state.1.get(&ptr) {
2063                drop(guard);
2064                use serde::ser::SerializeMap;
2065                let mut map = serializer.serialize_map(Some(1))?;
2066                map.serialize_entry("$shared_ref", &id)?;
2067                map.end()
2068            } else {
2069                let id = state.0;
2070                state.0 += 1;
2071                state.1.insert(ptr, id);
2072                drop(guard);
2073
2074                use serde::ser::SerializeMap;
2075                let mut map = serializer.serialize_map(Some(2))?;
2076                map.serialize_entry("$shared", &id)?;
2077                map.serialize_entry("node", &*self.0.borrow())?;
2078                map.end()
2079            }
2080        })
2081    }
2082}
2083
2084impl SharedNode {
2085    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2086        Rc::as_ptr(&self.0)
2087    }
2088}
2089
2090impl Debug for SharedNode {
2091    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2092        PRINTED_TEES.with(|printed_tees| {
2093            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2094            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2095
2096            if let Some(printed_tees_mut) = printed_tees_mut {
2097                if let Some(existing) = printed_tees_mut
2098                    .1
2099                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2100                {
2101                    write!(f, "<shared {}>", existing)
2102                } else {
2103                    let next_id = printed_tees_mut.0;
2104                    printed_tees_mut.0 += 1;
2105                    printed_tees_mut
2106                        .1
2107                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2108                    drop(printed_tees_mut_borrow);
2109                    write!(f, "<shared {}>: ", next_id)?;
2110                    Debug::fmt(&self.0.borrow(), f)
2111                }
2112            } else {
2113                drop(printed_tees_mut_borrow);
2114                write!(f, "<shared>: ")?;
2115                Debug::fmt(&self.0.borrow(), f)
2116            }
2117        })
2118    }
2119}
2120
2121impl Hash for SharedNode {
2122    fn hash<H: Hasher>(&self, state: &mut H) {
2123        self.0.borrow_mut().hash(state);
2124    }
2125}
2126
2127/// A counter for tracking singleton access groups on a `HydroNode::Reference`.
2128///
2129/// Each mutable access increments the counter (before and after) to isolate itself in its own group;
2130/// immutable accesses share the current group.
2131#[derive(Debug)]
2132pub enum AccessCounter {
2133    Counting(Cell<u32>),
2134    Frozen(u32),
2135}
2136
2137impl AccessCounter {
2138    pub fn new() -> Self {
2139        Self::Counting(Cell::new(0))
2140    }
2141
2142    /// Assign the next access group for this reference.
2143    /// Mutable accesses get an isolated group (counter increments before and after).
2144    /// Immutable accesses share the current group.
2145    pub fn next_group(&self, is_mut: bool) -> Self {
2146        let AccessCounter::Counting(count) = self else {
2147            panic!("Cannot count on `AccessCounter::Frozen`");
2148        };
2149        let c = if is_mut {
2150            let c = count.get() + 1;
2151            count.set(c + 1);
2152            c
2153        } else {
2154            count.get()
2155        };
2156        Self::Frozen(c)
2157    }
2158
2159    /// Creates a frozen counter to prevent further counting.
2160    pub fn freeze(&self) -> Self {
2161        Self::Frozen(match self {
2162            Self::Counting(count) => count.get(),
2163            Self::Frozen(count) => *count,
2164        })
2165    }
2166
2167    pub fn frozen_group(&self) -> u32 {
2168        let Self::Frozen(count) = self else {
2169            panic!("`AccessCounter` not frozen");
2170        };
2171        *count
2172    }
2173}
2174
2175impl Default for AccessCounter {
2176    fn default() -> Self {
2177        Self::new()
2178    }
2179}
2180
2181impl Hash for AccessCounter {
2182    fn hash<H: Hasher>(&self, _state: &mut H) {
2183        // Access counter does not participate in hashing — it is runtime bookkeeping.
2184    }
2185}
2186
2187impl serde::Serialize for AccessCounter {
2188    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2189        let count = match self {
2190            AccessCounter::Counting(count) => count.get(),
2191            AccessCounter::Frozen(count) => *count,
2192        };
2193        count.serialize(serializer)
2194    }
2195}
2196
2197#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2198pub enum BoundKind {
2199    Unbounded,
2200    Bounded,
2201}
2202
2203#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2204pub enum StreamOrder {
2205    NoOrder,
2206    TotalOrder,
2207}
2208
2209#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2210pub enum StreamRetry {
2211    AtLeastOnce,
2212    ExactlyOnce,
2213}
2214
2215#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2216pub enum KeyedSingletonBoundKind {
2217    Unbounded,
2218    MonotonicKeys,
2219    MonotonicValue,
2220    BoundedValue,
2221    Bounded,
2222}
2223
2224#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2225pub enum SingletonBoundKind {
2226    Unbounded,
2227    Monotonic,
2228    Bounded,
2229}
2230
2231#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2232pub enum CollectionKind {
2233    Stream {
2234        bound: BoundKind,
2235        order: StreamOrder,
2236        retry: StreamRetry,
2237        element_type: DebugType,
2238    },
2239    Singleton {
2240        bound: SingletonBoundKind,
2241        element_type: DebugType,
2242    },
2243    Optional {
2244        bound: BoundKind,
2245        element_type: DebugType,
2246    },
2247    KeyedStream {
2248        bound: BoundKind,
2249        value_order: StreamOrder,
2250        value_retry: StreamRetry,
2251        key_type: DebugType,
2252        value_type: DebugType,
2253    },
2254    KeyedSingleton {
2255        bound: KeyedSingletonBoundKind,
2256        key_type: DebugType,
2257        value_type: DebugType,
2258    },
2259}
2260
2261impl CollectionKind {
2262    pub fn is_bounded(&self) -> bool {
2263        matches!(
2264            self,
2265            CollectionKind::Stream {
2266                bound: BoundKind::Bounded,
2267                ..
2268            } | CollectionKind::Singleton {
2269                bound: SingletonBoundKind::Bounded,
2270                ..
2271            } | CollectionKind::Optional {
2272                bound: BoundKind::Bounded,
2273                ..
2274            } | CollectionKind::KeyedStream {
2275                bound: BoundKind::Bounded,
2276                ..
2277            } | CollectionKind::KeyedSingleton {
2278                bound: KeyedSingletonBoundKind::Bounded,
2279                ..
2280            }
2281        )
2282    }
2283
2284    /// Returns whether this collection kind is already "strict" (TotalOrder + ExactlyOnce),
2285    /// meaning no non-determinism needs to be observed for mut closures.
2286    pub fn is_strict(&self) -> bool {
2287        match self {
2288            CollectionKind::Stream { order, retry, .. } => {
2289                *order == StreamOrder::TotalOrder && *retry == StreamRetry::ExactlyOnce
2290            }
2291            CollectionKind::KeyedStream {
2292                value_order,
2293                value_retry,
2294                ..
2295            } => {
2296                *value_order == StreamOrder::TotalOrder && *value_retry == StreamRetry::ExactlyOnce
2297            }
2298            // Singletons/Optionals/KeyedSingletons do not have observable
2299            // non-determinism other than snapshots / batching
2300            CollectionKind::Singleton { .. }
2301            | CollectionKind::Optional { .. }
2302            | CollectionKind::KeyedSingleton { .. } => true,
2303        }
2304    }
2305
2306    /// Creates a "strict" version of this kind with TotalOrder and ExactlyOnce.
2307    pub fn strict_kind(&self) -> CollectionKind {
2308        match self {
2309            CollectionKind::Stream {
2310                bound,
2311                element_type,
2312                ..
2313            } => CollectionKind::Stream {
2314                bound: bound.clone(),
2315                order: StreamOrder::TotalOrder,
2316                retry: StreamRetry::ExactlyOnce,
2317                element_type: element_type.clone(),
2318            },
2319            CollectionKind::KeyedStream {
2320                bound,
2321                key_type,
2322                value_type,
2323                ..
2324            } => CollectionKind::KeyedStream {
2325                bound: bound.clone(),
2326                value_order: StreamOrder::TotalOrder,
2327                value_retry: StreamRetry::ExactlyOnce,
2328                key_type: key_type.clone(),
2329                value_type: value_type.clone(),
2330            },
2331            other => other.clone(),
2332        }
2333    }
2334}
2335
2336#[derive(Clone, serde::Serialize)]
2337pub struct HydroIrMetadata {
2338    pub location_id: LocationId,
2339    pub collection_kind: CollectionKind,
2340    pub consistency: Option<ClusterConsistency>,
2341    pub cardinality: Option<usize>,
2342    pub tag: Option<String>,
2343    pub op: HydroIrOpMetadata,
2344}
2345
2346// HydroIrMetadata shouldn't be used to hash or compare
2347impl Hash for HydroIrMetadata {
2348    fn hash<H: Hasher>(&self, _: &mut H) {}
2349}
2350
2351impl PartialEq for HydroIrMetadata {
2352    fn eq(&self, _: &Self) -> bool {
2353        true
2354    }
2355}
2356
2357impl Eq for HydroIrMetadata {}
2358
2359impl Debug for HydroIrMetadata {
2360    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2361        f.debug_struct("HydroIrMetadata")
2362            .field("location_id", &self.location_id)
2363            .field("collection_kind", &self.collection_kind)
2364            .finish()
2365    }
2366}
2367
2368/// Metadata that is specific to the operator itself, rather than its outputs.
2369/// This is available on _both_ inner nodes and roots.
2370#[derive(Clone, serde::Serialize)]
2371pub struct HydroIrOpMetadata {
2372    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2373    pub backtrace: Backtrace,
2374    pub cpu_usage: Option<f64>,
2375    pub network_recv_cpu_usage: Option<f64>,
2376    pub id: Option<usize>,
2377}
2378
2379impl HydroIrOpMetadata {
2380    #[expect(
2381        clippy::new_without_default,
2382        reason = "explicit calls to new ensure correct backtrace bounds"
2383    )]
2384    pub fn new() -> HydroIrOpMetadata {
2385        Self::new_with_skip(1)
2386    }
2387
2388    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2389        HydroIrOpMetadata {
2390            backtrace: Backtrace::get_backtrace(2 + skip_count),
2391            cpu_usage: None,
2392            network_recv_cpu_usage: None,
2393            id: None,
2394        }
2395    }
2396}
2397
2398impl Debug for HydroIrOpMetadata {
2399    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2400        f.debug_struct("HydroIrOpMetadata").finish()
2401    }
2402}
2403
2404impl Hash for HydroIrOpMetadata {
2405    fn hash<H: Hasher>(&self, _: &mut H) {}
2406}
2407
2408/// An intermediate node in a Hydro graph, which consumes data
2409/// from upstream nodes and emits data to downstream nodes.
2410#[derive(Debug, Hash, serde::Serialize)]
2411pub enum HydroNode {
2412    Placeholder,
2413
2414    /// Manually "casts" between two different collection kinds.
2415    ///
2416    /// Using this IR node requires special care, since it bypasses many of Hydro's core
2417    /// correctness checks. In particular, the user must ensure that every possible
2418    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
2419    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
2420    /// collection. This ensures that the simulator does not miss any possible outputs.
2421    Cast {
2422        inner: Box<HydroNode>,
2423        metadata: HydroIrMetadata,
2424    },
2425
2426    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
2427    /// interpretation of the input stream.
2428    ///
2429    /// In production, this simply passes through the input, but in simulation, this operator
2430    /// explicitly selects a randomized interpretation.
2431    ObserveNonDet {
2432        inner: Box<HydroNode>,
2433        trusted: bool, // if true, we do not need to simulate non-determinism
2434        metadata: HydroIrMetadata,
2435    },
2436
2437    Source {
2438        source: HydroSource,
2439        metadata: HydroIrMetadata,
2440    },
2441
2442    SingletonSource {
2443        value: DebugExpr,
2444        first_tick_only: bool,
2445        metadata: HydroIrMetadata,
2446    },
2447
2448    CycleSource {
2449        cycle_id: CycleId,
2450        metadata: HydroIrMetadata,
2451    },
2452
2453    Tee {
2454        inner: SharedNode,
2455        metadata: HydroIrMetadata,
2456    },
2457
2458    /// A reference materialization point. Wraps a SharedNode so that:
2459    /// - The pipe output delivers data to one consumer
2460    /// - `#var` references can borrow the value from the slot
2461    ///
2462    /// In DFIR codegen, emits `ident = inner_ident -> singleton()` or `-> optional()` or
2463    /// `-> handoff()` depending on `kind`.
2464    ///
2465    /// Uses the same `built_tees` dedup pattern as `Tee`.
2466    Reference {
2467        inner: SharedNode,
2468        kind: crate::handoff_ref::HandoffRefKind,
2469        access_counter: AccessCounter,
2470        metadata: HydroIrMetadata,
2471    },
2472
2473    Partition {
2474        inner: SharedNode,
2475        f: ClosureExpr,
2476        is_true: bool,
2477        metadata: HydroIrMetadata,
2478    },
2479
2480    BeginAtomic {
2481        inner: Box<HydroNode>,
2482        metadata: HydroIrMetadata,
2483    },
2484
2485    EndAtomic {
2486        inner: Box<HydroNode>,
2487        metadata: HydroIrMetadata,
2488    },
2489
2490    Batch {
2491        inner: Box<HydroNode>,
2492        metadata: HydroIrMetadata,
2493    },
2494
2495    YieldConcat {
2496        inner: Box<HydroNode>,
2497        metadata: HydroIrMetadata,
2498    },
2499
2500    Chain {
2501        first: Box<HydroNode>,
2502        second: Box<HydroNode>,
2503        metadata: HydroIrMetadata,
2504    },
2505
2506    MergeOrdered {
2507        first: Box<HydroNode>,
2508        second: Box<HydroNode>,
2509        metadata: HydroIrMetadata,
2510    },
2511
2512    ChainFirst {
2513        first: Box<HydroNode>,
2514        second: Box<HydroNode>,
2515        metadata: HydroIrMetadata,
2516    },
2517
2518    CrossProduct {
2519        left: Box<HydroNode>,
2520        right: Box<HydroNode>,
2521        metadata: HydroIrMetadata,
2522    },
2523
2524    CrossSingleton {
2525        left: Box<HydroNode>,
2526        right: Box<HydroNode>,
2527        metadata: HydroIrMetadata,
2528    },
2529
2530    Join {
2531        left: Box<HydroNode>,
2532        right: Box<HydroNode>,
2533        metadata: HydroIrMetadata,
2534    },
2535
2536    /// Asymmetric join where the right (build) side is bounded.
2537    /// The build side is accumulated (stratum-delayed) into a hash table,
2538    /// then the left (probe) side streams through preserving its ordering.
2539    JoinHalf {
2540        left: Box<HydroNode>,
2541        right: Box<HydroNode>,
2542        metadata: HydroIrMetadata,
2543    },
2544
2545    Difference {
2546        pos: Box<HydroNode>,
2547        neg: Box<HydroNode>,
2548        metadata: HydroIrMetadata,
2549    },
2550
2551    AntiJoin {
2552        pos: Box<HydroNode>,
2553        neg: Box<HydroNode>,
2554        metadata: HydroIrMetadata,
2555    },
2556
2557    ResolveFutures {
2558        input: Box<HydroNode>,
2559        metadata: HydroIrMetadata,
2560    },
2561    ResolveFuturesBlocking {
2562        input: Box<HydroNode>,
2563        metadata: HydroIrMetadata,
2564    },
2565    ResolveFuturesOrdered {
2566        input: Box<HydroNode>,
2567        metadata: HydroIrMetadata,
2568    },
2569
2570    Map {
2571        f: ClosureExpr,
2572        input: Box<HydroNode>,
2573        metadata: HydroIrMetadata,
2574    },
2575    FlatMap {
2576        f: ClosureExpr,
2577        input: Box<HydroNode>,
2578        metadata: HydroIrMetadata,
2579    },
2580    FlatMapStreamBlocking {
2581        f: ClosureExpr,
2582        input: Box<HydroNode>,
2583        metadata: HydroIrMetadata,
2584    },
2585    Filter {
2586        f: ClosureExpr,
2587        input: Box<HydroNode>,
2588        metadata: HydroIrMetadata,
2589    },
2590    FilterMap {
2591        f: ClosureExpr,
2592        input: Box<HydroNode>,
2593        metadata: HydroIrMetadata,
2594    },
2595
2596    DeferTick {
2597        input: Box<HydroNode>,
2598        metadata: HydroIrMetadata,
2599    },
2600    Enumerate {
2601        input: Box<HydroNode>,
2602        metadata: HydroIrMetadata,
2603    },
2604    Inspect {
2605        f: ClosureExpr,
2606        input: Box<HydroNode>,
2607        metadata: HydroIrMetadata,
2608    },
2609
2610    Unique {
2611        input: Box<HydroNode>,
2612        metadata: HydroIrMetadata,
2613    },
2614
2615    Sort {
2616        input: Box<HydroNode>,
2617        metadata: HydroIrMetadata,
2618    },
2619    Fold {
2620        init: ClosureExpr,
2621        acc: ClosureExpr,
2622        input: Box<HydroNode>,
2623        metadata: HydroIrMetadata,
2624    },
2625
2626    Scan {
2627        init: ClosureExpr,
2628        acc: ClosureExpr,
2629        input: Box<HydroNode>,
2630        metadata: HydroIrMetadata,
2631    },
2632    ScanAsyncBlocking {
2633        init: ClosureExpr,
2634        acc: ClosureExpr,
2635        input: Box<HydroNode>,
2636        metadata: HydroIrMetadata,
2637    },
2638    FoldKeyed {
2639        init: ClosureExpr,
2640        acc: ClosureExpr,
2641        input: Box<HydroNode>,
2642        metadata: HydroIrMetadata,
2643    },
2644
2645    Reduce {
2646        f: ClosureExpr,
2647        input: Box<HydroNode>,
2648        metadata: HydroIrMetadata,
2649    },
2650    ReduceKeyed {
2651        f: ClosureExpr,
2652        input: Box<HydroNode>,
2653        metadata: HydroIrMetadata,
2654    },
2655    ReduceKeyedWatermark {
2656        f: ClosureExpr,
2657        input: Box<HydroNode>,
2658        watermark: Box<HydroNode>,
2659        metadata: HydroIrMetadata,
2660    },
2661
2662    Network {
2663        name: Option<String>,
2664        networking_info: crate::networking::NetworkingInfo,
2665        serialize_fn: Option<DebugExpr>,
2666        instantiate_fn: DebugInstantiate,
2667        deserialize_fn: Option<DebugExpr>,
2668        input: Box<HydroNode>,
2669        metadata: HydroIrMetadata,
2670    },
2671
2672    VersionedNetworkFork {
2673        channel_id: u32,
2674        channel_name: String,
2675        senders: Vec<(u32, Box<HydroNode>, Option<DebugExpr>)>,
2676        metadata: HydroIrMetadata,
2677    },
2678
2679    VersionedNetwork {
2680        fork: SharedNode,
2681        version: u32,
2682        deserialize_fn: Option<DebugExpr>,
2683        metadata: HydroIrMetadata,
2684    },
2685
2686    ExternalInput {
2687        from_external_key: LocationKey,
2688        from_port_id: ExternalPortId,
2689        from_many: bool,
2690        codec_type: DebugType,
2691        #[serde(skip)]
2692        port_hint: NetworkHint,
2693        instantiate_fn: DebugInstantiate,
2694        deserialize_fn: Option<DebugExpr>,
2695        metadata: HydroIrMetadata,
2696    },
2697
2698    Counter {
2699        tag: String,
2700        duration: DebugExpr,
2701        prefix: String,
2702        input: Box<HydroNode>,
2703        metadata: HydroIrMetadata,
2704    },
2705
2706    AssertIsConsistent {
2707        inner: Box<HydroNode>,
2708        trusted: bool,
2709        metadata: HydroIrMetadata,
2710    },
2711
2712    UnboundSingleton {
2713        inner: Box<HydroNode>,
2714        metadata: HydroIrMetadata,
2715    },
2716}
2717
2718pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2719pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2720
2721/// If `f` has a mut singleton ref and `in_kind` is non-strict, emits an
2722/// `observe_for_mut` node and returns the new ident. Otherwise returns
2723/// `in_ident` unchanged. Always consumes a stmt_id when applicable.
2724#[cfg(feature = "build")]
2725fn maybe_observe_for_mut(
2726    f: &ClosureExpr,
2727    in_ident: syn::Ident,
2728    in_location: &LocationId,
2729    in_kind: &CollectionKind,
2730    op_meta: &HydroIrOpMetadata,
2731    builders_or_callback: &mut BuildersOrCallback<
2732        impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
2733        impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
2734    >,
2735    next_stmt_id: &mut crate::Counter<StmtId>,
2736) -> syn::Ident {
2737    if f.has_mut_ref() && !in_kind.is_strict() {
2738        let observe_stmt_id = next_stmt_id.get_and_increment();
2739        let observe_ident =
2740            syn::Ident::new(&format!("stream_{}", observe_stmt_id), Span::call_site());
2741        if let BuildersOrCallback::Builders(graph_builders) = builders_or_callback {
2742            graph_builders.observe_for_mut(in_location, in_ident, in_kind, &observe_ident, op_meta);
2743        }
2744        observe_ident
2745    } else {
2746        in_ident
2747    }
2748}
2749
2750impl HydroNode {
2751    pub fn transform_bottom_up(
2752        &mut self,
2753        transform: &mut impl FnMut(&mut HydroNode),
2754        seen_tees: &mut SeenSharedNodes,
2755        check_well_formed: bool,
2756    ) {
2757        self.transform_children(
2758            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2759            seen_tees,
2760        );
2761
2762        transform(self);
2763
2764        let self_location = self.metadata().location_id.root();
2765
2766        if check_well_formed {
2767            match &*self {
2768                HydroNode::Network { .. } => {}
2769                _ => {
2770                    self.input_metadata().iter().for_each(|i| {
2771                        if i.location_id.root() != self_location {
2772                            panic!(
2773                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2774                                i,
2775                                i.location_id.root(),
2776                                self,
2777                                self_location
2778                            )
2779                        }
2780                    });
2781                }
2782            }
2783        }
2784    }
2785
2786    #[inline(always)]
2787    pub fn transform_children(
2788        &mut self,
2789        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2790        seen_tees: &mut SeenSharedNodes,
2791    ) {
2792        match self {
2793            HydroNode::Placeholder => {
2794                panic!();
2795            }
2796
2797            HydroNode::Source { .. }
2798            | HydroNode::SingletonSource { .. }
2799            | HydroNode::CycleSource { .. }
2800            | HydroNode::ExternalInput { .. } => {}
2801
2802            HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2803                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2804                    *inner = SharedNode(transformed.clone());
2805                } else {
2806                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2807                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2808                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2809                    transform(&mut orig, seen_tees);
2810                    *transformed_cell.borrow_mut() = orig;
2811                    *inner = SharedNode(transformed_cell);
2812                }
2813            }
2814
2815            HydroNode::Partition { inner, f, .. } => {
2816                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2817                    *inner = SharedNode(transformed.clone());
2818                } else {
2819                    f.transform_children(&mut transform, seen_tees);
2820                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2821                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2822                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2823                    transform(&mut orig, seen_tees);
2824                    *transformed_cell.borrow_mut() = orig;
2825                    *inner = SharedNode(transformed_cell);
2826                }
2827            }
2828
2829            HydroNode::Cast { inner, .. }
2830            | HydroNode::ObserveNonDet { inner, .. }
2831            | HydroNode::BeginAtomic { inner, .. }
2832            | HydroNode::EndAtomic { inner, .. }
2833            | HydroNode::Batch { inner, .. }
2834            | HydroNode::YieldConcat { inner, .. }
2835            | HydroNode::UnboundSingleton { inner, .. }
2836            | HydroNode::AssertIsConsistent { inner, .. } => {
2837                transform(inner.as_mut(), seen_tees);
2838            }
2839
2840            HydroNode::Chain { first, second, .. } => {
2841                transform(first.as_mut(), seen_tees);
2842                transform(second.as_mut(), seen_tees);
2843            }
2844
2845            HydroNode::MergeOrdered { first, second, .. } => {
2846                transform(first.as_mut(), seen_tees);
2847                transform(second.as_mut(), seen_tees);
2848            }
2849
2850            HydroNode::ChainFirst { first, second, .. } => {
2851                transform(first.as_mut(), seen_tees);
2852                transform(second.as_mut(), seen_tees);
2853            }
2854
2855            HydroNode::CrossSingleton { left, right, .. }
2856            | HydroNode::CrossProduct { left, right, .. }
2857            | HydroNode::Join { left, right, .. }
2858            | HydroNode::JoinHalf { left, right, .. } => {
2859                transform(left.as_mut(), seen_tees);
2860                transform(right.as_mut(), seen_tees);
2861            }
2862
2863            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2864                transform(pos.as_mut(), seen_tees);
2865                transform(neg.as_mut(), seen_tees);
2866            }
2867
2868            HydroNode::Map { f, input, .. } => {
2869                f.transform_children(&mut transform, seen_tees);
2870                transform(input.as_mut(), seen_tees);
2871            }
2872            HydroNode::FlatMap { f, input, .. }
2873            | HydroNode::FlatMapStreamBlocking { f, input, .. }
2874            | HydroNode::Filter { f, input, .. }
2875            | HydroNode::FilterMap { f, input, .. }
2876            | HydroNode::Inspect { f, input, .. }
2877            | HydroNode::Reduce { f, input, .. }
2878            | HydroNode::ReduceKeyed { f, input, .. } => {
2879                f.transform_children(&mut transform, seen_tees);
2880                transform(input.as_mut(), seen_tees);
2881            }
2882            HydroNode::ReduceKeyedWatermark {
2883                f,
2884                input,
2885                watermark,
2886                ..
2887            } => {
2888                f.transform_children(&mut transform, seen_tees);
2889                transform(input.as_mut(), seen_tees);
2890                transform(watermark.as_mut(), seen_tees);
2891            }
2892            HydroNode::Fold {
2893                init, acc, input, ..
2894            }
2895            | HydroNode::Scan {
2896                init, acc, input, ..
2897            }
2898            | HydroNode::ScanAsyncBlocking {
2899                init, acc, input, ..
2900            }
2901            | HydroNode::FoldKeyed {
2902                init, acc, input, ..
2903            } => {
2904                init.transform_children(&mut transform, seen_tees);
2905                acc.transform_children(&mut transform, seen_tees);
2906                transform(input.as_mut(), seen_tees);
2907            }
2908            HydroNode::ResolveFutures { input, .. }
2909            | HydroNode::ResolveFuturesBlocking { input, .. }
2910            | HydroNode::ResolveFuturesOrdered { input, .. }
2911            | HydroNode::Sort { input, .. }
2912            | HydroNode::DeferTick { input, .. }
2913            | HydroNode::Enumerate { input, .. }
2914            | HydroNode::Unique { input, .. }
2915            | HydroNode::Network { input, .. }
2916            | HydroNode::Counter { input, .. } => {
2917                transform(input.as_mut(), seen_tees);
2918            }
2919
2920            HydroNode::VersionedNetworkFork { senders, .. } => {
2921                for (_version, sender, _serialize) in senders.iter_mut() {
2922                    transform(sender.as_mut(), seen_tees);
2923                }
2924            }
2925
2926            HydroNode::VersionedNetwork { fork, .. } => {
2927                if let Some(transformed) = seen_tees.get(&fork.as_ptr()) {
2928                    *fork = SharedNode(transformed.clone());
2929                } else {
2930                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2931                    seen_tees.insert(fork.as_ptr(), transformed_cell.clone());
2932                    let mut orig = fork.0.replace(HydroNode::Placeholder);
2933                    transform(&mut orig, seen_tees);
2934                    *transformed_cell.borrow_mut() = orig;
2935                    *fork = SharedNode(transformed_cell);
2936                }
2937            }
2938        }
2939    }
2940
2941    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2942        match self {
2943            HydroNode::Placeholder => HydroNode::Placeholder,
2944            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2945                inner: Box::new(inner.deep_clone(seen_tees)),
2946                metadata: metadata.clone(),
2947            },
2948            HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2949                inner: Box::new(inner.deep_clone(seen_tees)),
2950                metadata: metadata.clone(),
2951            },
2952            HydroNode::ObserveNonDet {
2953                inner,
2954                trusted,
2955                metadata,
2956            } => HydroNode::ObserveNonDet {
2957                inner: Box::new(inner.deep_clone(seen_tees)),
2958                trusted: *trusted,
2959                metadata: metadata.clone(),
2960            },
2961            HydroNode::AssertIsConsistent {
2962                inner,
2963                trusted,
2964                metadata,
2965            } => HydroNode::AssertIsConsistent {
2966                inner: Box::new(inner.deep_clone(seen_tees)),
2967                trusted: *trusted,
2968                metadata: metadata.clone(),
2969            },
2970            HydroNode::Source { source, metadata } => HydroNode::Source {
2971                source: source.clone(),
2972                metadata: metadata.clone(),
2973            },
2974            HydroNode::SingletonSource {
2975                value,
2976                first_tick_only,
2977                metadata,
2978            } => HydroNode::SingletonSource {
2979                value: value.clone(),
2980                first_tick_only: *first_tick_only,
2981                metadata: metadata.clone(),
2982            },
2983            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2984                cycle_id: *cycle_id,
2985                metadata: metadata.clone(),
2986            },
2987            HydroNode::Tee { inner, metadata }
2988            | HydroNode::Reference {
2989                inner, metadata, ..
2990            } => {
2991                let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2992                    SharedNode(transformed.clone())
2993                } else {
2994                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2995                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2996                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2997                    *new_rc.borrow_mut() = cloned;
2998                    SharedNode(new_rc)
2999                };
3000                if let HydroNode::Reference {
3001                    kind,
3002                    access_counter,
3003                    ..
3004                } = self
3005                {
3006                    HydroNode::Reference {
3007                        inner: cloned_inner,
3008                        kind: *kind,
3009                        access_counter: access_counter.freeze(),
3010                        metadata: metadata.clone(),
3011                    }
3012                } else {
3013                    HydroNode::Tee {
3014                        inner: cloned_inner,
3015                        metadata: metadata.clone(),
3016                    }
3017                }
3018            }
3019            HydroNode::Partition {
3020                inner,
3021                f,
3022                is_true,
3023                metadata,
3024            } => {
3025                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
3026                    HydroNode::Partition {
3027                        inner: SharedNode(transformed.clone()),
3028                        f: f.deep_clone(seen_tees),
3029                        is_true: *is_true,
3030                        metadata: metadata.clone(),
3031                    }
3032                } else {
3033                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
3034                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
3035                    let cloned = inner.0.borrow().deep_clone(seen_tees);
3036                    *new_rc.borrow_mut() = cloned;
3037                    HydroNode::Partition {
3038                        inner: SharedNode(new_rc),
3039                        f: f.deep_clone(seen_tees),
3040                        is_true: *is_true,
3041                        metadata: metadata.clone(),
3042                    }
3043                }
3044            }
3045            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
3046                inner: Box::new(inner.deep_clone(seen_tees)),
3047                metadata: metadata.clone(),
3048            },
3049            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
3050                inner: Box::new(inner.deep_clone(seen_tees)),
3051                metadata: metadata.clone(),
3052            },
3053            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
3054                inner: Box::new(inner.deep_clone(seen_tees)),
3055                metadata: metadata.clone(),
3056            },
3057            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
3058                inner: Box::new(inner.deep_clone(seen_tees)),
3059                metadata: metadata.clone(),
3060            },
3061            HydroNode::Chain {
3062                first,
3063                second,
3064                metadata,
3065            } => HydroNode::Chain {
3066                first: Box::new(first.deep_clone(seen_tees)),
3067                second: Box::new(second.deep_clone(seen_tees)),
3068                metadata: metadata.clone(),
3069            },
3070            HydroNode::MergeOrdered {
3071                first,
3072                second,
3073                metadata,
3074            } => HydroNode::MergeOrdered {
3075                first: Box::new(first.deep_clone(seen_tees)),
3076                second: Box::new(second.deep_clone(seen_tees)),
3077                metadata: metadata.clone(),
3078            },
3079            HydroNode::ChainFirst {
3080                first,
3081                second,
3082                metadata,
3083            } => HydroNode::ChainFirst {
3084                first: Box::new(first.deep_clone(seen_tees)),
3085                second: Box::new(second.deep_clone(seen_tees)),
3086                metadata: metadata.clone(),
3087            },
3088            HydroNode::CrossProduct {
3089                left,
3090                right,
3091                metadata,
3092            } => HydroNode::CrossProduct {
3093                left: Box::new(left.deep_clone(seen_tees)),
3094                right: Box::new(right.deep_clone(seen_tees)),
3095                metadata: metadata.clone(),
3096            },
3097            HydroNode::CrossSingleton {
3098                left,
3099                right,
3100                metadata,
3101            } => HydroNode::CrossSingleton {
3102                left: Box::new(left.deep_clone(seen_tees)),
3103                right: Box::new(right.deep_clone(seen_tees)),
3104                metadata: metadata.clone(),
3105            },
3106            HydroNode::Join {
3107                left,
3108                right,
3109                metadata,
3110            } => HydroNode::Join {
3111                left: Box::new(left.deep_clone(seen_tees)),
3112                right: Box::new(right.deep_clone(seen_tees)),
3113                metadata: metadata.clone(),
3114            },
3115            HydroNode::JoinHalf {
3116                left,
3117                right,
3118                metadata,
3119            } => HydroNode::JoinHalf {
3120                left: Box::new(left.deep_clone(seen_tees)),
3121                right: Box::new(right.deep_clone(seen_tees)),
3122                metadata: metadata.clone(),
3123            },
3124            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
3125                pos: Box::new(pos.deep_clone(seen_tees)),
3126                neg: Box::new(neg.deep_clone(seen_tees)),
3127                metadata: metadata.clone(),
3128            },
3129            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
3130                pos: Box::new(pos.deep_clone(seen_tees)),
3131                neg: Box::new(neg.deep_clone(seen_tees)),
3132                metadata: metadata.clone(),
3133            },
3134            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
3135                input: Box::new(input.deep_clone(seen_tees)),
3136                metadata: metadata.clone(),
3137            },
3138            HydroNode::ResolveFuturesBlocking { input, metadata } => {
3139                HydroNode::ResolveFuturesBlocking {
3140                    input: Box::new(input.deep_clone(seen_tees)),
3141                    metadata: metadata.clone(),
3142                }
3143            }
3144            HydroNode::ResolveFuturesOrdered { input, metadata } => {
3145                HydroNode::ResolveFuturesOrdered {
3146                    input: Box::new(input.deep_clone(seen_tees)),
3147                    metadata: metadata.clone(),
3148                }
3149            }
3150            HydroNode::Map { f, input, metadata } => HydroNode::Map {
3151                f: f.deep_clone(seen_tees),
3152                input: Box::new(input.deep_clone(seen_tees)),
3153                metadata: metadata.clone(),
3154            },
3155            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
3156                f: f.deep_clone(seen_tees),
3157                input: Box::new(input.deep_clone(seen_tees)),
3158                metadata: metadata.clone(),
3159            },
3160            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
3161                HydroNode::FlatMapStreamBlocking {
3162                    f: f.deep_clone(seen_tees),
3163                    input: Box::new(input.deep_clone(seen_tees)),
3164                    metadata: metadata.clone(),
3165                }
3166            }
3167            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
3168                f: f.deep_clone(seen_tees),
3169                input: Box::new(input.deep_clone(seen_tees)),
3170                metadata: metadata.clone(),
3171            },
3172            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
3173                f: f.deep_clone(seen_tees),
3174                input: Box::new(input.deep_clone(seen_tees)),
3175                metadata: metadata.clone(),
3176            },
3177            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
3178                input: Box::new(input.deep_clone(seen_tees)),
3179                metadata: metadata.clone(),
3180            },
3181            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
3182                input: Box::new(input.deep_clone(seen_tees)),
3183                metadata: metadata.clone(),
3184            },
3185            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
3186                f: f.deep_clone(seen_tees),
3187                input: Box::new(input.deep_clone(seen_tees)),
3188                metadata: metadata.clone(),
3189            },
3190            HydroNode::Unique { input, metadata } => HydroNode::Unique {
3191                input: Box::new(input.deep_clone(seen_tees)),
3192                metadata: metadata.clone(),
3193            },
3194            HydroNode::Sort { input, metadata } => HydroNode::Sort {
3195                input: Box::new(input.deep_clone(seen_tees)),
3196                metadata: metadata.clone(),
3197            },
3198            HydroNode::Fold {
3199                init,
3200                acc,
3201                input,
3202                metadata,
3203            } => HydroNode::Fold {
3204                init: init.deep_clone(seen_tees),
3205                acc: acc.deep_clone(seen_tees),
3206                input: Box::new(input.deep_clone(seen_tees)),
3207                metadata: metadata.clone(),
3208            },
3209            HydroNode::Scan {
3210                init,
3211                acc,
3212                input,
3213                metadata,
3214            } => HydroNode::Scan {
3215                init: init.deep_clone(seen_tees),
3216                acc: acc.deep_clone(seen_tees),
3217                input: Box::new(input.deep_clone(seen_tees)),
3218                metadata: metadata.clone(),
3219            },
3220            HydroNode::ScanAsyncBlocking {
3221                init,
3222                acc,
3223                input,
3224                metadata,
3225            } => HydroNode::ScanAsyncBlocking {
3226                init: init.deep_clone(seen_tees),
3227                acc: acc.deep_clone(seen_tees),
3228                input: Box::new(input.deep_clone(seen_tees)),
3229                metadata: metadata.clone(),
3230            },
3231            HydroNode::FoldKeyed {
3232                init,
3233                acc,
3234                input,
3235                metadata,
3236            } => HydroNode::FoldKeyed {
3237                init: init.deep_clone(seen_tees),
3238                acc: acc.deep_clone(seen_tees),
3239                input: Box::new(input.deep_clone(seen_tees)),
3240                metadata: metadata.clone(),
3241            },
3242            HydroNode::ReduceKeyedWatermark {
3243                f,
3244                input,
3245                watermark,
3246                metadata,
3247            } => HydroNode::ReduceKeyedWatermark {
3248                f: f.deep_clone(seen_tees),
3249                input: Box::new(input.deep_clone(seen_tees)),
3250                watermark: Box::new(watermark.deep_clone(seen_tees)),
3251                metadata: metadata.clone(),
3252            },
3253            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3254                f: f.deep_clone(seen_tees),
3255                input: Box::new(input.deep_clone(seen_tees)),
3256                metadata: metadata.clone(),
3257            },
3258            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3259                f: f.deep_clone(seen_tees),
3260                input: Box::new(input.deep_clone(seen_tees)),
3261                metadata: metadata.clone(),
3262            },
3263            HydroNode::Network {
3264                name,
3265                networking_info,
3266                serialize_fn,
3267                instantiate_fn,
3268                deserialize_fn,
3269                input,
3270                metadata,
3271            } => HydroNode::Network {
3272                name: name.clone(),
3273                networking_info: networking_info.clone(),
3274                serialize_fn: serialize_fn.clone(),
3275                instantiate_fn: instantiate_fn.clone(),
3276                deserialize_fn: deserialize_fn.clone(),
3277                input: Box::new(input.deep_clone(seen_tees)),
3278                metadata: metadata.clone(),
3279            },
3280            HydroNode::ExternalInput {
3281                from_external_key,
3282                from_port_id,
3283                from_many,
3284                codec_type,
3285                port_hint,
3286                instantiate_fn,
3287                deserialize_fn,
3288                metadata,
3289            } => HydroNode::ExternalInput {
3290                from_external_key: *from_external_key,
3291                from_port_id: *from_port_id,
3292                from_many: *from_many,
3293                codec_type: codec_type.clone(),
3294                port_hint: *port_hint,
3295                instantiate_fn: instantiate_fn.clone(),
3296                deserialize_fn: deserialize_fn.clone(),
3297                metadata: metadata.clone(),
3298            },
3299            HydroNode::Counter {
3300                tag,
3301                duration,
3302                prefix,
3303                input,
3304                metadata,
3305            } => HydroNode::Counter {
3306                tag: tag.clone(),
3307                duration: duration.clone(),
3308                prefix: prefix.clone(),
3309                input: Box::new(input.deep_clone(seen_tees)),
3310                metadata: metadata.clone(),
3311            },
3312            HydroNode::VersionedNetworkFork {
3313                channel_id,
3314                channel_name,
3315                senders,
3316                metadata,
3317            } => HydroNode::VersionedNetworkFork {
3318                channel_id: *channel_id,
3319                channel_name: channel_name.clone(),
3320                senders: senders
3321                    .iter()
3322                    .map(|(version, sender, serialize)| {
3323                        (
3324                            *version,
3325                            Box::new(sender.deep_clone(seen_tees)),
3326                            serialize.clone(),
3327                        )
3328                    })
3329                    .collect(),
3330                metadata: metadata.clone(),
3331            },
3332            HydroNode::VersionedNetwork {
3333                fork,
3334                version,
3335                deserialize_fn,
3336                metadata,
3337            } => {
3338                let cloned_fork = if let Some(transformed) = seen_tees.get(&fork.as_ptr()) {
3339                    SharedNode(transformed.clone())
3340                } else {
3341                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
3342                    seen_tees.insert(fork.as_ptr(), new_rc.clone());
3343                    let cloned = fork.0.borrow().deep_clone(seen_tees);
3344                    *new_rc.borrow_mut() = cloned;
3345                    SharedNode(new_rc)
3346                };
3347                HydroNode::VersionedNetwork {
3348                    fork: cloned_fork,
3349                    version: *version,
3350                    deserialize_fn: deserialize_fn.clone(),
3351                    metadata: metadata.clone(),
3352                }
3353            }
3354        }
3355    }
3356
3357    #[cfg(feature = "build")]
3358    pub fn emit_core(
3359        &mut self,
3360        builders_or_callback: &mut BuildersOrCallback<
3361            impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
3362            impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
3363        >,
3364        seen_tees: &mut SeenSharedNodes,
3365        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3366        next_stmt_id: &mut crate::Counter<StmtId>,
3367        fold_hooked_idents: &mut HashSet<String>,
3368    ) -> syn::Ident {
3369        let mut ident_stack: Vec<syn::Ident> = Vec::new();
3370
3371        self.transform_bottom_up(
3372            &mut |node: &mut HydroNode| {
3373                let out_location = node.metadata().location_id.clone();
3374                match node {
3375                    HydroNode::Placeholder => {
3376                        panic!()
3377                    }
3378
3379                    HydroNode::Cast { .. } => {
3380                        // Cast passes through the input ident unchanged
3381                        // The input ident is already on the stack from processing the child
3382                        let _ = next_stmt_id.get_and_increment();
3383                        match builders_or_callback {
3384                            BuildersOrCallback::Builders(_) => {}
3385                            BuildersOrCallback::Callback(_, node_callback) => {
3386                                node_callback(node, next_stmt_id);
3387                            }
3388                        }
3389                        // input_ident stays on stack as output
3390                    }
3391
3392                    HydroNode::UnboundSingleton { .. } => {
3393                        let inner_ident = ident_stack.pop().unwrap();
3394
3395                        let stmt_id = next_stmt_id.get_and_increment();
3396                        let out_ident =
3397                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3398
3399                        match builders_or_callback {
3400                            BuildersOrCallback::Builders(graph_builders) => {
3401                                if graph_builders.singleton_intermediates() {
3402                                    let builder = graph_builders.get_dfir_mut(&out_location);
3403                                    builder.add_dfir(
3404                                        parse_quote! {
3405                                            #out_ident = #inner_ident;
3406                                        },
3407                                        None,
3408                                        None,
3409                                    );
3410                                } else {
3411                                    let builder = graph_builders.get_dfir_mut(&out_location);
3412                                    builder.add_dfir(
3413                                        parse_quote! {
3414                                            #out_ident = #inner_ident -> persist::<'static>();
3415                                        },
3416                                        None,
3417                                        None,
3418                                    );
3419                                }
3420                            }
3421                            BuildersOrCallback::Callback(_, node_callback) => {
3422                                node_callback(node, next_stmt_id);
3423                            }
3424                        }
3425
3426                        ident_stack.push(out_ident);
3427                    }
3428
3429                    HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3430                        let inner_ident = ident_stack.pop().unwrap();
3431
3432                        let stmt_id = next_stmt_id.get_and_increment();
3433                        let out_ident =
3434                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3435
3436                        match builders_or_callback {
3437                            BuildersOrCallback::Builders(graph_builders) => {
3438                                graph_builders.assert_is_consistent(
3439                                    *trusted,
3440                                    &inner.metadata().location_id,
3441                                    inner_ident,
3442                                    &out_ident,
3443                                );
3444                            }
3445                            BuildersOrCallback::Callback(_, node_callback) => {
3446                                node_callback(node, next_stmt_id);
3447                            }
3448                        }
3449
3450                        ident_stack.push(out_ident);
3451                    }
3452
3453                    HydroNode::ObserveNonDet {
3454                        inner,
3455                        trusted,
3456                        metadata,
3457                        ..
3458                    } => {
3459                        let inner_ident = ident_stack.pop().unwrap();
3460
3461                        let stmt_id = next_stmt_id.get_and_increment();
3462                        let observe_ident =
3463                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3464
3465                        match builders_or_callback {
3466                            BuildersOrCallback::Builders(graph_builders) => {
3467                                graph_builders.observe_nondet(
3468                                    *trusted,
3469                                    &inner.metadata().location_id,
3470                                    inner_ident,
3471                                    &inner.metadata().collection_kind,
3472                                    &observe_ident,
3473                                    &metadata.collection_kind,
3474                                    &metadata.op,
3475                                );
3476                            }
3477                            BuildersOrCallback::Callback(_, node_callback) => {
3478                                node_callback(node, next_stmt_id);
3479                            }
3480                        }
3481
3482                        ident_stack.push(observe_ident);
3483                    }
3484
3485                    HydroNode::Batch {
3486                        inner, metadata, ..
3487                    } => {
3488                        let inner_ident = ident_stack.pop().unwrap();
3489
3490                        let stmt_id = next_stmt_id.get_and_increment();
3491                        let batch_ident =
3492                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3493
3494                        match builders_or_callback {
3495                            BuildersOrCallback::Builders(graph_builders) => {
3496                                graph_builders.batch(
3497                                    inner_ident,
3498                                    &inner.metadata().location_id,
3499                                    &inner.metadata().collection_kind,
3500                                    &batch_ident,
3501                                    &out_location,
3502                                    &metadata.op,
3503                                    fold_hooked_idents,
3504                                );
3505                            }
3506                            BuildersOrCallback::Callback(_, node_callback) => {
3507                                node_callback(node, next_stmt_id);
3508                            }
3509                        }
3510
3511                        ident_stack.push(batch_ident);
3512                    }
3513
3514                    HydroNode::YieldConcat { inner, .. } => {
3515                        let inner_ident = ident_stack.pop().unwrap();
3516
3517                        let stmt_id = next_stmt_id.get_and_increment();
3518                        let yield_ident =
3519                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3520
3521                        match builders_or_callback {
3522                            BuildersOrCallback::Builders(graph_builders) => {
3523                                graph_builders.yield_from_tick(
3524                                    inner_ident,
3525                                    &inner.metadata().location_id,
3526                                    &inner.metadata().collection_kind,
3527                                    &yield_ident,
3528                                    &out_location,
3529                                );
3530                            }
3531                            BuildersOrCallback::Callback(_, node_callback) => {
3532                                node_callback(node, next_stmt_id);
3533                            }
3534                        }
3535
3536                        ident_stack.push(yield_ident);
3537                    }
3538
3539                    HydroNode::BeginAtomic { inner, metadata } => {
3540                        let inner_ident = ident_stack.pop().unwrap();
3541
3542                        let stmt_id = next_stmt_id.get_and_increment();
3543                        let begin_ident =
3544                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3545
3546                        match builders_or_callback {
3547                            BuildersOrCallback::Builders(graph_builders) => {
3548                                graph_builders.begin_atomic(
3549                                    inner_ident,
3550                                    &inner.metadata().location_id,
3551                                    &inner.metadata().collection_kind,
3552                                    &begin_ident,
3553                                    &out_location,
3554                                    &metadata.op,
3555                                );
3556                            }
3557                            BuildersOrCallback::Callback(_, node_callback) => {
3558                                node_callback(node, next_stmt_id);
3559                            }
3560                        }
3561
3562                        ident_stack.push(begin_ident);
3563                    }
3564
3565                    HydroNode::EndAtomic { inner, .. } => {
3566                        let inner_ident = ident_stack.pop().unwrap();
3567
3568                        let stmt_id = next_stmt_id.get_and_increment();
3569                        let end_ident =
3570                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3571
3572                        match builders_or_callback {
3573                            BuildersOrCallback::Builders(graph_builders) => {
3574                                graph_builders.end_atomic(
3575                                    inner_ident,
3576                                    &inner.metadata().location_id,
3577                                    &inner.metadata().collection_kind,
3578                                    &end_ident,
3579                                );
3580                            }
3581                            BuildersOrCallback::Callback(_, node_callback) => {
3582                                node_callback(node, next_stmt_id);
3583                            }
3584                        }
3585
3586                        ident_stack.push(end_ident);
3587                    }
3588
3589                    HydroNode::Source {
3590                        source, metadata, ..
3591                    } => {
3592                        if let HydroSource::ExternalNetwork() = source {
3593                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3594                        } else {
3595                            let stmt_id = next_stmt_id.get_and_increment();
3596                            let source_ident =
3597                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3598
3599                            let source_stmt = match source {
3600                                HydroSource::Stream(expr) => {
3601                                    debug_assert!(metadata.location_id.is_top_level());
3602                                    parse_quote! {
3603                                        #source_ident = source_stream(#expr);
3604                                    }
3605                                }
3606
3607                                HydroSource::ExternalNetwork() => {
3608                                    unreachable!()
3609                                }
3610
3611                                HydroSource::Iter(expr) => {
3612                                    if metadata.location_id.is_top_level() {
3613                                        parse_quote! {
3614                                            #source_ident = source_iter(#expr);
3615                                        }
3616                                    } else {
3617                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
3618                                        parse_quote! {
3619                                            #source_ident = source_iter(#expr) -> persist::<'static>();
3620                                        }
3621                                    }
3622                                }
3623
3624                                HydroSource::Spin() => {
3625                                    debug_assert!(metadata.location_id.is_top_level());
3626                                    parse_quote! {
3627                                        #source_ident = spin();
3628                                    }
3629                                }
3630
3631                                HydroSource::ClusterMembers(target_loc, state) => {
3632                                    debug_assert!(metadata.location_id.is_top_level());
3633
3634                                    let members_tee_ident = syn::Ident::new(
3635                                        &format!(
3636                                            "__cluster_members_tee_{}_{}",
3637                                            metadata.location_id.root().key(),
3638                                            target_loc.key(),
3639                                        ),
3640                                        Span::call_site(),
3641                                    );
3642
3643                                    match state {
3644                                        ClusterMembersState::Stream(d) => {
3645                                            parse_quote! {
3646                                                #members_tee_ident = source_stream(#d) -> tee();
3647                                                #source_ident = #members_tee_ident;
3648                                            }
3649                                        },
3650                                        ClusterMembersState::Uninit => syn::parse_quote! {
3651                                            #source_ident = source_stream(DUMMY);
3652                                        },
3653                                        ClusterMembersState::Tee(..) => parse_quote! {
3654                                            #source_ident = #members_tee_ident;
3655                                        },
3656                                    }
3657                                }
3658
3659                                HydroSource::Embedded(ident) => {
3660                                    parse_quote! {
3661                                        #source_ident = source_stream(#ident);
3662                                    }
3663                                }
3664
3665                                HydroSource::EmbeddedSingleton(ident) => {
3666                                    parse_quote! {
3667                                        #source_ident = source_iter([#ident]);
3668                                    }
3669                                }
3670                            };
3671
3672                            match builders_or_callback {
3673                                BuildersOrCallback::Builders(graph_builders) => {
3674                                    let builder = graph_builders.get_dfir_mut(&out_location);
3675                                    builder.add_dfir(source_stmt, None, Some(&stmt_id.to_string()));
3676                                }
3677                                BuildersOrCallback::Callback(_, node_callback) => {
3678                                    node_callback(node, next_stmt_id);
3679                                }
3680                            }
3681
3682                            ident_stack.push(source_ident);
3683                        }
3684                    }
3685
3686                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3687                        let stmt_id = next_stmt_id.get_and_increment();
3688                        let source_ident =
3689                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3690
3691                        match builders_or_callback {
3692                            BuildersOrCallback::Builders(graph_builders) => {
3693                                let builder = graph_builders.get_dfir_mut(&out_location);
3694
3695                                if *first_tick_only {
3696                                    assert!(
3697                                        !metadata.location_id.is_top_level(),
3698                                        "first_tick_only SingletonSource must be inside a tick"
3699                                    );
3700                                }
3701
3702                                if *first_tick_only
3703                                    || (metadata.location_id.is_top_level()
3704                                        && metadata.collection_kind.is_bounded())
3705                                {
3706                                    builder.add_dfir(
3707                                        parse_quote! {
3708                                            #source_ident = source_iter([#value]);
3709                                        },
3710                                        None,
3711                                        Some(&stmt_id.to_string()),
3712                                    );
3713                                } else {
3714                                    builder.add_dfir(
3715                                        parse_quote! {
3716                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3717                                        },
3718                                        None,
3719                                        Some(&stmt_id.to_string()),
3720                                    );
3721                                }
3722                            }
3723                            BuildersOrCallback::Callback(_, node_callback) => {
3724                                node_callback(node, next_stmt_id);
3725                            }
3726                        }
3727
3728                        ident_stack.push(source_ident);
3729                    }
3730
3731                    HydroNode::CycleSource { cycle_id, .. } => {
3732                        let ident = cycle_id.as_ident();
3733
3734                        // consume a stmt id even though we did not emit anything so that we can instrument this
3735                        let _ = next_stmt_id.get_and_increment();
3736
3737                        match builders_or_callback {
3738                            BuildersOrCallback::Builders(_) => {}
3739                            BuildersOrCallback::Callback(_, node_callback) => {
3740                                node_callback(node, next_stmt_id);
3741                            }
3742                        }
3743
3744                        ident_stack.push(ident);
3745                    }
3746
3747                    HydroNode::Tee { inner, .. } => {
3748                        // we consume a stmt id regardless of if we emit the tee() operator,
3749                        // so that during rewrites we touch all recipients of the tee()
3750                        let stmt_id = next_stmt_id.get_and_increment();
3751
3752                        let ret_ident = if let Some(built_idents) =
3753                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3754                        {
3755                            match builders_or_callback {
3756                                BuildersOrCallback::Builders(_) => {}
3757                                BuildersOrCallback::Callback(_, node_callback) => {
3758                                    node_callback(node, next_stmt_id);
3759                                }
3760                            }
3761
3762                            built_idents[0].clone()
3763                        } else {
3764                            // The inner node was already processed by transform_bottom_up,
3765                            // so its ident is on the stack
3766                            let inner_ident = ident_stack.pop().unwrap();
3767
3768                            let tee_ident =
3769                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3770
3771                            built_tees.insert(
3772                                inner.0.as_ref() as *const RefCell<HydroNode>,
3773                                vec![tee_ident.clone()],
3774                            );
3775
3776                            match builders_or_callback {
3777                                BuildersOrCallback::Builders(graph_builders) => {
3778                                    // NOTE: With `forward_ref`, the fold codegen may not have
3779                                    // run yet when we reach this tee, so `fold_hooked_idents`
3780                                    // might not contain the inner ident. In that case we won't
3781                                    // propagate the "hooked" status to the tee and the
3782                                    // downstream singleton batch will use the normal
3783                                    // `SingletonHook` instead of `PassthroughSingletonHook`.
3784                                    // This is not a soundness issue: the fallback hook still
3785                                    // produces correct behavior, just with a redundant decision
3786                                    // point. TODO(https://github.com/hydro-project/hydro/issues/2856):
3787                                    // fix ordering so forward_ref folds are always processed
3788                                    // before their downstream tees.
3789                                    if fold_hooked_idents.contains(&inner_ident.to_string()) {
3790                                        fold_hooked_idents.insert(tee_ident.to_string());
3791                                    }
3792                                    let builder = graph_builders.get_dfir_mut(&out_location);
3793                                    builder.add_dfir(
3794                                        parse_quote! {
3795                                            #tee_ident = #inner_ident -> tee();
3796                                        },
3797                                        None,
3798                                        Some(&stmt_id.to_string()),
3799                                    );
3800                                }
3801                                BuildersOrCallback::Callback(_, node_callback) => {
3802                                    node_callback(node, next_stmt_id);
3803                                }
3804                            }
3805
3806                            tee_ident
3807                        };
3808
3809                        ident_stack.push(ret_ident);
3810                    }
3811
3812                    HydroNode::Reference { inner, kind, .. } => {
3813                        // we consume a stmt id regardless of if we emit the operator,
3814                        // so that during rewrites we touch all recipients
3815                        let stmt_id = next_stmt_id.get_and_increment();
3816
3817                        let ret_ident = if let Some(built_idents) =
3818                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3819                        {
3820                            built_idents[0].clone()
3821                        } else {
3822                            let inner_ident = ident_stack.pop().unwrap();
3823
3824                            let ref_ident =
3825                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3826
3827                            built_tees.insert(
3828                                inner.0.as_ref() as *const RefCell<HydroNode>,
3829                                vec![ref_ident.clone()],
3830                            );
3831
3832                            match builders_or_callback {
3833                                BuildersOrCallback::Builders(graph_builders) => {
3834                                    let builder = graph_builders.get_dfir_mut(&out_location);
3835                                    let op_ident = syn::Ident::new(
3836                                        match kind {
3837                                            crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3838                                            crate::handoff_ref::HandoffRefKind::Optional => "optional",
3839                                            crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3840                                        },
3841                                        Span::call_site(),
3842                                    );
3843                                    builder.add_dfir(
3844                                        parse_quote! {
3845                                            #ref_ident = #inner_ident -> #op_ident();
3846                                        },
3847                                        None,
3848                                        Some(&stmt_id.to_string()),
3849                                    );
3850                                }
3851                                BuildersOrCallback::Callback(_, node_callback) => {
3852                                    node_callback(node, next_stmt_id);
3853                                }
3854                            }
3855
3856                            ref_ident
3857                        };
3858
3859                        ident_stack.push(ret_ident);
3860                    }
3861
3862                    HydroNode::Partition {
3863                        inner, f, is_true, metadata,
3864                    } => {
3865                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3866                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3867                        let stmt_id = next_stmt_id.get_and_increment();
3868
3869                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3870                            match builders_or_callback {
3871                                BuildersOrCallback::Builders(_) => {}
3872                                BuildersOrCallback::Callback(_, node_callback) => {
3873                                    node_callback(node, next_stmt_id);
3874                                }
3875                            }
3876
3877                            let idx = if is_true { 0 } else { 1 };
3878                            built_idents[idx].clone()
3879                        } else {
3880                            // The inner node was already processed by transform_bottom_up,
3881                            // so its ident is on the stack
3882                            let inner_ident = ident_stack.pop().unwrap();
3883                            let f_tokens = f.emit_tokens(&mut ident_stack);
3884
3885                            let inner_ident = {
3886                                let inner_borrow = inner.0.borrow();
3887                                maybe_observe_for_mut(
3888                                    f, inner_ident,
3889                                    &inner_borrow.metadata().location_id,
3890                                    &inner_borrow.metadata().collection_kind,
3891                                    &metadata.op,
3892                                    builders_or_callback, next_stmt_id,
3893                                )
3894                            };
3895
3896                            let partition_ident = syn::Ident::new(
3897                                &format!("stream_{}_partition", stmt_id),
3898                                Span::call_site(),
3899                            );
3900                            let true_ident = syn::Ident::new(
3901                                &format!("stream_{}_true", stmt_id),
3902                                Span::call_site(),
3903                            );
3904                            let false_ident = syn::Ident::new(
3905                                &format!("stream_{}_false", stmt_id),
3906                                Span::call_site(),
3907                            );
3908
3909                            built_tees.insert(
3910                                ptr,
3911                                vec![true_ident.clone(), false_ident.clone()],
3912                            );
3913
3914                            let stmt_id = next_stmt_id.get_and_increment();
3915                            match builders_or_callback {
3916                                BuildersOrCallback::Builders(graph_builders) => {
3917                                    let builder = graph_builders.get_dfir_mut(&out_location);
3918                                    builder.add_dfir(
3919                                        parse_quote! {
3920                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3921                                            #true_ident = #partition_ident[0];
3922                                            #false_ident = #partition_ident[1];
3923                                        },
3924                                        None,
3925                                        Some(&stmt_id.to_string()),
3926                                    );
3927                                }
3928                                BuildersOrCallback::Callback(_, node_callback) => {
3929                                    node_callback(node, next_stmt_id);
3930                                }
3931                            }
3932
3933                            if is_true { true_ident } else { false_ident }
3934                        };
3935
3936                        ident_stack.push(ret_ident);
3937                    }
3938
3939                    HydroNode::Chain { .. } => {
3940                        // Children are processed left-to-right, so second is on top
3941                        let second_ident = ident_stack.pop().unwrap();
3942                        let first_ident = ident_stack.pop().unwrap();
3943
3944                        let stmt_id = next_stmt_id.get_and_increment();
3945                        let chain_ident =
3946                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3947
3948                        match builders_or_callback {
3949                            BuildersOrCallback::Builders(graph_builders) => {
3950                                let builder = graph_builders.get_dfir_mut(&out_location);
3951                                builder.add_dfir(
3952                                    parse_quote! {
3953                                        #chain_ident = chain();
3954                                        #first_ident -> [0]#chain_ident;
3955                                        #second_ident -> [1]#chain_ident;
3956                                    },
3957                                    None,
3958                                    Some(&stmt_id.to_string()),
3959                                );
3960                            }
3961                            BuildersOrCallback::Callback(_, node_callback) => {
3962                                node_callback(node, next_stmt_id);
3963                            }
3964                        }
3965
3966                        ident_stack.push(chain_ident);
3967                    }
3968
3969                    HydroNode::MergeOrdered { first, metadata, .. } => {
3970                        let second_ident = ident_stack.pop().unwrap();
3971                        let first_ident = ident_stack.pop().unwrap();
3972
3973                        let stmt_id = next_stmt_id.get_and_increment();
3974                        let merge_ident =
3975                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3976
3977                        match builders_or_callback {
3978                            BuildersOrCallback::Builders(graph_builders) => {
3979                                graph_builders.merge_ordered(
3980                                    &first.metadata().location_id,
3981                                    first_ident,
3982                                    second_ident,
3983                                    &merge_ident,
3984                                    &first.metadata().collection_kind,
3985                                    &metadata.op,
3986                                    Some(&stmt_id.to_string()),
3987                                );
3988                            }
3989                            BuildersOrCallback::Callback(_, node_callback) => {
3990                                node_callback(node, next_stmt_id);
3991                            }
3992                        }
3993
3994                        ident_stack.push(merge_ident);
3995                    }
3996
3997                    HydroNode::ChainFirst { .. } => {
3998                        let second_ident = ident_stack.pop().unwrap();
3999                        let first_ident = ident_stack.pop().unwrap();
4000
4001                        let stmt_id = next_stmt_id.get_and_increment();
4002                        let chain_ident =
4003                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4004
4005                        match builders_or_callback {
4006                            BuildersOrCallback::Builders(graph_builders) => {
4007                                let builder = graph_builders.get_dfir_mut(&out_location);
4008                                builder.add_dfir(
4009                                    parse_quote! {
4010                                        #chain_ident = chain_first_n(1);
4011                                        #first_ident -> [0]#chain_ident;
4012                                        #second_ident -> [1]#chain_ident;
4013                                    },
4014                                    None,
4015                                    Some(&stmt_id.to_string()),
4016                                );
4017                            }
4018                            BuildersOrCallback::Callback(_, node_callback) => {
4019                                node_callback(node, next_stmt_id);
4020                            }
4021                        }
4022
4023                        ident_stack.push(chain_ident);
4024                    }
4025
4026                    HydroNode::CrossSingleton { right, .. } => {
4027                        let right_ident = ident_stack.pop().unwrap();
4028                        let left_ident = ident_stack.pop().unwrap();
4029
4030                        let stmt_id = next_stmt_id.get_and_increment();
4031                        let cross_ident =
4032                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4033
4034                        match builders_or_callback {
4035                            BuildersOrCallback::Builders(graph_builders) => {
4036                                let builder = graph_builders.get_dfir_mut(&out_location);
4037
4038                                if right.metadata().location_id.is_top_level()
4039                                    && right.metadata().collection_kind.is_bounded()
4040                                {
4041                                    builder.add_dfir(
4042                                        parse_quote! {
4043                                            #cross_ident = cross_singleton::<'static>();
4044                                            #left_ident -> [input]#cross_ident;
4045                                            #right_ident -> [single]#cross_ident;
4046                                        },
4047                                        None,
4048                                        Some(&stmt_id.to_string()),
4049                                    );
4050                                } else {
4051                                    builder.add_dfir(
4052                                        parse_quote! {
4053                                            #cross_ident = cross_singleton();
4054                                            #left_ident -> [input]#cross_ident;
4055                                            #right_ident -> [single]#cross_ident;
4056                                        },
4057                                        None,
4058                                        Some(&stmt_id.to_string()),
4059                                    );
4060                                }
4061                            }
4062                            BuildersOrCallback::Callback(_, node_callback) => {
4063                                node_callback(node, next_stmt_id);
4064                            }
4065                        }
4066
4067                        ident_stack.push(cross_ident);
4068                    }
4069
4070                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
4071                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
4072                            parse_quote!(cross_join_multiset)
4073                        } else {
4074                            parse_quote!(join_multiset)
4075                        };
4076
4077                        let (HydroNode::CrossProduct { left, right, .. }
4078                        | HydroNode::Join { left, right, .. }) = node
4079                        else {
4080                            unreachable!()
4081                        };
4082
4083                        let is_top_level = left.metadata().location_id.is_top_level()
4084                            && right.metadata().location_id.is_top_level();
4085                        let left_lifetime = if left.metadata().location_id.is_top_level() {
4086                            quote!('static)
4087                        } else {
4088                            quote!('tick)
4089                        };
4090
4091                        let right_lifetime = if right.metadata().location_id.is_top_level() {
4092                            quote!('static)
4093                        } else {
4094                            quote!('tick)
4095                        };
4096
4097                        let right_ident = ident_stack.pop().unwrap();
4098                        let left_ident = ident_stack.pop().unwrap();
4099
4100                        let stmt_id = next_stmt_id.get_and_increment();
4101                        let stream_ident =
4102                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4103
4104                        match builders_or_callback {
4105                            BuildersOrCallback::Builders(graph_builders) => {
4106                                let builder = graph_builders.get_dfir_mut(&out_location);
4107                                builder.add_dfir(
4108                                    if is_top_level {
4109                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
4110                                        // a multiset_delta() to negate the replay behavior
4111                                        parse_quote! {
4112                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
4113                                            #left_ident -> [0]#stream_ident;
4114                                            #right_ident -> [1]#stream_ident;
4115                                        }
4116                                    } else {
4117                                        parse_quote! {
4118                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
4119                                            #left_ident -> [0]#stream_ident;
4120                                            #right_ident -> [1]#stream_ident;
4121                                        }
4122                                    }
4123                                    ,
4124                                    None,
4125                                    Some(&stmt_id.to_string()),
4126                                );
4127                            }
4128                            BuildersOrCallback::Callback(_, node_callback) => {
4129                                node_callback(node, next_stmt_id);
4130                            }
4131                        }
4132
4133                        ident_stack.push(stream_ident);
4134                    }
4135
4136                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
4137                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
4138                            parse_quote!(difference)
4139                        } else {
4140                            parse_quote!(anti_join)
4141                        };
4142
4143                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
4144                            node
4145                        else {
4146                            unreachable!()
4147                        };
4148
4149                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
4150                            quote!('static)
4151                        } else {
4152                            quote!('tick)
4153                        };
4154
4155                        let neg_ident = ident_stack.pop().unwrap();
4156                        let pos_ident = ident_stack.pop().unwrap();
4157
4158                        let stmt_id = next_stmt_id.get_and_increment();
4159                        let stream_ident =
4160                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4161
4162                        match builders_or_callback {
4163                            BuildersOrCallback::Builders(graph_builders) => {
4164                                let builder = graph_builders.get_dfir_mut(&out_location);
4165                                builder.add_dfir(
4166                                    parse_quote! {
4167                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
4168                                        #pos_ident -> [pos]#stream_ident;
4169                                        #neg_ident -> [neg]#stream_ident;
4170                                    },
4171                                    None,
4172                                    Some(&stmt_id.to_string()),
4173                                );
4174                            }
4175                            BuildersOrCallback::Callback(_, node_callback) => {
4176                                node_callback(node, next_stmt_id);
4177                            }
4178                        }
4179
4180                        ident_stack.push(stream_ident);
4181                    }
4182
4183                    HydroNode::JoinHalf { .. } => {
4184                        let HydroNode::JoinHalf { right, .. } = node else {
4185                            unreachable!()
4186                        };
4187
4188                        assert!(
4189                            right.metadata().collection_kind.is_bounded(),
4190                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
4191                            right.metadata().collection_kind
4192                        );
4193
4194                        let build_lifetime = if right.metadata().location_id.is_top_level() {
4195                            quote!('static)
4196                        } else {
4197                            quote!('tick)
4198                        };
4199
4200                        let build_ident = ident_stack.pop().unwrap();
4201                        let probe_ident = ident_stack.pop().unwrap();
4202
4203                        let stmt_id = next_stmt_id.get_and_increment();
4204                        let stream_ident =
4205                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4206
4207                        match builders_or_callback {
4208                            BuildersOrCallback::Builders(graph_builders) => {
4209                                let builder = graph_builders.get_dfir_mut(&out_location);
4210                                builder.add_dfir(
4211                                    parse_quote! {
4212                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
4213                                        #probe_ident -> [probe]#stream_ident;
4214                                        #build_ident -> [build]#stream_ident;
4215                                    },
4216                                    None,
4217                                    Some(&stmt_id.to_string()),
4218                                );
4219                            }
4220                            BuildersOrCallback::Callback(_, node_callback) => {
4221                                node_callback(node, next_stmt_id);
4222                            }
4223                        }
4224
4225                        ident_stack.push(stream_ident);
4226                    }
4227
4228                    HydroNode::ResolveFutures { .. } => {
4229                        let input_ident = ident_stack.pop().unwrap();
4230
4231                        let stmt_id = next_stmt_id.get_and_increment();
4232                        let futures_ident =
4233                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4234
4235                        match builders_or_callback {
4236                            BuildersOrCallback::Builders(graph_builders) => {
4237                                let builder = graph_builders.get_dfir_mut(&out_location);
4238                                builder.add_dfir(
4239                                    parse_quote! {
4240                                        #futures_ident = #input_ident -> resolve_futures();
4241                                    },
4242                                    None,
4243                                    Some(&stmt_id.to_string()),
4244                                );
4245                            }
4246                            BuildersOrCallback::Callback(_, node_callback) => {
4247                                node_callback(node, next_stmt_id);
4248                            }
4249                        }
4250
4251                        ident_stack.push(futures_ident);
4252                    }
4253
4254                    HydroNode::ResolveFuturesBlocking { .. } => {
4255                        let input_ident = ident_stack.pop().unwrap();
4256
4257                        let stmt_id = next_stmt_id.get_and_increment();
4258                        let futures_ident =
4259                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4260
4261                        match builders_or_callback {
4262                            BuildersOrCallback::Builders(graph_builders) => {
4263                                let builder = graph_builders.get_dfir_mut(&out_location);
4264                                builder.add_dfir(
4265                                    parse_quote! {
4266                                        #futures_ident = #input_ident -> resolve_futures_blocking();
4267                                    },
4268                                    None,
4269                                    Some(&stmt_id.to_string()),
4270                                );
4271                            }
4272                            BuildersOrCallback::Callback(_, node_callback) => {
4273                                node_callback(node, next_stmt_id);
4274                            }
4275                        }
4276
4277                        ident_stack.push(futures_ident);
4278                    }
4279
4280                    HydroNode::ResolveFuturesOrdered { .. } => {
4281                        let input_ident = ident_stack.pop().unwrap();
4282
4283                        let stmt_id = next_stmt_id.get_and_increment();
4284                        let futures_ident =
4285                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4286
4287                        match builders_or_callback {
4288                            BuildersOrCallback::Builders(graph_builders) => {
4289                                let builder = graph_builders.get_dfir_mut(&out_location);
4290                                builder.add_dfir(
4291                                    parse_quote! {
4292                                        #futures_ident = #input_ident -> resolve_futures_ordered();
4293                                    },
4294                                    None,
4295                                    Some(&stmt_id.to_string()),
4296                                );
4297                            }
4298                            BuildersOrCallback::Callback(_, node_callback) => {
4299                                node_callback(node, next_stmt_id);
4300                            }
4301                        }
4302
4303                        ident_stack.push(futures_ident);
4304                    }
4305
4306                    HydroNode::Map {
4307                        f,
4308                        input,
4309                        metadata,
4310                    } => {
4311                        // Pop input ident (pushed last by transform_children).
4312                        let input_ident = ident_stack.pop().unwrap();
4313                        let f_tokens = f.emit_tokens(&mut ident_stack);
4314
4315                        let input_ident = maybe_observe_for_mut(
4316                            f,
4317                            input_ident,
4318                            &input.metadata().location_id,
4319                            &input.metadata().collection_kind,
4320                            &metadata.op,
4321                            builders_or_callback,
4322                            next_stmt_id,
4323                        );
4324
4325                        let stmt_id = next_stmt_id.get_and_increment();
4326                        let map_ident =
4327                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4328
4329                        match builders_or_callback {
4330                            BuildersOrCallback::Builders(graph_builders) => {
4331                                let builder = graph_builders.get_dfir_mut(&out_location);
4332                                builder.add_dfir(
4333                                    parse_quote! {
4334                                        #map_ident = #input_ident -> map(#f_tokens);
4335                                    },
4336                                    None,
4337                                    Some(&stmt_id.to_string()),
4338                                );
4339                            }
4340                            BuildersOrCallback::Callback(_, node_callback) => {
4341                                node_callback(node, next_stmt_id);
4342                            }
4343                        }
4344
4345                        ident_stack.push(map_ident);
4346                    }
4347
4348                    HydroNode::FlatMap { f, input, metadata } => {
4349                        let input_ident = ident_stack.pop().unwrap();
4350                        let f_tokens = f.emit_tokens(&mut ident_stack);
4351
4352                        let input_ident = maybe_observe_for_mut(
4353                            f, input_ident,
4354                            &input.metadata().location_id,
4355                            &input.metadata().collection_kind,
4356                            &metadata.op,
4357                            builders_or_callback, next_stmt_id,
4358                        );
4359
4360                        let stmt_id = next_stmt_id.get_and_increment();
4361                        let flat_map_ident =
4362                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4363
4364                        match builders_or_callback {
4365                            BuildersOrCallback::Builders(graph_builders) => {
4366                                let builder = graph_builders.get_dfir_mut(&out_location);
4367                                builder.add_dfir(
4368                                    parse_quote! {
4369                                        #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4370                                    },
4371                                    None,
4372                                    Some(&stmt_id.to_string()),
4373                                );
4374                            }
4375                            BuildersOrCallback::Callback(_, node_callback) => {
4376                                node_callback(node, next_stmt_id);
4377                            }
4378                        }
4379
4380                        ident_stack.push(flat_map_ident);
4381                    }
4382
4383                    HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
4384                        let input_ident = ident_stack.pop().unwrap();
4385                        let f_tokens = f.emit_tokens(&mut ident_stack);
4386
4387                        let input_ident = maybe_observe_for_mut(
4388                            f, input_ident,
4389                            &input.metadata().location_id,
4390                            &input.metadata().collection_kind,
4391                            &metadata.op,
4392                            builders_or_callback, next_stmt_id,
4393                        );
4394
4395                        let stmt_id = next_stmt_id.get_and_increment();
4396                        let flat_map_stream_blocking_ident =
4397                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4398
4399                        match builders_or_callback {
4400                            BuildersOrCallback::Builders(graph_builders) => {
4401                                let builder = graph_builders.get_dfir_mut(&out_location);
4402                                builder.add_dfir(
4403                                    parse_quote! {
4404                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4405                                    },
4406                                    None,
4407                                    Some(&stmt_id.to_string()),
4408                                );
4409                            }
4410                            BuildersOrCallback::Callback(_, node_callback) => {
4411                                node_callback(node, next_stmt_id);
4412                            }
4413                        }
4414
4415                        ident_stack.push(flat_map_stream_blocking_ident);
4416                    }
4417
4418                    HydroNode::Filter { f, input, metadata } => {
4419                        let input_ident = ident_stack.pop().unwrap();
4420                        let f_tokens = f.emit_tokens(&mut ident_stack);
4421
4422                        let input_ident = maybe_observe_for_mut(
4423                            f, input_ident,
4424                            &input.metadata().location_id,
4425                            &input.metadata().collection_kind,
4426                            &metadata.op,
4427                            builders_or_callback, next_stmt_id,
4428                        );
4429
4430                        let stmt_id = next_stmt_id.get_and_increment();
4431                        let filter_ident =
4432                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4433
4434                        match builders_or_callback {
4435                            BuildersOrCallback::Builders(graph_builders) => {
4436                                let builder = graph_builders.get_dfir_mut(&out_location);
4437                                builder.add_dfir(
4438                                    parse_quote! {
4439                                        #filter_ident = #input_ident -> filter(#f_tokens);
4440                                    },
4441                                    None,
4442                                    Some(&stmt_id.to_string()),
4443                                );
4444                            }
4445                            BuildersOrCallback::Callback(_, node_callback) => {
4446                                node_callback(node, next_stmt_id);
4447                            }
4448                        }
4449
4450                        ident_stack.push(filter_ident);
4451                    }
4452
4453                    HydroNode::FilterMap { f, input, metadata } => {
4454                        let input_ident = ident_stack.pop().unwrap();
4455                        let f_tokens = f.emit_tokens(&mut ident_stack);
4456
4457                        let input_ident = maybe_observe_for_mut(
4458                            f, input_ident,
4459                            &input.metadata().location_id,
4460                            &input.metadata().collection_kind,
4461                            &metadata.op,
4462                            builders_or_callback, next_stmt_id,
4463                        );
4464
4465                        let stmt_id = next_stmt_id.get_and_increment();
4466                        let filter_map_ident =
4467                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4468
4469                        match builders_or_callback {
4470                            BuildersOrCallback::Builders(graph_builders) => {
4471                                let builder = graph_builders.get_dfir_mut(&out_location);
4472                                builder.add_dfir(
4473                                    parse_quote! {
4474                                        #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4475                                    },
4476                                    None,
4477                                    Some(&stmt_id.to_string()),
4478                                );
4479                            }
4480                            BuildersOrCallback::Callback(_, node_callback) => {
4481                                node_callback(node, next_stmt_id);
4482                            }
4483                        }
4484
4485                        ident_stack.push(filter_map_ident);
4486                    }
4487
4488                    HydroNode::Sort { .. } => {
4489                        let input_ident = ident_stack.pop().unwrap();
4490
4491                        let stmt_id = next_stmt_id.get_and_increment();
4492                        let sort_ident =
4493                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4494
4495                        match builders_or_callback {
4496                            BuildersOrCallback::Builders(graph_builders) => {
4497                                let builder = graph_builders.get_dfir_mut(&out_location);
4498                                builder.add_dfir(
4499                                    parse_quote! {
4500                                        #sort_ident = #input_ident -> sort();
4501                                    },
4502                                    None,
4503                                    Some(&stmt_id.to_string()),
4504                                );
4505                            }
4506                            BuildersOrCallback::Callback(_, node_callback) => {
4507                                node_callback(node, next_stmt_id);
4508                            }
4509                        }
4510
4511                        ident_stack.push(sort_ident);
4512                    }
4513
4514                    HydroNode::DeferTick { .. } => {
4515                        let input_ident = ident_stack.pop().unwrap();
4516
4517                        let stmt_id = next_stmt_id.get_and_increment();
4518                        let defer_tick_ident =
4519                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4520
4521                        match builders_or_callback {
4522                            BuildersOrCallback::Builders(graph_builders) => {
4523                                let builder = graph_builders.get_dfir_mut(&out_location);
4524                                builder.add_dfir(
4525                                    parse_quote! {
4526                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
4527                                    },
4528                                    None,
4529                                    Some(&stmt_id.to_string()),
4530                                );
4531                            }
4532                            BuildersOrCallback::Callback(_, node_callback) => {
4533                                node_callback(node, next_stmt_id);
4534                            }
4535                        }
4536
4537                        ident_stack.push(defer_tick_ident);
4538                    }
4539
4540                    HydroNode::Enumerate { input, .. } => {
4541                        let input_ident = ident_stack.pop().unwrap();
4542
4543                        let stmt_id = next_stmt_id.get_and_increment();
4544                        let enumerate_ident =
4545                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4546
4547                        match builders_or_callback {
4548                            BuildersOrCallback::Builders(graph_builders) => {
4549                                let builder = graph_builders.get_dfir_mut(&out_location);
4550                                let lifetime = if input.metadata().location_id.is_top_level() {
4551                                    quote!('static)
4552                                } else {
4553                                    quote!('tick)
4554                                };
4555                                builder.add_dfir(
4556                                    parse_quote! {
4557                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4558                                    },
4559                                    None,
4560                                    Some(&stmt_id.to_string()),
4561                                );
4562                            }
4563                            BuildersOrCallback::Callback(_, node_callback) => {
4564                                node_callback(node, next_stmt_id);
4565                            }
4566                        }
4567
4568                        ident_stack.push(enumerate_ident);
4569                    }
4570
4571                    HydroNode::Inspect { f, input, metadata } => {
4572                        let input_ident = ident_stack.pop().unwrap();
4573                        let f_tokens = f.emit_tokens(&mut ident_stack);
4574
4575                        let input_ident = maybe_observe_for_mut(
4576                            f, input_ident,
4577                            &input.metadata().location_id,
4578                            &input.metadata().collection_kind,
4579                            &metadata.op,
4580                            builders_or_callback, next_stmt_id,
4581                        );
4582
4583                        let stmt_id = next_stmt_id.get_and_increment();
4584                        let inspect_ident =
4585                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4586
4587                        match builders_or_callback {
4588                            BuildersOrCallback::Builders(graph_builders) => {
4589                                let builder = graph_builders.get_dfir_mut(&out_location);
4590                                builder.add_dfir(
4591                                    parse_quote! {
4592                                        #inspect_ident = #input_ident -> inspect(#f_tokens);
4593                                    },
4594                                    None,
4595                                    Some(&stmt_id.to_string()),
4596                                );
4597                            }
4598                            BuildersOrCallback::Callback(_, node_callback) => {
4599                                node_callback(node, next_stmt_id);
4600                            }
4601                        }
4602
4603                        ident_stack.push(inspect_ident);
4604                    }
4605
4606                    HydroNode::Unique { input, .. } => {
4607                        let input_ident = ident_stack.pop().unwrap();
4608
4609                        let stmt_id = next_stmt_id.get_and_increment();
4610                        let unique_ident =
4611                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4612
4613                        match builders_or_callback {
4614                            BuildersOrCallback::Builders(graph_builders) => {
4615                                let builder = graph_builders.get_dfir_mut(&out_location);
4616                                let lifetime = if input.metadata().location_id.is_top_level() {
4617                                    quote!('static)
4618                                } else {
4619                                    quote!('tick)
4620                                };
4621
4622                                builder.add_dfir(
4623                                    parse_quote! {
4624                                        #unique_ident = #input_ident -> unique::<#lifetime>();
4625                                    },
4626                                    None,
4627                                    Some(&stmt_id.to_string()),
4628                                );
4629                            }
4630                            BuildersOrCallback::Callback(_, node_callback) => {
4631                                node_callback(node, next_stmt_id);
4632                            }
4633                        }
4634
4635                        ident_stack.push(unique_ident);
4636                    }
4637
4638                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4639                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4640                            if input.metadata().location_id.is_top_level()
4641                                && input.metadata().collection_kind.is_bounded()
4642                            {
4643                                parse_quote!(fold_no_replay)
4644                            } else {
4645                                parse_quote!(fold)
4646                            }
4647                        } else if matches!(node, HydroNode::Scan { .. }) {
4648                            parse_quote!(scan)
4649                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4650                            parse_quote!(scan_async_blocking)
4651                        } else if let HydroNode::FoldKeyed { input, .. } = node {
4652                            if input.metadata().location_id.is_top_level()
4653                                && input.metadata().collection_kind.is_bounded()
4654                            {
4655                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
4656                            } else {
4657                                parse_quote!(fold_keyed)
4658                            }
4659                        } else {
4660                            unreachable!()
4661                        };
4662
4663                        let (HydroNode::Fold { input, .. }
4664                        | HydroNode::FoldKeyed { input, .. }
4665                        | HydroNode::Scan { input, .. }
4666                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
4667                        else {
4668                            unreachable!()
4669                        };
4670
4671                        let lifetime = if input.metadata().location_id.is_top_level() {
4672                            quote!('static)
4673                        } else {
4674                            quote!('tick)
4675                        };
4676
4677                        let input_ident = ident_stack.pop().unwrap();
4678
4679                        let (HydroNode::Fold { init, acc, .. }
4680                        | HydroNode::FoldKeyed { init, acc, .. }
4681                        | HydroNode::Scan { init, acc, .. }
4682                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4683                        else {
4684                            unreachable!()
4685                        };
4686
4687                        let acc_tokens = acc.emit_tokens(&mut ident_stack);
4688                        let init_tokens = init.emit_tokens(&mut ident_stack);
4689
4690                        let stmt_id = next_stmt_id.get_and_increment();
4691                        let fold_ident =
4692                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4693
4694                        match builders_or_callback {
4695                            BuildersOrCallback::Builders(graph_builders) => {
4696                                if matches!(node, HydroNode::Fold { .. })
4697                                    && node.metadata().location_id.is_top_level()
4698                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4699                                    && graph_builders.singleton_intermediates()
4700                                    && !node.metadata().collection_kind.is_bounded()
4701                                {
4702                                    let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4703                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4704                                        &input.metadata().location_id,
4705                                        &input_ident,
4706                                        &input.metadata().collection_kind,
4707                                        &node.metadata().op,
4708                                    );
4709
4710                                    let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4711                                        let acc: syn::Expr = parse_quote!({
4712                                            let mut __inner = #acc_tokens;
4713                                            move |__state, __batch: Vec<_>| {
4714                                                if __batch.is_empty() {
4715                                                    return None;
4716                                                }
4717                                                for __value in __batch {
4718                                                    __inner(__state, __value);
4719                                                }
4720                                                Some(__state.clone())
4721                                            }
4722                                        });
4723                                        (hooked, acc)
4724                                    } else {
4725                                        let acc: syn::Expr = parse_quote!({
4726                                            let mut __inner = #acc_tokens;
4727                                            move |__state, __value| {
4728                                                __inner(__state, __value);
4729                                                Some(__state.clone())
4730                                            }
4731                                        });
4732                                        (&input_ident, acc)
4733                                    };
4734
4735                                    let builder = graph_builders.get_dfir_mut(&out_location);
4736                                    builder.add_dfir(
4737                                        parse_quote! {
4738                                            source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4739                                            #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4740                                            #fold_ident = chain();
4741                                        },
4742                                        None,
4743                                        Some(&stmt_id.to_string()),
4744                                    );
4745
4746                                    if hooked_input_ident.is_some() {
4747                                        fold_hooked_idents.insert(fold_ident.to_string());
4748                                    }
4749                                } else if matches!(node, HydroNode::FoldKeyed { .. })
4750                                    && node.metadata().location_id.is_top_level()
4751                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4752                                    && graph_builders.singleton_intermediates()
4753                                    && !node.metadata().collection_kind.is_bounded()
4754                                {
4755                                    let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4756                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4757                                        &input.metadata().location_id,
4758                                        &input_ident,
4759                                        &input.metadata().collection_kind,
4760                                        &node.metadata().op,
4761                                    );
4762                                    let builder = graph_builders.get_dfir_mut(&out_location);
4763
4764                                    let wrapped_acc: syn::Expr = parse_quote!({
4765                                        let mut __init = #init_tokens;
4766                                        let mut __inner = #acc_tokens;
4767                                        move |__state, __kv: (_, _)| {
4768                                            // TODO(shadaj): we can avoid the clone when the entry exists
4769                                            let __state = __state
4770                                                .entry(::std::clone::Clone::clone(&__kv.0))
4771                                                .or_insert_with(|| (__init)());
4772                                            __inner(__state, __kv.1);
4773                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4774                                        }
4775                                    });
4776
4777                                    if let Some(hooked_input_ident) = hooked_input_ident {
4778                                        builder.add_dfir(
4779                                            parse_quote! {
4780                                                #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4781                                            },
4782                                            None,
4783                                            Some(&stmt_id.to_string()),
4784                                        );
4785
4786                                        fold_hooked_idents.insert(fold_ident.to_string());
4787                                    } else {
4788                                        builder.add_dfir(
4789                                            parse_quote! {
4790                                                #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4791                                            },
4792                                            None,
4793                                            Some(&stmt_id.to_string()),
4794                                        );
4795                                    }
4796                                } else if (matches!(node, HydroNode::Fold { .. })
4797                                    || matches!(node, HydroNode::FoldKeyed { .. }))
4798                                    && !node.metadata().location_id.is_top_level()
4799                                    && graph_builders.singleton_intermediates()
4800                                {
4801                                    let input_ref = match &*node {
4802                                        HydroNode::Fold { input, .. } => input,
4803                                        HydroNode::FoldKeyed { input, .. } => input,
4804                                        _ => unreachable!(),
4805                                    };
4806                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4807                                        &input_ref.metadata().location_id,
4808                                        &input_ident,
4809                                        &input_ref.metadata().collection_kind,
4810                                        &node.metadata().op,
4811                                    );
4812
4813                                    let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4814                                    let builder = graph_builders.get_dfir_mut(&out_location);
4815                                    builder.add_dfir(
4816                                        parse_quote! {
4817                                            #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4818                                        },
4819                                        None,
4820                                        Some(&stmt_id.to_string()),
4821                                    );
4822                                } else {
4823                                    let builder = graph_builders.get_dfir_mut(&out_location);
4824                                    builder.add_dfir(
4825                                        parse_quote! {
4826                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4827                                        },
4828                                        None,
4829                                        Some(&stmt_id.to_string()),
4830                                    );
4831                                }
4832                            }
4833                            BuildersOrCallback::Callback(_, node_callback) => {
4834                                node_callback(node, next_stmt_id);
4835                            }
4836                        }
4837
4838                        ident_stack.push(fold_ident);
4839                    }
4840
4841                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4842                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4843                            if input.metadata().location_id.is_top_level()
4844                                && input.metadata().collection_kind.is_bounded()
4845                            {
4846                                parse_quote!(reduce_no_replay)
4847                            } else {
4848                                parse_quote!(reduce)
4849                            }
4850                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
4851                            if input.metadata().location_id.is_top_level()
4852                                && input.metadata().collection_kind.is_bounded()
4853                            {
4854                                todo!(
4855                                    "Calling keyed reduce on a top-level bounded collection is not supported"
4856                                )
4857                            } else {
4858                                parse_quote!(reduce_keyed)
4859                            }
4860                        } else {
4861                            unreachable!()
4862                        };
4863
4864                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4865                        else {
4866                            unreachable!()
4867                        };
4868
4869                        let lifetime = if input.metadata().location_id.is_top_level() {
4870                            quote!('static)
4871                        } else {
4872                            quote!('tick)
4873                        };
4874
4875                        let input_ident = ident_stack.pop().unwrap();
4876
4877                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4878                        else {
4879                            unreachable!()
4880                        };
4881
4882                        let f_tokens = f.emit_tokens(&mut ident_stack);
4883
4884                        let stmt_id = next_stmt_id.get_and_increment();
4885                        let reduce_ident =
4886                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4887
4888                        match builders_or_callback {
4889                            BuildersOrCallback::Builders(graph_builders) => {
4890                                if matches!(node, HydroNode::Reduce { .. })
4891                                    && node.metadata().location_id.is_top_level()
4892                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4893                                    && graph_builders.singleton_intermediates()
4894                                    && !node.metadata().collection_kind.is_bounded()
4895                                {
4896                                    todo!(
4897                                        "Reduce with optional intermediates is not yet supported in simulator"
4898                                    );
4899                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4900                                    && node.metadata().location_id.is_top_level()
4901                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4902                                    && graph_builders.singleton_intermediates()
4903                                    && !node.metadata().collection_kind.is_bounded()
4904                                {
4905                                    todo!(
4906                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4907                                    );
4908                                } else {
4909                                    let builder = graph_builders.get_dfir_mut(&out_location);
4910                                    builder.add_dfir(
4911                                        parse_quote! {
4912                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4913                                        },
4914                                        None,
4915                                        Some(&stmt_id.to_string()),
4916                                    );
4917                                }
4918                            }
4919                            BuildersOrCallback::Callback(_, node_callback) => {
4920                                node_callback(node, next_stmt_id);
4921                            }
4922                        }
4923
4924                        ident_stack.push(reduce_ident);
4925                    }
4926
4927                    HydroNode::ReduceKeyedWatermark {
4928                        f,
4929                        input,
4930                        metadata,
4931                        ..
4932                    } => {
4933                        let lifetime = if input.metadata().location_id.is_top_level() {
4934                            quote!('static)
4935                        } else {
4936                            quote!('tick)
4937                        };
4938
4939                        // watermark is processed second, so it's on top
4940                        let watermark_ident = ident_stack.pop().unwrap();
4941                        let input_ident = ident_stack.pop().unwrap();
4942                        let f_tokens = f.emit_tokens(&mut ident_stack);
4943
4944                        let stmt_id = next_stmt_id.get_and_increment();
4945                        let chain_ident = syn::Ident::new(
4946                            &format!("reduce_keyed_watermark_chain_{}", stmt_id),
4947                            Span::call_site(),
4948                        );
4949
4950                        let fold_ident =
4951                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4952
4953                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4954                            && input.metadata().collection_kind.is_bounded()
4955                        {
4956                            parse_quote!(fold_no_replay)
4957                        } else {
4958                            parse_quote!(fold)
4959                        };
4960
4961                        match builders_or_callback {
4962                            BuildersOrCallback::Builders(graph_builders) => {
4963                                if metadata.location_id.is_top_level()
4964                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4965                                    && graph_builders.singleton_intermediates()
4966                                    && !metadata.collection_kind.is_bounded()
4967                                {
4968                                    todo!(
4969                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4970                                    )
4971                                } else {
4972                                    let builder = graph_builders.get_dfir_mut(&out_location);
4973                                    builder.add_dfir(
4974                                        parse_quote! {
4975                                            #chain_ident = chain();
4976                                            #input_ident
4977                                                -> map(|x| (Some(x), None))
4978                                                -> [0]#chain_ident;
4979                                            #watermark_ident
4980                                                -> map(|watermark| (None, Some(watermark)))
4981                                                -> [1]#chain_ident;
4982
4983                                            #fold_ident = #chain_ident
4984                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4985                                                    let __reduce_keyed_fn = #f_tokens;
4986                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4987                                                        if let Some((k, v)) = opt_payload {
4988                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4989                                                                if k < curr_watermark {
4990                                                                    return;
4991                                                                }
4992                                                            }
4993                                                            match map.entry(k) {
4994                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4995                                                                    e.insert(v);
4996                                                                }
4997                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4998                                                                    __reduce_keyed_fn(e.get_mut(), v);
4999                                                                }
5000                                                            }
5001                                                        } else {
5002                                                            let watermark = opt_watermark.unwrap();
5003                                                            if let Some(curr_watermark) = *opt_curr_watermark {
5004                                                                if watermark <= curr_watermark {
5005                                                                    return;
5006                                                                }
5007                                                            }
5008                                                            map.retain(|k, _| *k >= watermark);
5009                                                            *opt_curr_watermark = Some(watermark);
5010                                                        }
5011                                                    }
5012                                                })
5013                                                -> flat_map(|(map, _curr_watermark)| map);
5014                                        },
5015                                        None,
5016                                        Some(&stmt_id.to_string()),
5017                                    );
5018                                }
5019                            }
5020                            BuildersOrCallback::Callback(_, node_callback) => {
5021                                node_callback(node, next_stmt_id);
5022                            }
5023                        }
5024
5025                        ident_stack.push(fold_ident);
5026                    }
5027
5028                    HydroNode::Network {
5029                        networking_info,
5030                        serialize_fn: serialize_pipeline,
5031                        instantiate_fn,
5032                        deserialize_fn: deserialize_pipeline,
5033                        input,
5034                        ..
5035                    } => {
5036                        let input_ident = ident_stack.pop().unwrap();
5037
5038                        let stmt_id = next_stmt_id.get_and_increment();
5039                        let receiver_stream_ident =
5040                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5041
5042                        match builders_or_callback {
5043                            BuildersOrCallback::Builders(graph_builders) => {
5044                                let (sink_expr, source_expr) = match instantiate_fn {
5045                                    DebugInstantiate::Building => (
5046                                        syn::parse_quote!(DUMMY_SINK),
5047                                        syn::parse_quote!(DUMMY_SOURCE),
5048                                    ),
5049
5050                                    DebugInstantiate::Finalized(finalized) => {
5051                                        (finalized.sink.clone(), finalized.source.clone())
5052                                    }
5053                                };
5054
5055                                graph_builders.create_network(
5056                                    &input.metadata().location_id,
5057                                    &out_location,
5058                                    input_ident,
5059                                    &receiver_stream_ident,
5060                                    serialize_pipeline.as_ref(),
5061                                    sink_expr,
5062                                    source_expr,
5063                                    deserialize_pipeline.as_ref(),
5064                                    stmt_id,
5065                                    networking_info,
5066                                );
5067                            }
5068                            BuildersOrCallback::Callback(_, node_callback) => {
5069                                node_callback(node, next_stmt_id);
5070                            }
5071                        }
5072
5073                        ident_stack.push(receiver_stream_ident);
5074                    }
5075
5076                    HydroNode::ExternalInput {
5077                        instantiate_fn,
5078                        deserialize_fn: deserialize_pipeline,
5079                        ..
5080                    } => {
5081                        let stmt_id = next_stmt_id.get_and_increment();
5082                        let receiver_stream_ident =
5083                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5084
5085                        match builders_or_callback {
5086                            BuildersOrCallback::Builders(graph_builders) => {
5087                                let (_, source_expr) = match instantiate_fn {
5088                                    DebugInstantiate::Building => (
5089                                        syn::parse_quote!(DUMMY_SINK),
5090                                        syn::parse_quote!(DUMMY_SOURCE),
5091                                    ),
5092
5093                                    DebugInstantiate::Finalized(finalized) => {
5094                                        (finalized.sink.clone(), finalized.source.clone())
5095                                    }
5096                                };
5097
5098                                graph_builders.create_external_source(
5099                                    &out_location,
5100                                    source_expr,
5101                                    &receiver_stream_ident,
5102                                    deserialize_pipeline.as_ref(),
5103                                    stmt_id,
5104                                );
5105                            }
5106                            BuildersOrCallback::Callback(_, node_callback) => {
5107                                node_callback(node, next_stmt_id);
5108                            }
5109                        }
5110
5111                        ident_stack.push(receiver_stream_ident);
5112                    }
5113
5114                    HydroNode::Counter {
5115                        tag,
5116                        duration,
5117                        prefix,
5118                        ..
5119                    } => {
5120                        let input_ident = ident_stack.pop().unwrap();
5121
5122                        let stmt_id = next_stmt_id.get_and_increment();
5123                        let counter_ident =
5124                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5125
5126                        match builders_or_callback {
5127                            BuildersOrCallback::Builders(graph_builders) => {
5128                                let arg = format!("{}({})", prefix, tag);
5129                                let builder = graph_builders.get_dfir_mut(&out_location);
5130                                builder.add_dfir(
5131                                    parse_quote! {
5132                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
5133                                    },
5134                                    None,
5135                                    Some(&stmt_id.to_string()),
5136                                );
5137                            }
5138                            BuildersOrCallback::Callback(_, node_callback) => {
5139                                node_callback(node, next_stmt_id);
5140                            }
5141                        }
5142
5143                        ident_stack.push(counter_ident);
5144                    }
5145
5146                    HydroNode::VersionedNetworkFork {
5147                        channel_id,
5148                        senders,
5149                        metadata,
5150                        ..
5151                    } => {
5152                        let mut sender_idents: Vec<syn::Ident> =
5153                            (0..senders.len()).map(|_| ident_stack.pop().unwrap()).collect();
5154                        sender_idents.reverse();
5155
5156                        let stmt_id = next_stmt_id.get_and_increment();
5157
5158                        match builders_or_callback {
5159                            BuildersOrCallback::Builders(graph_builders) => {
5160                                let sender_args: Vec<(LocationId, syn::Ident, Option<DebugExpr>)> =
5161                                    senders
5162                                        .iter()
5163                                        .zip(sender_idents)
5164                                        .map(|((_version, sender, serialize), ident)| {
5165                                            (
5166                                                sender.metadata().location_id.clone(),
5167                                                ident,
5168                                                serialize.clone(),
5169                                            )
5170                                        })
5171                                        .collect();
5172                                graph_builders.create_versioned_network_fork(
5173                                    *channel_id,
5174                                    &metadata.location_id,
5175                                    sender_args,
5176                                    stmt_id,
5177                                );
5178                            }
5179                            BuildersOrCallback::Callback(_, node_callback) => {
5180                                node_callback(node, next_stmt_id);
5181                            }
5182                        }
5183                    }
5184
5185                    HydroNode::VersionedNetwork {
5186                        fork,
5187                        deserialize_fn,
5188                        metadata,
5189                        ..
5190                    } => {
5191                        let stmt_id = next_stmt_id.get_and_increment();
5192                        let receiver_stream_ident =
5193                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5194
5195                        // The wire element type is determined by the channel's *source* kind, which
5196                        // all senders share; read it from the shared fork's first sender.
5197                        let (channel_id, source_loc) = {
5198                            let fork_ref = fork.0.borrow();
5199                            let HydroNode::VersionedNetworkFork {
5200                                channel_id,
5201                                senders,
5202                                ..
5203                            } = &*fork_ref
5204                            else {
5205                                unreachable!("VersionedNetwork.fork must be a VersionedNetworkFork");
5206                            };
5207                            let source_loc = senders
5208                                .first()
5209                                .map(|(_v, sender, _s)| sender.metadata().location_id.clone())
5210                                .expect("a VersionedNetworkFork always has at least one sender");
5211                            (*channel_id, source_loc)
5212                        };
5213
5214                        match builders_or_callback {
5215                            BuildersOrCallback::Builders(graph_builders) => {
5216                                graph_builders.create_versioned_network(
5217                                    channel_id,
5218                                    &source_loc,
5219                                    &metadata.location_id,
5220                                    &receiver_stream_ident,
5221                                    deserialize_fn.as_ref(),
5222                                    stmt_id,
5223                                );
5224                            }
5225                            BuildersOrCallback::Callback(_, node_callback) => {
5226                                node_callback(node, next_stmt_id);
5227                            }
5228                        }
5229
5230                        ident_stack.push(receiver_stream_ident);
5231                    }
5232                }
5233            },
5234            seen_tees,
5235            false,
5236        );
5237
5238        let ret = ident_stack
5239            .pop()
5240            .expect("ident_stack should have exactly one element after traversal");
5241        assert!(
5242            ident_stack.is_empty(),
5243            "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
5244             This indicates a bug in the code gen: some node pushed idents that were never consumed.",
5245            ident_stack.len()
5246        );
5247        ret
5248    }
5249
5250    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
5251        match self {
5252            HydroNode::Placeholder => {
5253                panic!()
5254            }
5255            HydroNode::Cast { .. }
5256            | HydroNode::ObserveNonDet { .. }
5257            | HydroNode::UnboundSingleton { .. }
5258            | HydroNode::AssertIsConsistent { .. } => {}
5259            HydroNode::Source { source, .. } => match source {
5260                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
5261                HydroSource::ExternalNetwork()
5262                | HydroSource::Spin()
5263                | HydroSource::ClusterMembers(_, _)
5264                | HydroSource::Embedded(_)
5265                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
5266            },
5267            HydroNode::SingletonSource { value, .. } => {
5268                transform(value);
5269            }
5270            HydroNode::CycleSource { .. }
5271            | HydroNode::Tee { .. }
5272            | HydroNode::Reference { .. }
5273            | HydroNode::YieldConcat { .. }
5274            | HydroNode::BeginAtomic { .. }
5275            | HydroNode::EndAtomic { .. }
5276            | HydroNode::Batch { .. }
5277            | HydroNode::Chain { .. }
5278            | HydroNode::MergeOrdered { .. }
5279            | HydroNode::ChainFirst { .. }
5280            | HydroNode::CrossProduct { .. }
5281            | HydroNode::CrossSingleton { .. }
5282            | HydroNode::ResolveFutures { .. }
5283            | HydroNode::ResolveFuturesBlocking { .. }
5284            | HydroNode::ResolveFuturesOrdered { .. }
5285            | HydroNode::Join { .. }
5286            | HydroNode::JoinHalf { .. }
5287            | HydroNode::Difference { .. }
5288            | HydroNode::AntiJoin { .. }
5289            | HydroNode::DeferTick { .. }
5290            | HydroNode::Enumerate { .. }
5291            | HydroNode::Unique { .. }
5292            | HydroNode::Sort { .. }
5293            | HydroNode::VersionedNetworkFork { .. }
5294            | HydroNode::VersionedNetwork { .. } => {}
5295            HydroNode::Map { f, .. }
5296            | HydroNode::FlatMap { f, .. }
5297            | HydroNode::FlatMapStreamBlocking { f, .. }
5298            | HydroNode::Filter { f, .. }
5299            | HydroNode::FilterMap { f, .. }
5300            | HydroNode::Inspect { f, .. }
5301            | HydroNode::Partition { f, .. }
5302            | HydroNode::Reduce { f, .. }
5303            | HydroNode::ReduceKeyed { f, .. }
5304            | HydroNode::ReduceKeyedWatermark { f, .. } => {
5305                transform(&mut f.expr);
5306            }
5307            HydroNode::Fold { init, acc, .. }
5308            | HydroNode::Scan { init, acc, .. }
5309            | HydroNode::ScanAsyncBlocking { init, acc, .. }
5310            | HydroNode::FoldKeyed { init, acc, .. } => {
5311                transform(&mut init.expr);
5312                transform(&mut acc.expr);
5313            }
5314            HydroNode::Network {
5315                serialize_fn,
5316                deserialize_fn,
5317                ..
5318            } => {
5319                if let Some(serialize_fn) = serialize_fn {
5320                    transform(serialize_fn);
5321                }
5322                if let Some(deserialize_fn) = deserialize_fn {
5323                    transform(deserialize_fn);
5324                }
5325            }
5326            HydroNode::ExternalInput { deserialize_fn, .. } => {
5327                if let Some(deserialize_fn) = deserialize_fn {
5328                    transform(deserialize_fn);
5329                }
5330            }
5331            HydroNode::Counter { duration, .. } => {
5332                transform(duration);
5333            }
5334        }
5335    }
5336
5337    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
5338        &self.metadata().op
5339    }
5340
5341    pub fn metadata(&self) -> &HydroIrMetadata {
5342        match self {
5343            HydroNode::Placeholder => {
5344                panic!()
5345            }
5346            HydroNode::VersionedNetworkFork { metadata, .. }
5347            | HydroNode::VersionedNetwork { metadata, .. } => metadata,
5348            HydroNode::Cast { metadata, .. }
5349            | HydroNode::ObserveNonDet { metadata, .. }
5350            | HydroNode::AssertIsConsistent { metadata, .. }
5351            | HydroNode::UnboundSingleton { metadata, .. }
5352            | HydroNode::Source { metadata, .. }
5353            | HydroNode::SingletonSource { metadata, .. }
5354            | HydroNode::CycleSource { metadata, .. }
5355            | HydroNode::Tee { metadata, .. }
5356            | HydroNode::Reference { metadata, .. }
5357            | HydroNode::Partition { metadata, .. }
5358            | HydroNode::YieldConcat { metadata, .. }
5359            | HydroNode::BeginAtomic { metadata, .. }
5360            | HydroNode::EndAtomic { metadata, .. }
5361            | HydroNode::Batch { metadata, .. }
5362            | HydroNode::Chain { metadata, .. }
5363            | HydroNode::MergeOrdered { metadata, .. }
5364            | HydroNode::ChainFirst { metadata, .. }
5365            | HydroNode::CrossProduct { metadata, .. }
5366            | HydroNode::CrossSingleton { metadata, .. }
5367            | HydroNode::Join { metadata, .. }
5368            | HydroNode::JoinHalf { metadata, .. }
5369            | HydroNode::Difference { metadata, .. }
5370            | HydroNode::AntiJoin { metadata, .. }
5371            | HydroNode::ResolveFutures { metadata, .. }
5372            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5373            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5374            | HydroNode::Map { metadata, .. }
5375            | HydroNode::FlatMap { metadata, .. }
5376            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5377            | HydroNode::Filter { metadata, .. }
5378            | HydroNode::FilterMap { metadata, .. }
5379            | HydroNode::DeferTick { metadata, .. }
5380            | HydroNode::Enumerate { metadata, .. }
5381            | HydroNode::Inspect { metadata, .. }
5382            | HydroNode::Unique { metadata, .. }
5383            | HydroNode::Sort { metadata, .. }
5384            | HydroNode::Scan { metadata, .. }
5385            | HydroNode::ScanAsyncBlocking { metadata, .. }
5386            | HydroNode::Fold { metadata, .. }
5387            | HydroNode::FoldKeyed { metadata, .. }
5388            | HydroNode::Reduce { metadata, .. }
5389            | HydroNode::ReduceKeyed { metadata, .. }
5390            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5391            | HydroNode::ExternalInput { metadata, .. }
5392            | HydroNode::Network { metadata, .. }
5393            | HydroNode::Counter { metadata, .. } => metadata,
5394        }
5395    }
5396
5397    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5398        &mut self.metadata_mut().op
5399    }
5400
5401    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5402        match self {
5403            HydroNode::Placeholder => {
5404                panic!()
5405            }
5406            HydroNode::VersionedNetworkFork { metadata, .. }
5407            | HydroNode::VersionedNetwork { metadata, .. } => metadata,
5408            HydroNode::Cast { metadata, .. }
5409            | HydroNode::ObserveNonDet { metadata, .. }
5410            | HydroNode::AssertIsConsistent { metadata, .. }
5411            | HydroNode::UnboundSingleton { metadata, .. }
5412            | HydroNode::Source { metadata, .. }
5413            | HydroNode::SingletonSource { metadata, .. }
5414            | HydroNode::CycleSource { metadata, .. }
5415            | HydroNode::Tee { metadata, .. }
5416            | HydroNode::Reference { metadata, .. }
5417            | HydroNode::Partition { metadata, .. }
5418            | HydroNode::YieldConcat { metadata, .. }
5419            | HydroNode::BeginAtomic { metadata, .. }
5420            | HydroNode::EndAtomic { metadata, .. }
5421            | HydroNode::Batch { metadata, .. }
5422            | HydroNode::Chain { metadata, .. }
5423            | HydroNode::MergeOrdered { metadata, .. }
5424            | HydroNode::ChainFirst { metadata, .. }
5425            | HydroNode::CrossProduct { metadata, .. }
5426            | HydroNode::CrossSingleton { metadata, .. }
5427            | HydroNode::Join { metadata, .. }
5428            | HydroNode::JoinHalf { metadata, .. }
5429            | HydroNode::Difference { metadata, .. }
5430            | HydroNode::AntiJoin { metadata, .. }
5431            | HydroNode::ResolveFutures { metadata, .. }
5432            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5433            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5434            | HydroNode::Map { metadata, .. }
5435            | HydroNode::FlatMap { metadata, .. }
5436            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5437            | HydroNode::Filter { metadata, .. }
5438            | HydroNode::FilterMap { metadata, .. }
5439            | HydroNode::DeferTick { metadata, .. }
5440            | HydroNode::Enumerate { metadata, .. }
5441            | HydroNode::Inspect { metadata, .. }
5442            | HydroNode::Unique { metadata, .. }
5443            | HydroNode::Sort { metadata, .. }
5444            | HydroNode::Scan { metadata, .. }
5445            | HydroNode::ScanAsyncBlocking { metadata, .. }
5446            | HydroNode::Fold { metadata, .. }
5447            | HydroNode::FoldKeyed { metadata, .. }
5448            | HydroNode::Reduce { metadata, .. }
5449            | HydroNode::ReduceKeyed { metadata, .. }
5450            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5451            | HydroNode::ExternalInput { metadata, .. }
5452            | HydroNode::Network { metadata, .. }
5453            | HydroNode::Counter { metadata, .. } => metadata,
5454        }
5455    }
5456
5457    pub fn input(&self) -> Vec<&HydroNode> {
5458        match self {
5459            HydroNode::Placeholder => {
5460                panic!()
5461            }
5462            HydroNode::Source { .. }
5463            | HydroNode::SingletonSource { .. }
5464            | HydroNode::ExternalInput { .. }
5465            | HydroNode::CycleSource { .. }
5466            | HydroNode::Tee { .. }
5467            | HydroNode::Reference { .. }
5468            | HydroNode::Partition { .. }
5469            | HydroNode::VersionedNetwork { .. } => {
5470                // Tee/Partition/VersionedNetwork find their input in separate special ways
5471                vec![]
5472            }
5473            HydroNode::Cast { inner, .. }
5474            | HydroNode::ObserveNonDet { inner, .. }
5475            | HydroNode::YieldConcat { inner, .. }
5476            | HydroNode::BeginAtomic { inner, .. }
5477            | HydroNode::EndAtomic { inner, .. }
5478            | HydroNode::Batch { inner, .. }
5479            | HydroNode::UnboundSingleton { inner, .. }
5480            | HydroNode::AssertIsConsistent { inner, .. } => {
5481                vec![inner]
5482            }
5483            HydroNode::Chain { first, second, .. } => {
5484                vec![first, second]
5485            }
5486            HydroNode::MergeOrdered { first, second, .. } => {
5487                vec![first, second]
5488            }
5489            HydroNode::ChainFirst { first, second, .. } => {
5490                vec![first, second]
5491            }
5492            HydroNode::CrossProduct { left, right, .. }
5493            | HydroNode::CrossSingleton { left, right, .. }
5494            | HydroNode::Join { left, right, .. }
5495            | HydroNode::JoinHalf { left, right, .. } => {
5496                vec![left, right]
5497            }
5498            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5499                vec![pos, neg]
5500            }
5501            HydroNode::Map { input, .. }
5502            | HydroNode::FlatMap { input, .. }
5503            | HydroNode::FlatMapStreamBlocking { input, .. }
5504            | HydroNode::Filter { input, .. }
5505            | HydroNode::FilterMap { input, .. }
5506            | HydroNode::Sort { input, .. }
5507            | HydroNode::DeferTick { input, .. }
5508            | HydroNode::Enumerate { input, .. }
5509            | HydroNode::Inspect { input, .. }
5510            | HydroNode::Unique { input, .. }
5511            | HydroNode::Network { input, .. }
5512            | HydroNode::Counter { input, .. }
5513            | HydroNode::ResolveFutures { input, .. }
5514            | HydroNode::ResolveFuturesBlocking { input, .. }
5515            | HydroNode::ResolveFuturesOrdered { input, .. }
5516            | HydroNode::Fold { input, .. }
5517            | HydroNode::FoldKeyed { input, .. }
5518            | HydroNode::Reduce { input, .. }
5519            | HydroNode::ReduceKeyed { input, .. }
5520            | HydroNode::Scan { input, .. }
5521            | HydroNode::ScanAsyncBlocking { input, .. } => {
5522                vec![input]
5523            }
5524            HydroNode::ReduceKeyedWatermark {
5525                input, watermark, ..
5526            } => {
5527                vec![input, watermark]
5528            }
5529            HydroNode::VersionedNetworkFork { senders, .. } => senders
5530                .iter()
5531                .map(|(_version, sender, _serialize)| sender.as_ref())
5532                .collect(),
5533        }
5534    }
5535
5536    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5537        self.input()
5538            .iter()
5539            .map(|input_node| input_node.metadata())
5540            .collect()
5541    }
5542
5543    /// Returns `true` if this node is a Tee or Partition whose inner Rc
5544    /// has other live references, meaning the upstream is already driven
5545    /// by another consumer and does not need a Null sink.
5546    pub fn is_shared_with_others(&self) -> bool {
5547        match self {
5548            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5549                Rc::strong_count(&inner.0) > 1
5550            }
5551            // A zero-output reference node is valid in DFIR (it drains itself at
5552            // end of tick), so it doesn't need to be driven by another consumer.
5553            HydroNode::Reference { .. } => false,
5554            _ => false,
5555        }
5556    }
5557
5558    pub fn print_root(&self) -> String {
5559        match self {
5560            HydroNode::Placeholder => {
5561                panic!()
5562            }
5563            HydroNode::Cast { .. } => "Cast()".to_owned(),
5564            HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5565            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5566            HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5567            HydroNode::Source { source, .. } => format!("Source({:?})", source),
5568            HydroNode::SingletonSource {
5569                value,
5570                first_tick_only,
5571                ..
5572            } => format!(
5573                "SingletonSource({:?}, first_tick_only={})",
5574                value, first_tick_only
5575            ),
5576            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5577            HydroNode::Tee { inner, .. } => {
5578                format!("Tee({})", inner.0.borrow().print_root())
5579            }
5580            HydroNode::Reference { inner, kind, .. } => {
5581                format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5582            }
5583            HydroNode::Partition { f, is_true, .. } => {
5584                format!("Partition({:?}, is_true={})", f, is_true)
5585            }
5586            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5587            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5588            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5589            HydroNode::Batch { .. } => "Batch()".to_owned(),
5590            HydroNode::Chain { first, second, .. } => {
5591                format!("Chain({}, {})", first.print_root(), second.print_root())
5592            }
5593            HydroNode::MergeOrdered { first, second, .. } => {
5594                format!(
5595                    "MergeOrdered({}, {})",
5596                    first.print_root(),
5597                    second.print_root()
5598                )
5599            }
5600            HydroNode::ChainFirst { first, second, .. } => {
5601                format!(
5602                    "ChainFirst({}, {})",
5603                    first.print_root(),
5604                    second.print_root()
5605                )
5606            }
5607            HydroNode::CrossProduct { left, right, .. } => {
5608                format!(
5609                    "CrossProduct({}, {})",
5610                    left.print_root(),
5611                    right.print_root()
5612                )
5613            }
5614            HydroNode::CrossSingleton { left, right, .. } => {
5615                format!(
5616                    "CrossSingleton({}, {})",
5617                    left.print_root(),
5618                    right.print_root()
5619                )
5620            }
5621            HydroNode::Join { left, right, .. } => {
5622                format!("Join({}, {})", left.print_root(), right.print_root())
5623            }
5624            HydroNode::JoinHalf { left, right, .. } => {
5625                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5626            }
5627            HydroNode::Difference { pos, neg, .. } => {
5628                format!("Difference({}, {})", pos.print_root(), neg.print_root())
5629            }
5630            HydroNode::AntiJoin { pos, neg, .. } => {
5631                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5632            }
5633            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5634            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5635            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5636            HydroNode::Map { f, .. } => format!("Map({:?})", f),
5637            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5638            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5639            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5640            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5641            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5642            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5643            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5644            HydroNode::Unique { .. } => "Unique()".to_owned(),
5645            HydroNode::Sort { .. } => "Sort()".to_owned(),
5646            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5647            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5648            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5649                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5650            }
5651            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5652            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5653            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5654            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5655            HydroNode::Network { .. } => "Network()".to_owned(),
5656            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5657            HydroNode::Counter { tag, duration, .. } => {
5658                format!("Counter({:?}, {:?})", tag, duration)
5659            }
5660            HydroNode::VersionedNetworkFork {
5661                channel_name,
5662                senders,
5663                ..
5664            } => {
5665                let versions: Vec<u32> = senders.iter().map(|(v, _, _)| *v).collect();
5666                format!(
5667                    "VersionedNetworkFork({}, senders={:?})",
5668                    channel_name, versions
5669                )
5670            }
5671            HydroNode::VersionedNetwork { version, .. } => {
5672                format!("VersionedNetwork(v{})", version)
5673            }
5674        }
5675    }
5676}
5677
5678#[cfg(feature = "build")]
5679fn instantiate_network<'a, D>(
5680    env: &mut D::InstantiateEnv,
5681    from_location: &LocationId,
5682    to_location: &LocationId,
5683    processes: &SparseSecondaryMap<LocationKey, D::Process>,
5684    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5685    name: Option<&str>,
5686    networking_info: &crate::networking::NetworkingInfo,
5687) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5688where
5689    D: Deploy<'a>,
5690{
5691    let ((sink, source), connect_fn) = match (from_location, to_location) {
5692        (&LocationId::Process(from), &LocationId::Process(to)) => {
5693            let from_node = processes
5694                .get(from)
5695                .unwrap_or_else(|| {
5696                    panic!("A process used in the graph was not instantiated: {}", from)
5697                })
5698                .clone();
5699            let to_node = processes
5700                .get(to)
5701                .unwrap_or_else(|| {
5702                    panic!("A process used in the graph was not instantiated: {}", to)
5703                })
5704                .clone();
5705
5706            let sink_port = from_node.next_port();
5707            let source_port = to_node.next_port();
5708
5709            (
5710                D::o2o_sink_source(
5711                    env,
5712                    &from_node,
5713                    &sink_port,
5714                    &to_node,
5715                    &source_port,
5716                    name,
5717                    networking_info,
5718                ),
5719                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5720            )
5721        }
5722        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5723            let from_node = processes
5724                .get(from)
5725                .unwrap_or_else(|| {
5726                    panic!("A process used in the graph was not instantiated: {}", from)
5727                })
5728                .clone();
5729            let to_node = clusters
5730                .get(to)
5731                .unwrap_or_else(|| {
5732                    panic!("A cluster used in the graph was not instantiated: {}", to)
5733                })
5734                .clone();
5735
5736            let sink_port = from_node.next_port();
5737            let source_port = to_node.next_port();
5738
5739            (
5740                D::o2m_sink_source(
5741                    env,
5742                    &from_node,
5743                    &sink_port,
5744                    &to_node,
5745                    &source_port,
5746                    name,
5747                    networking_info,
5748                ),
5749                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5750            )
5751        }
5752        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5753            let from_node = clusters
5754                .get(from)
5755                .unwrap_or_else(|| {
5756                    panic!("A cluster used in the graph was not instantiated: {}", from)
5757                })
5758                .clone();
5759            let to_node = processes
5760                .get(to)
5761                .unwrap_or_else(|| {
5762                    panic!("A process used in the graph was not instantiated: {}", to)
5763                })
5764                .clone();
5765
5766            let sink_port = from_node.next_port();
5767            let source_port = to_node.next_port();
5768
5769            (
5770                D::m2o_sink_source(
5771                    env,
5772                    &from_node,
5773                    &sink_port,
5774                    &to_node,
5775                    &source_port,
5776                    name,
5777                    networking_info,
5778                ),
5779                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5780            )
5781        }
5782        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5783            let from_node = clusters
5784                .get(from)
5785                .unwrap_or_else(|| {
5786                    panic!("A cluster used in the graph was not instantiated: {}", from)
5787                })
5788                .clone();
5789            let to_node = clusters
5790                .get(to)
5791                .unwrap_or_else(|| {
5792                    panic!("A cluster used in the graph was not instantiated: {}", to)
5793                })
5794                .clone();
5795
5796            let sink_port = from_node.next_port();
5797            let source_port = to_node.next_port();
5798
5799            (
5800                D::m2m_sink_source(
5801                    env,
5802                    &from_node,
5803                    &sink_port,
5804                    &to_node,
5805                    &source_port,
5806                    name,
5807                    networking_info,
5808                ),
5809                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5810            )
5811        }
5812        (LocationId::Tick(_, _), _) => panic!(),
5813        (_, LocationId::Tick(_, _)) => panic!(),
5814        (LocationId::Atomic(_), _) => panic!(),
5815        (_, LocationId::Atomic(_)) => panic!(),
5816    };
5817    (sink, source, connect_fn)
5818}
5819
5820#[cfg(test)]
5821mod serde_test;
5822
5823#[cfg(test)]
5824mod test {
5825    use std::mem::size_of;
5826
5827    use stageleft::{QuotedWithContext, q};
5828
5829    use super::*;
5830
5831    #[test]
5832    #[cfg_attr(
5833        not(feature = "build"),
5834        ignore = "expects inclusion of feature-gated fields"
5835    )]
5836    fn hydro_node_size() {
5837        assert_eq!(size_of::<HydroNode>(), 264);
5838    }
5839
5840    #[test]
5841    #[cfg_attr(
5842        not(feature = "build"),
5843        ignore = "expects inclusion of feature-gated fields"
5844    )]
5845    fn hydro_root_size() {
5846        assert_eq!(size_of::<HydroRoot>(), 136);
5847    }
5848
5849    #[test]
5850    fn test_simplify_q_macro_basic() {
5851        // Test basic non-q! expression
5852        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5853        let result = simplify_q_macro(simple_expr.clone());
5854        assert_eq!(result, simple_expr);
5855    }
5856
5857    #[test]
5858    fn test_simplify_q_macro_actual_stageleft_call() {
5859        // Test a simplified version of what a real stageleft call might look like
5860        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5861        let result = simplify_q_macro(stageleft_call);
5862        // This should be processed by our visitor and simplified to q!(...)
5863        // since we detect the stageleft::runtime_support::fn_* pattern
5864        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5865    }
5866
5867    #[test]
5868    fn test_closure_no_pipe_at_start() {
5869        // Test a closure that does not start with a pipe
5870        let stageleft_call = q!({
5871            let foo = 123;
5872            move |b: usize| b + foo
5873        })
5874        .splice_fn1_ctx(&());
5875        let result = simplify_q_macro(stageleft_call);
5876        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5877    }
5878}