Skip to main content

hydro_lang/live_collections/keyed_stream/
networking.rs

1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{MinOrder, Ordering, Retries, Stream};
11#[cfg(stageleft_runtime)]
12use crate::location::dynamic::DynLocation;
13use crate::location::{Cluster, MemberId, Process};
14use crate::networking::{NetworkFor, TCP};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17    KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
20    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
21    /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
22    ///
23    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
24    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
25    /// API allows precise targeting of specific cluster members rather than broadcasting to
26    /// all members.
27    ///
28    /// # Example
29    /// ```rust
30    /// # #[cfg(feature = "deploy")] {
31    /// # use hydro_lang::prelude::*;
32    /// # use futures::StreamExt;
33    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
34    /// let p1 = flow.process::<()>();
35    /// let workers: Cluster<()> = flow.cluster::<()>();
36    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
37    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
38    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
39    ///     .into_keyed()
40    ///     .demux_bincode(&workers);
41    /// # on_worker.send_bincode(&p2).entries()
42    /// // if there are 4 members in the cluster, each receives one element
43    /// // - MemberId::<()>(0): [0]
44    /// // - MemberId::<()>(1): [1]
45    /// // - MemberId::<()>(2): [2]
46    /// // - MemberId::<()>(3): [3]
47    /// # }, |mut stream| async move {
48    /// # let mut results = Vec::new();
49    /// # for w in 0..4 {
50    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
51    /// # }
52    /// # results.sort();
53    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
54    /// # }));
55    /// # }
56    /// ```
57    pub fn demux_bincode(
58        self,
59        other: &Cluster<'a, L2>,
60    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
61    where
62        T: Serialize + DeserializeOwned,
63    {
64        self.demux(other, TCP.fail_stop().bincode())
65    }
66
67    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
68    /// identifying the recipient for each group and using the configuration in `via` to set up the
69    /// message transport.
70    ///
71    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
72    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
73    /// API allows precise targeting of specific cluster members rather than broadcasting to
74    /// all members.
75    ///
76    /// # Example
77    /// ```rust
78    /// # #[cfg(feature = "deploy")] {
79    /// # use hydro_lang::prelude::*;
80    /// # use futures::StreamExt;
81    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
82    /// let p1 = flow.process::<()>();
83    /// let workers: Cluster<()> = flow.cluster::<()>();
84    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
85    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
86    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
87    ///     .into_keyed()
88    ///     .demux(&workers, TCP.fail_stop().bincode());
89    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
90    /// // if there are 4 members in the cluster, each receives one element
91    /// // - MemberId::<()>(0): [0]
92    /// // - MemberId::<()>(1): [1]
93    /// // - MemberId::<()>(2): [2]
94    /// // - MemberId::<()>(3): [3]
95    /// # }, |mut stream| async move {
96    /// # let mut results = Vec::new();
97    /// # for w in 0..4 {
98    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
99    /// # }
100    /// # results.sort();
101    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
102    /// # }));
103    /// # }
104    /// ```
105    pub fn demux<N: NetworkFor<T>>(
106        self,
107        to: &Cluster<'a, L2>,
108        via: N,
109    ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
110    where
111        T: Serialize + DeserializeOwned,
112        O: MinOrder<N::OrderingGuarantee>,
113    {
114        let serialize_pipeline = Some(N::serialize_thunk(true));
115
116        let deserialize_pipeline = Some(N::deserialize_thunk(None));
117
118        let name = via.name();
119        if to.multiversioned() && name.is_none() {
120            panic!(
121                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
122            );
123        }
124
125        Stream::new(
126            to.clone(),
127            HydroNode::Network {
128                name: name.map(ToOwned::to_owned),
129                networking_info: N::networking_info(),
130                serialize_fn: serialize_pipeline.map(|e| e.into()),
131                instantiate_fn: DebugInstantiate::Building,
132                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
133                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
134                metadata: to.new_node_metadata(Stream::<
135                    T,
136                    Cluster<'a, L2>,
137                    Unbounded,
138                    <O as MinOrder<N::OrderingGuarantee>>::Min,
139                    R,
140                >::collection_kind()),
141            },
142        )
143    }
144}
145
146impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
147    KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
148{
149    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
150    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
151    /// compound key where the first element is the recipient's [`MemberId`] and the second element
152    /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
153    /// messages.
154    ///
155    /// # Example
156    /// ```rust
157    /// # #[cfg(feature = "deploy")] {
158    /// # use hydro_lang::prelude::*;
159    /// # use futures::StreamExt;
160    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
161    /// let p1 = flow.process::<()>();
162    /// let workers: Cluster<()> = flow.cluster::<()>();
163    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
164    ///     .source_iter(q!(vec![0, 1, 2, 3]))
165    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
166    ///     .into_keyed();
167    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
168    /// # on_worker.entries().send_bincode(&p2).entries()
169    /// // if there are 4 members in the cluster, each receives one element
170    /// // - MemberId::<()>(0): { 0: [123] }
171    /// // - MemberId::<()>(1): { 1: [124] }
172    /// // - ...
173    /// # }, |mut stream| async move {
174    /// # let mut results = Vec::new();
175    /// # for w in 0..4 {
176    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
177    /// # }
178    /// # results.sort();
179    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
180    /// # }));
181    /// # }
182    /// ```
183    pub fn demux_bincode(
184        self,
185        other: &Cluster<'a, L2>,
186    ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
187    where
188        K: Serialize + DeserializeOwned,
189        T: Serialize + DeserializeOwned,
190    {
191        self.demux(other, TCP.fail_stop().bincode())
192    }
193
194    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
195    /// compound key where the first element is the recipient's [`MemberId`] and the second element
196    /// is a key that will be sent along with the value, using the configuration in `via` to set up
197    /// the message transport.
198    ///
199    /// # Example
200    /// ```rust
201    /// # #[cfg(feature = "deploy")] {
202    /// # use hydro_lang::prelude::*;
203    /// # use futures::StreamExt;
204    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
205    /// let p1 = flow.process::<()>();
206    /// let workers: Cluster<()> = flow.cluster::<()>();
207    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
208    ///     .source_iter(q!(vec![0, 1, 2, 3]))
209    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
210    ///     .into_keyed();
211    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.fail_stop().bincode());
212    /// # on_worker.entries().send(&p2, TCP.fail_stop().bincode()).entries()
213    /// // if there are 4 members in the cluster, each receives one element
214    /// // - MemberId::<()>(0): { 0: [123] }
215    /// // - MemberId::<()>(1): { 1: [124] }
216    /// // - ...
217    /// # }, |mut stream| async move {
218    /// # let mut results = Vec::new();
219    /// # for w in 0..4 {
220    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
221    /// # }
222    /// # results.sort();
223    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
224    /// # }));
225    /// # }
226    /// ```
227    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
228    pub fn demux<N: NetworkFor<(K, T)>>(
229        self,
230        to: &Cluster<'a, L2>,
231        via: N,
232    ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
233    where
234        K: Serialize + DeserializeOwned,
235        T: Serialize + DeserializeOwned,
236        O: MinOrder<N::OrderingGuarantee>,
237    {
238        let serialize_pipeline = Some(N::serialize_thunk(true));
239
240        let deserialize_pipeline = Some(N::deserialize_thunk(None));
241
242        let name = via.name();
243        if to.multiversioned() && name.is_none() {
244            panic!(
245                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
246            );
247        }
248
249        KeyedStream::new(
250            to.clone(),
251            HydroNode::Network {
252                name: name.map(ToOwned::to_owned),
253                networking_info: N::networking_info(),
254                serialize_fn: serialize_pipeline.map(|e| e.into()),
255                instantiate_fn: DebugInstantiate::Building,
256                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
257                input: Box::new(
258                    self.entries()
259                        .map(q!(|((id, k), v)| (id, (k, v))))
260                        .ir_node
261                        .replace(HydroNode::Placeholder),
262                ),
263                metadata: to.new_node_metadata(KeyedStream::<
264                    K,
265                    T,
266                    Cluster<'a, L2>,
267                    Unbounded,
268                    <O as MinOrder<N::OrderingGuarantee>>::Min,
269                    R,
270                >::collection_kind()),
271            },
272        )
273    }
274}
275
276impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
277    KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
278{
279    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
280    /// Sends each group of this stream at each source member to a specific member of a destination
281    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
282    /// [`bincode`] to serialize/deserialize messages.
283    ///
284    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
285    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
286    /// API allows precise targeting of specific cluster members rather than broadcasting to all
287    /// members.
288    ///
289    /// Each cluster member sends its local stream elements, and they are collected at each
290    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
291    ///
292    /// # Example
293    /// ```rust
294    /// # #[cfg(feature = "deploy")] {
295    /// # use hydro_lang::prelude::*;
296    /// # use futures::StreamExt;
297    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
298    /// # type Source = ();
299    /// # type Destination = ();
300    /// let source: Cluster<Source> = flow.cluster::<Source>();
301    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
302    ///     .source_iter(q!(vec![0, 1, 2, 3]))
303    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
304    ///     .into_keyed();
305    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
306    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
307    /// # all_received.entries().send_bincode(&p2).entries()
308    /// # }, |mut stream| async move {
309    /// // if there are 4 members in the destination cluster, each receives one message from each source member
310    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
311    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
312    /// // - ...
313    /// # let mut results = Vec::new();
314    /// # for w in 0..16 {
315    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
316    /// # }
317    /// # results.sort();
318    /// # assert_eq!(results, vec![
319    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
320    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
321    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
322    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
323    /// # ]);
324    /// # }));
325    /// # }
326    /// ```
327    pub fn demux_bincode(
328        self,
329        other: &Cluster<'a, L2>,
330    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
331    where
332        T: Serialize + DeserializeOwned,
333    {
334        self.demux(other, TCP.fail_stop().bincode())
335    }
336
337    /// Sends each group of this stream at each source member to a specific member of a destination
338    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
339    /// configuration in `via` to set up the message transport.
340    ///
341    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
342    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
343    /// API allows precise targeting of specific cluster members rather than broadcasting to all
344    /// members.
345    ///
346    /// Each cluster member sends its local stream elements, and they are collected at each
347    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
348    ///
349    /// # Example
350    /// ```rust
351    /// # #[cfg(feature = "deploy")] {
352    /// # use hydro_lang::prelude::*;
353    /// # use futures::StreamExt;
354    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
355    /// # type Source = ();
356    /// # type Destination = ();
357    /// let source: Cluster<Source> = flow.cluster::<Source>();
358    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
359    ///     .source_iter(q!(vec![0, 1, 2, 3]))
360    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
361    ///     .into_keyed();
362    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
363    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
364    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
365    /// # }, |mut stream| async move {
366    /// // if there are 4 members in the destination cluster, each receives one message from each source member
367    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
368    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
369    /// // - ...
370    /// # let mut results = Vec::new();
371    /// # for w in 0..16 {
372    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
373    /// # }
374    /// # results.sort();
375    /// # assert_eq!(results, vec![
376    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
377    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
378    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
379    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
380    /// # ]);
381    /// # }));
382    /// # }
383    /// ```
384    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
385    pub fn demux<N: NetworkFor<T>>(
386        self,
387        to: &Cluster<'a, L2>,
388        via: N,
389    ) -> KeyedStream<
390        MemberId<L>,
391        T,
392        Cluster<'a, L2>,
393        Unbounded,
394        <O as MinOrder<N::OrderingGuarantee>>::Min,
395        R,
396    >
397    where
398        T: Serialize + DeserializeOwned,
399        O: MinOrder<N::OrderingGuarantee>,
400    {
401        let serialize_pipeline = Some(N::serialize_thunk(true));
402
403        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
404
405        let name = via.name();
406        if to.multiversioned() && name.is_none() {
407            panic!(
408                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
409            );
410        }
411
412        KeyedStream::new(
413            to.clone(),
414            HydroNode::Network {
415                name: name.map(ToOwned::to_owned),
416                networking_info: N::networking_info(),
417                serialize_fn: serialize_pipeline.map(|e| e.into()),
418                instantiate_fn: DebugInstantiate::Building,
419                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
420                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
421                metadata: to.new_node_metadata(KeyedStream::<
422                    MemberId<L>,
423                    T,
424                    Cluster<'a, L2>,
425                    Unbounded,
426                    <O as MinOrder<N::OrderingGuarantee>>::Min,
427                    R,
428                >::collection_kind()),
429            },
430        )
431    }
432}
433
434impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries>
435    KeyedStream<K, V, Cluster<'a, L>, B, O, R>
436{
437    #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
438    #[deprecated = "use KeyedStream::send(..., TCP.fail_stop().bincode()) instead"]
439    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
440    /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
441    /// has a compound key where the first element is the sender's [`MemberId`] and the second
442    /// element is the original key.
443    ///
444    /// # Example
445    /// ```rust
446    /// # #[cfg(feature = "deploy")] {
447    /// # use hydro_lang::prelude::*;
448    /// # use futures::StreamExt;
449    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
450    /// # type Source = ();
451    /// # type Destination = ();
452    /// let source: Cluster<Source> = flow.cluster::<Source>();
453    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
454    ///     .source_iter(q!(vec![0, 1, 2, 3]))
455    ///     .map(q!(|x| (x, x + 123)))
456    ///     .into_keyed();
457    /// let destination_process = flow.process::<Destination>();
458    /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
459    /// # all_received.entries().send_bincode(&p2)
460    /// # }, |mut stream| async move {
461    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
462    /// // {
463    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
464    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
465    /// //     ...
466    /// // }
467    /// # let mut results = Vec::new();
468    /// # for w in 0..16 {
469    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
470    /// # }
471    /// # results.sort();
472    /// # assert_eq!(results, vec![
473    /// #   "((MemberId::<()>(0), 0), 123)",
474    /// #   "((MemberId::<()>(0), 1), 124)",
475    /// #   "((MemberId::<()>(0), 2), 125)",
476    /// #   "((MemberId::<()>(0), 3), 126)",
477    /// #   "((MemberId::<()>(1), 0), 123)",
478    /// #   "((MemberId::<()>(1), 1), 124)",
479    /// #   "((MemberId::<()>(1), 2), 125)",
480    /// #   "((MemberId::<()>(1), 3), 126)",
481    /// #   "((MemberId::<()>(2), 0), 123)",
482    /// #   "((MemberId::<()>(2), 1), 124)",
483    /// #   "((MemberId::<()>(2), 2), 125)",
484    /// #   "((MemberId::<()>(2), 3), 126)",
485    /// #   "((MemberId::<()>(3), 0), 123)",
486    /// #   "((MemberId::<()>(3), 1), 124)",
487    /// #   "((MemberId::<()>(3), 2), 125)",
488    /// #   "((MemberId::<()>(3), 3), 126)",
489    /// # ]);
490    /// # }));
491    /// # }
492    /// ```
493    pub fn send_bincode<L2>(
494        self,
495        other: &Process<'a, L2>,
496    ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
497    where
498        K: Serialize + DeserializeOwned,
499        V: Serialize + DeserializeOwned,
500    {
501        self.send(other, TCP.fail_stop().bincode())
502    }
503
504    #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
505    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
506    /// network, using the configuration in `via` to set up the message transport. The resulting
507    /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
508    /// the second element is the original key.
509    ///
510    /// # Example
511    /// ```rust
512    /// # #[cfg(feature = "deploy")] {
513    /// # use hydro_lang::prelude::*;
514    /// # use futures::StreamExt;
515    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
516    /// # type Source = ();
517    /// # type Destination = ();
518    /// let source: Cluster<Source> = flow.cluster::<Source>();
519    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
520    ///     .source_iter(q!(vec![0, 1, 2, 3]))
521    ///     .map(q!(|x| (x, x + 123)))
522    ///     .into_keyed();
523    /// let destination_process = flow.process::<Destination>();
524    /// let all_received = to_send.send(&destination_process, TCP.fail_stop().bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
525    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode())
526    /// # }, |mut stream| async move {
527    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
528    /// // {
529    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
530    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
531    /// //     ...
532    /// // }
533    /// # let mut results = Vec::new();
534    /// # for w in 0..16 {
535    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
536    /// # }
537    /// # results.sort();
538    /// # assert_eq!(results, vec![
539    /// #   "((MemberId::<()>(0), 0), 123)",
540    /// #   "((MemberId::<()>(0), 1), 124)",
541    /// #   "((MemberId::<()>(0), 2), 125)",
542    /// #   "((MemberId::<()>(0), 3), 126)",
543    /// #   "((MemberId::<()>(1), 0), 123)",
544    /// #   "((MemberId::<()>(1), 1), 124)",
545    /// #   "((MemberId::<()>(1), 2), 125)",
546    /// #   "((MemberId::<()>(1), 3), 126)",
547    /// #   "((MemberId::<()>(2), 0), 123)",
548    /// #   "((MemberId::<()>(2), 1), 124)",
549    /// #   "((MemberId::<()>(2), 2), 125)",
550    /// #   "((MemberId::<()>(2), 3), 126)",
551    /// #   "((MemberId::<()>(3), 0), 123)",
552    /// #   "((MemberId::<()>(3), 1), 124)",
553    /// #   "((MemberId::<()>(3), 2), 125)",
554    /// #   "((MemberId::<()>(3), 3), 126)",
555    /// # ]);
556    /// # }));
557    /// # }
558    /// ```
559    pub fn send<L2, N: NetworkFor<(K, V)>>(
560        self,
561        to: &Process<'a, L2>,
562        via: N,
563    ) -> KeyedStream<
564        (MemberId<L>, K),
565        V,
566        Process<'a, L2>,
567        Unbounded,
568        <O as MinOrder<N::OrderingGuarantee>>::Min,
569        R,
570    >
571    where
572        K: Serialize + DeserializeOwned,
573        V: Serialize + DeserializeOwned,
574        O: MinOrder<N::OrderingGuarantee>,
575    {
576        let serialize_pipeline = Some(N::serialize_thunk(false));
577
578        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
579
580        let name = via.name();
581        if to.multiversioned() && name.is_none() {
582            panic!(
583                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
584            );
585        }
586
587        let raw_stream: Stream<
588            (MemberId<L>, (K, V)),
589            Process<'a, L2>,
590            Unbounded,
591            <O as MinOrder<N::OrderingGuarantee>>::Min,
592            R,
593        > = Stream::new(
594            to.clone(),
595            HydroNode::Network {
596                name: name.map(ToOwned::to_owned),
597                networking_info: N::networking_info(),
598                serialize_fn: serialize_pipeline.map(|e| e.into()),
599                instantiate_fn: DebugInstantiate::Building,
600                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
601                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
602                metadata: to.new_node_metadata(Stream::<
603                    (MemberId<L>, (K, V)),
604                    Cluster<'a, L2>,
605                    Unbounded,
606                    <O as MinOrder<N::OrderingGuarantee>>::Min,
607                    R,
608                >::collection_kind()),
609            },
610        );
611
612        raw_stream
613            .map(q!(|(sender, (k, v))| ((sender, k), v)))
614            .into_keyed()
615    }
616}