1use std::fmt::{Debug, Formatter};
14use std::marker::PhantomData;
15
16use super::{Location, LocationId};
17use crate::compile::builder::FlowState;
18use crate::location::LocationKey;
19use crate::staging_util::Invariant;
20
21pub struct Process<'a, ProcessTag = ()> {
38 pub(crate) key: LocationKey,
39 pub(crate) flow_state: FlowState,
40 pub(crate) _phantom: Invariant<'a, ProcessTag>,
41}
42
43impl<P> Debug for Process<'_, P> {
44 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45 write!(f, "Process({})", self.key)
46 }
47}
48
49impl<P> Eq for Process<'_, P> {}
50impl<P> PartialEq for Process<'_, P> {
51 fn eq(&self, other: &Self) -> bool {
52 self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
53 }
54}
55
56impl<P> Clone for Process<'_, P> {
57 fn clone(&self) -> Self {
58 Process {
59 key: self.key,
60 flow_state: self.flow_state.clone(),
61 _phantom: PhantomData,
62 }
63 }
64}
65
66impl<'a, P> super::dynamic::DynLocation for Process<'a, P> {
67 fn id(&self) -> LocationId {
68 LocationId::Process(self.key)
69 }
70
71 fn flow_state(&self) -> &FlowState {
72 &self.flow_state
73 }
74
75 fn is_top_level() -> bool {
76 true
77 }
78
79 fn multiversioned(&self) -> bool {
80 false }
82}
83
84impl<'a, P> Location<'a> for Process<'a, P> {
85 type Root = Self;
86
87 fn root(&self) -> Self::Root {
88 self.clone()
89 }
90}
91
92#[cfg(feature = "sim")]
93impl<'a, P> Process<'a, P> {
94 #[expect(clippy::type_complexity, reason = "stream markers")]
100 pub fn sim_atomic_input<
101 T,
102 O: crate::live_collections::stream::Ordering,
103 R: crate::live_collections::stream::Retries,
104 >(
105 &self,
106 ) -> (
107 crate::sim::SimAtomicSender<T, O, R>,
108 crate::live_collections::stream::Stream<
109 T,
110 super::Atomic<Self>,
111 crate::live_collections::boundedness::Unbounded,
112 O,
113 R,
114 >,
115 )
116 where
117 T: serde::Serialize + serde::de::DeserializeOwned,
118 {
119 use std::marker::PhantomData;
120
121 use stageleft::quote_type;
122 use tokio_util::codec::LengthDelimitedCodec;
123
124 use crate::compile::ir::{DebugInstantiate, HydroNode, HydroRoot};
125 use crate::live_collections::boundedness::Unbounded;
126 use crate::live_collections::stream::Stream;
127 use crate::location::dynamic::DynLocation;
128 use crate::location::tick::Atomic;
129 use crate::location::{Location, LocationKey, NetworkHint, Tick};
130 use crate::staging_util::get_this_crate;
131
132 let id = self.flow_state().borrow_mut().next_clock_id();
133 let atomic_location = Atomic {
134 tick: Tick {
135 id,
136 l: self.clone(),
137 },
138 };
139
140 let external_location: super::External<'a, ()> = super::External {
141 key: LocationKey::FIRST,
142 flow_state: self.flow_state().clone(),
143 _phantom: PhantomData,
144 };
145
146 let next_external_port_id = self.flow_state().borrow_mut().next_external_port();
147
148 let root = get_this_crate();
149 let in_t_type = quote_type::<T>();
150
151 let deser_fn: syn::Expr = syn::parse_quote! {
152 |res| {
153 let b = res.unwrap();
154 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
155 }
156 };
157
158 let stream: Stream<T, Atomic<Self>, Unbounded, O, R> = Stream::new(
160 atomic_location.clone(),
161 HydroNode::ExternalInput {
162 from_external_key: external_location.key,
163 from_port_id: next_external_port_id,
164 from_many: false,
165 codec_type: quote_type::<LengthDelimitedCodec>().into(),
166 port_hint: NetworkHint::Auto,
167 instantiate_fn: DebugInstantiate::Building,
168 deserialize_fn: Some(deser_fn.into()),
169 metadata: atomic_location.new_node_metadata(Stream::<
170 T,
171 Atomic<Self>,
172 Unbounded,
173 O,
174 R,
175 >::collection_kind()),
176 },
177 );
178
179 let empty_stream: Stream<T, Self, _, _, _> = self.source_iter(stageleft::q!([]));
181 let out_t_type = quote_type::<T>();
182 let ser_fn: syn::Expr = syn::parse_quote! {
183 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
184 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
185 )
186 };
187 self.flow_state()
188 .borrow_mut()
189 .push_root(HydroRoot::SendExternal {
190 to_external_key: external_location.key,
191 to_port_id: next_external_port_id,
192 to_many: false,
193 unpaired: false,
194 serialize_fn: Some(ser_fn.into()),
195 instantiate_fn: DebugInstantiate::Building,
196 input: Box::new(empty_stream.ir_node.replace(HydroNode::Placeholder)),
197 op_metadata: crate::compile::ir::HydroIrOpMetadata::new(),
198 });
199
200 (
201 crate::sim::SimAtomicSender(crate::sim::SimSender(next_external_port_id, PhantomData)),
202 stream,
203 )
204 }
205}