1use core::{fmt, panic};
4use std::cell::{Cell, RefCell};
5use std::collections::{HashMap, VecDeque};
6use std::fmt::Debug;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::{Pin, pin};
10use std::rc::Rc;
11use std::task::ready;
12
13use bytes::Bytes;
14use colored::Colorize;
15use dfir_rs::scheduled::context::DfirErased;
16use futures::{Stream, StreamExt};
17use libloading::Library;
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use tempfile::TempPath;
21use tokio::sync::mpsc::UnboundedSender;
22use tokio::sync::{Mutex, Notify};
23use tokio_stream::wrappers::UnboundedReceiverStream;
24
25use super::runtime::{Hooks, InlineHooks};
26use super::{SimClusterReceiver, SimClusterSender, SimReceiver, SimSender};
27use crate::compile::builder::ExternalPortId;
28use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
29use crate::location::dynamic::LocationId;
30use crate::sim::graph::{SimExternalPort, SimExternalPortRegistry};
31use crate::sim::runtime::SimHook;
32
33struct QuiescenceState {
34 quiescent: Cell<bool>,
36 quiescence_notify: Notify,
38 resume_notify: Notify,
40}
41
42impl QuiescenceState {
43 fn resume(&self) {
45 self.quiescent.set(false);
46 self.resume_notify.notify_waiters();
47 }
48
49 fn is_quiescent(&self) -> bool {
51 self.quiescent.get()
52 }
53
54 fn notified(&self) -> tokio::sync::futures::Notified<'_> {
56 self.quiescence_notify.notified()
57 }
58
59 async fn wait_for_resume(&self) {
61 self.quiescent.set(true);
62 self.quiescence_notify.notify_waiters();
63 self.resume_notify.notified().await;
64 self.quiescent.set(false);
65 }
66}
67
68struct SimConnections {
69 input_senders: HashMap<SimExternalPort, Rc<UnboundedSender<Bytes>>>,
70 output_receivers: HashMap<SimExternalPort, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
71 cluster_input_senders: HashMap<SimExternalPort, Vec<Rc<UnboundedSender<Bytes>>>>,
72 cluster_output_receivers:
73 HashMap<SimExternalPort, Vec<Rc<Mutex<UnboundedReceiverStream<Bytes>>>>>,
74 external_registered: HashMap<ExternalPortId, SimExternalPort>,
75 quiescence: Rc<QuiescenceState>,
76}
77
78tokio::task_local! {
79 static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
80}
81
82pub struct CompiledSim {
84 pub(super) _path: TempPath,
85 pub(super) lib: Library,
86 pub(super) externals_port_registry: SimExternalPortRegistry,
87 pub(super) unit_test_fuzz_iterations: usize,
88}
89
90#[sealed::sealed]
91pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
95#[sealed::sealed]
96impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
97
98fn null_handler(_args: fmt::Arguments) {}
99
100fn println_handler(args: fmt::Arguments) {
101 println!("{}", args);
102}
103
104fn eprintln_handler(args: fmt::Arguments) {
105 eprintln!("{}", args);
106}
107
108type SimLoaded<'a> = libloading::Symbol<
114 'a,
115 unsafe extern "Rust" fn(
116 should_color: bool,
117 external_out: &mut HashMap<usize, UnboundedReceiverStream<Bytes>>,
118 external_in: &mut HashMap<usize, UnboundedSender<Bytes>>,
119 cluster_external_out: &mut HashMap<usize, Vec<UnboundedReceiverStream<Bytes>>>,
120 cluster_external_in: &mut HashMap<usize, Vec<UnboundedSender<Bytes>>>,
121 println_handler: fn(fmt::Arguments<'_>),
122 eprintln_handler: fn(fmt::Arguments<'_>),
123 ) -> (
124 Vec<(&'static str, Option<u32>, DfirErased)>,
125 Vec<(&'static str, Option<u32>, DfirErased)>,
126 Hooks<&'static str>,
127 InlineHooks<&'static str>,
128 ),
129>;
130
131impl CompiledSim {
132 pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
134 self.with_instantiator(|instantiator| thunk(instantiator()), true)
135 }
136
137 pub fn with_instantiator<T>(
145 &self,
146 thunk: impl FnOnce(&dyn Instantiator) -> T,
147 always_log: bool,
148 ) -> T {
149 let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
150 let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
151 thunk(
152 &(|| CompiledSimInstance {
153 func: func.clone(),
154 externals_port_registry: self.externals_port_registry.clone(),
155 dylib_result: None,
156 log,
157 }),
158 )
159 }
160
161 pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
172 let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
173 .elements()
174 .into_iter()
175 .find(|e| {
176 !e.fn_name.starts_with("hydro_lang::sim::compiled")
177 && !e.fn_name.starts_with("hydro_lang::sim::flow")
178 && !e.fn_name.starts_with("fuzz<")
179 && !e.fn_name.starts_with("<hydro_lang::sim")
180 })
181 .unwrap();
182
183 let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
184 let repro_folder = caller_path.parent().unwrap().join("sim-failures");
185
186 let caller_fuzz_repro_path = repro_folder
187 .join(caller_fn.fn_name.replace("::", "__"))
188 .with_extension("bin");
189
190 if std::env::var("BOLERO_FUZZER").is_ok() {
191 let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
192 std::fs::create_dir_all(&corpus_dir).unwrap();
193 let libfuzzer_args = format!(
194 "{} {} -artifact_prefix={}/ -handle_abrt=0",
195 corpus_dir.to_str().unwrap(),
196 corpus_dir.to_str().unwrap(),
197 corpus_dir.to_str().unwrap(),
198 );
199
200 std::fs::create_dir_all(&repro_folder).unwrap();
201
202 if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
203 unsafe {
204 std::env::set_var(
205 "BOLERO_FAILURE_OUTPUT",
206 caller_fuzz_repro_path.to_str().unwrap(),
207 );
208 }
209 }
210
211 unsafe {
212 std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
213 }
214
215 self.with_instantiator(
216 |instantiator| {
217 bolero::test(bolero::TargetLocation {
218 package_name: "",
219 manifest_dir: "",
220 module_path: "",
221 file: "",
222 line: 0,
223 item_path: "<unknown>::__bolero_item_path__",
224 test_name: None,
225 })
226 .run_with_replay(move |is_replay| {
227 let mut instance = instantiator();
228
229 if instance.log {
230 eprintln!(
231 "{}",
232 "\n==== New Simulation Instance ===="
233 .color(colored::Color::Cyan)
234 .bold()
235 );
236 }
237
238 if is_replay {
239 instance.log = true;
240 }
241
242 tokio::runtime::Builder::new_current_thread()
243 .build()
244 .unwrap()
245 .block_on(async { instance.run(&mut thunk).await })
246 })
247 },
248 false,
249 );
250 } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
251 self.fuzz_repro(existing_bytes, async |compiled| {
252 compiled.launch();
253 thunk().await
254 });
255 } else {
256 eprintln!(
257 "Running a fuzz test without `cargo sim` and no reproducer found at {}, using {} iterations with random inputs.",
258 caller_fuzz_repro_path.display(),
259 self.unit_test_fuzz_iterations,
260 );
261 self.with_instantiator(
262 |instantiator| {
263 bolero::test(bolero::TargetLocation {
264 package_name: "",
265 manifest_dir: "",
266 module_path: "",
267 file: ".",
268 line: 0,
269 item_path: "<unknown>::__bolero_item_path__",
270 test_name: None,
271 })
272 .with_iterations(self.unit_test_fuzz_iterations)
273 .run(move || {
274 let instance = instantiator();
275 tokio::runtime::Builder::new_current_thread()
276 .build()
277 .unwrap()
278 .block_on(async { instance.run(&mut thunk).await })
279 })
280 },
281 false,
282 );
283 }
284 }
285
286 pub fn fuzz_repro<'a>(
290 &'a self,
291 bytes: Vec<u8>,
292 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
293 ) {
294 self.with_instance(|instance| {
295 bolero::bolero_engine::any::scope::with(
296 Box::new(bolero::bolero_engine::driver::object::Object(
297 bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
298 )),
299 || {
300 tokio::runtime::Builder::new_current_thread()
301 .build()
302 .unwrap()
303 .block_on(async { instance.run_without_launching(thunk).await })
304 },
305 )
306 });
307 }
308
309 pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
320 if std::env::var("BOLERO_FUZZER").is_ok() {
321 eprintln!(
322 "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
323 );
324 std::process::abort();
325 }
326
327 let mut count = 0;
328 let count_mut = &mut count;
329
330 self.with_instantiator(
331 |instantiator| {
332 bolero::test(bolero::TargetLocation {
333 package_name: "",
334 manifest_dir: "",
335 module_path: "",
336 file: "",
337 line: 0,
338 item_path: "<unknown>::__bolero_item_path__",
339 test_name: None,
340 })
341 .exhaustive()
342 .run_with_replay(move |is_replay| {
343 *count_mut += 1;
344
345 let mut instance = instantiator();
346 if instance.log {
347 eprintln!(
348 "{}",
349 "\n==== New Simulation Instance ===="
350 .color(colored::Color::Cyan)
351 .bold()
352 );
353 }
354
355 if is_replay {
356 instance.log = true;
357 }
358
359 tokio::runtime::Builder::new_current_thread()
360 .build()
361 .unwrap()
362 .block_on(async { instance.run(&mut thunk).await })
363 })
364 },
365 false,
366 );
367
368 count
369 }
370}
371
372type DylibResult = (
374 Vec<(&'static str, Option<u32>, DfirErased)>,
375 Vec<(&'static str, Option<u32>, DfirErased)>,
376 Hooks<&'static str>,
377 InlineHooks<&'static str>,
378);
379
380pub struct CompiledSimInstance<'a> {
383 func: SimLoaded<'a>,
384 externals_port_registry: SimExternalPortRegistry,
385 dylib_result: Option<DylibResult>,
386 log: bool,
387}
388
389impl<'a> CompiledSimInstance<'a> {
390 async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
391 self.run_without_launching(async |instance| {
392 instance.launch();
393 thunk().await;
394 })
395 .await;
396 }
397
398 async fn run_without_launching(
399 mut self,
400 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
401 ) {
402 let mut external_out: HashMap<usize, UnboundedReceiverStream<Bytes>> = HashMap::new();
403 let mut external_in: HashMap<usize, UnboundedSender<Bytes>> = HashMap::new();
404 let mut cluster_external_out: HashMap<usize, Vec<UnboundedReceiverStream<Bytes>>> =
405 HashMap::new();
406 let mut cluster_external_in: HashMap<usize, Vec<UnboundedSender<Bytes>>> = HashMap::new();
407
408 let dylib_result = unsafe {
409 (self.func)(
410 colored::control::SHOULD_COLORIZE.should_colorize(),
411 &mut external_out,
412 &mut external_in,
413 &mut cluster_external_out,
414 &mut cluster_external_in,
415 if self.log {
416 println_handler
417 } else {
418 null_handler
419 },
420 if self.log {
421 eprintln_handler
422 } else {
423 null_handler
424 },
425 )
426 };
427
428 let registered = &self.externals_port_registry.registered;
429
430 let quiescence = Rc::new(QuiescenceState {
431 quiescent: Cell::new(false),
432 quiescence_notify: Notify::new(),
433 resume_notify: Notify::new(),
434 });
435
436 let mut input_senders = HashMap::new();
437 let mut output_receivers = HashMap::new();
438 let mut cluster_input_senders = HashMap::new();
439 let mut cluster_output_receivers = HashMap::new();
440
441 #[expect(
442 clippy::disallowed_methods,
443 reason = "inserts into maps also unordered"
444 )]
445 for sim_port in registered.values() {
446 let usize_key = sim_port.into_inner();
447 if let Some(sender) = external_in.remove(&usize_key) {
448 input_senders.insert(*sim_port, Rc::new(sender));
449 }
450 if let Some(receiver) = external_out.remove(&usize_key) {
451 output_receivers.insert(*sim_port, Rc::new(Mutex::new(receiver)));
452 }
453 if let Some(senders) = cluster_external_in.remove(&usize_key) {
454 cluster_input_senders.insert(*sim_port, senders.into_iter().map(Rc::new).collect());
455 }
456 if let Some(receivers) = cluster_external_out.remove(&usize_key) {
457 cluster_output_receivers.insert(
458 *sim_port,
459 receivers
460 .into_iter()
461 .map(|r| Rc::new(Mutex::new(r)))
462 .collect(),
463 );
464 }
465 }
466
467 self.dylib_result = Some(dylib_result);
468
469 let local_set = tokio::task::LocalSet::new();
470 local_set
471 .run_until(CURRENT_SIM_CONNECTIONS.scope(
472 RefCell::new(SimConnections {
473 input_senders,
474 output_receivers,
475 cluster_input_senders,
476 cluster_output_receivers,
477 external_registered: self.externals_port_registry.registered.clone(),
478 quiescence: quiescence.clone(),
479 }),
480 async move {
481 thunk(self).await;
482 },
483 ))
484 .await;
485 }
486
487 fn launch(self) {
490 tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
491 }
492
493 pub fn schedule_with_logger<W: std::io::Write>(
496 self,
497 log_writer: W,
498 ) -> impl use<W> + Future<Output = ()> {
499 self.schedule_with_maybe_logger(Some(log_writer))
500 }
501
502 fn schedule_with_maybe_logger<W: std::io::Write>(
503 mut self,
504 log_override: Option<W>,
505 ) -> impl use<W> + Future<Output = ()> {
506 let (async_dfirs, tick_dfirs, hooks, inline_hooks) = self.dylib_result.take().unwrap();
507
508 let not_ready_observation = async_dfirs
509 .iter()
510 .map(|(lid, c_id, _)| (serde_json::from_str(lid).unwrap(), *c_id))
511 .collect();
512
513 let quiescence = CURRENT_SIM_CONNECTIONS.with(|connections| {
514 let connections = connections.borrow();
515 connections.quiescence.clone()
516 });
517
518 let mut launched = LaunchedSim {
519 async_dfirs: async_dfirs
520 .into_iter()
521 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
522 .collect(),
523 possibly_ready_ticks: vec![],
524 not_ready_ticks: tick_dfirs
525 .into_iter()
526 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
527 .collect(),
528 possibly_ready_observation: vec![],
529 not_ready_observation,
530 hooks: hooks
531 .into_iter()
532 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
533 .collect(),
534 inline_hooks: inline_hooks
535 .into_iter()
536 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
537 .collect(),
538 log: if self.log {
539 if let Some(w) = log_override {
540 LogKind::Custom(w)
541 } else {
542 LogKind::Stderr
543 }
544 } else {
545 LogKind::Null
546 },
547 quiescence,
548 };
549
550 async move { launched.scheduler().await }
551 }
552}
553
554impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
555 fn clone(&self) -> Self {
556 *self
557 }
558}
559
560impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
561
562impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
563 async fn with_stream<Out>(
564 &self,
565 thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
566 ) -> Out {
567 let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
568 let connections = connections.borrow();
569 let port = connections.external_registered.get(&self.0).unwrap();
570 (
571 connections.output_receivers.get(port).unwrap().clone(),
572 connections.quiescence.clone(),
573 )
574 });
575
576 let mut receiver_stream = receiver.lock().await;
577 let mut notified_fut = pin!(quiescence.notified());
578 let mut quiescence_aware = futures::stream::poll_fn(|cx| {
579 use std::task::Poll;
580 match receiver_stream.poll_next_unpin(cx) {
581 Poll::Ready(Some(bytes)) => {
582 return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
583 }
584 Poll::Ready(None) => return Poll::Ready(None),
585 Poll::Pending => {}
586 }
587 if quiescence.is_quiescent() {
588 return Poll::Ready(None);
589 }
590 let () = ready!(notified_fut.as_mut().poll(cx));
591 notified_fut.set(quiescence.notified());
592 Poll::Ready(None)
593 });
594 thunk(&mut pin!(&mut quiescence_aware)).await
595 }
596
597 pub fn assert_no_more(self) -> impl Future<Output = ()>
599 where
600 T: Debug,
601 {
602 FutureTrackingCaller {
603 future: async move {
604 self.with_stream(async |stream| {
605 if let Some(next) = stream.next().await {
606 return Err(format!(
607 "Stream yielded unexpected message: {:?}, expected termination",
608 next
609 ));
610 }
611 Ok(())
612 })
613 .await
614 },
615 }
616 }
617}
618
619impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
620 pub async fn next(&self) -> Option<T> {
623 self.with_stream(async |stream| stream.next().await).await
624 }
625
626 pub async fn collect<C: Default + Extend<T>>(self) -> C {
629 self.with_stream(async |stream| stream.collect().await)
630 .await
631 }
632
633 pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
636 &self,
637 expected: I,
638 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
639 where
640 T: Debug + PartialEq<T2>,
641 {
642 FutureTrackingCaller {
643 future: async {
644 let mut expected: VecDeque<T2> = expected.into_iter().collect();
645
646 while !expected.is_empty() {
647 if let Some(next) = self.next().await {
648 let next_expected = expected.pop_front().unwrap();
649 if next != next_expected {
650 return Err(format!(
651 "Stream yielded unexpected message: {:?}, expected: {:?}",
652 next, next_expected
653 ));
654 }
655 } else {
656 return Err(format!(
657 "Stream ended early, still expected: {:?}",
658 expected
659 ));
660 }
661 }
662
663 Ok(())
664 },
665 }
666 }
667
668 pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
671 &self,
672 expected: I,
673 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
674 where
675 T: Debug + PartialEq<T2>,
676 {
677 ChainedFuture {
678 first: self.assert_yields(expected),
679 second: self.assert_no_more(),
680 first_done: false,
681 }
682 }
683}
684
685pin_project_lite::pin_project! {
686 struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
696 #[pin]
697 future: F,
698 }
699}
700
701impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
702 type Output = ();
703
704 #[track_caller]
705 fn poll(
706 mut self: Pin<&mut Self>,
707 cx: &mut std::task::Context<'_>,
708 ) -> std::task::Poll<Self::Output> {
709 match ready!(self.as_mut().project().future.poll(cx)) {
710 Ok(()) => std::task::Poll::Ready(()),
711 Err(e) => panic!("{}", e),
712 }
713 }
714}
715
716pin_project_lite::pin_project! {
717 struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
721 #[pin]
722 first: F1,
723 #[pin]
724 second: F2,
725 first_done: bool,
726 }
727}
728
729impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
730 type Output = ();
731
732 #[track_caller]
733 fn poll(
734 mut self: Pin<&mut Self>,
735 cx: &mut std::task::Context<'_>,
736 ) -> std::task::Poll<Self::Output> {
737 if !self.first_done {
738 ready!(self.as_mut().project().first.poll(cx));
739 *self.as_mut().project().first_done = true;
740 }
741
742 self.as_mut().project().second.poll(cx)
743 }
744}
745
746impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
747 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
750 where
751 T: Ord,
752 {
753 self.with_stream(async |stream| {
754 let mut collected: C = stream.collect().await;
755 collected.as_mut().sort();
756 collected
757 })
758 .await
759 }
760
761 pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
764 &self,
765 expected: I,
766 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
767 where
768 T: Debug + PartialEq<T2>,
769 {
770 FutureTrackingCaller {
771 future: async {
772 self.with_stream(async |stream| {
773 let mut expected: Vec<T2> = expected.into_iter().collect();
774
775 while !expected.is_empty() {
776 if let Some(next) = stream.next().await {
777 let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
778 if let Some((i, _)) = idx {
779 expected.swap_remove(i);
780 } else {
781 return Err(format!(
782 "Stream yielded unexpected message: {:?}",
783 next
784 ));
785 }
786 } else {
787 return Err(format!(
788 "Stream ended early, still expected: {:?}",
789 expected
790 ));
791 }
792 }
793
794 Ok(())
795 })
796 .await
797 },
798 }
799 }
800
801 pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
804 &self,
805 expected: I,
806 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
807 where
808 T: Debug + PartialEq<T2>,
809 {
810 ChainedFuture {
811 first: self.assert_yields_unordered(expected),
812 second: self.assert_no_more(),
813 first_done: false,
814 }
815 }
816}
817
818impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
819 fn with_sink<Out>(
820 &self,
821 thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
822 ) -> Out {
823 let (sender, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
824 let connections = connections.borrow();
825 (
826 connections
827 .input_senders
828 .get(connections.external_registered.get(&self.0).unwrap())
829 .unwrap()
830 .clone(),
831 connections.quiescence.clone(),
832 )
833 });
834
835 thunk(&move |t| {
836 let res = sender.send(bincode::serialize(&t).unwrap().into());
837 quiescence.resume();
838 res
839 })
840 }
841}
842
843impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
844 pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
847 self.with_sink(|send| {
848 for t in iter {
849 send(t).unwrap();
850 }
851 })
852 }
853}
854
855impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
856 pub fn send(&self, t: T) {
859 self.with_sink(|send| send(t)).unwrap();
860 }
861
862 pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
865 self.with_sink(|send| {
866 for t in iter {
867 send(t).unwrap();
868 }
869 })
870 }
871}
872
873impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone
874 for SimClusterReceiver<T, O, R>
875{
876 fn clone(&self) -> Self {
877 *self
878 }
879}
880
881impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy
882 for SimClusterReceiver<T, O, R>
883{
884}
885
886impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterReceiver<T, O, R> {
887 async fn with_member_stream<Out>(
888 &self,
889 member_id: u32,
890 thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
891 ) -> Out {
892 let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
893 let connections = connections.borrow();
894 let port = connections.external_registered.get(&self.0).unwrap();
895 let receivers = connections.cluster_output_receivers.get(port).unwrap();
896 (
897 receivers[member_id as usize].clone(),
898 connections.quiescence.clone(),
899 )
900 });
901
902 let mut lock = receiver.lock().await;
903 let mut notified_fut = pin!(quiescence.notified());
904 let mut quiescence_aware = futures::stream::poll_fn(|cx| {
905 use std::task::Poll;
906 match lock.poll_next_unpin(cx) {
907 Poll::Ready(Some(bytes)) => {
908 return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
909 }
910 Poll::Ready(None) => return Poll::Ready(None),
911 Poll::Pending => {}
912 }
913 if quiescence.is_quiescent() {
914 return Poll::Ready(None);
915 }
916 let () = ready!(notified_fut.as_mut().poll(cx));
917 notified_fut.set(quiescence.notified());
918 Poll::Ready(None)
919 });
920 thunk(&mut pin!(&mut quiescence_aware)).await
921 }
922}
923
924impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, TotalOrder, ExactlyOnce> {
925 pub async fn next(&self, member_id: u32) -> Option<T> {
927 self.with_member_stream(member_id, async |stream| stream.next().await)
928 .await
929 }
930
931 pub async fn collect<C: Default + Extend<T>>(self, member_id: u32) -> C {
933 self.with_member_stream(member_id, async |stream| stream.collect().await)
934 .await
935 }
936}
937
938impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, NoOrder, ExactlyOnce> {
939 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self, member_id: u32) -> C
941 where
942 T: Ord,
943 {
944 self.with_member_stream(member_id, async |stream| {
945 let mut collected: C = stream.collect().await;
946 collected.as_mut().sort();
947 collected
948 })
949 .await
950 }
951}
952
953impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterSender<T, O, R> {
954 fn with_sink<Out>(
955 &self,
956 thunk: impl FnOnce(
957 &dyn Fn(u32, T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>,
958 ) -> Out,
959 ) -> Out {
960 let (senders, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
961 let connections = connections.borrow();
962 (
963 connections
964 .cluster_input_senders
965 .get(connections.external_registered.get(&self.0).unwrap())
966 .unwrap()
967 .clone(),
968 connections.quiescence.clone(),
969 )
970 });
971
972 thunk(&move |member_id: u32, t: T| {
973 let payload = bincode::serialize(&t).unwrap();
974 let res = senders[member_id as usize].send(Bytes::from(payload));
975 quiescence.resume();
976 res
977 })
978 }
979}
980
981impl<T: Serialize + DeserializeOwned> SimClusterSender<T, TotalOrder, ExactlyOnce> {
982 pub fn send(&self, member_id: u32, t: T) {
984 self.with_sink(|send| send(member_id, t)).unwrap();
985 }
986
987 pub fn send_many<I: IntoIterator<Item = (u32, T)>>(&self, iter: I) {
989 self.with_sink(|send| {
990 for (member_id, t) in iter {
991 send(member_id, t).unwrap();
992 }
993 })
994 }
995}
996
997enum LogKind<W: std::io::Write> {
998 Null,
999 Stderr,
1000 Custom(W),
1001}
1002
1003impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
1005 fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
1006 match self {
1007 LogKind::Null => Ok(()),
1008 LogKind::Stderr => {
1009 eprint!("{}", s);
1010 Ok(())
1011 }
1012 LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
1013 }
1014 }
1015}
1016
1017struct LaunchedSim<W: std::io::Write> {
1029 async_dfirs: Vec<(LocationId, Option<u32>, DfirErased)>,
1032 possibly_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1035 not_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1037 possibly_ready_observation: Vec<(LocationId, Option<u32>)>,
1041 not_ready_observation: Vec<(LocationId, Option<u32>)>,
1043 hooks: Hooks<LocationId>,
1046 inline_hooks: InlineHooks<LocationId>,
1050 log: LogKind<W>,
1051 quiescence: Rc<QuiescenceState>,
1053}
1054
1055impl<W: std::io::Write> LaunchedSim<W> {
1056 async fn scheduler(&mut self) {
1057 loop {
1058 tokio::task::yield_now().await;
1059 let mut any_made_progress = false;
1060 for (loc, c_id, dfir) in &mut self.async_dfirs {
1061 if dfir.run_tick().await {
1062 any_made_progress = true;
1063 let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
1064 .not_ready_ticks
1065 .drain(..)
1066 .partition(|(tick_loc, tick_c_id, _)| {
1067 let LocationId::Tick(_, outer) = tick_loc else {
1068 unreachable!()
1069 };
1070 outer.as_ref() == loc && tick_c_id == c_id
1071 });
1072
1073 self.possibly_ready_ticks.extend(now_ready);
1074 self.not_ready_ticks.extend(still_not_ready);
1075
1076 let (now_ready_obs, still_not_ready_obs): (Vec<_>, Vec<_>) = self
1077 .not_ready_observation
1078 .drain(..)
1079 .partition(|(obs_loc, obs_c_id)| obs_loc == loc && obs_c_id == c_id);
1080
1081 self.possibly_ready_observation.extend(now_ready_obs);
1082 self.not_ready_observation.extend(still_not_ready_obs);
1083 }
1084 }
1085
1086 if any_made_progress {
1087 continue;
1088 } else {
1089 use bolero::generator::*;
1090
1091 let (ready_tick, mut not_ready_tick): (Vec<_>, Vec<_>) = self
1092 .possibly_ready_ticks
1093 .drain(..)
1094 .partition(|(name, cid, _)| {
1095 self.hooks
1096 .get(&(name.clone(), *cid))
1097 .unwrap()
1098 .iter()
1099 .any(|hook| {
1100 hook.current_decision().unwrap_or(false)
1101 || hook.can_make_nontrivial_decision()
1102 })
1103 });
1104
1105 self.possibly_ready_ticks = ready_tick;
1106 self.not_ready_ticks.append(&mut not_ready_tick);
1107
1108 let (ready_obs, mut not_ready_obs): (Vec<_>, Vec<_>) = self
1109 .possibly_ready_observation
1110 .drain(..)
1111 .partition(|(name, cid)| {
1112 self.hooks
1113 .get(&(name.clone(), *cid))
1114 .into_iter()
1115 .flatten()
1116 .any(|hook| {
1117 hook.current_decision().unwrap_or(false)
1118 || hook.can_make_nontrivial_decision()
1119 })
1120 });
1121
1122 self.possibly_ready_observation = ready_obs;
1123 self.not_ready_observation.append(&mut not_ready_obs);
1124
1125 if self.possibly_ready_ticks.is_empty()
1126 && self.possibly_ready_observation.is_empty()
1127 {
1128 self.quiescence.wait_for_resume().await;
1130 } else {
1131 let next_tick_or_obs = (0..(self.possibly_ready_ticks.len()
1132 + self.possibly_ready_observation.len()))
1133 .any();
1134
1135 if next_tick_or_obs < self.possibly_ready_ticks.len() {
1136 let next_tick = next_tick_or_obs;
1137 let mut removed = self.possibly_ready_ticks.remove(next_tick);
1138
1139 match &mut self.log {
1140 LogKind::Null => {}
1141 LogKind::Stderr => {
1142 if let Some(cid) = &removed.1 {
1143 eprintln!(
1144 "\n{}",
1145 format!("Running Tick (Cluster Member {})", cid)
1146 .color(colored::Color::Magenta)
1147 .bold()
1148 )
1149 } else {
1150 eprintln!(
1151 "\n{}",
1152 "Running Tick".color(colored::Color::Magenta).bold()
1153 )
1154 }
1155 }
1156 LogKind::Custom(writer) => {
1157 writeln!(
1158 writer,
1159 "\n{}",
1160 "Running Tick".color(colored::Color::Magenta).bold()
1161 )
1162 .unwrap();
1163 }
1164 }
1165
1166 let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
1167 write.write_str(&"*".color(colored::Color::Magenta).bold())?;
1168 write.write_str(" ")
1169 };
1170
1171 let mut tick_decision_writer = indenter::indented(&mut self.log)
1172 .with_format(indenter::Format::Custom {
1173 inserter: &mut asterisk_indenter,
1174 });
1175
1176 let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
1177 run_hooks(&mut tick_decision_writer, hooks);
1178
1179 let run_tick_future = removed.2.run_tick();
1180 if let Some(inline_hooks) =
1181 self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
1182 {
1183 let mut run_tick_future_pinned = pin!(run_tick_future);
1184
1185 loop {
1186 tokio::select! {
1187 biased;
1188 r = &mut run_tick_future_pinned => {
1189 assert!(r);
1190 break;
1191 }
1192 _ = async {} => {
1193 bolero_generator::any::scope::borrow_with(|driver| {
1194 for hook in inline_hooks.iter_mut() {
1195 if hook.pending_decision() {
1196 if !hook.has_decision() {
1197 hook.autonomous_decision(driver);
1198 }
1199
1200 hook.release_decision(&mut tick_decision_writer);
1201 }
1202 }
1203 });
1204 }
1205 }
1206 }
1207 } else {
1208 assert!(run_tick_future.await);
1209 }
1210
1211 self.possibly_ready_ticks.push(removed);
1212 } else {
1213 let next_obs = next_tick_or_obs - self.possibly_ready_ticks.len();
1214 let mut default_hooks = vec![];
1215 let hooks = self
1216 .hooks
1217 .get_mut(&self.possibly_ready_observation[next_obs])
1218 .unwrap_or(&mut default_hooks);
1219
1220 run_hooks(&mut self.log, hooks);
1221 }
1222 }
1223 }
1224 }
1225 }
1226}
1227
1228fn run_hooks(tick_decision_writer: &mut impl std::fmt::Write, hooks: &mut Vec<Box<dyn SimHook>>) {
1229 let mut remaining_decision_count = hooks.len();
1230 let mut made_nontrivial_decision = false;
1231
1232 bolero::generator::bolero_generator::any::scope::borrow_with(|driver| {
1233 hooks.iter_mut().for_each(|hook| {
1235 if let Some(is_nontrivial) = hook.current_decision() {
1236 made_nontrivial_decision |= is_nontrivial;
1237 remaining_decision_count -= 1;
1238 } else if !hook.can_make_nontrivial_decision() {
1239 hook.autonomous_decision(driver, false);
1243 remaining_decision_count -= 1;
1244 }
1245 });
1246
1247 hooks.iter_mut().for_each(|hook| {
1248 if hook.current_decision().is_none() {
1249 made_nontrivial_decision |= hook.autonomous_decision(
1250 driver,
1251 !made_nontrivial_decision && remaining_decision_count == 1,
1252 );
1253 remaining_decision_count -= 1;
1254 }
1255
1256 hook.release_decision(tick_decision_writer);
1257 });
1258 });
1259}