Data Windows
Data windows aggregate incoming data over time or count, returning pandas DataFrames for analysis. All window operations require the ai optional dependency.
Note
The examples given here are just the basics.
For more detailed and specific examples on windowing in Kelvin, you can read about them in the Develop SmartApps section here.
Tumbling Window
Non-overlapping, fixed-size time windows:
| Basic Tumbling Window Python Example |
|---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 | import asyncio
from datetime import datetime, timedelta
from kelvin.application import KelvinApp
app = KelvinApp()
async def main():
await app.connect()
# Process data in 10-second non-overlapping windows
window_start = datetime.now()
async for asset_name, df in app.tumbling_window(
window_size=timedelta(seconds=10)
).stream(window_start):
print(f"Asset: {asset_name}")
print(df) # pandas DataFrame with all data in window
if __name__ == "__main__":
asyncio.run(main())
|
Use case: Aggregate data every N seconds for batch processing (e.g., computing averages every 10 seconds).
Hopping Window
Overlapping, fixed-size windows with a configurable hop interval:
| Basic Hopping Window Python Example |
|---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 | import asyncio
from datetime import datetime, timedelta
from kelvin.application import KelvinApp
app = KelvinApp()
async def main():
await app.connect()
# 10-second windows, moving forward by 5 seconds each time
window_start = datetime.now()
async for asset_name, df in app.hopping_window(
window_size=timedelta(seconds=10),
hop_size=timedelta(seconds=5)
).stream(window_start=window_start):
print(f"Asset: {asset_name}")
print(df) # pandas DataFrame with overlapping data
if __name__ == "__main__":
asyncio.run(main())
|
Use case: Sliding window analysis where you want overlapping data (e.g., moving averages with 50% overlap).
Rolling Window
Count-based windows that slide with each new message:
| Basic Rolling Window Python Example |
|---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 | import asyncio
from kelvin.application import KelvinApp
app = KelvinApp()
async def main():
await app.connect()
# Window of last 5 messages, slides by 2 messages
async for asset_name, df in app.rolling_window(
count_size=5,
slide=2
).stream():
print(f"Asset: {asset_name}")
print(df) # pandas DataFrame with last 5 messages
if __name__ == "__main__":
asyncio.run(main())
|
Use case: Process the last N data points (e.g., calculate trend over the last 10 readings).