hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{
16 ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
17};
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::keyed_singleton::KeyedSingletonBound;
27use crate::live_collections::stream::{
28 AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
29};
30#[cfg(stageleft_runtime)]
31use crate::location::dynamic::{DynLocation, LocationId};
32use crate::location::tick::DeferTick;
33use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
34use crate::manual_expr::ManualExpr;
35use crate::nondet::{NonDet, nondet};
36use crate::properties::{
37 AggFuncAlgebra, ApplyMonotoneKeyedStream, ValidCommutativityFor, ValidIdempotenceFor,
38 manual_proof,
39};
40
41pub mod networking;
42
43/// Streaming elements of type `V` grouped by a key of type `K`.
44///
45/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
46/// order of keys is non-deterministic but the order *within* each group may be deterministic.
47///
48/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
49/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
50/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
51///
52/// Type Parameters:
53/// - `K`: the type of the key for each group
54/// - `V`: the type of the elements inside each group
55/// - `Loc`: the [`Location`] where the keyed stream is materialized
56/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
57/// - `Order`: tracks whether the elements within each group have deterministic order
58/// ([`TotalOrder`]) or not ([`NoOrder`])
59/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
60/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
61pub struct KeyedStream<
62 K,
63 V,
64 Loc,
65 Bound: Boundedness = Unbounded,
66 Order: Ordering = TotalOrder,
67 Retry: Retries = ExactlyOnce,
68> {
69 pub(crate) location: Loc,
70 pub(crate) ir_node: RefCell<HydroNode>,
71 pub(crate) flow_state: FlowState,
72
73 _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
74}
75
76impl<K, V, L, B: Boundedness, O: Ordering, R: Retries> Drop for KeyedStream<K, V, L, B, O, R> {
77 fn drop(&mut self) {
78 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
79 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
80 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
81 input: Box::new(ir_node),
82 op_metadata: HydroIrOpMetadata::new(),
83 });
84 }
85 }
86}
87
88impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
89 for KeyedStream<K, V, L, Unbounded, O, R>
90where
91 L: Location<'a>,
92{
93 fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
94 let new_meta = stream
95 .location
96 .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
97
98 KeyedStream {
99 location: stream.location.clone(),
100 flow_state: stream.flow_state.clone(),
101 ir_node: RefCell::new(HydroNode::Cast {
102 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
103 metadata: new_meta,
104 }),
105 _phantom: PhantomData,
106 }
107 }
108}
109
110impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
111 for KeyedStream<K, V, L, B, NoOrder, R>
112where
113 L: Location<'a>,
114{
115 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
116 stream.weaken_ordering()
117 }
118}
119
120impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
121where
122 L: Location<'a>,
123{
124 fn defer_tick(self) -> Self {
125 KeyedStream::defer_tick(self)
126 }
127}
128
129impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
130 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
131where
132 L: Location<'a>,
133{
134 type Location = Tick<L>;
135
136 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
137 KeyedStream {
138 flow_state: location.flow_state().clone(),
139 location: location.clone(),
140 ir_node: RefCell::new(HydroNode::CycleSource {
141 cycle_id,
142 metadata: location.new_node_metadata(
143 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
144 ),
145 }),
146 _phantom: PhantomData,
147 }
148 }
149}
150
151impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
152 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
153where
154 L: Location<'a>,
155{
156 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
157 assert_eq!(
158 Location::id(&self.location),
159 expected_location,
160 "locations do not match"
161 );
162
163 self.location
164 .flow_state()
165 .borrow_mut()
166 .push_root(HydroRoot::CycleSink {
167 cycle_id,
168 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
169 op_metadata: HydroIrOpMetadata::new(),
170 });
171 }
172}
173
174impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
175 for KeyedStream<K, V, L, B, O, R>
176where
177 L: Location<'a> + NoTick,
178{
179 type Location = L;
180
181 fn create_source(cycle_id: CycleId, location: L) -> Self {
182 KeyedStream {
183 flow_state: location.flow_state().clone(),
184 location: location.clone(),
185 ir_node: RefCell::new(HydroNode::CycleSource {
186 cycle_id,
187 metadata: location
188 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
189 }),
190 _phantom: PhantomData,
191 }
192 }
193}
194
195impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
196 for KeyedStream<K, V, L, B, O, R>
197where
198 L: Location<'a> + NoTick,
199{
200 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
201 assert_eq!(
202 Location::id(&self.location),
203 expected_location,
204 "locations do not match"
205 );
206 self.location
207 .flow_state()
208 .borrow_mut()
209 .push_root(HydroRoot::CycleSink {
210 cycle_id,
211 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
212 op_metadata: HydroIrOpMetadata::new(),
213 });
214 }
215}
216
217impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
218 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
219{
220 fn clone(&self) -> Self {
221 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
222 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
223 *self.ir_node.borrow_mut() = HydroNode::Tee {
224 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
225 metadata: self.location.new_node_metadata(Self::collection_kind()),
226 };
227 }
228
229 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
230 KeyedStream {
231 location: self.location.clone(),
232 flow_state: self.flow_state.clone(),
233 ir_node: HydroNode::Tee {
234 inner: SharedNode(inner.0.clone()),
235 metadata: metadata.clone(),
236 }
237 .into(),
238 _phantom: PhantomData,
239 }
240 } else {
241 unreachable!()
242 }
243 }
244}
245
246/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
247/// control the processing of future elements.
248pub enum Generate<T> {
249 /// Emit the provided element, and keep processing future inputs.
250 Yield(T),
251 /// Emit the provided element as the _final_ element, do not process future inputs.
252 Return(T),
253 /// Do not emit anything, but continue processing future inputs.
254 Continue,
255 /// Do not emit anything, and do not process further inputs.
256 Break,
257}
258
259impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
260 KeyedStream<K, V, L, B, O, R>
261{
262 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
263 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
264 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
265
266 let flow_state = location.flow_state().clone();
267 KeyedStream {
268 location,
269 flow_state,
270 ir_node: RefCell::new(ir_node),
271 _phantom: PhantomData,
272 }
273 }
274
275 /// Returns the [`CollectionKind`] corresponding to this type.
276 pub fn collection_kind() -> CollectionKind {
277 CollectionKind::KeyedStream {
278 bound: B::BOUND_KIND,
279 value_order: O::ORDERING_KIND,
280 value_retry: R::RETRIES_KIND,
281 key_type: stageleft::quote_type::<K>().into(),
282 value_type: stageleft::quote_type::<V>().into(),
283 }
284 }
285
286 /// Returns the [`Location`] where this keyed stream is being materialized.
287 pub fn location(&self) -> &L {
288 &self.location
289 }
290
291 /// Turns this [`KeyedStream`] into a [`Stream`] preserving ordering, under the invariant
292 /// assumption that there is at most one key. If this invariant is broken, the program
293 /// may exhibit undefined behavior, so uses must be carefully vetted.
294 pub(crate) fn cast_at_most_one_key(self) -> Stream<(K, V), L, B, O, R> {
295 Stream::new(
296 self.location.clone(),
297 HydroNode::Cast {
298 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
299 metadata: self
300 .location
301 .new_node_metadata(Stream::<(K, V), L, B, O, R>::collection_kind()),
302 },
303 )
304 }
305
306 /// Turns this [`KeyedStream`] into a [`KeyedSingleton`], under the invariant assumption that
307 /// there is at most one entry per key. If this invariant is broken, the program may exhibit
308 /// undefined behavior, so uses must be carefully vetted.
309 pub(crate) fn cast_at_most_one_entry_per_key(
310 self,
311 ) -> KeyedSingleton<K, V, L, B::WithBoundedValue> {
312 KeyedSingleton::new(
313 self.location.clone(),
314 HydroNode::Cast {
315 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
316 metadata: self.location.new_node_metadata(KeyedSingleton::<
317 K,
318 V,
319 L,
320 B::WithBoundedValue,
321 >::collection_kind()),
322 },
323 )
324 }
325
326 pub(crate) fn use_ordering_type<O2: Ordering>(self) -> KeyedStream<K, V, L, B, O2, R> {
327 if O::ORDERING_KIND == O2::ORDERING_KIND {
328 KeyedStream::new(
329 self.location.clone(),
330 self.ir_node.replace(HydroNode::Placeholder),
331 )
332 } else {
333 panic!(
334 "Runtime ordering {:?} did not match requested cast {:?}.",
335 O::ORDERING_KIND,
336 O2::ORDERING_KIND
337 )
338 }
339 }
340
341 /// Explicitly "casts" the keyed stream to a type with a different ordering
342 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
343 /// by the type-system.
344 ///
345 /// # Non-Determinism
346 /// This function is used as an escape hatch, and any mistakes in the
347 /// provided ordering guarantee will propagate into the guarantees
348 /// for the rest of the program.
349 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
350 if O::ORDERING_KIND == O2::ORDERING_KIND {
351 KeyedStream::new(
352 self.location.clone(),
353 self.ir_node.replace(HydroNode::Placeholder),
354 )
355 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
356 // We can always weaken the ordering guarantee
357 KeyedStream::new(
358 self.location.clone(),
359 HydroNode::Cast {
360 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
361 metadata: self
362 .location
363 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
364 },
365 )
366 } else {
367 KeyedStream::new(
368 self.location.clone(),
369 HydroNode::ObserveNonDet {
370 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
371 trusted: false,
372 metadata: self
373 .location
374 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
375 },
376 )
377 }
378 }
379
380 fn assume_ordering_trusted<O2: Ordering>(
381 self,
382 _nondet: NonDet,
383 ) -> KeyedStream<K, V, L, B, O2, R> {
384 if O::ORDERING_KIND == O2::ORDERING_KIND {
385 KeyedStream::new(
386 self.location.clone(),
387 self.ir_node.replace(HydroNode::Placeholder),
388 )
389 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
390 // We can always weaken the ordering guarantee
391 KeyedStream::new(
392 self.location.clone(),
393 HydroNode::Cast {
394 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
395 metadata: self
396 .location
397 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
398 },
399 )
400 } else {
401 KeyedStream::new(
402 self.location.clone(),
403 HydroNode::ObserveNonDet {
404 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
405 trusted: true,
406 metadata: self
407 .location
408 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
409 },
410 )
411 }
412 }
413
414 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
415 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
416 /// which is always safe because that is the weakest possible guarantee.
417 pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
418 self.weaken_ordering::<NoOrder>()
419 }
420
421 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
422 /// enforcing that `O2` is weaker than the input ordering guarantee.
423 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
424 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
425 self.assume_ordering::<O2>(nondet)
426 }
427
428 /// Explicitly "casts" the keyed stream to a type with a different retries
429 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
430 /// be proven by the type-system.
431 ///
432 /// # Non-Determinism
433 /// This function is used as an escape hatch, and any mistakes in the
434 /// provided retries guarantee will propagate into the guarantees
435 /// for the rest of the program.
436 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
437 if R::RETRIES_KIND == R2::RETRIES_KIND {
438 KeyedStream::new(
439 self.location.clone(),
440 self.ir_node.replace(HydroNode::Placeholder),
441 )
442 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
443 // We can always weaken the retries guarantee
444 KeyedStream::new(
445 self.location.clone(),
446 HydroNode::Cast {
447 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
448 metadata: self
449 .location
450 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
451 },
452 )
453 } else {
454 KeyedStream::new(
455 self.location.clone(),
456 HydroNode::ObserveNonDet {
457 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
458 trusted: false,
459 metadata: self
460 .location
461 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
462 },
463 )
464 }
465 }
466
467 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
468 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
469 /// which is always safe because that is the weakest possible guarantee.
470 pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
471 self.weaken_retries::<AtLeastOnce>()
472 }
473
474 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
475 /// enforcing that `R2` is weaker than the input retries guarantee.
476 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
477 let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
478 self.assume_retries::<R2>(nondet)
479 }
480
481 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
482 /// implies that `O == TotalOrder`.
483 pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
484 where
485 O: IsOrdered,
486 {
487 self.assume_ordering(nondet!(/** no-op */))
488 }
489
490 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
491 /// implies that `R == ExactlyOnce`.
492 pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
493 where
494 R: IsExactlyOnce,
495 {
496 self.assume_retries(nondet!(/** no-op */))
497 }
498
499 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
500 /// implies that `B == Bounded`.
501 pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
502 where
503 B: IsBounded,
504 {
505 self.weaken_boundedness()
506 }
507
508 /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
509 /// which implies that `B == Bounded`.
510 pub fn weaken_boundedness<B2: Boundedness>(self) -> KeyedStream<K, V, L, B2, O, R> {
511 if B::BOUNDED == B2::BOUNDED {
512 KeyedStream::new(
513 self.location.clone(),
514 self.ir_node.replace(HydroNode::Placeholder),
515 )
516 } else {
517 // We can always weaken the boundedness
518 KeyedStream::new(
519 self.location.clone(),
520 HydroNode::Cast {
521 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
522 metadata: self
523 .location
524 .new_node_metadata(KeyedStream::<K, V, L, B2, O, R>::collection_kind()),
525 },
526 )
527 }
528 }
529
530 /// Flattens the keyed stream into an unordered stream of key-value pairs.
531 ///
532 /// # Example
533 /// ```rust
534 /// # #[cfg(feature = "deploy")] {
535 /// # use hydro_lang::prelude::*;
536 /// # use futures::StreamExt;
537 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
538 /// process
539 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
540 /// .into_keyed()
541 /// .entries()
542 /// # }, |mut stream| async move {
543 /// // (1, 2), (1, 3), (2, 4) in any order
544 /// # let mut results = Vec::new();
545 /// # for _ in 0..3 {
546 /// # results.push(stream.next().await.unwrap());
547 /// # }
548 /// # results.sort();
549 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
550 /// # }));
551 /// # }
552 /// ```
553 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
554 Stream::new(
555 self.location.clone(),
556 HydroNode::Cast {
557 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
558 metadata: self
559 .location
560 .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
561 },
562 )
563 }
564
565 /// Flattens the keyed stream into a totally ordered stream of key-value pairs,
566 /// preserving the order of values within each key group but non-deterministically
567 /// interleaving across keys.
568 ///
569 /// Requires the keyed stream to be totally ordered within each group (`O: IsOrdered`).
570 ///
571 /// # Non-Determinism
572 /// The interleaving of entries across different keys is non-deterministic.
573 /// Within each key, the original order is preserved.
574 pub fn entries_partially_ordered(self, _nondet: NonDet) -> Stream<(K, V), L, B, TotalOrder, R>
575 where
576 O: IsOrdered,
577 {
578 Stream::new(
579 self.location.clone(),
580 HydroNode::ObserveNonDet {
581 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
582 trusted: false,
583 metadata: self
584 .location
585 .new_node_metadata(Stream::<(K, V), L, B, TotalOrder, R>::collection_kind()),
586 },
587 )
588 }
589
590 /// Flattens the keyed stream into an unordered stream of only the values.
591 ///
592 /// # Example
593 /// ```rust
594 /// # #[cfg(feature = "deploy")] {
595 /// # use hydro_lang::prelude::*;
596 /// # use futures::StreamExt;
597 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
598 /// process
599 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
600 /// .into_keyed()
601 /// .values()
602 /// # }, |mut stream| async move {
603 /// // 2, 3, 4 in any order
604 /// # let mut results = Vec::new();
605 /// # for _ in 0..3 {
606 /// # results.push(stream.next().await.unwrap());
607 /// # }
608 /// # results.sort();
609 /// # assert_eq!(results, vec![2, 3, 4]);
610 /// # }));
611 /// # }
612 /// ```
613 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
614 self.entries().map(q!(|(_, v)| v))
615 }
616
617 /// Flattens the keyed stream into an unordered stream of just the keys.
618 ///
619 /// # Example
620 /// ```rust
621 /// # #[cfg(feature = "deploy")] {
622 /// # use hydro_lang::prelude::*;
623 /// # use futures::StreamExt;
624 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
625 /// # process
626 /// # .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
627 /// # .into_keyed()
628 /// # .keys()
629 /// # }, |mut stream| async move {
630 /// // 1, 2 in any order
631 /// # let mut results = Vec::new();
632 /// # for _ in 0..2 {
633 /// # results.push(stream.next().await.unwrap());
634 /// # }
635 /// # results.sort();
636 /// # assert_eq!(results, vec![1, 2]);
637 /// # }));
638 /// # }
639 /// ```
640 pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
641 where
642 K: Eq + Hash,
643 {
644 self.entries().map(q!(|(k, _)| k)).unique()
645 }
646
647 /// Transforms each value by invoking `f` on each element, with keys staying the same
648 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
649 ///
650 /// If you do not want to modify the stream and instead only want to view
651 /// each item use [`KeyedStream::inspect`] instead.
652 ///
653 /// # Example
654 /// ```rust
655 /// # #[cfg(feature = "deploy")] {
656 /// # use hydro_lang::prelude::*;
657 /// # use futures::StreamExt;
658 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
659 /// process
660 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
661 /// .into_keyed()
662 /// .map(q!(|v| v + 1))
663 /// # .entries()
664 /// # }, |mut stream| async move {
665 /// // { 1: [3, 4], 2: [5] }
666 /// # let mut results = Vec::new();
667 /// # for _ in 0..3 {
668 /// # results.push(stream.next().await.unwrap());
669 /// # }
670 /// # results.sort();
671 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
672 /// # }));
673 /// # }
674 /// ```
675 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
676 where
677 F: Fn(V) -> U + 'a,
678 {
679 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
680 let map_f = q!({
681 let orig = f;
682 move |(k, v)| (k, orig(v))
683 })
684 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
685 .into();
686
687 KeyedStream::new(
688 self.location.clone(),
689 HydroNode::Map {
690 f: map_f,
691 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
692 metadata: self
693 .location
694 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
695 },
696 )
697 }
698
699 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
700 /// re-grouped even they are tuples; instead they will be grouped under the original key.
701 ///
702 /// If you do not want to modify the stream and instead only want to view
703 /// each item use [`KeyedStream::inspect_with_key`] instead.
704 ///
705 /// # Example
706 /// ```rust
707 /// # #[cfg(feature = "deploy")] {
708 /// # use hydro_lang::prelude::*;
709 /// # use futures::StreamExt;
710 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
711 /// process
712 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
713 /// .into_keyed()
714 /// .map_with_key(q!(|(k, v)| k + v))
715 /// # .entries()
716 /// # }, |mut stream| async move {
717 /// // { 1: [3, 4], 2: [6] }
718 /// # let mut results = Vec::new();
719 /// # for _ in 0..3 {
720 /// # results.push(stream.next().await.unwrap());
721 /// # }
722 /// # results.sort();
723 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
724 /// # }));
725 /// # }
726 /// ```
727 pub fn map_with_key<U, F>(
728 self,
729 f: impl IntoQuotedMut<'a, F, L> + Copy,
730 ) -> KeyedStream<K, U, L, B, O, R>
731 where
732 F: Fn((K, V)) -> U + 'a,
733 K: Clone,
734 {
735 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
736 let map_f = q!({
737 let orig = f;
738 move |(k, v)| {
739 let out = orig((Clone::clone(&k), v));
740 (k, out)
741 }
742 })
743 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
744 .into();
745
746 KeyedStream::new(
747 self.location.clone(),
748 HydroNode::Map {
749 f: map_f,
750 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
751 metadata: self
752 .location
753 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
754 },
755 )
756 }
757
758 /// Prepends a new value to the key of each element in the stream, producing a new
759 /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
760 /// occurs and the elements in each group preserve their original order.
761 ///
762 /// # Example
763 /// ```rust
764 /// # #[cfg(feature = "deploy")] {
765 /// # use hydro_lang::prelude::*;
766 /// # use futures::StreamExt;
767 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
768 /// process
769 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
770 /// .into_keyed()
771 /// .prefix_key(q!(|&(k, _)| k % 2))
772 /// # .entries()
773 /// # }, |mut stream| async move {
774 /// // { (1, 1): [2, 3], (0, 2): [4] }
775 /// # let mut results = Vec::new();
776 /// # for _ in 0..3 {
777 /// # results.push(stream.next().await.unwrap());
778 /// # }
779 /// # results.sort();
780 /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
781 /// # }));
782 /// # }
783 /// ```
784 pub fn prefix_key<K2, F>(
785 self,
786 f: impl IntoQuotedMut<'a, F, L> + Copy,
787 ) -> KeyedStream<(K2, K), V, L, B, O, R>
788 where
789 F: Fn(&(K, V)) -> K2 + 'a,
790 {
791 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
792 let map_f = q!({
793 let orig = f;
794 move |kv| {
795 let out = orig(&kv);
796 ((out, kv.0), kv.1)
797 }
798 })
799 .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
800 .into();
801
802 KeyedStream::new(
803 self.location.clone(),
804 HydroNode::Map {
805 f: map_f,
806 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
807 metadata: self
808 .location
809 .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
810 },
811 )
812 }
813
814 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
815 /// `f`, preserving the order of the elements within the group.
816 ///
817 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
818 /// not modify or take ownership of the values. If you need to modify the values while filtering
819 /// use [`KeyedStream::filter_map`] instead.
820 ///
821 /// # Example
822 /// ```rust
823 /// # #[cfg(feature = "deploy")] {
824 /// # use hydro_lang::prelude::*;
825 /// # use futures::StreamExt;
826 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
827 /// process
828 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
829 /// .into_keyed()
830 /// .filter(q!(|&x| x > 2))
831 /// # .entries()
832 /// # }, |mut stream| async move {
833 /// // { 1: [3], 2: [4] }
834 /// # let mut results = Vec::new();
835 /// # for _ in 0..2 {
836 /// # results.push(stream.next().await.unwrap());
837 /// # }
838 /// # results.sort();
839 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
840 /// # }));
841 /// # }
842 /// ```
843 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
844 where
845 F: Fn(&V) -> bool + 'a,
846 {
847 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
848 let filter_f = q!({
849 let orig = f;
850 move |t: &(_, _)| orig(&t.1)
851 })
852 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
853 .into();
854
855 KeyedStream::new(
856 self.location.clone(),
857 HydroNode::Filter {
858 f: filter_f,
859 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
860 metadata: self.location.new_node_metadata(Self::collection_kind()),
861 },
862 )
863 }
864
865 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
866 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
867 ///
868 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
869 /// not modify or take ownership of the values. If you need to modify the values while filtering
870 /// use [`KeyedStream::filter_map_with_key`] instead.
871 ///
872 /// # Example
873 /// ```rust
874 /// # #[cfg(feature = "deploy")] {
875 /// # use hydro_lang::prelude::*;
876 /// # use futures::StreamExt;
877 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
878 /// process
879 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
880 /// .into_keyed()
881 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
882 /// # .entries()
883 /// # }, |mut stream| async move {
884 /// // { 1: [3], 2: [4] }
885 /// # let mut results = Vec::new();
886 /// # for _ in 0..2 {
887 /// # results.push(stream.next().await.unwrap());
888 /// # }
889 /// # results.sort();
890 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
891 /// # }));
892 /// # }
893 /// ```
894 pub fn filter_with_key<F>(
895 self,
896 f: impl IntoQuotedMut<'a, F, L> + Copy,
897 ) -> KeyedStream<K, V, L, B, O, R>
898 where
899 F: Fn(&(K, V)) -> bool + 'a,
900 {
901 let filter_f = f
902 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
903 .into();
904
905 KeyedStream::new(
906 self.location.clone(),
907 HydroNode::Filter {
908 f: filter_f,
909 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
910 metadata: self.location.new_node_metadata(Self::collection_kind()),
911 },
912 )
913 }
914
915 /// An operator that both filters and maps each value, with keys staying the same.
916 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
917 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
918 ///
919 /// # Example
920 /// ```rust
921 /// # #[cfg(feature = "deploy")] {
922 /// # use hydro_lang::prelude::*;
923 /// # use futures::StreamExt;
924 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
925 /// process
926 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
927 /// .into_keyed()
928 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
929 /// # .entries()
930 /// # }, |mut stream| async move {
931 /// // { 1: [2], 2: [4] }
932 /// # let mut results = Vec::new();
933 /// # for _ in 0..2 {
934 /// # results.push(stream.next().await.unwrap());
935 /// # }
936 /// # results.sort();
937 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
938 /// # }));
939 /// # }
940 /// ```
941 pub fn filter_map<U, F>(
942 self,
943 f: impl IntoQuotedMut<'a, F, L> + Copy,
944 ) -> KeyedStream<K, U, L, B, O, R>
945 where
946 F: Fn(V) -> Option<U> + 'a,
947 {
948 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
949 let filter_map_f = q!({
950 let orig = f;
951 move |(k, v)| orig(v).map(|o| (k, o))
952 })
953 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
954 .into();
955
956 KeyedStream::new(
957 self.location.clone(),
958 HydroNode::FilterMap {
959 f: filter_map_f,
960 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
961 metadata: self
962 .location
963 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
964 },
965 )
966 }
967
968 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
969 /// re-grouped even they are tuples; instead they will be grouped under the original key.
970 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
971 ///
972 /// # Example
973 /// ```rust
974 /// # #[cfg(feature = "deploy")] {
975 /// # use hydro_lang::prelude::*;
976 /// # use futures::StreamExt;
977 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
978 /// process
979 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
980 /// .into_keyed()
981 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
982 /// # .entries()
983 /// # }, |mut stream| async move {
984 /// // { 2: [2] }
985 /// # let mut results = Vec::new();
986 /// # for _ in 0..1 {
987 /// # results.push(stream.next().await.unwrap());
988 /// # }
989 /// # results.sort();
990 /// # assert_eq!(results, vec![(2, 2)]);
991 /// # }));
992 /// # }
993 /// ```
994 pub fn filter_map_with_key<U, F>(
995 self,
996 f: impl IntoQuotedMut<'a, F, L> + Copy,
997 ) -> KeyedStream<K, U, L, B, O, R>
998 where
999 F: Fn((K, V)) -> Option<U> + 'a,
1000 K: Clone,
1001 {
1002 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1003 let filter_map_f = q!({
1004 let orig = f;
1005 move |(k, v)| {
1006 let out = orig((Clone::clone(&k), v));
1007 out.map(|o| (k, o))
1008 }
1009 })
1010 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1011 .into();
1012
1013 KeyedStream::new(
1014 self.location.clone(),
1015 HydroNode::FilterMap {
1016 f: filter_map_f,
1017 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1018 metadata: self
1019 .location
1020 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1021 },
1022 )
1023 }
1024
1025 /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
1026 /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
1027 /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
1028 ///
1029 /// # Example
1030 /// ```rust
1031 /// # #[cfg(feature = "deploy")] {
1032 /// # use hydro_lang::prelude::*;
1033 /// # use futures::StreamExt;
1034 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1035 /// let tick = process.tick();
1036 /// let batch = process
1037 /// .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
1038 /// .into_keyed()
1039 /// .batch(&tick, nondet!(/** test */));
1040 /// let count = batch.clone().entries().count(); // `count()` returns a singleton
1041 /// batch.cross_singleton(count).all_ticks().entries()
1042 /// # }, |mut stream| async move {
1043 /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
1044 /// # let mut results = Vec::new();
1045 /// # for _ in 0..3 {
1046 /// # results.push(stream.next().await.unwrap());
1047 /// # }
1048 /// # results.sort();
1049 /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
1050 /// # }));
1051 /// # }
1052 /// ```
1053 pub fn cross_singleton<O2>(
1054 self,
1055 other: impl Into<Optional<O2, L, Bounded>>,
1056 ) -> KeyedStream<K, (V, O2), L, B, O, R>
1057 where
1058 O2: Clone,
1059 {
1060 let other: Optional<O2, L, Bounded> = other.into();
1061 check_matching_location(&self.location, &other.location);
1062
1063 Stream::new(
1064 self.location.clone(),
1065 HydroNode::CrossSingleton {
1066 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1067 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1068 metadata: self
1069 .location
1070 .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
1071 },
1072 )
1073 .map(q!(|((k, v), o2)| (k, (v, o2))))
1074 .into_keyed()
1075 }
1076
1077 /// For each value `v` in each group, transform `v` using `f` and then treat the
1078 /// result as an [`Iterator`] to produce values one by one within the same group.
1079 /// The implementation for [`Iterator`] for the output type `I` must produce items
1080 /// in a **deterministic** order.
1081 ///
1082 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
1083 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
1084 ///
1085 /// # Example
1086 /// ```rust
1087 /// # #[cfg(feature = "deploy")] {
1088 /// # use hydro_lang::prelude::*;
1089 /// # use futures::StreamExt;
1090 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1091 /// process
1092 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1093 /// .into_keyed()
1094 /// .flat_map_ordered(q!(|x| x))
1095 /// # .entries()
1096 /// # }, |mut stream| async move {
1097 /// // { 1: [2, 3, 4], 2: [5, 6] }
1098 /// # let mut results = Vec::new();
1099 /// # for _ in 0..5 {
1100 /// # results.push(stream.next().await.unwrap());
1101 /// # }
1102 /// # results.sort();
1103 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1104 /// # }));
1105 /// # }
1106 /// ```
1107 pub fn flat_map_ordered<U, I, F>(
1108 self,
1109 f: impl IntoQuotedMut<'a, F, L> + Copy,
1110 ) -> KeyedStream<K, U, L, B, O, R>
1111 where
1112 I: IntoIterator<Item = U>,
1113 F: Fn(V) -> I + 'a,
1114 K: Clone,
1115 {
1116 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1117 let flat_map_f = q!({
1118 let orig = f;
1119 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1120 })
1121 .splice_fn1_ctx::<(K, V), _>(&self.location)
1122 .into();
1123
1124 KeyedStream::new(
1125 self.location.clone(),
1126 HydroNode::FlatMap {
1127 f: flat_map_f,
1128 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1129 metadata: self
1130 .location
1131 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1132 },
1133 )
1134 }
1135
1136 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1137 /// for the output type `I` to produce items in any order.
1138 ///
1139 /// # Example
1140 /// ```rust
1141 /// # #[cfg(feature = "deploy")] {
1142 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1143 /// # use futures::StreamExt;
1144 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1145 /// process
1146 /// .source_iter(q!(vec![
1147 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1148 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1149 /// ]))
1150 /// .into_keyed()
1151 /// .flat_map_unordered(q!(|x| x))
1152 /// # .entries()
1153 /// # }, |mut stream| async move {
1154 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1155 /// # let mut results = Vec::new();
1156 /// # for _ in 0..4 {
1157 /// # results.push(stream.next().await.unwrap());
1158 /// # }
1159 /// # results.sort();
1160 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1161 /// # }));
1162 /// # }
1163 /// ```
1164 pub fn flat_map_unordered<U, I, F>(
1165 self,
1166 f: impl IntoQuotedMut<'a, F, L> + Copy,
1167 ) -> KeyedStream<K, U, L, B, NoOrder, R>
1168 where
1169 I: IntoIterator<Item = U>,
1170 F: Fn(V) -> I + 'a,
1171 K: Clone,
1172 {
1173 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1174 let flat_map_f = q!({
1175 let orig = f;
1176 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1177 })
1178 .splice_fn1_ctx::<(K, V), _>(&self.location)
1179 .into();
1180
1181 KeyedStream::new(
1182 self.location.clone(),
1183 HydroNode::FlatMap {
1184 f: flat_map_f,
1185 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1186 metadata: self
1187 .location
1188 .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1189 },
1190 )
1191 }
1192
1193 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1194 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1195 /// items in a **deterministic** order.
1196 ///
1197 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1198 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1199 ///
1200 /// # Example
1201 /// ```rust
1202 /// # #[cfg(feature = "deploy")] {
1203 /// # use hydro_lang::prelude::*;
1204 /// # use futures::StreamExt;
1205 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1206 /// process
1207 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1208 /// .into_keyed()
1209 /// .flatten_ordered()
1210 /// # .entries()
1211 /// # }, |mut stream| async move {
1212 /// // { 1: [2, 3, 4], 2: [5, 6] }
1213 /// # let mut results = Vec::new();
1214 /// # for _ in 0..5 {
1215 /// # results.push(stream.next().await.unwrap());
1216 /// # }
1217 /// # results.sort();
1218 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1219 /// # }));
1220 /// # }
1221 /// ```
1222 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1223 where
1224 V: IntoIterator<Item = U>,
1225 K: Clone,
1226 {
1227 self.flat_map_ordered(q!(|d| d))
1228 }
1229
1230 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1231 /// for the value type `V` to produce items in any order.
1232 ///
1233 /// # Example
1234 /// ```rust
1235 /// # #[cfg(feature = "deploy")] {
1236 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1237 /// # use futures::StreamExt;
1238 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1239 /// process
1240 /// .source_iter(q!(vec![
1241 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1242 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1243 /// ]))
1244 /// .into_keyed()
1245 /// .flatten_unordered()
1246 /// # .entries()
1247 /// # }, |mut stream| async move {
1248 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1249 /// # let mut results = Vec::new();
1250 /// # for _ in 0..4 {
1251 /// # results.push(stream.next().await.unwrap());
1252 /// # }
1253 /// # results.sort();
1254 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1255 /// # }));
1256 /// # }
1257 /// ```
1258 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1259 where
1260 V: IntoIterator<Item = U>,
1261 K: Clone,
1262 {
1263 self.flat_map_unordered(q!(|d| d))
1264 }
1265
1266 /// An operator which allows you to "inspect" each element of a stream without
1267 /// modifying it. The closure `f` is called on a reference to each value. This is
1268 /// mainly useful for debugging, and should not be used to generate side-effects.
1269 ///
1270 /// # Example
1271 /// ```rust
1272 /// # #[cfg(feature = "deploy")] {
1273 /// # use hydro_lang::prelude::*;
1274 /// # use futures::StreamExt;
1275 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1276 /// process
1277 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1278 /// .into_keyed()
1279 /// .inspect(q!(|v| println!("{}", v)))
1280 /// # .entries()
1281 /// # }, |mut stream| async move {
1282 /// # let mut results = Vec::new();
1283 /// # for _ in 0..3 {
1284 /// # results.push(stream.next().await.unwrap());
1285 /// # }
1286 /// # results.sort();
1287 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1288 /// # }));
1289 /// # }
1290 /// ```
1291 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1292 where
1293 F: Fn(&V) + 'a,
1294 {
1295 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1296 let inspect_f = q!({
1297 let orig = f;
1298 move |t: &(_, _)| orig(&t.1)
1299 })
1300 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1301 .into();
1302
1303 KeyedStream::new(
1304 self.location.clone(),
1305 HydroNode::Inspect {
1306 f: inspect_f,
1307 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1308 metadata: self.location.new_node_metadata(Self::collection_kind()),
1309 },
1310 )
1311 }
1312
1313 /// An operator which allows you to "inspect" each element of a stream without
1314 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1315 /// mainly useful for debugging, and should not be used to generate side-effects.
1316 ///
1317 /// # Example
1318 /// ```rust
1319 /// # #[cfg(feature = "deploy")] {
1320 /// # use hydro_lang::prelude::*;
1321 /// # use futures::StreamExt;
1322 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1323 /// process
1324 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1325 /// .into_keyed()
1326 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1327 /// # .entries()
1328 /// # }, |mut stream| async move {
1329 /// # let mut results = Vec::new();
1330 /// # for _ in 0..3 {
1331 /// # results.push(stream.next().await.unwrap());
1332 /// # }
1333 /// # results.sort();
1334 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1335 /// # }));
1336 /// # }
1337 /// ```
1338 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1339 where
1340 F: Fn(&(K, V)) + 'a,
1341 {
1342 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1343
1344 KeyedStream::new(
1345 self.location.clone(),
1346 HydroNode::Inspect {
1347 f: inspect_f,
1348 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1349 metadata: self.location.new_node_metadata(Self::collection_kind()),
1350 },
1351 )
1352 }
1353
1354 /// An operator which allows you to "name" a `HydroNode`.
1355 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1356 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1357 {
1358 let mut node = self.ir_node.borrow_mut();
1359 let metadata = node.metadata_mut();
1360 metadata.tag = Some(name.to_owned());
1361 }
1362 self
1363 }
1364
1365 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1366 ///
1367 /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1368 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1369 /// early by returning `None`.
1370 ///
1371 /// The function takes a mutable reference to the accumulator and the current element, and returns
1372 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1373 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1374 ///
1375 /// # Example
1376 /// ```rust
1377 /// # #[cfg(feature = "deploy")] {
1378 /// # use hydro_lang::prelude::*;
1379 /// # use futures::StreamExt;
1380 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1381 /// process
1382 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1383 /// .into_keyed()
1384 /// .scan(
1385 /// q!(|| 0),
1386 /// q!(|acc, x| {
1387 /// *acc += x;
1388 /// if *acc % 2 == 0 { None } else { Some(*acc) }
1389 /// }),
1390 /// )
1391 /// # .entries()
1392 /// # }, |mut stream| async move {
1393 /// // Output: { 0: [1], 1: [3, 7] }
1394 /// # let mut results = Vec::new();
1395 /// # for _ in 0..3 {
1396 /// # results.push(stream.next().await.unwrap());
1397 /// # }
1398 /// # results.sort();
1399 /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1400 /// # }));
1401 /// # }
1402 /// ```
1403 pub fn scan<A, U, I, F>(
1404 self,
1405 init: impl IntoQuotedMut<'a, I, L> + Copy,
1406 f: impl IntoQuotedMut<'a, F, L> + Copy,
1407 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1408 where
1409 O: IsOrdered,
1410 R: IsExactlyOnce,
1411 K: Clone + Eq + Hash,
1412 I: Fn() -> A + 'a,
1413 F: Fn(&mut A, V) -> Option<U> + 'a,
1414 {
1415 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1416 self.make_totally_ordered().make_exactly_once().generator(
1417 init,
1418 q!({
1419 let orig = f;
1420 move |state, v| {
1421 if let Some(out) = orig(state, v) {
1422 Generate::Yield(out)
1423 } else {
1424 Generate::Break
1425 }
1426 }
1427 }),
1428 )
1429 }
1430
1431 /// Iteratively processes the elements in each group using a state machine that can yield
1432 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1433 /// syntax in Rust, without requiring special syntax.
1434 ///
1435 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1436 /// state for each group. The second argument defines the processing logic, taking in a
1437 /// mutable reference to the group's state and the value to be processed. It emits a
1438 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1439 /// should be processed.
1440 ///
1441 /// # Example
1442 /// ```rust
1443 /// # #[cfg(feature = "deploy")] {
1444 /// # use hydro_lang::prelude::*;
1445 /// # use futures::StreamExt;
1446 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1447 /// process
1448 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1449 /// .into_keyed()
1450 /// .generator(
1451 /// q!(|| 0),
1452 /// q!(|acc, x| {
1453 /// *acc += x;
1454 /// if *acc > 100 {
1455 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
1456 /// "done!".to_owned()
1457 /// )
1458 /// } else if *acc % 2 == 0 {
1459 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
1460 /// "even".to_owned()
1461 /// )
1462 /// } else {
1463 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1464 /// }
1465 /// }),
1466 /// )
1467 /// # .entries()
1468 /// # }, |mut stream| async move {
1469 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1470 /// # let mut results = Vec::new();
1471 /// # for _ in 0..3 {
1472 /// # results.push(stream.next().await.unwrap());
1473 /// # }
1474 /// # results.sort();
1475 /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1476 /// # }));
1477 /// # }
1478 /// ```
1479 pub fn generator<A, U, I, F>(
1480 self,
1481 init: impl IntoQuotedMut<'a, I, L> + Copy,
1482 f: impl IntoQuotedMut<'a, F, L> + Copy,
1483 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1484 where
1485 O: IsOrdered,
1486 R: IsExactlyOnce,
1487 K: Clone + Eq + Hash,
1488 I: Fn() -> A + 'a,
1489 F: Fn(&mut A, V) -> Generate<U> + 'a,
1490 {
1491 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1492 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1493
1494 let this = self.make_totally_ordered().make_exactly_once();
1495
1496 let scan_init = q!(|| HashMap::new())
1497 .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1498 .into();
1499 let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1500 let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1501 if let Some(existing_state_value) = existing_state {
1502 match f(existing_state_value, v) {
1503 Generate::Yield(out) => Some(Some((k, out))),
1504 Generate::Return(out) => {
1505 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1506 Some(Some((k, out)))
1507 }
1508 Generate::Break => {
1509 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1510 Some(None)
1511 }
1512 Generate::Continue => Some(None),
1513 }
1514 } else {
1515 Some(None)
1516 }
1517 })
1518 .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1519 .into();
1520
1521 let scan_node = HydroNode::Scan {
1522 init: scan_init,
1523 acc: scan_f,
1524 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1525 metadata: this.location.new_node_metadata(Stream::<
1526 Option<(K, U)>,
1527 L,
1528 B,
1529 TotalOrder,
1530 ExactlyOnce,
1531 >::collection_kind()),
1532 };
1533
1534 let flatten_f = q!(|d| d)
1535 .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1536 .into();
1537 let flatten_node = HydroNode::FlatMap {
1538 f: flatten_f,
1539 input: Box::new(scan_node),
1540 metadata: this.location.new_node_metadata(KeyedStream::<
1541 K,
1542 U,
1543 L,
1544 B,
1545 TotalOrder,
1546 ExactlyOnce,
1547 >::collection_kind()),
1548 };
1549
1550 KeyedStream::new(this.location.clone(), flatten_node)
1551 }
1552
1553 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1554 /// in-order across the values in each group. But the aggregation function returns a boolean,
1555 /// which when true indicates that the aggregated result is complete and can be released to
1556 /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1557 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1558 /// normal stream elements.
1559 ///
1560 /// # Example
1561 /// ```rust
1562 /// # #[cfg(feature = "deploy")] {
1563 /// # use hydro_lang::prelude::*;
1564 /// # use futures::StreamExt;
1565 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1566 /// process
1567 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1568 /// .into_keyed()
1569 /// .fold_early_stop(
1570 /// q!(|| 0),
1571 /// q!(|acc, x| {
1572 /// *acc += x;
1573 /// x % 2 == 0
1574 /// }),
1575 /// )
1576 /// # .entries()
1577 /// # }, |mut stream| async move {
1578 /// // Output: { 0: 2, 1: 9 }
1579 /// # let mut results = Vec::new();
1580 /// # for _ in 0..2 {
1581 /// # results.push(stream.next().await.unwrap());
1582 /// # }
1583 /// # results.sort();
1584 /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1585 /// # }));
1586 /// # }
1587 /// ```
1588 pub fn fold_early_stop<A, I, F>(
1589 self,
1590 init: impl IntoQuotedMut<'a, I, L> + Copy,
1591 f: impl IntoQuotedMut<'a, F, L> + Copy,
1592 ) -> KeyedSingleton<K, A, L, B::WithBoundedValue>
1593 where
1594 O: IsOrdered,
1595 R: IsExactlyOnce,
1596 K: Clone + Eq + Hash,
1597 I: Fn() -> A + 'a,
1598 F: Fn(&mut A, V) -> bool + 'a,
1599 {
1600 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1601 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1602 let out_without_bound_cast = self.generator(
1603 q!(move || Some(init())),
1604 q!(move |key_state, v| {
1605 if let Some(key_state_value) = key_state.as_mut() {
1606 if f(key_state_value, v) {
1607 Generate::Return(key_state.take().unwrap())
1608 } else {
1609 Generate::Continue
1610 }
1611 } else {
1612 unreachable!()
1613 }
1614 }),
1615 );
1616
1617 // SAFETY: The generator will only ever return at most one value per key, since once it
1618 // returns a value for a key it will never process any more values for that key.
1619 out_without_bound_cast.cast_at_most_one_entry_per_key()
1620 }
1621
1622 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1623 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1624 /// otherwise the first element would be non-deterministic.
1625 ///
1626 /// # Example
1627 /// ```rust
1628 /// # #[cfg(feature = "deploy")] {
1629 /// # use hydro_lang::prelude::*;
1630 /// # use futures::StreamExt;
1631 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1632 /// process
1633 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1634 /// .into_keyed()
1635 /// .first()
1636 /// # .entries()
1637 /// # }, |mut stream| async move {
1638 /// // Output: { 0: 2, 1: 3 }
1639 /// # let mut results = Vec::new();
1640 /// # for _ in 0..2 {
1641 /// # results.push(stream.next().await.unwrap());
1642 /// # }
1643 /// # results.sort();
1644 /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1645 /// # }));
1646 /// # }
1647 /// ```
1648 pub fn first(self) -> KeyedSingleton<K, V, L, B::WithBoundedValue>
1649 where
1650 O: IsOrdered,
1651 R: IsExactlyOnce,
1652 K: Clone + Eq + Hash,
1653 {
1654 self.fold_early_stop(
1655 q!(|| None),
1656 q!(|acc, v| {
1657 *acc = Some(v);
1658 true
1659 }),
1660 )
1661 .map(q!(|v| v.unwrap()))
1662 }
1663
1664 /// Returns a keyed stream containing at most the first `n` values per key,
1665 /// preserving the original order within each group. Similar to SQL `LIMIT`
1666 /// applied per group.
1667 ///
1668 /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1669 /// retries, since the result depends on the order and cardinality of elements
1670 /// within each group.
1671 ///
1672 /// # Example
1673 /// ```rust
1674 /// # #[cfg(feature = "deploy")] {
1675 /// # use hydro_lang::prelude::*;
1676 /// # use futures::StreamExt;
1677 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1678 /// process
1679 /// .source_iter(q!(vec![(1, 10), (1, 20), (1, 30), (2, 40), (2, 50)]))
1680 /// .into_keyed()
1681 /// .limit(q!(2))
1682 /// # .entries()
1683 /// # }, |mut stream| async move {
1684 /// // { 1: [10, 20], 2: [40, 50] }
1685 /// # let mut results = Vec::new();
1686 /// # for _ in 0..4 {
1687 /// # results.push(stream.next().await.unwrap());
1688 /// # }
1689 /// # results.sort();
1690 /// # assert_eq!(results, vec![(1, 10), (1, 20), (2, 40), (2, 50)]);
1691 /// # }));
1692 /// # }
1693 /// ```
1694 pub fn limit(
1695 self,
1696 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1697 ) -> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1698 where
1699 O: IsOrdered,
1700 R: IsExactlyOnce,
1701 K: Clone + Eq + Hash,
1702 {
1703 self.generator(
1704 q!(|| 0usize),
1705 q!(move |count, item| {
1706 if *count == n {
1707 Generate::Break
1708 } else {
1709 *count += 1;
1710 if *count == n {
1711 Generate::Return(item)
1712 } else {
1713 Generate::Yield(item)
1714 }
1715 }
1716 }),
1717 )
1718 }
1719
1720 /// Assigns a zero-based index to each value within each key group, emitting
1721 /// `(K, (index, V))` tuples with per-key sequential indices.
1722 ///
1723 /// The output keyed stream has [`TotalOrder`] and [`ExactlyOnce`] guarantees.
1724 /// This is a streaming operator that processes elements as they arrive.
1725 ///
1726 /// # Example
1727 /// ```rust
1728 /// # #[cfg(feature = "deploy")] {
1729 /// # use hydro_lang::prelude::*;
1730 /// # use futures::StreamExt;
1731 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1732 /// process
1733 /// .source_iter(q!(vec![(1, 10), (2, 20), (1, 30)]))
1734 /// .into_keyed()
1735 /// .enumerate()
1736 /// # .entries()
1737 /// # }, |mut stream| async move {
1738 /// // per-key indices: { 1: [(0, 10), (1, 30)], 2: [(0, 20)] }
1739 /// # let mut results = Vec::new();
1740 /// # for _ in 0..3 {
1741 /// # results.push(stream.next().await.unwrap());
1742 /// # }
1743 /// # let key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1744 /// # let key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1745 /// # assert_eq!(key1, vec![(0, 10), (1, 30)]);
1746 /// # assert_eq!(key2, vec![(0, 20)]);
1747 /// # }));
1748 /// # }
1749 /// ```
1750 pub fn enumerate(self) -> KeyedStream<K, (usize, V), L, B, TotalOrder, ExactlyOnce>
1751 where
1752 O: IsOrdered,
1753 R: IsExactlyOnce,
1754 K: Eq + Hash + Clone,
1755 {
1756 self.scan(
1757 q!(|| 0),
1758 q!(|acc, next| {
1759 let curr = *acc;
1760 *acc += 1;
1761 Some((curr, next))
1762 }),
1763 )
1764 }
1765
1766 /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1767 ///
1768 /// # Example
1769 /// ```rust
1770 /// # #[cfg(feature = "deploy")] {
1771 /// # use hydro_lang::prelude::*;
1772 /// # use futures::StreamExt;
1773 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1774 /// let tick = process.tick();
1775 /// let numbers = process
1776 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1777 /// .into_keyed();
1778 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1779 /// batch
1780 /// .value_counts()
1781 /// .entries()
1782 /// .all_ticks()
1783 /// # }, |mut stream| async move {
1784 /// // (1, 3), (2, 2)
1785 /// # let mut results = Vec::new();
1786 /// # for _ in 0..2 {
1787 /// # results.push(stream.next().await.unwrap());
1788 /// # }
1789 /// # results.sort();
1790 /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1791 /// # }));
1792 /// # }
1793 /// ```
1794 pub fn value_counts(
1795 self,
1796 ) -> KeyedSingleton<K, usize, L, <B as KeyedSingletonBound>::KeyedStreamToMonotone>
1797 where
1798 R: IsExactlyOnce,
1799 K: Eq + Hash,
1800 {
1801 self.make_exactly_once()
1802 .assume_ordering_trusted(
1803 nondet!(/** ordering within each group affects neither result nor intermediates */),
1804 )
1805 .fold(
1806 q!(|| 0),
1807 q!(
1808 |acc, _| *acc += 1,
1809 monotone = manual_proof!(/** += 1 is monotonic */)
1810 ),
1811 )
1812 }
1813
1814 /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1815 /// group via the `comb` closure.
1816 ///
1817 /// Depending on the input stream guarantees, the closure may need to be commutative
1818 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1819 ///
1820 /// If the input and output value types are the same and do not require initialization then use
1821 /// [`KeyedStream::reduce`].
1822 ///
1823 /// # Example
1824 /// ```rust
1825 /// # #[cfg(feature = "deploy")] {
1826 /// # use hydro_lang::prelude::*;
1827 /// # use futures::StreamExt;
1828 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1829 /// let tick = process.tick();
1830 /// let numbers = process
1831 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1832 /// .into_keyed();
1833 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1834 /// batch
1835 /// .fold(q!(|| false), q!(|acc, x| *acc |= x))
1836 /// .entries()
1837 /// .all_ticks()
1838 /// # }, |mut stream| async move {
1839 /// // (1, false), (2, true)
1840 /// # let mut results = Vec::new();
1841 /// # for _ in 0..2 {
1842 /// # results.push(stream.next().await.unwrap());
1843 /// # }
1844 /// # results.sort();
1845 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1846 /// # }));
1847 /// # }
1848 /// ```
1849 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp, M, B2: KeyedSingletonBound>(
1850 self,
1851 init: impl IntoQuotedMut<'a, I, L>,
1852 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1853 ) -> KeyedSingleton<K, A, L, B2>
1854 where
1855 K: Eq + Hash,
1856 C: ValidCommutativityFor<O>,
1857 Idemp: ValidIdempotenceFor<R>,
1858 B: ApplyMonotoneKeyedStream<M, B2>,
1859 {
1860 let init = init.splice_fn0_ctx(&self.location).into();
1861 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1862 proof.register_proof(&comb);
1863
1864 let ordered = self
1865 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1866 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1867
1868 KeyedSingleton::new(
1869 ordered.location.clone(),
1870 HydroNode::FoldKeyed {
1871 init,
1872 acc: comb.into(),
1873 input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1874 metadata: ordered
1875 .location
1876 .new_node_metadata(KeyedSingleton::<K, A, L, B2>::collection_kind()),
1877 },
1878 )
1879 }
1880
1881 /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1882 /// group via the `comb` closure.
1883 ///
1884 /// Depending on the input stream guarantees, the closure may need to be commutative
1885 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1886 ///
1887 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1888 ///
1889 /// # Example
1890 /// ```rust
1891 /// # #[cfg(feature = "deploy")] {
1892 /// # use hydro_lang::prelude::*;
1893 /// # use futures::StreamExt;
1894 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1895 /// let tick = process.tick();
1896 /// let numbers = process
1897 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1898 /// .into_keyed();
1899 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1900 /// batch
1901 /// .reduce(q!(|acc, x| *acc |= x))
1902 /// .entries()
1903 /// .all_ticks()
1904 /// # }, |mut stream| async move {
1905 /// // (1, false), (2, true)
1906 /// # let mut results = Vec::new();
1907 /// # for _ in 0..2 {
1908 /// # results.push(stream.next().await.unwrap());
1909 /// # }
1910 /// # results.sort();
1911 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1912 /// # }));
1913 /// # }
1914 /// ```
1915 pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1916 self,
1917 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1918 ) -> KeyedSingleton<K, V, L, B>
1919 where
1920 K: Eq + Hash,
1921 C: ValidCommutativityFor<O>,
1922 Idemp: ValidIdempotenceFor<R>,
1923 {
1924 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1925 proof.register_proof(&f);
1926
1927 let ordered = self
1928 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1929 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1930
1931 KeyedSingleton::new(
1932 ordered.location.clone(),
1933 HydroNode::ReduceKeyed {
1934 f: f.into(),
1935 input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1936 metadata: ordered
1937 .location
1938 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1939 },
1940 )
1941 }
1942
1943 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1944 /// are automatically deleted.
1945 ///
1946 /// Depending on the input stream guarantees, the closure may need to be commutative
1947 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1948 ///
1949 /// # Example
1950 /// ```rust
1951 /// # #[cfg(feature = "deploy")] {
1952 /// # use hydro_lang::prelude::*;
1953 /// # use futures::StreamExt;
1954 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1955 /// let tick = process.tick();
1956 /// let watermark = tick.singleton(q!(2));
1957 /// let numbers = process
1958 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1959 /// .into_keyed();
1960 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1961 /// batch
1962 /// .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1963 /// .entries()
1964 /// .all_ticks()
1965 /// # }, |mut stream| async move {
1966 /// // (2, true)
1967 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1968 /// # }));
1969 /// # }
1970 /// ```
1971 pub fn reduce_watermark<O2, F, C, Idemp>(
1972 self,
1973 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1974 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1975 ) -> KeyedSingleton<K, V, L, B>
1976 where
1977 K: Eq + Hash,
1978 O2: Clone,
1979 F: Fn(&mut V, V) + 'a,
1980 C: ValidCommutativityFor<O>,
1981 Idemp: ValidIdempotenceFor<R>,
1982 {
1983 let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1984 check_matching_location(&self.location.root(), other.location.outer());
1985 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1986 proof.register_proof(&f);
1987
1988 let ordered = self
1989 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1990 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1991
1992 KeyedSingleton::new(
1993 ordered.location.clone(),
1994 HydroNode::ReduceKeyedWatermark {
1995 f: f.into(),
1996 input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1997 watermark: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1998 metadata: ordered
1999 .location
2000 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
2001 },
2002 )
2003 }
2004
2005 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
2006 /// whose keys are not in the bounded stream.
2007 ///
2008 /// # Example
2009 /// ```rust
2010 /// # #[cfg(feature = "deploy")] {
2011 /// # use hydro_lang::prelude::*;
2012 /// # use futures::StreamExt;
2013 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2014 /// let tick = process.tick();
2015 /// let keyed_stream = process
2016 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2017 /// .batch(&tick, nondet!(/** test */))
2018 /// .into_keyed();
2019 /// let keys_to_remove = process
2020 /// .source_iter(q!(vec![1, 2]))
2021 /// .batch(&tick, nondet!(/** test */));
2022 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
2023 /// # .entries()
2024 /// # }, |mut stream| async move {
2025 /// // { 3: ['c'], 4: ['d'] }
2026 /// # let mut results = Vec::new();
2027 /// # for _ in 0..2 {
2028 /// # results.push(stream.next().await.unwrap());
2029 /// # }
2030 /// # results.sort();
2031 /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
2032 /// # }));
2033 /// # }
2034 /// ```
2035 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
2036 self,
2037 other: Stream<K, L, Bounded, O2, R2>,
2038 ) -> Self
2039 where
2040 K: Eq + Hash,
2041 {
2042 check_matching_location(&self.location, &other.location);
2043
2044 KeyedStream::new(
2045 self.location.clone(),
2046 HydroNode::AntiJoin {
2047 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2048 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2049 metadata: self.location.new_node_metadata(Self::collection_kind()),
2050 },
2051 )
2052 }
2053
2054 /// Emit a keyed stream containing keys shared between two keyed streams,
2055 /// where each value in the output keyed stream is a tuple of
2056 /// (self's value, other's value).
2057 /// If there are multiple values for the same key, this performs a cross product
2058 /// for each matching key.
2059 ///
2060 /// # Example
2061 /// ```rust
2062 /// # #[cfg(feature = "deploy")] {
2063 /// # use hydro_lang::prelude::*;
2064 /// # use futures::StreamExt;
2065 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2066 /// let tick = process.tick();
2067 /// let keyed_data = process
2068 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2069 /// .into_keyed()
2070 /// .batch(&tick, nondet!(/** test */));
2071 /// let other_data = process
2072 /// .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
2073 /// .into_keyed()
2074 /// .batch(&tick, nondet!(/** test */));
2075 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
2076 /// # }, |mut stream| async move {
2077 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
2078 /// # let mut results = vec![];
2079 /// # for _ in 0..4 {
2080 /// # results.push(stream.next().await.unwrap());
2081 /// # }
2082 /// # results.sort();
2083 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
2084 /// # }));
2085 /// # }
2086 /// ```
2087 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2088 pub fn join_keyed_stream<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2089 self,
2090 other: KeyedStream<K, V2, L, B2, O2, R2>,
2091 ) -> KeyedStream<
2092 K,
2093 (V, V2),
2094 L,
2095 B,
2096 B2::PreserveOrderIfBounded<NoOrder>,
2097 <R as MinRetries<R2>>::Min,
2098 >
2099 where
2100 K: Eq + Hash + Clone,
2101 R: MinRetries<R2>,
2102 V: Clone,
2103 V2: Clone,
2104 {
2105 self.entries().join(other.entries()).into_keyed()
2106 }
2107
2108 /// Deduplicates values within each key group, emitting each unique value per key
2109 /// exactly once.
2110 ///
2111 /// # Example
2112 /// ```rust
2113 /// # #[cfg(feature = "deploy")] {
2114 /// # use hydro_lang::prelude::*;
2115 /// # use futures::StreamExt;
2116 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2117 /// process
2118 /// .source_iter(q!(vec![(1, 10), (2, 20), (1, 10), (2, 30), (1, 20)]))
2119 /// .into_keyed()
2120 /// .unique()
2121 /// # .entries()
2122 /// # }, |mut stream| async move {
2123 /// // unique values per key: { 1: [10, 20], 2: [20, 30] }
2124 /// # let mut results = Vec::new();
2125 /// # for _ in 0..4 {
2126 /// # results.push(stream.next().await.unwrap());
2127 /// # }
2128 /// # let mut key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2129 /// # let mut key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2130 /// # key1.sort();
2131 /// # key2.sort();
2132 /// # assert_eq!(key1, vec![10, 20]);
2133 /// # assert_eq!(key2, vec![20, 30]);
2134 /// # }));
2135 /// # }
2136 /// ```
2137 pub fn unique(self) -> KeyedStream<K, V, L, B, NoOrder, ExactlyOnce>
2138 where
2139 K: Eq + Hash + Clone,
2140 V: Eq + Hash + Clone,
2141 {
2142 self.entries().unique().into_keyed()
2143 }
2144
2145 /// Sorts the values within each key group in ascending order.
2146 ///
2147 /// The output keyed stream has a [`TotalOrder`] guarantee on the values within
2148 /// each group. This operator will block until all elements in the input stream
2149 /// are available, so it requires the input stream to be [`Bounded`].
2150 ///
2151 /// # Example
2152 /// ```rust
2153 /// # #[cfg(feature = "deploy")] {
2154 /// # use hydro_lang::prelude::*;
2155 /// # use futures::StreamExt;
2156 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2157 /// let tick = process.tick();
2158 /// let numbers = process
2159 /// .source_iter(q!(vec![(1, 3), (2, 1), (1, 1), (2, 2)]))
2160 /// .into_keyed();
2161 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2162 /// batch.sort().all_ticks()
2163 /// # .entries()
2164 /// # }, |mut stream| async move {
2165 /// // values sorted within each key: { 1: [1, 3], 2: [1, 2] }
2166 /// # let mut results = Vec::new();
2167 /// # for _ in 0..4 {
2168 /// # results.push(stream.next().await.unwrap());
2169 /// # }
2170 /// # let key1_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2171 /// # let key2_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2172 /// # assert_eq!(key1_vals, vec![1, 3]);
2173 /// # assert_eq!(key2_vals, vec![1, 2]);
2174 /// # }));
2175 /// # }
2176 /// ```
2177 pub fn sort(self) -> KeyedStream<K, V, L, Bounded, TotalOrder, R>
2178 where
2179 B: IsBounded,
2180 K: Ord,
2181 V: Ord,
2182 {
2183 self.entries().sort().into_keyed()
2184 }
2185
2186 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2187 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2188 /// is only present in one of the inputs, its values are passed through as-is). The output has
2189 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2190 ///
2191 /// Currently, both input streams must be [`Bounded`]. This operator will block
2192 /// on the first stream until all its elements are available. In a future version,
2193 /// we will relax the requirement on the `other` stream.
2194 ///
2195 /// # Example
2196 /// ```rust
2197 /// # #[cfg(feature = "deploy")] {
2198 /// # use hydro_lang::prelude::*;
2199 /// # use futures::StreamExt;
2200 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2201 /// let tick = process.tick();
2202 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2203 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2204 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2205 /// # .entries()
2206 /// # }, |mut stream| async move {
2207 /// // { 0: [2, 1], 1: [4, 3] }
2208 /// # let mut results = Vec::new();
2209 /// # for _ in 0..4 {
2210 /// # results.push(stream.next().await.unwrap());
2211 /// # }
2212 /// # results.sort();
2213 /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2214 /// # }));
2215 /// # }
2216 /// ```
2217 pub fn chain<O2: Ordering, R2: Retries>(
2218 self,
2219 other: KeyedStream<K, V, L, Bounded, O2, R2>,
2220 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2221 where
2222 B: IsBounded,
2223 O: MinOrder<O2>,
2224 R: MinRetries<R2>,
2225 {
2226 let this = self.make_bounded();
2227 check_matching_location(&this.location, &other.location);
2228
2229 KeyedStream::new(
2230 this.location.clone(),
2231 HydroNode::Chain {
2232 first: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2233 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2234 metadata: this.location.new_node_metadata(KeyedStream::<
2235 K,
2236 V,
2237 L,
2238 Bounded,
2239 <O as MinOrder<O2>>::Min,
2240 <R as MinRetries<R2>>::Min,
2241 >::collection_kind()),
2242 },
2243 )
2244 }
2245
2246 /// Emit a keyed stream containing keys shared between the keyed stream and the
2247 /// keyed singleton, where each value in the output keyed stream is a tuple of
2248 /// (the keyed stream's value, the keyed singleton's value).
2249 ///
2250 /// # Example
2251 /// ```rust
2252 /// # #[cfg(feature = "deploy")] {
2253 /// # use hydro_lang::prelude::*;
2254 /// # use futures::StreamExt;
2255 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2256 /// let tick = process.tick();
2257 /// let keyed_data = process
2258 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2259 /// .into_keyed()
2260 /// .batch(&tick, nondet!(/** test */));
2261 /// let singleton_data = process
2262 /// .source_iter(q!(vec![(1, 100), (2, 200)]))
2263 /// .into_keyed()
2264 /// .batch(&tick, nondet!(/** test */))
2265 /// .first();
2266 /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2267 /// # }, |mut stream| async move {
2268 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2269 /// # let mut results = vec![];
2270 /// # for _ in 0..3 {
2271 /// # results.push(stream.next().await.unwrap());
2272 /// # }
2273 /// # results.sort();
2274 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2275 /// # }));
2276 /// # }
2277 /// ```
2278 pub fn join_keyed_singleton<V2: Clone, B2: IsBounded>(
2279 self,
2280 other: KeyedSingleton<K, V2, L, B2>,
2281 ) -> KeyedStream<K, (V, V2), L, B, O, R>
2282 where
2283 K: Eq + Hash + Clone,
2284 V: Clone,
2285 {
2286 let ir_node = if B2::BOUNDED {
2287 HydroNode::JoinHalf {
2288 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2289 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2290 metadata: self
2291 .location
2292 .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2293 }
2294 } else {
2295 HydroNode::Join {
2296 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2297 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2298 metadata: self
2299 .location
2300 .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2301 }
2302 };
2303
2304 KeyedStream::new(self.location.clone(), ir_node)
2305 }
2306
2307 /// Gets the values associated with a specific key from the keyed stream.
2308 /// Returns an empty stream if the key is `None` or there are no associated values.
2309 ///
2310 /// # Example
2311 /// ```rust
2312 /// # #[cfg(feature = "deploy")] {
2313 /// # use hydro_lang::prelude::*;
2314 /// # use futures::StreamExt;
2315 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2316 /// let tick = process.tick();
2317 /// let keyed_data = process
2318 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2319 /// .into_keyed()
2320 /// .batch(&tick, nondet!(/** test */));
2321 /// let key = tick.singleton(q!(1));
2322 /// keyed_data.get(key).all_ticks()
2323 /// # }, |mut stream| async move {
2324 /// // 10, 11
2325 /// # let mut results = vec![];
2326 /// # for _ in 0..2 {
2327 /// # results.push(stream.next().await.unwrap());
2328 /// # }
2329 /// # results.sort();
2330 /// # assert_eq!(results, vec![10, 11]);
2331 /// # }));
2332 /// # }
2333 /// ```
2334 pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Stream<V, L, B, O, R>
2335 where
2336 K: Eq + Hash + Clone,
2337 V: Clone,
2338 {
2339 let joined =
2340 self.join_keyed_singleton(key.into().map(q!(|k| (k, ()))).into_keyed_singleton());
2341
2342 if O::ORDERING_KIND == StreamOrder::TotalOrder {
2343 joined
2344 .use_ordering_type::<TotalOrder>()
2345 .cast_at_most_one_key()
2346 .map(q!(|(_, (v, _))| v))
2347 .weaken_ordering()
2348 } else {
2349 joined.values().map(q!(|(v, _)| v)).use_ordering_type()
2350 }
2351 }
2352
2353 /// For each value in `self`, find the matching key in `lookup`.
2354 /// The output is a keyed stream with the key from `self`, and a value
2355 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2356 /// If the key is not present in `lookup`, the option will be [`None`].
2357 ///
2358 /// # Example
2359 /// ```rust
2360 /// # #[cfg(feature = "deploy")] {
2361 /// # use hydro_lang::prelude::*;
2362 /// # use futures::StreamExt;
2363 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2364 /// # let tick = process.tick();
2365 /// let requests = // { 1: [10, 11], 2: 20 }
2366 /// # process
2367 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2368 /// # .into_keyed()
2369 /// # .batch(&tick, nondet!(/** test */));
2370 /// let other_data = // { 10: 100, 11: 110 }
2371 /// # process
2372 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
2373 /// # .into_keyed()
2374 /// # .batch(&tick, nondet!(/** test */))
2375 /// # .first();
2376 /// requests.lookup_keyed_singleton(other_data)
2377 /// # .entries().all_ticks()
2378 /// # }, |mut stream| async move {
2379 /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2380 /// # let mut results = vec![];
2381 /// # for _ in 0..3 {
2382 /// # results.push(stream.next().await.unwrap());
2383 /// # }
2384 /// # results.sort();
2385 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2386 /// # }));
2387 /// # }
2388 /// ```
2389 pub fn lookup_keyed_singleton<V2>(
2390 self,
2391 lookup: KeyedSingleton<V, V2, L, Bounded>,
2392 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2393 where
2394 B: IsBounded,
2395 K: Eq + Hash + Clone,
2396 V: Eq + Hash + Clone,
2397 V2: Clone,
2398 {
2399 self.lookup_keyed_stream(
2400 lookup
2401 .into_keyed_stream()
2402 .assume_retries::<R>(nondet!(/** Retries are irrelevant for keyed singletons */)),
2403 )
2404 }
2405
2406 /// For each value in `self`, find the matching key in `lookup`.
2407 /// The output is a keyed stream with the key from `self`, and a value
2408 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2409 /// If the key is not present in `lookup`, the option will be [`None`].
2410 ///
2411 /// # Example
2412 /// ```rust
2413 /// # #[cfg(feature = "deploy")] {
2414 /// # use hydro_lang::prelude::*;
2415 /// # use futures::StreamExt;
2416 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2417 /// # let tick = process.tick();
2418 /// let requests = // { 1: [10, 11], 2: 20 }
2419 /// # process
2420 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2421 /// # .into_keyed()
2422 /// # .batch(&tick, nondet!(/** test */));
2423 /// let other_data = // { 10: [100, 101], 11: 110 }
2424 /// # process
2425 /// # .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2426 /// # .into_keyed()
2427 /// # .batch(&tick, nondet!(/** test */));
2428 /// requests.lookup_keyed_stream(other_data)
2429 /// # .entries().all_ticks()
2430 /// # }, |mut stream| async move {
2431 /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2432 /// # let mut results = vec![];
2433 /// # for _ in 0..4 {
2434 /// # results.push(stream.next().await.unwrap());
2435 /// # }
2436 /// # results.sort();
2437 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2438 /// # }));
2439 /// # }
2440 /// ```
2441 #[expect(clippy::type_complexity, reason = "retries propagation")]
2442 pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2443 self,
2444 lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2445 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2446 where
2447 B: IsBounded,
2448 K: Eq + Hash + Clone,
2449 V: Eq + Hash + Clone,
2450 V2: Clone,
2451 R: MinRetries<R2>,
2452 {
2453 let inverted = self
2454 .make_bounded()
2455 .entries()
2456 .map(q!(|(key, lookup_value)| (lookup_value, key)))
2457 .into_keyed();
2458 let found = inverted
2459 .clone()
2460 .join_keyed_stream(lookup.clone())
2461 .entries()
2462 .map(q!(|(lookup_value, (key, value))| (
2463 key,
2464 (lookup_value, Some(value))
2465 )))
2466 .into_keyed();
2467 let not_found = inverted
2468 .filter_key_not_in(lookup.keys())
2469 .entries()
2470 .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2471 .into_keyed();
2472
2473 found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2474 }
2475
2476 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2477 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2478 ///
2479 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2480 /// processed before an acknowledgement is emitted.
2481 pub fn atomic(self) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2482 let id = self.location.flow_state().borrow_mut().next_clock_id();
2483 let out_location = Atomic {
2484 tick: Tick {
2485 id,
2486 l: self.location.clone(),
2487 },
2488 };
2489 KeyedStream::new(
2490 out_location.clone(),
2491 HydroNode::BeginAtomic {
2492 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2493 metadata: out_location
2494 .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2495 },
2496 )
2497 }
2498
2499 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2500 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2501 /// the order of the input.
2502 ///
2503 /// # Non-Determinism
2504 /// The batch boundaries are non-deterministic and may change across executions.
2505 pub fn batch(
2506 self,
2507 tick: &Tick<L>,
2508 nondet: NonDet,
2509 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2510 let _ = nondet;
2511 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2512 KeyedStream::new(
2513 tick.clone(),
2514 HydroNode::Batch {
2515 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2516 metadata: tick.new_node_metadata(
2517 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2518 ),
2519 },
2520 )
2521 }
2522}
2523
2524impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2525 KeyedStream<(K1, K2), V, L, B, O, R>
2526{
2527 /// Produces a new keyed stream by dropping the first element of the compound key.
2528 ///
2529 /// Because multiple keys may share the same suffix, this operation results in re-grouping
2530 /// of the values under the new keys. The values across groups with the same new key
2531 /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2532 ///
2533 /// # Example
2534 /// ```rust
2535 /// # #[cfg(feature = "deploy")] {
2536 /// # use hydro_lang::prelude::*;
2537 /// # use futures::StreamExt;
2538 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2539 /// process
2540 /// .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2541 /// .into_keyed()
2542 /// .drop_key_prefix()
2543 /// # .entries()
2544 /// # }, |mut stream| async move {
2545 /// // { 10: [2, 3], 20: [4] }
2546 /// # let mut results = Vec::new();
2547 /// # for _ in 0..3 {
2548 /// # results.push(stream.next().await.unwrap());
2549 /// # }
2550 /// # results.sort();
2551 /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2552 /// # }));
2553 /// # }
2554 /// ```
2555 pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2556 self.entries()
2557 .map(q!(|((_k1, k2), v)| (k2, v)))
2558 .into_keyed()
2559 }
2560}
2561
2562impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
2563 KeyedStream<K, V, L, Unbounded, O, R>
2564{
2565 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2566 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2567 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2568 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2569 ///
2570 /// Currently, both input streams must be [`Unbounded`].
2571 ///
2572 /// # Example
2573 /// ```rust
2574 /// # #[cfg(feature = "deploy")] {
2575 /// # use hydro_lang::prelude::*;
2576 /// # use futures::StreamExt;
2577 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2578 /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2579 /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2580 /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2581 /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2582 /// numbers1.merge_unordered(numbers2)
2583 /// # .entries()
2584 /// # }, |mut stream| async move {
2585 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2586 /// # let mut results = Vec::new();
2587 /// # for _ in 0..4 {
2588 /// # results.push(stream.next().await.unwrap());
2589 /// # }
2590 /// # results.sort();
2591 /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2592 /// # }));
2593 /// # }
2594 /// ```
2595 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2596 self,
2597 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2598 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2599 where
2600 R: MinRetries<R2>,
2601 {
2602 KeyedStream::new(
2603 self.location.clone(),
2604 HydroNode::Chain {
2605 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2606 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2607 metadata: self.location.new_node_metadata(KeyedStream::<
2608 K,
2609 V,
2610 L,
2611 Unbounded,
2612 NoOrder,
2613 <R as MinRetries<R2>>::Min,
2614 >::collection_kind()),
2615 },
2616 )
2617 }
2618
2619 /// Deprecated: use [`KeyedStream::merge_unordered`] instead.
2620 #[deprecated(note = "use `merge_unordered` instead")]
2621 pub fn interleave<O2: Ordering, R2: Retries>(
2622 self,
2623 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2624 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2625 where
2626 R: MinRetries<R2>,
2627 {
2628 self.merge_unordered(other)
2629 }
2630}
2631
2632impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2633where
2634 L: Location<'a> + NoTick,
2635{
2636 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2637 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2638 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2639 /// used to create the atomic section.
2640 ///
2641 /// # Non-Determinism
2642 /// The batch boundaries are non-deterministic and may change across executions.
2643 pub fn batch_atomic(
2644 self,
2645 tick: &Tick<L>,
2646 nondet: NonDet,
2647 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2648 let _ = nondet;
2649 KeyedStream::new(
2650 tick.clone(),
2651 HydroNode::Batch {
2652 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2653 metadata: tick.new_node_metadata(
2654 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2655 ),
2656 },
2657 )
2658 }
2659
2660 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2661 /// See [`KeyedStream::atomic`] for more details.
2662 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2663 KeyedStream::new(
2664 self.location.tick.l.clone(),
2665 HydroNode::EndAtomic {
2666 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2667 metadata: self
2668 .location
2669 .tick
2670 .l
2671 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2672 },
2673 )
2674 }
2675}
2676
2677impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2678where
2679 L: Location<'a>,
2680{
2681 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2682 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2683 /// each key.
2684 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2685 KeyedStream::new(
2686 self.location.outer().clone(),
2687 HydroNode::YieldConcat {
2688 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2689 metadata: self.location.outer().new_node_metadata(KeyedStream::<
2690 K,
2691 V,
2692 L,
2693 Unbounded,
2694 O,
2695 R,
2696 >::collection_kind(
2697 )),
2698 },
2699 )
2700 }
2701
2702 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2703 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2704 /// each key.
2705 ///
2706 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2707 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2708 /// stream's [`Tick`] context.
2709 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2710 let out_location = Atomic {
2711 tick: self.location.clone(),
2712 };
2713
2714 KeyedStream::new(
2715 out_location.clone(),
2716 HydroNode::YieldConcat {
2717 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2718 metadata: out_location.new_node_metadata(KeyedStream::<
2719 K,
2720 V,
2721 Atomic<L>,
2722 Unbounded,
2723 O,
2724 R,
2725 >::collection_kind()),
2726 },
2727 )
2728 }
2729
2730 /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2731 /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2732 ///
2733 /// This API is particularly useful for stateful computation on batches of data, such as
2734 /// maintaining an accumulated state that is up to date with the current batch.
2735 ///
2736 /// # Example
2737 /// ```rust
2738 /// # #[cfg(feature = "deploy")] {
2739 /// # use hydro_lang::prelude::*;
2740 /// # use futures::StreamExt;
2741 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2742 /// let tick = process.tick();
2743 /// # // ticks are lazy by default, forces the second tick to run
2744 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2745 /// # let batch_first_tick = process
2746 /// # .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2747 /// # .into_keyed()
2748 /// # .batch(&tick, nondet!(/** test */));
2749 /// # let batch_second_tick = process
2750 /// # .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2751 /// # .into_keyed()
2752 /// # .batch(&tick, nondet!(/** test */))
2753 /// # .defer_tick(); // appears on the second tick
2754 /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2755 ///
2756 /// input.batch(&tick, nondet!(/** test */))
2757 /// .across_ticks(|s| s.reduce(q!(|sum, new| {
2758 /// *sum += new;
2759 /// }))).entries().all_ticks()
2760 /// # }, |mut stream| async move {
2761 /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2762 /// # let mut results = Vec::new();
2763 /// # for _ in 0..4 {
2764 /// # results.push(stream.next().await.unwrap());
2765 /// # }
2766 /// # results.sort();
2767 /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2768 /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2769 /// # results.clear();
2770 /// # for _ in 0..4 {
2771 /// # results.push(stream.next().await.unwrap());
2772 /// # }
2773 /// # results.sort();
2774 /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2775 /// # }));
2776 /// # }
2777 /// ```
2778 pub fn across_ticks<Out: BatchAtomic>(
2779 self,
2780 thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2781 ) -> Out::Batched {
2782 thunk(self.all_ticks_atomic()).batched_atomic()
2783 }
2784
2785 /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2786 /// tick `T` always has the entries of `self` at tick `T - 1`.
2787 ///
2788 /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2789 ///
2790 /// This operator enables stateful iterative processing with ticks, by sending data from one
2791 /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2792 ///
2793 /// # Example
2794 /// ```rust
2795 /// # #[cfg(feature = "deploy")] {
2796 /// # use hydro_lang::prelude::*;
2797 /// # use futures::StreamExt;
2798 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2799 /// let tick = process.tick();
2800 /// # // ticks are lazy by default, forces the second tick to run
2801 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2802 /// # let batch_first_tick = process
2803 /// # .source_iter(q!(vec![(1, 2), (1, 3)]))
2804 /// # .batch(&tick, nondet!(/** test */))
2805 /// # .into_keyed();
2806 /// # let batch_second_tick = process
2807 /// # .source_iter(q!(vec![(1, 4), (2, 5)]))
2808 /// # .batch(&tick, nondet!(/** test */))
2809 /// # .defer_tick()
2810 /// # .into_keyed(); // appears on the second tick
2811 /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2812 /// # batch_first_tick.chain(batch_second_tick);
2813 /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2814 /// changes_across_ticks // from the current tick
2815 /// )
2816 /// # .entries().all_ticks()
2817 /// # }, |mut stream| async move {
2818 /// // First tick: { 1: [2, 3] }
2819 /// # let mut results = Vec::new();
2820 /// # for _ in 0..2 {
2821 /// # results.push(stream.next().await.unwrap());
2822 /// # }
2823 /// # results.sort();
2824 /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2825 /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2826 /// # results.clear();
2827 /// # for _ in 0..4 {
2828 /// # results.push(stream.next().await.unwrap());
2829 /// # }
2830 /// # results.sort();
2831 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2832 /// // Third tick: { 1: [4], 2: [5] }
2833 /// # results.clear();
2834 /// # for _ in 0..2 {
2835 /// # results.push(stream.next().await.unwrap());
2836 /// # }
2837 /// # results.sort();
2838 /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2839 /// # }));
2840 /// # }
2841 /// ```
2842 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2843 KeyedStream::new(
2844 self.location.clone(),
2845 HydroNode::DeferTick {
2846 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2847 metadata: self.location.new_node_metadata(KeyedStream::<
2848 K,
2849 V,
2850 Tick<L>,
2851 Bounded,
2852 O,
2853 R,
2854 >::collection_kind()),
2855 },
2856 )
2857 }
2858}
2859
2860#[cfg(test)]
2861mod tests {
2862 #[cfg(feature = "deploy")]
2863 use futures::{SinkExt, StreamExt};
2864 #[cfg(feature = "deploy")]
2865 use hydro_deploy::Deployment;
2866 #[cfg(any(feature = "deploy", feature = "sim"))]
2867 use stageleft::q;
2868
2869 #[cfg(any(feature = "deploy", feature = "sim"))]
2870 use crate::compile::builder::FlowBuilder;
2871 #[cfg(feature = "deploy")]
2872 use crate::live_collections::stream::ExactlyOnce;
2873 #[cfg(feature = "sim")]
2874 use crate::live_collections::stream::{NoOrder, TotalOrder};
2875 #[cfg(any(feature = "deploy", feature = "sim"))]
2876 use crate::location::Location;
2877 #[cfg(feature = "sim")]
2878 use crate::networking::TCP;
2879 #[cfg(any(feature = "deploy", feature = "sim"))]
2880 use crate::nondet::nondet;
2881 #[cfg(feature = "deploy")]
2882 use crate::properties::manual_proof;
2883
2884 #[cfg(feature = "deploy")]
2885 #[tokio::test]
2886 async fn get_unbounded_keyed_stream_bounded_singleton() {
2887 let mut deployment = Deployment::new();
2888
2889 let mut flow = FlowBuilder::new();
2890 let node = flow.process::<()>();
2891 let external = flow.external::<()>();
2892
2893 let (input_send, input_stream) =
2894 node.source_external_bincode::<_, (i32, i32), _, ExactlyOnce>(&external);
2895
2896 let key = node.singleton(q!(1));
2897
2898 let out = input_stream
2899 .into_keyed()
2900 .get(key)
2901 .send_bincode_external(&external);
2902
2903 let nodes = flow
2904 .with_process(&node, deployment.Localhost())
2905 .with_external(&external, deployment.Localhost())
2906 .deploy(&mut deployment);
2907
2908 deployment.deploy().await.unwrap();
2909
2910 let mut input_send = nodes.connect(input_send).await;
2911 let mut out = nodes.connect(out).await;
2912
2913 deployment.start().await.unwrap();
2914
2915 // First batch
2916 input_send.send((1, 10)).await.unwrap();
2917 input_send.send((2, 20)).await.unwrap();
2918 assert_eq!(out.next().await.unwrap(), 10);
2919
2920 // Second batch
2921 input_send.send((1, 11)).await.unwrap();
2922 input_send.send((2, 21)).await.unwrap();
2923 assert_eq!(out.next().await.unwrap(), 11);
2924 }
2925
2926 #[cfg(feature = "deploy")]
2927 #[tokio::test]
2928 async fn reduce_watermark_filter() {
2929 let mut deployment = Deployment::new();
2930
2931 let mut flow = FlowBuilder::new();
2932 let node = flow.process::<()>();
2933 let external = flow.external::<()>();
2934
2935 let node_tick = node.tick();
2936 let watermark = node_tick.singleton(q!(2));
2937
2938 let sum = node
2939 .source_stream(q!(tokio_stream::iter([
2940 (0, 100),
2941 (1, 101),
2942 (2, 102),
2943 (2, 102)
2944 ])))
2945 .into_keyed()
2946 .reduce_watermark(
2947 watermark,
2948 q!(|acc, v| {
2949 *acc += v;
2950 }),
2951 )
2952 .snapshot(&node_tick, nondet!(/** test */))
2953 .entries()
2954 .all_ticks()
2955 .send_bincode_external(&external);
2956
2957 let nodes = flow
2958 .with_process(&node, deployment.Localhost())
2959 .with_external(&external, deployment.Localhost())
2960 .deploy(&mut deployment);
2961
2962 deployment.deploy().await.unwrap();
2963
2964 let mut out = nodes.connect(sum).await;
2965
2966 deployment.start().await.unwrap();
2967
2968 assert_eq!(out.next().await.unwrap(), (2, 204));
2969 }
2970
2971 #[cfg(feature = "deploy")]
2972 #[tokio::test]
2973 async fn reduce_watermark_bounded() {
2974 let mut deployment = Deployment::new();
2975
2976 let mut flow = FlowBuilder::new();
2977 let node = flow.process::<()>();
2978 let external = flow.external::<()>();
2979
2980 let node_tick = node.tick();
2981 let watermark = node_tick.singleton(q!(2));
2982
2983 let sum = node
2984 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2985 .into_keyed()
2986 .reduce_watermark(
2987 watermark,
2988 q!(|acc, v| {
2989 *acc += v;
2990 }),
2991 )
2992 .entries()
2993 .send_bincode_external(&external);
2994
2995 let nodes = flow
2996 .with_process(&node, deployment.Localhost())
2997 .with_external(&external, deployment.Localhost())
2998 .deploy(&mut deployment);
2999
3000 deployment.deploy().await.unwrap();
3001
3002 let mut out = nodes.connect(sum).await;
3003
3004 deployment.start().await.unwrap();
3005
3006 assert_eq!(out.next().await.unwrap(), (2, 204));
3007 }
3008
3009 #[cfg(feature = "deploy")]
3010 #[tokio::test]
3011 async fn reduce_watermark_garbage_collect() {
3012 let mut deployment = Deployment::new();
3013
3014 let mut flow = FlowBuilder::new();
3015 let node = flow.process::<()>();
3016 let external = flow.external::<()>();
3017 let (tick_send, tick_trigger) =
3018 node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
3019
3020 let node_tick = node.tick();
3021 let (watermark_complete_cycle, watermark) =
3022 node_tick.cycle_with_initial(node_tick.singleton(q!(2)));
3023 let next_watermark = watermark.clone().map(q!(|v| v + 1));
3024 watermark_complete_cycle.complete_next_tick(next_watermark);
3025
3026 let tick_triggered_input = node_tick
3027 .singleton(q!((3, 103)))
3028 .into_stream()
3029 .filter_if(
3030 tick_trigger
3031 .clone()
3032 .batch(&node_tick, nondet!(/** test */))
3033 .first()
3034 .is_some(),
3035 )
3036 .all_ticks();
3037
3038 let sum = node
3039 .source_stream(q!(tokio_stream::iter([
3040 (0, 100),
3041 (1, 101),
3042 (2, 102),
3043 (2, 102)
3044 ])))
3045 .merge_unordered(tick_triggered_input)
3046 .into_keyed()
3047 .reduce_watermark(
3048 watermark,
3049 q!(
3050 |acc, v| {
3051 *acc += v;
3052 },
3053 commutative = manual_proof!(/** integer addition is commutative */)
3054 ),
3055 )
3056 .snapshot(&node_tick, nondet!(/** test */))
3057 .entries()
3058 .all_ticks()
3059 .send_bincode_external(&external);
3060
3061 let nodes = flow
3062 .with_default_optimize()
3063 .with_process(&node, deployment.Localhost())
3064 .with_external(&external, deployment.Localhost())
3065 .deploy(&mut deployment);
3066
3067 deployment.deploy().await.unwrap();
3068
3069 let mut tick_send = nodes.connect(tick_send).await;
3070 let mut out_recv = nodes.connect(sum).await;
3071
3072 deployment.start().await.unwrap();
3073
3074 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
3075
3076 tick_send.send(()).await.unwrap();
3077
3078 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
3079 }
3080
3081 #[cfg(feature = "sim")]
3082 #[test]
3083 #[should_panic]
3084 fn sim_batch_nondet_size() {
3085 let mut flow = FlowBuilder::new();
3086 let node = flow.process::<()>();
3087
3088 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3089
3090 let tick = node.tick();
3091 let out_recv = input
3092 .batch(&tick, nondet!(/** test */))
3093 .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
3094 .entries()
3095 .all_ticks()
3096 .sim_output();
3097
3098 flow.sim().exhaustive(async || {
3099 out_recv
3100 .assert_yields_only_unordered([(1, vec![1, 2])])
3101 .await;
3102 });
3103 }
3104
3105 #[cfg(feature = "sim")]
3106 #[test]
3107 fn sim_batch_preserves_group_order() {
3108 let mut flow = FlowBuilder::new();
3109 let node = flow.process::<()>();
3110
3111 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3112
3113 let tick = node.tick();
3114 let out_recv = input
3115 .batch(&tick, nondet!(/** test */))
3116 .all_ticks()
3117 .fold_early_stop(
3118 q!(|| 0),
3119 q!(|acc, v| {
3120 *acc = std::cmp::max(v, *acc);
3121 *acc >= 2
3122 }),
3123 )
3124 .entries()
3125 .sim_output();
3126
3127 let instances = flow.sim().exhaustive(async || {
3128 out_recv
3129 .assert_yields_only_unordered([(1, 2), (2, 3)])
3130 .await;
3131 });
3132
3133 assert_eq!(instances, 8);
3134 // - three cases: all three in a separate tick (pick where (2, 3) is)
3135 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
3136 // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
3137 // - one case: all three together
3138 }
3139
3140 #[cfg(feature = "sim")]
3141 #[test]
3142 fn sim_batch_unordered_shuffles() {
3143 let mut flow = FlowBuilder::new();
3144 let node = flow.process::<()>();
3145
3146 let input = node
3147 .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
3148 .into_keyed()
3149 .weaken_ordering::<NoOrder>();
3150
3151 let tick = node.tick();
3152 let out_recv = input
3153 .batch(&tick, nondet!(/** test */))
3154 .all_ticks()
3155 .entries()
3156 .sim_output();
3157
3158 let instances = flow.sim().exhaustive(async || {
3159 out_recv
3160 .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
3161 .await;
3162 });
3163
3164 assert_eq!(instances, 13);
3165 // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
3166 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3167 // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
3168 // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3169 }
3170
3171 #[cfg(feature = "sim")]
3172 #[test]
3173 #[should_panic]
3174 fn sim_observe_order_batched() {
3175 let mut flow = FlowBuilder::new();
3176 let node = flow.process::<()>();
3177
3178 let (port, input) = node.sim_input::<_, NoOrder, _>();
3179
3180 let tick = node.tick();
3181 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3182 let out_recv = batch
3183 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3184 .all_ticks()
3185 .first()
3186 .entries()
3187 .sim_output();
3188
3189 flow.sim().exhaustive(async || {
3190 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3191 out_recv
3192 .assert_yields_only_unordered([(1, 1), (2, 1)])
3193 .await; // fails with assume_ordering
3194 });
3195 }
3196
3197 #[cfg(feature = "sim")]
3198 #[test]
3199 fn sim_observe_order_batched_count() {
3200 let mut flow = FlowBuilder::new();
3201 let node = flow.process::<()>();
3202
3203 let (port, input) = node.sim_input::<_, NoOrder, _>();
3204
3205 let tick = node.tick();
3206 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3207 let out_recv = batch
3208 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3209 .all_ticks()
3210 .entries()
3211 .sim_output();
3212
3213 let instance_count = flow.sim().exhaustive(async || {
3214 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3215 let _ = out_recv.collect_sorted::<Vec<_>>().await;
3216 });
3217
3218 assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
3219 }
3220
3221 #[cfg(feature = "sim")]
3222 #[test]
3223 fn sim_top_level_assume_ordering() {
3224 use std::collections::HashMap;
3225
3226 let mut flow = FlowBuilder::new();
3227 let node = flow.process::<()>();
3228
3229 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3230
3231 let out_recv = input
3232 .into_keyed()
3233 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3234 .fold_early_stop(
3235 q!(|| Vec::new()),
3236 q!(|acc, v| {
3237 acc.push(v);
3238 acc.len() >= 2
3239 }),
3240 )
3241 .entries()
3242 .sim_output();
3243
3244 let instance_count = flow.sim().exhaustive(async || {
3245 in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
3246 let out: HashMap<_, _> = out_recv
3247 .collect_sorted::<Vec<_>>()
3248 .await
3249 .into_iter()
3250 .collect();
3251 // Each key accumulates its values; we get one entry per key
3252 assert_eq!(out.len(), 2);
3253 });
3254
3255 assert_eq!(instance_count, 24)
3256 }
3257
3258 #[cfg(feature = "sim")]
3259 #[test]
3260 fn sim_top_level_assume_ordering_cycle_back() {
3261 use std::collections::HashMap;
3262
3263 let mut flow = FlowBuilder::new();
3264 let node = flow.process::<()>();
3265 let node2 = flow.process::<()>();
3266
3267 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3268
3269 let (complete_cycle_back, cycle_back) =
3270 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3271 let ordered = input
3272 .into_keyed()
3273 .merge_unordered(cycle_back)
3274 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3275 complete_cycle_back.complete(
3276 ordered
3277 .clone()
3278 .map(q!(|v| v + 1))
3279 .filter(q!(|v| v % 2 == 1))
3280 .entries()
3281 .send(&node2, TCP.fail_stop().bincode())
3282 .send(&node, TCP.fail_stop().bincode())
3283 .into_keyed(),
3284 );
3285
3286 let out_recv = ordered
3287 .fold_early_stop(
3288 q!(|| Vec::new()),
3289 q!(|acc, v| {
3290 acc.push(v);
3291 acc.len() >= 2
3292 }),
3293 )
3294 .entries()
3295 .sim_output();
3296
3297 let mut saw = false;
3298 let instance_count = flow.sim().exhaustive(async || {
3299 // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
3300 // We want to see [0, 1] - the cycled back value interleaved
3301 in_send.send_many_unordered([(1, 0), (1, 2)]);
3302 let out: HashMap<_, _> = out_recv
3303 .collect_sorted::<Vec<_>>()
3304 .await
3305 .into_iter()
3306 .collect();
3307
3308 // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
3309 if let Some(values) = out.get(&1)
3310 && *values == vec![0, 1]
3311 {
3312 saw = true;
3313 }
3314 });
3315
3316 assert!(
3317 saw,
3318 "did not see an instance with key 1 having [0, 1] in order"
3319 );
3320 assert_eq!(instance_count, 6);
3321 }
3322
3323 #[cfg(feature = "sim")]
3324 #[test]
3325 fn sim_top_level_assume_ordering_cross_key_cycle() {
3326 use std::collections::HashMap;
3327
3328 // This test demonstrates why releasing one entry at a time is important:
3329 // When one key's observed order cycles back into a different key, we need
3330 // to be able to interleave the cycled-back entry with pending items for
3331 // that other key.
3332 let mut flow = FlowBuilder::new();
3333 let node = flow.process::<()>();
3334 let node2 = flow.process::<()>();
3335
3336 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3337
3338 let (complete_cycle_back, cycle_back) =
3339 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3340 let ordered = input
3341 .into_keyed()
3342 .merge_unordered(cycle_back)
3343 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3344
3345 // Cycle back: when we see (1, 10), emit (2, 100) to key 2
3346 complete_cycle_back.complete(
3347 ordered
3348 .clone()
3349 .filter(q!(|v| *v == 10))
3350 .map(q!(|_| 100))
3351 .entries()
3352 .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
3353 .send(&node2, TCP.fail_stop().bincode())
3354 .send(&node, TCP.fail_stop().bincode())
3355 .into_keyed(),
3356 );
3357
3358 let out_recv = ordered
3359 .fold_early_stop(
3360 q!(|| Vec::new()),
3361 q!(|acc, v| {
3362 acc.push(v);
3363 acc.len() >= 2
3364 }),
3365 )
3366 .entries()
3367 .sim_output();
3368
3369 // We want to see an instance where:
3370 // - (1, 10) is released first
3371 // - This causes (2, 100) to be cycled back
3372 // - (2, 100) is released BEFORE (2, 20) which was already pending
3373 let mut saw_cross_key_interleave = false;
3374 let instance_count = flow.sim().exhaustive(async || {
3375 // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
3376 in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
3377 let out: HashMap<_, _> = out_recv
3378 .collect_sorted::<Vec<_>>()
3379 .await
3380 .into_iter()
3381 .collect();
3382
3383 // Check if we see the cross-key interleaving:
3384 // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
3385 if let Some(values) = out.get(&2)
3386 && values.len() >= 2
3387 && values[0] == 100
3388 {
3389 saw_cross_key_interleave = true;
3390 }
3391 });
3392
3393 assert!(
3394 saw_cross_key_interleave,
3395 "did not see an instance where cycled-back 100 was released before pending items for key 2"
3396 );
3397 assert_eq!(instance_count, 60);
3398 }
3399
3400 #[cfg(feature = "sim")]
3401 #[test]
3402 fn sim_top_level_assume_ordering_cycle_back_tick() {
3403 use std::collections::HashMap;
3404
3405 let mut flow = FlowBuilder::new();
3406 let node = flow.process::<()>();
3407 let node2 = flow.process::<()>();
3408
3409 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3410
3411 let (complete_cycle_back, cycle_back) =
3412 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3413 let ordered = input
3414 .into_keyed()
3415 .merge_unordered(cycle_back)
3416 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3417 complete_cycle_back.complete(
3418 ordered
3419 .clone()
3420 .batch(&node.tick(), nondet!(/** test */))
3421 .all_ticks()
3422 .map(q!(|v| v + 1))
3423 .filter(q!(|v| v % 2 == 1))
3424 .entries()
3425 .send(&node2, TCP.fail_stop().bincode())
3426 .send(&node, TCP.fail_stop().bincode())
3427 .into_keyed(),
3428 );
3429
3430 let out_recv = ordered
3431 .fold_early_stop(
3432 q!(|| Vec::new()),
3433 q!(|acc, v| {
3434 acc.push(v);
3435 acc.len() >= 2
3436 }),
3437 )
3438 .entries()
3439 .sim_output();
3440
3441 let mut saw = false;
3442 let instance_count = flow.sim().exhaustive(async || {
3443 in_send.send_many_unordered([(1, 0), (1, 2)]);
3444 let out: HashMap<_, _> = out_recv
3445 .collect_sorted::<Vec<_>>()
3446 .await
3447 .into_iter()
3448 .collect();
3449
3450 if let Some(values) = out.get(&1)
3451 && *values == vec![0, 1]
3452 {
3453 saw = true;
3454 }
3455 });
3456
3457 assert!(
3458 saw,
3459 "did not see an instance with key 1 having [0, 1] in order"
3460 );
3461 assert_eq!(instance_count, 58);
3462 }
3463
3464 #[cfg(feature = "sim")]
3465 #[test]
3466 fn sim_entries_partially_ordered_bounded() {
3467 let mut flow = FlowBuilder::new();
3468 let node = flow.process::<()>();
3469
3470 let (port, input) = node.sim_input::<_, TotalOrder, _>();
3471
3472 let tick = node.tick();
3473 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3474 let out_recv = batch
3475 .entries_partially_ordered(nondet!(/** test */))
3476 .all_ticks()
3477 .sim_output();
3478
3479 let instance_count = flow.sim().exhaustive(async || {
3480 port.send((1, 'a'));
3481 port.send((1, 'b'));
3482 port.send((2, 'c'));
3483 let _: Vec<(i32, char)> = out_recv.collect().await;
3484 });
3485
3486 assert_eq!(instance_count, 12);
3487 }
3488
3489 #[cfg(feature = "sim")]
3490 #[test]
3491 fn sim_entries_partially_ordered_top_level() {
3492 let mut flow = FlowBuilder::new();
3493 let node = flow.process::<()>();
3494
3495 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3496
3497 let out_recv = input
3498 .into_keyed()
3499 .entries_partially_ordered(nondet!(/** test */))
3500 .sim_output();
3501
3502 let instance_count = flow.sim().exhaustive(async || {
3503 in_send.send((1, 'a'));
3504 in_send.send((1, 'b'));
3505 in_send.send((2, 'c'));
3506 let _: Vec<(i32, char)> = out_recv.collect().await;
3507 });
3508
3509 assert_eq!(instance_count, 3);
3510 }
3511
3512 #[cfg(feature = "sim")]
3513 #[test]
3514 fn sim_entries_partially_ordered_cycle_back() {
3515 let mut flow = FlowBuilder::new();
3516 let node = flow.process::<()>();
3517 let node2 = flow.process::<()>();
3518
3519 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3520
3521 let (complete_cycle_back, cycle_back) =
3522 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3523 let ordered = input
3524 .into_keyed()
3525 .merge_unordered(cycle_back)
3526 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3527
3528 let flat = ordered
3529 .clone()
3530 .entries_partially_ordered(nondet!(/** test */));
3531
3532 complete_cycle_back.complete(
3533 flat.clone()
3534 .map(q!(|(k, v): (i32, i32)| (k, v + 1)))
3535 .filter(q!(|(_, v)| *v % 2 == 1))
3536 .send(&node2, TCP.fail_stop().bincode())
3537 .send(&node, TCP.fail_stop().bincode())
3538 .into_keyed(),
3539 );
3540
3541 let out_recv = flat.sim_output();
3542
3543 let mut saw = false;
3544 let instance_count = flow.sim().exhaustive(async || {
3545 // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back as (1, 1).
3546 // We want to see (1, 1) before (1, 2) - the cycled back value beats the pending one
3547 in_send.send_many_unordered([(1, 0), (1, 2)]);
3548 let results: Vec<(i32, i32)> = out_recv.collect().await;
3549
3550 let pos_1 = results.iter().position(|v| *v == (1, 1));
3551 let pos_2 = results.iter().position(|v| *v == (1, 2));
3552 if let (Some(p1), Some(p2)) = (pos_1, pos_2)
3553 && p1 < p2
3554 {
3555 saw = true;
3556 }
3557 });
3558
3559 assert!(saw, "did not see an instance with (1, 1) before (1, 2)");
3560 assert_eq!(instance_count, 78);
3561 }
3562}