Skip to main content

hydro_lang/location/
process.rs

1//! Definition of the [`Process`] location type, representing a single-node
2//! compute location in a distributed Hydro program.
3//!
4//! A [`Process`] is the simplest kind of location: it corresponds to exactly one
5//! machine (or OS process) and all live collections placed on it are materialized
6//! on that single node. Use a process when the computation does not need to be
7//! replicated or partitioned across multiple nodes.
8//!
9//! Processes are created via [`FlowBuilder::process`](crate::compile::builder::FlowBuilder::process)
10//! and are parameterized by a **tag type** (`ProcessTag`) that lets the type
11//! system distinguish different processes at compile time.
12
13use std::fmt::{Debug, Formatter};
14use std::marker::PhantomData;
15
16use super::{Location, LocationId};
17use crate::compile::builder::FlowState;
18use crate::location::LocationKey;
19use crate::staging_util::Invariant;
20
21/// A single-node location in a distributed Hydro program.
22///
23/// `Process` represents exactly one machine (or OS process) and is one of the
24/// core location types that implements the [`Location`] trait. Live collections
25/// placed on a `Process` are materialized entirely on that single node.
26///
27/// The type parameter `ProcessTag` is a compile-time marker that differentiates
28/// distinct processes in the same dataflow graph (e.g. `Process<'a, Leader>` vs
29/// `Process<'a, Follower>`). It defaults to `()` when only one process is
30/// needed.
31///
32/// # Creating a Process
33/// ```rust,ignore
34/// let mut flow = FlowBuilder::new();
35/// let node = flow.process::<MyTag>();
36/// ```
37pub struct Process<'a, ProcessTag = ()> {
38    pub(crate) key: LocationKey,
39    pub(crate) flow_state: FlowState,
40    pub(crate) _phantom: Invariant<'a, ProcessTag>,
41}
42
43impl<P> Debug for Process<'_, P> {
44    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45        write!(f, "Process({})", self.key)
46    }
47}
48
49impl<P> Eq for Process<'_, P> {}
50impl<P> PartialEq for Process<'_, P> {
51    fn eq(&self, other: &Self) -> bool {
52        self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
53    }
54}
55
56impl<P> Clone for Process<'_, P> {
57    fn clone(&self) -> Self {
58        Process {
59            key: self.key,
60            flow_state: self.flow_state.clone(),
61            _phantom: PhantomData,
62        }
63    }
64}
65
66impl<'a, P> super::dynamic::DynLocation for Process<'a, P> {
67    fn id(&self) -> LocationId {
68        LocationId::Process(self.key)
69    }
70
71    fn flow_state(&self) -> &FlowState {
72        &self.flow_state
73    }
74
75    fn is_top_level() -> bool {
76        true
77    }
78
79    fn multiversioned(&self) -> bool {
80        false // processes are always single-versioned
81    }
82}
83
84impl<'a, P> Location<'a> for Process<'a, P> {
85    type Root = Self;
86
87    fn root(&self) -> Self::Root {
88        self.clone()
89    }
90}
91
92#[cfg(feature = "sim")]
93impl<'a, P> Process<'a, P> {
94    /// Sets up a simulated atomic input port on this process for testing.
95    ///
96    /// The source is placed directly inside the atomic tick, so sent values
97    /// are immediately available in the next atomic slice without requiring
98    /// a separate tick to batch them in. `send_atomic` is synchronous.
99    #[expect(clippy::type_complexity, reason = "stream markers")]
100    pub fn sim_atomic_input<
101        T,
102        O: crate::live_collections::stream::Ordering,
103        R: crate::live_collections::stream::Retries,
104    >(
105        &self,
106    ) -> (
107        crate::sim::SimAtomicSender<T, O, R>,
108        crate::live_collections::stream::Stream<
109            T,
110            super::Atomic<Self>,
111            crate::live_collections::boundedness::Unbounded,
112            O,
113            R,
114        >,
115    )
116    where
117        T: serde::Serialize + serde::de::DeserializeOwned,
118    {
119        use std::marker::PhantomData;
120
121        use stageleft::quote_type;
122        use tokio_util::codec::LengthDelimitedCodec;
123
124        use crate::compile::ir::{DebugInstantiate, HydroNode, HydroRoot};
125        use crate::live_collections::boundedness::Unbounded;
126        use crate::live_collections::stream::Stream;
127        use crate::location::dynamic::DynLocation;
128        use crate::location::tick::Atomic;
129        use crate::location::{Location, LocationKey, NetworkHint, Tick};
130        use crate::staging_util::get_this_crate;
131
132        let id = self.flow_state().borrow_mut().next_clock_id();
133        let atomic_location = Atomic {
134            tick: Tick {
135                id,
136                l: self.clone(),
137            },
138        };
139
140        let external_location: super::External<'a, ()> = super::External {
141            key: LocationKey::FIRST,
142            flow_state: self.flow_state().clone(),
143            _phantom: PhantomData,
144        };
145
146        let next_external_port_id = self.flow_state().borrow_mut().next_external_port();
147
148        let root = get_this_crate();
149        let in_t_type = quote_type::<T>();
150
151        let deser_fn: syn::Expr = syn::parse_quote! {
152            |res| {
153                let b = res.unwrap();
154                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
155            }
156        };
157
158        // Create the source stream at the Atomic location directly
159        let stream: Stream<T, Atomic<Self>, Unbounded, O, R> = Stream::new(
160            atomic_location.clone(),
161            HydroNode::ExternalInput {
162                from_external_key: external_location.key,
163                from_port_id: next_external_port_id,
164                from_many: false,
165                codec_type: quote_type::<LengthDelimitedCodec>().into(),
166                port_hint: NetworkHint::Auto,
167                instantiate_fn: DebugInstantiate::Building,
168                deserialize_fn: Some(deser_fn.into()),
169                metadata: atomic_location.new_node_metadata(Stream::<
170                    T,
171                    Atomic<Self>,
172                    Unbounded,
173                    O,
174                    R,
175                >::collection_kind()),
176            },
177        );
178
179        // Wire up a dummy send side (empty stream) so the external port is paired
180        let empty_stream: Stream<T, Self, _, _, _> = self.source_iter(stageleft::q!([]));
181        let out_t_type = quote_type::<T>();
182        let ser_fn: syn::Expr = syn::parse_quote! {
183            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
184                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
185            )
186        };
187        self.flow_state()
188            .borrow_mut()
189            .push_root(HydroRoot::SendExternal {
190                to_external_key: external_location.key,
191                to_port_id: next_external_port_id,
192                to_many: false,
193                unpaired: false,
194                serialize_fn: Some(ser_fn.into()),
195                instantiate_fn: DebugInstantiate::Building,
196                input: Box::new(empty_stream.ir_node.replace(HydroNode::Placeholder)),
197                op_metadata: crate::compile::ir::HydroIrOpMetadata::new(),
198            });
199
200        (
201            crate::sim::SimAtomicSender(crate::sim::SimSender(next_external_port_id, PhantomData)),
202            stream,
203        )
204    }
205}