Skip to main content

hydro_lang/compile/
built.rs

1use std::marker::PhantomData;
2
3use dfir_lang::graph::{
4    DfirGraph, FlatGraphBuilderOutput, eliminate_extra_unions_tees, partition_graph,
5};
6use slotmap::{SecondaryMap, SlotMap};
7
8use super::compiled::CompiledFlow;
9use super::deploy::{DeployFlow, DeployResult};
10use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
11use super::ir::{HydroRoot, emit};
12use crate::location::{Cluster, External, LocationKey, LocationType, Process};
13#[cfg(stageleft_runtime)]
14#[cfg(feature = "sim")]
15use crate::sim::{flow::SimFlow, graph::SimNode};
16use crate::staging_util::Invariant;
17#[cfg(stageleft_runtime)]
18#[cfg(feature = "viz")]
19use crate::viz::api::GraphApi;
20
21pub struct BuiltFlow<'a> {
22    pub(super) ir: Vec<HydroRoot>,
23    pub(super) locations: SlotMap<LocationKey, LocationType>,
24    pub(super) location_names: SecondaryMap<LocationKey, String>,
25
26    /// Compile-time sidecar directives extracted from the flow state.
27    pub(super) sidecars: Vec<super::builder::Sidecar>,
28
29    /// Application name used in telemetry.
30    pub(super) flow_name: String,
31
32    /// The program version each location belongs to (every location has an entry; 0 unless it is a
33    /// `next_version` successor).
34    #[cfg(feature = "sim")]
35    pub(super) location_version: SecondaryMap<LocationKey, u32>,
36
37    /// Maps from a given location to Version 0 of that location.
38    /// [`Cluster::next_version`](crate::location::Cluster::next_version)).
39    #[cfg(feature = "sim")]
40    pub(super) location_group: SecondaryMap<LocationKey, LocationKey>,
41
42    pub(super) _phantom: Invariant<'a>,
43}
44
45pub(crate) fn build_inner(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, DfirGraph> {
46    emit(ir)
47        .into_iter()
48        .map(|(k, v)| {
49            let FlatGraphBuilderOutput { mut flat_graph, .. } =
50                v.build().expect("Failed to build DFIR flat graph.");
51            eliminate_extra_unions_tees(&mut flat_graph);
52            let partitioned_graph =
53                partition_graph(flat_graph).expect("Failed to partition (cycle detected).");
54            (k, partitioned_graph)
55        })
56        .collect()
57}
58
59impl<'a> BuiltFlow<'a> {
60    /// Returns all [`HydroRoot`]s in the IR.
61    pub fn ir(&self) -> &[HydroRoot] {
62        &self.ir
63    }
64
65    /// Serialize the IR as JSON.
66    #[cfg(feature = "runtime_support")]
67    pub fn ir_json(&self) -> Result<String, serde_json::Error> {
68        super::ir::serialize_dedup_shared(|| serde_json::to_string_pretty(&self.ir))
69    }
70
71    /// Returns all raw location ID -> location name mappings.
72    pub fn location_names(&self) -> &SecondaryMap<LocationKey, String> {
73        &self.location_names
74    }
75
76    /// Get a GraphApi instance for this built flow
77    #[cfg(stageleft_runtime)]
78    #[cfg(feature = "viz")]
79    pub fn graph_api(&self) -> GraphApi<'_> {
80        GraphApi::new(&self.ir, self.location_names())
81    }
82
83    /// Render graph to string in the given format.
84    #[cfg(feature = "viz")]
85    pub fn render_graph(
86        &self,
87        format: crate::viz::config::GraphType,
88        use_short_labels: bool,
89        show_metadata: bool,
90    ) -> String {
91        self.graph_api()
92            .render(format, use_short_labels, show_metadata)
93    }
94
95    /// Write graph to file.
96    #[cfg(feature = "viz")]
97    pub fn write_graph_to_file(
98        &self,
99        format: crate::viz::config::GraphType,
100        filename: &str,
101        use_short_labels: bool,
102        show_metadata: bool,
103    ) -> Result<(), Box<dyn std::error::Error>> {
104        self.graph_api()
105            .write_to_file(format, filename, use_short_labels, show_metadata)
106    }
107
108    /// Generate graph based on CLI config. Returns Some(path) if written.
109    #[cfg(feature = "viz")]
110    pub fn generate_graph(
111        &self,
112        config: &crate::viz::config::GraphConfig,
113    ) -> Result<Option<String>, Box<dyn std::error::Error>> {
114        self.graph_api().generate_graph(config)
115    }
116
117    pub fn optimize_with(mut self, f: impl FnOnce(&mut [HydroRoot])) -> Self {
118        f(&mut self.ir);
119        self
120    }
121
122    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
123        self.into_deploy()
124    }
125
126    #[cfg(feature = "sim")]
127    /// Creates a simulation for this builder, which can be used to run deterministic simulations
128    /// of the Hydro program.
129    pub fn sim(self) -> SimFlow<'a> {
130        use std::cell::RefCell;
131        use std::rc::Rc;
132
133        use slotmap::SparseSecondaryMap;
134
135        use crate::sim::graph::SimNodePort;
136
137        let shared_port_counter = Rc::new(RefCell::new(crate::Counter::<SimNodePort>::default()));
138
139        let mut processes = SparseSecondaryMap::new();
140        let mut clusters = SparseSecondaryMap::new();
141        let externals = SparseSecondaryMap::new();
142
143        for (key, loc) in self.locations.iter() {
144            match loc {
145                LocationType::Process => {
146                    processes.insert(
147                        key,
148                        SimNode {
149                            shared_port_counter: shared_port_counter.clone(),
150                        },
151                    );
152                }
153                LocationType::Cluster => {
154                    clusters.insert(
155                        key,
156                        SimNode {
157                            shared_port_counter: shared_port_counter.clone(),
158                        },
159                    );
160                }
161                LocationType::External => {
162                    panic!("Sim cannot have externals");
163                }
164            }
165        }
166
167        SimFlow {
168            ir: self.ir,
169            processes,
170            clusters,
171            externals,
172            cluster_max_sizes: SparseSecondaryMap::new(),
173            externals_port_registry: Default::default(),
174            location_version: self.location_version,
175            location_group: self.location_group,
176            test_safety_only: false,
177            skip_consistency_assertions: false,
178            unit_test_fuzz_iterations: 8192,
179            _phantom: PhantomData,
180        }
181    }
182
183    pub fn into_deploy<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
184        let (processes, clusters, externals) = Default::default();
185        DeployFlow {
186            ir: self.ir,
187            locations: self.locations,
188            location_names: self.location_names,
189            processes,
190            clusters,
191            externals,
192            sidecars: self.sidecars,
193            flow_name: self.flow_name,
194            _phantom: PhantomData,
195        }
196    }
197
198    pub fn with_process<P, D: Deploy<'a>>(
199        self,
200        process: &Process<P>,
201        spec: impl IntoProcessSpec<'a, D>,
202    ) -> DeployFlow<'a, D> {
203        self.into_deploy().with_process(process, spec)
204    }
205
206    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
207        self,
208        spec: impl Fn() -> S,
209    ) -> DeployFlow<'a, D> {
210        self.into_deploy().with_remaining_processes(spec)
211    }
212
213    pub fn with_external<P, D: Deploy<'a>>(
214        self,
215        process: &External<P>,
216        spec: impl ExternalSpec<'a, D>,
217    ) -> DeployFlow<'a, D> {
218        self.into_deploy().with_external(process, spec)
219    }
220
221    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
222        self,
223        spec: impl Fn() -> S,
224    ) -> DeployFlow<'a, D> {
225        self.into_deploy().with_remaining_externals(spec)
226    }
227
228    pub fn with_cluster<C, D: Deploy<'a>>(
229        self,
230        cluster: &Cluster<C>,
231        spec: impl ClusterSpec<'a, D>,
232    ) -> DeployFlow<'a, D> {
233        self.into_deploy().with_cluster(cluster, spec)
234    }
235
236    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
237        self,
238        spec: impl Fn() -> S,
239    ) -> DeployFlow<'a, D> {
240        self.into_deploy().with_remaining_clusters(spec)
241    }
242
243    pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
244        self.into_deploy::<D>().compile()
245    }
246
247    pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
248        self.into_deploy::<D>().deploy(env)
249    }
250}