Source code for kelvin.application.timer
"""Async timer with drift correction.
This module provides a Timer class for executing periodic tasks with
automatic clock drift correction to maintain consistent intervals.
"""
import asyncio
import time
from typing import AsyncIterator
import structlog
MAX_DRIFT_PERCENT = 0.30
MAX_DRIFT_CEILING = 1.2
logger = structlog.get_logger()
[docs]
class Timer:
"""A repeating async timer that corrects clock drift.
The Timer automatically adjusts sleep intervals to compensate for
execution time and system clock drift, ensuring consistent timing
over extended periods.
Attributes:
interval: The target interval between timer fires in seconds.
name: A descriptive name for the timer (used in logging).
max_drift: Maximum drift before logging a warning.
min_interval: Minimum allowed sleep interval.
max_interval: Maximum allowed sleep interval.
iteration: Number of times the timer has fired.
overlaps: Number of times execution exceeded the interval.
Example:
>>> timer = Timer(interval=5.0, name="my-timer")
>>> async for _ in timer:
... await do_periodic_work()
"""
def __init__(self, interval: float, name: str, max_drift_correction: float = 0.1):
self.interval = interval
self.name = name
# Log drift if it exceeds this threshold
self.max_drift = min(self.interval * MAX_DRIFT_PERCENT, MAX_DRIFT_CEILING)
# When drift correction is enabled, enforce minimum/maximum intervals
if self.interval > max_drift_correction:
self.min_interval = self.interval - max_drift_correction
self.max_interval = self.interval + max_drift_correction
else:
self.min_interval = self.max_interval = self.interval
# Initial time references
self.epoch = time.perf_counter()
self.last_wakeup_at = self.epoch
self.last_yield_at = self.epoch
self.iteration = 0
self.overlaps = 0
[docs]
async def __aiter__(self) -> AsyncIterator[float]:
"""An async iterator that yields the actual sleep interval each loop."""
while True:
sleep_time = self._tick()
await asyncio.sleep(sleep_time)
self.last_yield_at = time.perf_counter()
yield sleep_time
def _tick(self) -> float:
"""Calculate how long to sleep next time, correcting for drift."""
now = time.perf_counter()
# First iteration: just return the given interval
if self.last_yield_at == self.epoch:
self.iteration += 1
self.last_wakeup_at = now
return self.interval
time_spent_sleeping = self.last_yield_at - self.last_wakeup_at
time_spent_yielding = now - self.last_wakeup_at - time_spent_sleeping
drift = self.interval - time_spent_sleeping
new_interval = self.interval + drift
if drift > 0:
new_interval = min(new_interval, self.max_interval)
else:
new_interval = max(new_interval, self.min_interval)
logger.debug(
"Timer fired",
name=self.name,
iteration=self.iteration,
sleeptime=round(time_spent_sleeping, 6),
runtime=round(time_spent_yielding, 6),
drift=round(abs(drift), 6),
new_interval=round(new_interval, 6),
since_epoch=round(now - self.epoch, 6),
)
# Warn if the total time running exceeds the interval
if time_spent_yielding > self.interval:
self.overlaps += 1
logger.warning(
"Timer is overlapping",
name=self.name,
interval=round(self.interval, 6),
runtime=round(time_spent_yielding, 6),
)
self.iteration += 1
self.last_wakeup_at = now
self.interval = new_interval
return new_interval