Skip to main content

hydro_lang/telemetry/
emf.rs

1//! AWS CloudWatch embedded metric format (EMF).
2//!
3//! <https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html>
4#[cfg(feature = "runtime_support")]
5use std::marker::Unpin;
6#[cfg(feature = "runtime_support")]
7use std::panic::AssertUnwindSafe;
8use std::time::Duration;
9#[cfg(feature = "runtime_support")]
10use std::time::SystemTime;
11
12#[cfg(feature = "runtime_support")]
13use dfir_rs::Never;
14#[cfg(feature = "runtime_support")]
15use dfir_rs::scheduled::metrics::{DfirMetrics, DfirMetricsIntervals};
16#[cfg(feature = "runtime_support")]
17use futures::FutureExt;
18use quote::quote;
19#[cfg(feature = "runtime_support")]
20use serde_json::json;
21use syn::parse_quote;
22#[cfg(feature = "runtime_support")]
23use tokio::io::{AsyncWrite, AsyncWriteExt};
24#[cfg(feature = "runtime_support")]
25use tokio_metrics::RuntimeMetrics;
26
27use crate::location::{LocationKey, LocationType};
28use crate::staging_util::get_this_crate;
29use crate::telemetry::Sidecar;
30
31/// Default file path for [`RecordMetricsSidecar`].
32pub const DEFAULT_FILE_PATH: &str = "/var/log/hydro/metrics.log";
33/// Default interval for [`RecordMetricsSidecar`].
34pub const DEFAULT_INTERVAL: Duration = Duration::from_secs(30);
35
36/// A sidecar which records metrics to a file via EMF.
37pub struct RecordMetricsSidecar {
38    file_path: String,
39    interval: Duration,
40}
41
42#[buildstructor::buildstructor]
43impl RecordMetricsSidecar {
44    /// Build an instance. Any `None` will be replaced with the default value.
45    #[builder]
46    pub fn new(file_path: Option<String>, interval: Option<Duration>) -> Self {
47        Self {
48            file_path: file_path.unwrap_or_else(|| DEFAULT_FILE_PATH.to_owned()),
49            interval: interval.unwrap_or(DEFAULT_INTERVAL),
50        }
51    }
52}
53
54impl Sidecar for RecordMetricsSidecar {
55    fn to_expr(
56        &self,
57        flow_name: &str,
58        _location_key: LocationKey,
59        _location_type: LocationType,
60        location_name: &str,
61        dfir_ident: &syn::Ident,
62    ) -> syn::Expr {
63        let Self {
64            file_path,
65            interval,
66        } = self;
67
68        let root = get_this_crate();
69        let namespace = flow_name.replace(char::is_whitespace, "_");
70        let interval: proc_macro2::TokenStream = {
71            let secs = interval.as_secs();
72            let nanos = interval.subsec_nanos();
73            quote!(::std::time::Duration::new(#secs, #nanos))
74        };
75
76        parse_quote! {
77            #root::telemetry::emf::record_metrics_sidecar(#dfir_ident.metrics_intervals(), #namespace, #location_name, #file_path, #interval)
78        }
79    }
80}
81
82/// Record both Dfir and Tokio metrics, at the given interval, forever.
83#[cfg(feature = "runtime_support")]
84#[doc(hidden)]
85pub fn record_metrics_sidecar(
86    mut dfir_intervals: DfirMetricsIntervals,
87    namespace: &'static str,
88    location_name: &'static str,
89    file_path: &'static str,
90    interval: Duration,
91) -> impl 'static + Future<Output = Never> {
92    assert!(!namespace.contains(char::is_whitespace));
93
94    async move {
95        // Attempt to create log file parent dir.
96        if let Some(parent_dir) = std::path::Path::new(file_path).parent()
97            && let Err(e) = tokio::fs::create_dir_all(parent_dir).await
98        {
99            // TODO(minwgei): use `tracing` once deployments set up tracing logging (setup moved out of stdout)
100            eprintln!("Failed to create log file directory for EMF metrics: {}", e);
101        }
102
103        // Only attempt to get Tokio runtime within async to be safe.
104        let rt_monitor = tokio_metrics::RuntimeMonitor::new(&tokio::runtime::Handle::current());
105        let mut rt_intervals = rt_monitor.intervals();
106
107        loop {
108            let _ = tokio::time::sleep(interval).await;
109
110            let dfir_metrics = dfir_intervals.take_interval();
111            let rt_metrics = rt_intervals.next().unwrap();
112
113            let unwind_result = AssertUnwindSafe(async {
114                let timestamp = SystemTime::now();
115
116                let file = tokio::fs::OpenOptions::new()
117                    .write(true)
118                    .create(true)
119                    .truncate(false)
120                    .append(true)
121                    .open(file_path)
122                    .await
123                    .expect("Failed to open log file for EMF metrics.");
124                let mut writer = tokio::io::BufWriter::new(file);
125
126                record_metrics_dfir(
127                    namespace,
128                    location_name,
129                    timestamp,
130                    dfir_metrics,
131                    &mut writer,
132                )
133                .await
134                .unwrap();
135
136                record_metrics_tokio(namespace, location_name, timestamp, rt_metrics, &mut writer)
137                    .await
138                    .unwrap();
139
140                writer.shutdown().await.unwrap();
141            })
142            .catch_unwind()
143            .await;
144
145            if let Err(panic_reason) = unwind_result {
146                // TODO(minwgei): use `tracing` once deployments set up tracing logging (setup coordination moved out of stdout)
147                eprintln!("Panic in metrics sidecar: {panic_reason:?}");
148            }
149        }
150    }
151}
152
153#[cfg(feature = "runtime_support")]
154/// Records DFIR metrics.
155async fn record_metrics_dfir<W>(
156    namespace: &str,
157    location_name: &str,
158    timestamp: SystemTime,
159    metrics: DfirMetrics,
160    writer: &mut W,
161) -> Result<(), std::io::Error>
162where
163    W: AsyncWrite + Unpin,
164{
165    let ts_millis = timestamp
166        .duration_since(SystemTime::UNIX_EPOCH)
167        .unwrap()
168        .as_millis();
169
170    // Handoffs
171    for (hoff_id, hoff_metrics) in metrics.handoffs.iter() {
172        let emf = json!({
173            "_aws": {
174                "Timestamp": ts_millis,
175                "CloudWatchMetrics": [
176                    {
177                        "Namespace": namespace,
178                        "Dimensions": [["LocationName"], ["LocationName", "HandoffId"]],
179                        "Metrics": [
180                            {"Name": "CurrItemsCount", "Unit": Unit::Count},
181                            {"Name": "TotalItemsCount", "Unit": Unit::Count},
182                        ]
183                    }
184                ]
185            },
186            "LocationName": location_name,
187            "HandoffId": format!("{:?}", hoff_id),
188            "CurrItemsCount": hoff_metrics.curr_items_count(),
189            "TotalItemsCount": hoff_metrics.total_items_count(),
190        })
191        .to_string();
192        writer.write_all(emf.as_bytes()).await?;
193        writer.write_u8(b'\n').await?;
194    }
195
196    // Subgraphs
197    for (sg_id, sg_metrics) in metrics.subgraphs.iter() {
198        let emf = json!({
199            "_aws": {
200                "Timestamp": ts_millis,
201                "CloudWatchMetrics": [
202                    {
203                        "Namespace": namespace,
204                        "Dimensions": [["LocationName"], ["LocationName", "SubgraphId"]],
205                        "Metrics": [
206                            {"Name": "TotalRunCount", "Unit": Unit::Count},
207                            {"Name": "TotalPollDuration", "Unit": Unit::Microseconds},
208                            {"Name": "TotalPollCount", "Unit": Unit::Count},
209                            {"Name": "TotalIdleDuration", "Unit": Unit::Microseconds},
210                            {"Name": "TotalIdleCount", "Unit": Unit::Count},
211                        ]
212                    }
213                ]
214            },
215            "LocationName": location_name,
216            "SubgraphId": format!("{:?}", sg_id),
217            "TotalRunCount": sg_metrics.total_run_count(),
218            "TotalPollDuration": sg_metrics.total_poll_duration().as_micros(),
219            "TotalPollCount": sg_metrics.total_poll_count(),
220            "TotalIdleDuration": sg_metrics.total_idle_duration().as_micros(),
221            "TotalIdleCount": sg_metrics.total_idle_count(),
222        })
223        .to_string();
224        writer.write_all(emf.as_bytes()).await?;
225        writer.write_u8(b'\n').await?;
226    }
227
228    Ok(())
229}
230
231#[cfg(feature = "runtime_support")]
232/// Records tokio runtime metrics.
233async fn record_metrics_tokio<W>(
234    namespace: &str,
235    location_name: &str,
236    timestamp: SystemTime,
237    rt_metrics: RuntimeMetrics,
238    writer: &mut W,
239) -> Result<(), std::io::Error>
240where
241    W: AsyncWrite + Unpin,
242{
243    let ts_millis = timestamp
244        .duration_since(SystemTime::UNIX_EPOCH)
245        .unwrap()
246        .as_millis();
247
248    // Tokio RuntimeMetrics
249    let emf = json!({
250        "_aws": {
251            "Timestamp": ts_millis,
252            "CloudWatchMetrics": [
253                {
254                    "Namespace": namespace,
255                    "Dimensions": [["LocationName"]],
256                    "Metrics": [
257                        // {"Name": "LiveTasksCount", "Unit": Unit::Count}, // https://github.com/tokio-rs/tokio-metrics/pull/108
258                        {"Name": "TotalBusyDuration", "Unit": Unit::Microseconds},
259                        {"Name": "GlobalQueueDepth", "Unit": Unit::Count},
260                    ]
261                }
262            ]
263        },
264        "LocationName": location_name,
265        // "LiveTasksCount": rt_metrics.live_tasks_count, // https://github.com/tokio-rs/tokio-metrics/pull/108
266        "TotalBusyDuration": rt_metrics.total_busy_duration.as_micros(),
267        "GlobalQueueDepth": rt_metrics.global_queue_depth,
268        // The rest of the tokio runtime metrics are `cfg(tokio_unstable)`
269    })
270    .to_string();
271    writer.write_all(emf.as_bytes()).await?;
272    writer.write_u8(b'\n').await?;
273
274    Ok(())
275}
276
277/// AWS CloudWatch EMF units.
278///
279/// <https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html#ACW-Type-MetricDatum-Unit>
280#[expect(missing_docs, reason = "self-explanatory")]
281#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
282pub enum Unit {
283    /// None
284    #[default]
285    None,
286    Seconds,
287    Microseconds,
288    Milliseconds,
289    Bytes,
290    Kilobytes,
291    Megabytes,
292    Gigabytes,
293    Terabytes,
294    Bits,
295    Kilobits,
296    Megabits,
297    Gigabits,
298    Terabits,
299    Percent,
300    Count,
301    #[serde(rename = "Bytes/Second")]
302    BytesPerSecond,
303    #[serde(rename = "Kilobytes/Second")]
304    KilobytesPerSecond,
305    #[serde(rename = "Megabytes/Second")]
306    MegabytesPerSecond,
307    #[serde(rename = "Gigabytes/Second")]
308    GigabytesPerSecond,
309    #[serde(rename = "Terabytes/Second")]
310    TerabytesPerSecond,
311    #[serde(rename = "Bits/Second")]
312    BitsPerSecond,
313    #[serde(rename = "Kilobits/Second")]
314    KilobitsPerSecond,
315    #[serde(rename = "Megabits/Second")]
316    MegabitsPerSecond,
317    #[serde(rename = "Gigabits/Second")]
318    GigabitsPerSecond,
319    #[serde(rename = "Terabits/Second")]
320    TerabitsPerSecond,
321    #[serde(rename = "Count/Second")]
322    CountPerSecond,
323}