hydro_lang/live_collections/keyed_stream/networking.rs
1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{MinOrder, Ordering, Retries, Stream};
11#[cfg(stageleft_runtime)]
12use crate::location::dynamic::DynLocation;
13use crate::location::{Cluster, MemberId, Process};
14use crate::networking::{NetworkFor, TCP};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17 KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
20 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
21 /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
22 ///
23 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
24 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
25 /// API allows precise targeting of specific cluster members rather than broadcasting to
26 /// all members.
27 ///
28 /// # Example
29 /// ```rust
30 /// # #[cfg(feature = "deploy")] {
31 /// # use hydro_lang::prelude::*;
32 /// # use futures::StreamExt;
33 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
34 /// let p1 = flow.process::<()>();
35 /// let workers: Cluster<()> = flow.cluster::<()>();
36 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
37 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
38 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
39 /// .into_keyed()
40 /// .demux_bincode(&workers);
41 /// # on_worker.send_bincode(&p2).entries()
42 /// // if there are 4 members in the cluster, each receives one element
43 /// // - MemberId::<()>(0): [0]
44 /// // - MemberId::<()>(1): [1]
45 /// // - MemberId::<()>(2): [2]
46 /// // - MemberId::<()>(3): [3]
47 /// # }, |mut stream| async move {
48 /// # let mut results = Vec::new();
49 /// # for w in 0..4 {
50 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
51 /// # }
52 /// # results.sort();
53 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
54 /// # }));
55 /// # }
56 /// ```
57 pub fn demux_bincode(
58 self,
59 other: &Cluster<'a, L2>,
60 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
61 where
62 T: Serialize + DeserializeOwned,
63 {
64 self.demux(other, TCP.fail_stop().bincode())
65 }
66
67 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
68 /// identifying the recipient for each group and using the configuration in `via` to set up the
69 /// message transport.
70 ///
71 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
72 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
73 /// API allows precise targeting of specific cluster members rather than broadcasting to
74 /// all members.
75 ///
76 /// # Example
77 /// ```rust
78 /// # #[cfg(feature = "deploy")] {
79 /// # use hydro_lang::prelude::*;
80 /// # use futures::StreamExt;
81 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
82 /// let p1 = flow.process::<()>();
83 /// let workers: Cluster<()> = flow.cluster::<()>();
84 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
85 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
86 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
87 /// .into_keyed()
88 /// .demux(&workers, TCP.fail_stop().bincode());
89 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
90 /// // if there are 4 members in the cluster, each receives one element
91 /// // - MemberId::<()>(0): [0]
92 /// // - MemberId::<()>(1): [1]
93 /// // - MemberId::<()>(2): [2]
94 /// // - MemberId::<()>(3): [3]
95 /// # }, |mut stream| async move {
96 /// # let mut results = Vec::new();
97 /// # for w in 0..4 {
98 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
99 /// # }
100 /// # results.sort();
101 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
102 /// # }));
103 /// # }
104 /// ```
105 pub fn demux<N: NetworkFor<T>>(
106 self,
107 to: &Cluster<'a, L2>,
108 via: N,
109 ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
110 where
111 T: Serialize + DeserializeOwned,
112 O: MinOrder<N::OrderingGuarantee>,
113 {
114 let serialize_pipeline = Some(N::serialize_thunk(true));
115
116 let deserialize_pipeline = Some(N::deserialize_thunk(None));
117
118 let name = via.name();
119 if to.multiversioned() && name.is_none() {
120 panic!(
121 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
122 );
123 }
124
125 Stream::new(
126 to.clone(),
127 HydroNode::Network {
128 name: name.map(ToOwned::to_owned),
129 networking_info: N::networking_info(),
130 serialize_fn: serialize_pipeline.map(|e| e.into()),
131 instantiate_fn: DebugInstantiate::Building,
132 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
133 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
134 metadata: to.new_node_metadata(Stream::<
135 T,
136 Cluster<'a, L2>,
137 Unbounded,
138 <O as MinOrder<N::OrderingGuarantee>>::Min,
139 R,
140 >::collection_kind()),
141 },
142 )
143 }
144}
145
146impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
147 KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
148{
149 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
150 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
151 /// compound key where the first element is the recipient's [`MemberId`] and the second element
152 /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
153 /// messages.
154 ///
155 /// # Example
156 /// ```rust
157 /// # #[cfg(feature = "deploy")] {
158 /// # use hydro_lang::prelude::*;
159 /// # use futures::StreamExt;
160 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
161 /// let p1 = flow.process::<()>();
162 /// let workers: Cluster<()> = flow.cluster::<()>();
163 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
164 /// .source_iter(q!(vec![0, 1, 2, 3]))
165 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
166 /// .into_keyed();
167 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
168 /// # on_worker.entries().send_bincode(&p2).entries()
169 /// // if there are 4 members in the cluster, each receives one element
170 /// // - MemberId::<()>(0): { 0: [123] }
171 /// // - MemberId::<()>(1): { 1: [124] }
172 /// // - ...
173 /// # }, |mut stream| async move {
174 /// # let mut results = Vec::new();
175 /// # for w in 0..4 {
176 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
177 /// # }
178 /// # results.sort();
179 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
180 /// # }));
181 /// # }
182 /// ```
183 pub fn demux_bincode(
184 self,
185 other: &Cluster<'a, L2>,
186 ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
187 where
188 K: Serialize + DeserializeOwned,
189 T: Serialize + DeserializeOwned,
190 {
191 self.demux(other, TCP.fail_stop().bincode())
192 }
193
194 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
195 /// compound key where the first element is the recipient's [`MemberId`] and the second element
196 /// is a key that will be sent along with the value, using the configuration in `via` to set up
197 /// the message transport.
198 ///
199 /// # Example
200 /// ```rust
201 /// # #[cfg(feature = "deploy")] {
202 /// # use hydro_lang::prelude::*;
203 /// # use futures::StreamExt;
204 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
205 /// let p1 = flow.process::<()>();
206 /// let workers: Cluster<()> = flow.cluster::<()>();
207 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
208 /// .source_iter(q!(vec![0, 1, 2, 3]))
209 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
210 /// .into_keyed();
211 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.fail_stop().bincode());
212 /// # on_worker.entries().send(&p2, TCP.fail_stop().bincode()).entries()
213 /// // if there are 4 members in the cluster, each receives one element
214 /// // - MemberId::<()>(0): { 0: [123] }
215 /// // - MemberId::<()>(1): { 1: [124] }
216 /// // - ...
217 /// # }, |mut stream| async move {
218 /// # let mut results = Vec::new();
219 /// # for w in 0..4 {
220 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
221 /// # }
222 /// # results.sort();
223 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
224 /// # }));
225 /// # }
226 /// ```
227 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
228 pub fn demux<N: NetworkFor<(K, T)>>(
229 self,
230 to: &Cluster<'a, L2>,
231 via: N,
232 ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
233 where
234 K: Serialize + DeserializeOwned,
235 T: Serialize + DeserializeOwned,
236 O: MinOrder<N::OrderingGuarantee>,
237 {
238 let serialize_pipeline = Some(N::serialize_thunk(true));
239
240 let deserialize_pipeline = Some(N::deserialize_thunk(None));
241
242 let name = via.name();
243 if to.multiversioned() && name.is_none() {
244 panic!(
245 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
246 );
247 }
248
249 KeyedStream::new(
250 to.clone(),
251 HydroNode::Network {
252 name: name.map(ToOwned::to_owned),
253 networking_info: N::networking_info(),
254 serialize_fn: serialize_pipeline.map(|e| e.into()),
255 instantiate_fn: DebugInstantiate::Building,
256 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
257 input: Box::new(
258 self.entries()
259 .map(q!(|((id, k), v)| (id, (k, v))))
260 .ir_node
261 .replace(HydroNode::Placeholder),
262 ),
263 metadata: to.new_node_metadata(KeyedStream::<
264 K,
265 T,
266 Cluster<'a, L2>,
267 Unbounded,
268 <O as MinOrder<N::OrderingGuarantee>>::Min,
269 R,
270 >::collection_kind()),
271 },
272 )
273 }
274}
275
276impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
277 KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
278{
279 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
280 /// Sends each group of this stream at each source member to a specific member of a destination
281 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
282 /// [`bincode`] to serialize/deserialize messages.
283 ///
284 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
285 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
286 /// API allows precise targeting of specific cluster members rather than broadcasting to all
287 /// members.
288 ///
289 /// Each cluster member sends its local stream elements, and they are collected at each
290 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
291 ///
292 /// # Example
293 /// ```rust
294 /// # #[cfg(feature = "deploy")] {
295 /// # use hydro_lang::prelude::*;
296 /// # use futures::StreamExt;
297 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
298 /// # type Source = ();
299 /// # type Destination = ();
300 /// let source: Cluster<Source> = flow.cluster::<Source>();
301 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
302 /// .source_iter(q!(vec![0, 1, 2, 3]))
303 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
304 /// .into_keyed();
305 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
306 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
307 /// # all_received.entries().send_bincode(&p2).entries()
308 /// # }, |mut stream| async move {
309 /// // if there are 4 members in the destination cluster, each receives one message from each source member
310 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
311 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
312 /// // - ...
313 /// # let mut results = Vec::new();
314 /// # for w in 0..16 {
315 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
316 /// # }
317 /// # results.sort();
318 /// # assert_eq!(results, vec![
319 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
320 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
321 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
322 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
323 /// # ]);
324 /// # }));
325 /// # }
326 /// ```
327 pub fn demux_bincode(
328 self,
329 other: &Cluster<'a, L2>,
330 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
331 where
332 T: Serialize + DeserializeOwned,
333 {
334 self.demux(other, TCP.fail_stop().bincode())
335 }
336
337 /// Sends each group of this stream at each source member to a specific member of a destination
338 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
339 /// configuration in `via` to set up the message transport.
340 ///
341 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
342 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
343 /// API allows precise targeting of specific cluster members rather than broadcasting to all
344 /// members.
345 ///
346 /// Each cluster member sends its local stream elements, and they are collected at each
347 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
348 ///
349 /// # Example
350 /// ```rust
351 /// # #[cfg(feature = "deploy")] {
352 /// # use hydro_lang::prelude::*;
353 /// # use futures::StreamExt;
354 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
355 /// # type Source = ();
356 /// # type Destination = ();
357 /// let source: Cluster<Source> = flow.cluster::<Source>();
358 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
359 /// .source_iter(q!(vec![0, 1, 2, 3]))
360 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
361 /// .into_keyed();
362 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
363 /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
364 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
365 /// # }, |mut stream| async move {
366 /// // if there are 4 members in the destination cluster, each receives one message from each source member
367 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
368 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
369 /// // - ...
370 /// # let mut results = Vec::new();
371 /// # for w in 0..16 {
372 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
373 /// # }
374 /// # results.sort();
375 /// # assert_eq!(results, vec![
376 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
377 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
378 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
379 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
380 /// # ]);
381 /// # }));
382 /// # }
383 /// ```
384 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
385 pub fn demux<N: NetworkFor<T>>(
386 self,
387 to: &Cluster<'a, L2>,
388 via: N,
389 ) -> KeyedStream<
390 MemberId<L>,
391 T,
392 Cluster<'a, L2>,
393 Unbounded,
394 <O as MinOrder<N::OrderingGuarantee>>::Min,
395 R,
396 >
397 where
398 T: Serialize + DeserializeOwned,
399 O: MinOrder<N::OrderingGuarantee>,
400 {
401 let serialize_pipeline = Some(N::serialize_thunk(true));
402
403 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
404
405 let name = via.name();
406 if to.multiversioned() && name.is_none() {
407 panic!(
408 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
409 );
410 }
411
412 KeyedStream::new(
413 to.clone(),
414 HydroNode::Network {
415 name: name.map(ToOwned::to_owned),
416 networking_info: N::networking_info(),
417 serialize_fn: serialize_pipeline.map(|e| e.into()),
418 instantiate_fn: DebugInstantiate::Building,
419 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
420 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
421 metadata: to.new_node_metadata(KeyedStream::<
422 MemberId<L>,
423 T,
424 Cluster<'a, L2>,
425 Unbounded,
426 <O as MinOrder<N::OrderingGuarantee>>::Min,
427 R,
428 >::collection_kind()),
429 },
430 )
431 }
432}
433
434impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries>
435 KeyedStream<K, V, Cluster<'a, L>, B, O, R>
436{
437 #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
438 #[deprecated = "use KeyedStream::send(..., TCP.fail_stop().bincode()) instead"]
439 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
440 /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
441 /// has a compound key where the first element is the sender's [`MemberId`] and the second
442 /// element is the original key.
443 ///
444 /// # Example
445 /// ```rust
446 /// # #[cfg(feature = "deploy")] {
447 /// # use hydro_lang::prelude::*;
448 /// # use futures::StreamExt;
449 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
450 /// # type Source = ();
451 /// # type Destination = ();
452 /// let source: Cluster<Source> = flow.cluster::<Source>();
453 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
454 /// .source_iter(q!(vec![0, 1, 2, 3]))
455 /// .map(q!(|x| (x, x + 123)))
456 /// .into_keyed();
457 /// let destination_process = flow.process::<Destination>();
458 /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
459 /// # all_received.entries().send_bincode(&p2)
460 /// # }, |mut stream| async move {
461 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
462 /// // {
463 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
464 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
465 /// // ...
466 /// // }
467 /// # let mut results = Vec::new();
468 /// # for w in 0..16 {
469 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
470 /// # }
471 /// # results.sort();
472 /// # assert_eq!(results, vec![
473 /// # "((MemberId::<()>(0), 0), 123)",
474 /// # "((MemberId::<()>(0), 1), 124)",
475 /// # "((MemberId::<()>(0), 2), 125)",
476 /// # "((MemberId::<()>(0), 3), 126)",
477 /// # "((MemberId::<()>(1), 0), 123)",
478 /// # "((MemberId::<()>(1), 1), 124)",
479 /// # "((MemberId::<()>(1), 2), 125)",
480 /// # "((MemberId::<()>(1), 3), 126)",
481 /// # "((MemberId::<()>(2), 0), 123)",
482 /// # "((MemberId::<()>(2), 1), 124)",
483 /// # "((MemberId::<()>(2), 2), 125)",
484 /// # "((MemberId::<()>(2), 3), 126)",
485 /// # "((MemberId::<()>(3), 0), 123)",
486 /// # "((MemberId::<()>(3), 1), 124)",
487 /// # "((MemberId::<()>(3), 2), 125)",
488 /// # "((MemberId::<()>(3), 3), 126)",
489 /// # ]);
490 /// # }));
491 /// # }
492 /// ```
493 pub fn send_bincode<L2>(
494 self,
495 other: &Process<'a, L2>,
496 ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
497 where
498 K: Serialize + DeserializeOwned,
499 V: Serialize + DeserializeOwned,
500 {
501 self.send(other, TCP.fail_stop().bincode())
502 }
503
504 #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
505 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
506 /// network, using the configuration in `via` to set up the message transport. The resulting
507 /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
508 /// the second element is the original key.
509 ///
510 /// # Example
511 /// ```rust
512 /// # #[cfg(feature = "deploy")] {
513 /// # use hydro_lang::prelude::*;
514 /// # use futures::StreamExt;
515 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
516 /// # type Source = ();
517 /// # type Destination = ();
518 /// let source: Cluster<Source> = flow.cluster::<Source>();
519 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
520 /// .source_iter(q!(vec![0, 1, 2, 3]))
521 /// .map(q!(|x| (x, x + 123)))
522 /// .into_keyed();
523 /// let destination_process = flow.process::<Destination>();
524 /// let all_received = to_send.send(&destination_process, TCP.fail_stop().bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
525 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode())
526 /// # }, |mut stream| async move {
527 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
528 /// // {
529 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
530 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
531 /// // ...
532 /// // }
533 /// # let mut results = Vec::new();
534 /// # for w in 0..16 {
535 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
536 /// # }
537 /// # results.sort();
538 /// # assert_eq!(results, vec![
539 /// # "((MemberId::<()>(0), 0), 123)",
540 /// # "((MemberId::<()>(0), 1), 124)",
541 /// # "((MemberId::<()>(0), 2), 125)",
542 /// # "((MemberId::<()>(0), 3), 126)",
543 /// # "((MemberId::<()>(1), 0), 123)",
544 /// # "((MemberId::<()>(1), 1), 124)",
545 /// # "((MemberId::<()>(1), 2), 125)",
546 /// # "((MemberId::<()>(1), 3), 126)",
547 /// # "((MemberId::<()>(2), 0), 123)",
548 /// # "((MemberId::<()>(2), 1), 124)",
549 /// # "((MemberId::<()>(2), 2), 125)",
550 /// # "((MemberId::<()>(2), 3), 126)",
551 /// # "((MemberId::<()>(3), 0), 123)",
552 /// # "((MemberId::<()>(3), 1), 124)",
553 /// # "((MemberId::<()>(3), 2), 125)",
554 /// # "((MemberId::<()>(3), 3), 126)",
555 /// # ]);
556 /// # }));
557 /// # }
558 /// ```
559 pub fn send<L2, N: NetworkFor<(K, V)>>(
560 self,
561 to: &Process<'a, L2>,
562 via: N,
563 ) -> KeyedStream<
564 (MemberId<L>, K),
565 V,
566 Process<'a, L2>,
567 Unbounded,
568 <O as MinOrder<N::OrderingGuarantee>>::Min,
569 R,
570 >
571 where
572 K: Serialize + DeserializeOwned,
573 V: Serialize + DeserializeOwned,
574 O: MinOrder<N::OrderingGuarantee>,
575 {
576 let serialize_pipeline = Some(N::serialize_thunk(false));
577
578 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
579
580 let name = via.name();
581 if to.multiversioned() && name.is_none() {
582 panic!(
583 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
584 );
585 }
586
587 let raw_stream: Stream<
588 (MemberId<L>, (K, V)),
589 Process<'a, L2>,
590 Unbounded,
591 <O as MinOrder<N::OrderingGuarantee>>::Min,
592 R,
593 > = Stream::new(
594 to.clone(),
595 HydroNode::Network {
596 name: name.map(ToOwned::to_owned),
597 networking_info: N::networking_info(),
598 serialize_fn: serialize_pipeline.map(|e| e.into()),
599 instantiate_fn: DebugInstantiate::Building,
600 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
601 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
602 metadata: to.new_node_metadata(Stream::<
603 (MemberId<L>, (K, V)),
604 Cluster<'a, L2>,
605 Unbounded,
606 <O as MinOrder<N::OrderingGuarantee>>::Min,
607 R,
608 >::collection_kind()),
609 },
610 );
611
612 raw_stream
613 .map(q!(|(sender, (k, v))| ((sender, k), v)))
614 .into_keyed()
615 }
616}