Provide next epoch hints on reader health endpoint

This commit is contained in:
Matt Gibson
2026-01-22 16:08:52 -08:00
parent 45ba4968b9
commit 82776f78a6
5 changed files with 216 additions and 8 deletions

View File

@@ -139,7 +139,6 @@ impl ApplicationConfig {
self.reader.validate()?;
Ok(())
}
}
impl PublisherSettings {
@@ -197,6 +196,7 @@ impl From<&ApplicationConfig> for reader::ApplicationConfig {
installation_id: config.installation_id,
max_batch_lookup_size: config.reader.max_batch_lookup_size,
azks_poll_interval_ms: config.reader.azks_poll_interval_ms,
expected_epoch_duration_ms: config.publisher.epoch_duration_ms,
}
}
}

View File

@@ -18,6 +18,10 @@ pub struct ApplicationConfig {
/// Optional polling interval for AZKS storage in milliseconds Should be significantly less than the epoch interval. Defaults to 100 ms.
#[serde(default = "default_azks_poll_interval_ms")]
pub azks_poll_interval_ms: u64,
/// Expected duration between epoch publishes in milliseconds.
/// This value should match the publisher's epoch_duration_ms configuration.
/// Used to predict when the next epoch will be published.
pub expected_epoch_duration_ms: u64,
}
fn default_web_server_bind_address() -> String {
@@ -71,6 +75,13 @@ impl ApplicationConfig {
self.storage
.validate()
.map_err(|e| ConfigError::Message(format!("{e}")))?;
if self.expected_epoch_duration_ms == 0 {
return Err(ConfigError::Message(
"expected_epoch_duration_ms must be greater than 0".to_string(),
));
}
Ok(())
}

View File

@@ -0,0 +1,126 @@
use chrono::{DateTime, Utc};
use std::sync::Arc;
use tokio::sync::RwLock;
/// Tracks epoch publishes and provides prediction based on expected duration
#[derive(Debug, Clone)]
pub(crate) struct EpochTracker {
inner: Arc<RwLock<EpochTrackerInner>>,
}
#[derive(Debug)]
struct EpochTrackerInner {
last_publish_time: Option<DateTime<Utc>>,
expected_epoch_duration_ms: u64,
}
impl EpochTracker {
pub(crate) fn new(expected_epoch_duration_ms: u64) -> Self {
Self {
inner: Arc::new(RwLock::new(EpochTrackerInner {
last_publish_time: None,
expected_epoch_duration_ms,
})),
}
}
/// Record a new epoch publish
pub(crate) async fn record_publish(&self, published_at: DateTime<Utc>) {
let mut inner = self.inner.write().await;
inner.last_publish_time = Some(published_at);
}
/// Predict the next epoch publish time using modulus calculation
/// Returns (seconds_until_next, next_epoch_datetime) or None if no publish has been recorded yet
pub(crate) async fn predict_next_epoch(
&self,
now: DateTime<Utc>,
) -> Option<(f64, DateTime<Utc>)> {
let inner = self.inner.read().await;
let last_publish = inner.last_publish_time?;
// Calculate time since last publish
let duration_since_publish = now - last_publish;
let ms_since_publish = duration_since_publish.num_milliseconds();
// Use modulus to find time until next epoch
let ms_until_next = inner.expected_epoch_duration_ms as i64
- (ms_since_publish % inner.expected_epoch_duration_ms as i64);
// Calculate predicted next epoch time
let next_epoch_time = now + chrono::Duration::milliseconds(ms_until_next);
// Convert to seconds with tenths precision
let seconds_until = ms_until_next as f64 / 1000.0;
let rounded = (seconds_until * 10.0).ceil() / 10.0;
Some((rounded, next_epoch_time))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_tracker_no_prediction_before_publish() {
let tracker = EpochTracker::new(30000);
let now = Utc::now();
assert!(tracker.predict_next_epoch(now).await.is_none());
}
#[tokio::test]
async fn test_tracker_predicts_after_publish() {
let tracker = EpochTracker::new(30000); // 30 second epochs
let now = Utc::now();
let publish_time = now - chrono::Duration::seconds(10); // 10 seconds ago
tracker.record_publish(publish_time).await;
let prediction = tracker.predict_next_epoch(now).await;
assert!(prediction.is_some());
let (seconds_until, next_time) = prediction.unwrap();
// Should predict ~20 seconds until next (30 - 10)
assert!((seconds_until - 20.0).abs() < 0.2);
// Next epoch should be approximately 20 seconds from now
let expected = now + chrono::Duration::seconds(20);
let diff = (next_time - expected).num_seconds().abs();
assert!(diff < 1);
}
#[tokio::test]
async fn test_tracker_handles_epoch_skip() {
let tracker = EpochTracker::new(30000); // 30 second epochs
let now = Utc::now();
let publish_time = now - chrono::Duration::seconds(75); // 75 seconds ago (2.5 epochs)
tracker.record_publish(publish_time).await;
let prediction = tracker.predict_next_epoch(now).await;
assert!(prediction.is_some());
let (seconds_until, _) = prediction.unwrap();
// Should predict ~15 seconds until next (75 % 30 = 15, 30 - 15 = 15)
assert!((seconds_until - 15.0).abs() < 0.2);
}
#[tokio::test]
async fn test_tracker_updates_publish() {
let tracker = EpochTracker::new(30000);
let now = Utc::now();
let t1 = now - chrono::Duration::seconds(60);
let t2 = now - chrono::Duration::seconds(10);
tracker.record_publish(t1).await;
tracker.record_publish(t2).await;
let (seconds_until, _) = tracker.predict_next_epoch(now).await.unwrap();
// Should use the newer publish time (t2)
assert!((seconds_until - 20.0).abs() < 0.2);
}
}

View File

@@ -7,9 +7,12 @@ use tokio::{net::TcpListener, sync::broadcast::Receiver};
use tracing::{info, instrument};
mod config;
mod epoch_tracker;
pub mod error;
mod routes;
use epoch_tracker::EpochTracker;
pub use crate::config::ApplicationConfig;
pub use error::{ErrorCode, ErrorResponse, ReaderError};
pub use routes::response_types;
@@ -21,6 +24,7 @@ struct AppState {
// TODO: use this to allow for unique failures for lookup and key history requests that have pending updates
// publish_queue: ReadOnlyPublishQueueType,
max_batch_lookup_size: usize,
epoch_tracker: EpochTracker,
}
#[instrument(skip_all, name = "reader_start")]
@@ -37,11 +41,13 @@ pub async fn start(
let mut shutdown_rx = shutdown_rx.resubscribe();
let max_batch_lookup_size = config.max_batch_lookup_size;
let epoch_tracker = EpochTracker::new(config.expected_epoch_duration_ms);
let axum_handle = tokio::spawn(async move {
let app_state = AppState {
directory: directory.clone(),
// publish_queue: publish_queue,
max_batch_lookup_size,
epoch_tracker: epoch_tracker.clone(),
};
let app = Router::new()
@@ -58,16 +64,61 @@ pub async fn start(
);
// polls azks storage for epoch changes. This is necessary to pick up newly published updates.
let epoch_tracker_for_poll = epoch_tracker.clone();
let directory_for_poll = directory.clone();
let poll_interval = config.azks_poll_interval_ms;
let _poll_handle = tokio::spawn(async move {
let (change_tx, mut change_rx) = tokio::sync::mpsc::channel::<()>(100);
// Detector task: listens for changes and records to tracker
let detector_handle = tokio::spawn(async move {
let mut last_epoch = match directory_for_poll.get_epoch_hash().await {
Ok(epoch_hash) => {
tracing::info!(epoch = epoch_hash.0, "Initial epoch detected");
epoch_hash.0
}
Err(e) => {
tracing::error!("Failed to get initial epoch: {:?}", e);
0
}
};
while change_rx.recv().await.is_some() {
match directory_for_poll.get_epoch_hash().await {
Ok(epoch_hash) => {
let current_epoch = epoch_hash.0;
if current_epoch != last_epoch {
let published_at = chrono::Utc::now();
tracing::info!(
previous_epoch = last_epoch,
new_epoch = current_epoch,
published_at = %published_at.to_rfc3339(),
"Epoch publish detected"
);
epoch_tracker_for_poll.record_publish(published_at).await;
last_epoch = current_epoch;
}
}
Err(e) => {
tracing::error!("Failed to get epoch hash: {:?}", e);
}
}
}
});
let result = directory
.poll_for_azks_changes(
tokio::time::Duration::from_millis(config.azks_poll_interval_ms),
None,
tokio::time::Duration::from_millis(poll_interval),
Some(change_tx),
)
.await;
if let Err(e) = result {
tracing::error!("Error polling for AZKS changes: {:?}", e);
}
detector_handle.abort();
});
axum::serve(listener, app.into_make_service())

View File

@@ -1,19 +1,39 @@
use axum::{http::StatusCode, Json};
use axum::{extract::State, http::StatusCode, Json};
use serde::{Deserialize, Serialize};
use tracing::{info, instrument};
use crate::routes::Response;
use crate::{routes::Response, AppState};
#[derive(Debug, Serialize, Deserialize)]
pub struct HealthData {
time: String,
#[serde(skip_serializing_if = "Option::is_none")]
predicted_next_epoch_datetime: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
predicted_seconds_until_next_epoch: Option<f64>,
}
#[instrument(skip_all)]
pub async fn health_handler() -> (StatusCode, Json<Response<HealthData>>) {
pub async fn health_handler(
State(AppState { epoch_tracker, .. }): State<AppState>,
) -> (StatusCode, Json<Response<HealthData>>) {
info!("Handling server health request");
let time = chrono::Utc::now().to_rfc3339();
let now = chrono::Utc::now();
let time = now.to_rfc3339();
(StatusCode::OK, Json(Response::success(HealthData { time })))
let (predicted_seconds_until_next_epoch, predicted_next_epoch_datetime) = epoch_tracker
.predict_next_epoch(now)
.await
.map(|(seconds, datetime)| (Some(seconds), Some(datetime.to_rfc3339())))
.unwrap_or((None, None));
(
StatusCode::OK,
Json(Response::success(HealthData {
time,
predicted_next_epoch_datetime,
predicted_seconds_until_next_epoch,
})),
)
}