Windowing
Introduction
This document provides a comprehensive guide on the implementation and usage of windowing classes in the Kelvin SDK, which are crucial for processing streaming data. These classes allow for the effective management and analysis of time-series data from various sources, utilizing different windowing techniques based on time or count.
The Kelvin SDK facilitates three primary windowing methods:
- Tumbling Window: This method uses fixed-size, non-overlapping windows that reset after each window period, making it suitable for isolated data analysis in each segment.
- Hopping Window: These windows are also fixed-size but can overlap based on a specified hop size, allowing for more frequent data analysis intervals.
- Rolling Window: Unlike the other types, this method creates fixed-size windows based on the count of messages, not on time intervals.
This guide elaborates on the specific mechanisms of each windowing method, providing usage examples and explaining how to configure their parameters effectively.
Clock and Message Timestamp
In the Kelvin SDK, precise time handling is crucial for effectively managing windows during data processing. Each input message processed by an application contains a timestamp. This timestamp is represented as a Python datetime object, indicating the precise moment data was acquired by Kelvin from its source. This temporal marker is essential for accurately positioning the message within its respective window, ensuring that temporal calculations are handled correctly.
Info
It is important to note that windows are always ordered by timestamp, ensuring that temporal sequencing is maintained across the data processed.
Lateness and Out-of-Order Events
-
Lateness in Data: Lateness refers to instances when data arrives after the processing of the window to which it should belong. This issue, often due to network delays, can impact the timeliness and accuracy of data analytics. Kelvin handles this by incorporating late-arriving data into the respective window if it is still open. If the window has already been closed, the data is disregarded and not processed.
-
Out-of-Order Events: Out-of-order events occur when messages arrive not in the sequence of their timestamps. Despite the arrival order, the window methods ensure that these events are integrated into the correct windows based on their timestamps. Windows are always ordered by timestamp, ensuring that temporal sequencing is maintained across the data processed.
Inclusivity and Exclusivity of Time Windows
Time-based windows are inclusive on the left and exclusive on the right, meaning that the data point at the start of the interval is included, while the data point at the end is not. This distinction is crucial for ensuring data points are not duplicated across windows.
Tumbling Window
A tumbling window represents a sequence of fixed-size, non-overlapping, and contiguous time intervals. This concept is essential for dividing data into distinct, independent sections, each of which is processed separately. Tumbling windows are particularly suited to tasks requiring isolated, consecutive computations on data segments that do not intersect.
Example of temperature readings with a tumbling window of 10 seconds.
Original Data:
Time: [00:00] 25°C
Time: [00:04] 26°C
Time: [00:07] 27°C
Time: [00:12] 28°C
Time: [00:18] 26°C
Time: [00:22] 29°C
Time: [00:27] 30°C
Time: [00:31] 27°C
Time: [00:35] 25°C
Windows:
Time: [00:00, 00:10): [25°C (00:00), 26°C (00:04), 27°C (00:07)]
Time: [00:10, 00:20): [28°C (00:12), 26°C (00:18)]
Time: [00:20, 00:30): [29°C (00:22), 30°C (00:27)]
Time: [00:30, 00:40): [27°C (00:31), 25°C (00:35)]
Each window (W1, W2, etc.) processes data that falls within its respective 10-second time frame. No data overlaps between the windows.
Usage Example:
app = KelvinApp()
await app.connect()
# Streaming data in 5-minute tumbling windows
async for asset_name, df in app.tumbling_window(window_size=timedelta(minutes=5)).stream():
print(asset_name, df)
This will automatically process data in 5-minute intervals for all datastreams defined in your app.yaml and for all assets your app is running on. Each iteration of the loop will return:
asset_name: the unique identifier of the assetdf: a Pandas DataFrame containing the data for that asset within the 5-minute window. This DataFrame’s index is timestamp-based, with the values in monotonic increasing order.
Filering Data Streams:
To narrow down the processing to specific data streams within a window, you can specify which streams to include:
Info
The order of data streams in the list specifies the order of the columns within the DataFrame produced by the window.
async for asset_name, df in app.tumbling_window(window_size=timedelta(minutes=5),
datastreams=['temperature', 'pressure']).stream():
print(asset_name, df)
Filtering Assets:
Similarly, you can limit the data processing to certain assets by specifying them explicitly:
async for asset_name, df in app.tumbling_window(window_size=timedelta(minutes=5),
assets=['asset1', 'asset2']).stream():
print(asset_name, df)
Aligning Data:
The round_to parameter is designed to align message timestamps to the nearest specified time unit, such as a second, minute, or hour. This functionality is particularly beneficial for ensuring that rows within a DataFrame are synchronized, which is crucial for consistent data analysis and reporting.
# Aligning windows to the nearest minute
async for asset_name, df in app.tumbling_window(window_size=timedelta(minutes=5),
round_to=timedelta(minutes=1)).stream():
print(asset_name, df)
Hopping Window
A hopping window, also known as a sliding window, is a series of fixed-sized, overlapping intervals where each window “hops” forward by a specified time interval (hop size). It is suitable for applications where data points may need to be included in multiple intervals for continuous calculations.
Example of temperature readings with a hopping window size of 10 seconds and a hop size of 5 seconds:
Original Data:
Time: [00:00] 25°C
Time: [00:04] 26°C
Time: [00:07] 27°C
Time: [00:12] 28°C
Time: [00:18] 26°C
Time: [00:22] 29°C
Time: [00:27] 30°C
Time: [00:31] 27°C
Time: [00:35] 25°C
Windows:
Time: [00:00, 00:10): [25°C (00:00), 26°C (00:04), 27°C (00:07)]
Time: [00:05, 00:15): [26°C (00:04), 27°C (00:07), 28°C (00:12)]
Time: [00:10, 00:20): [28°C (00:12), 26°C (00:18)]
Time: [00:15, 00:25): [26°C (00:18), 29°C (00:22)]
Time: [00:20, 00:30): [29°C (00:22), 30°C (00:27)]
Time: [00:25, 00:35): [30°C (00:27), 27°C (00:31)]
Time: [00:30, 00:40): [27°C (00:31), 25°C (00:35)]
Here, each new window starts every 5 seconds, causing overlapping between windows (e.g., 26°C and 29°C appear in multiple windows).
Usage Example:
app = KelvinApp()
await app.connect()
# Processing data using a 5-minute hopping window with a 2-minute hop size
for asset_name, df in app.hopping_window(window_size=timedelta(minutes=5),
hop_size=timedelta(minutes=2)).stream():
print(asset_name, df)
This code snippet will stream data from all configured data streams and assets, processing it in 5-minute windows that move forward by 2 minutes after each iteration.
Filering Data Streams:
To narrow down the processing to specific data streams within a window, you can specify which streams to include:
Info
The order of data streams in the list specifies the order of the columns within the DataFrame produced by the window.
# Processing data using a 5-minute hopping window with a 2-minute hop size
async for asset_name, df in app.hopping_window(window_size=timedelta(minutes=5),
hop_size=timedelta(minutes=2),
datastreams=['temperature', 'pressure']).stream():
print(asset_name, df)
Filtering Assets:
Similarly, you can limit the data processing to certain assets by specifying them explicitly:
# Processing data using a 5-minute hopping window with a 2-minute hop size
async for asset_name, df in app.hopping_window(window_size=timedelta(minutes=5),
hop_size=timedelta(minutes=2),
assets=['asset1', 'asset2']).stream():
print(asset_name, df)
Aligning Data:
The round_to parameter is designed to align message timestamps to the nearest specified time unit, such as a second, minute, or hour. This functionality is particularly beneficial for ensuring that rows within a DataFrame are synchronized, which is crucial for consistent data analysis and reporting.
# Processing data using a 5-minute hopping window with a 2-minute hop size
async for asset_name, df in app.hopping_window(window_size=timedelta(minutes=5),
hop_size=timedelta(minutes=2),
round_to=timedelta(minutes=1)).stream():
print(asset_name, df)
Rolling Window
A rolling window, also known as a moving window, calculates over a window of a specified number of data points (count size). Unlike time-based windows, it moves one observation at a time. This method is ideal for smoothing or averaging data points directly.
Example of temperature readings with a rolling window of 3 elements:
Original Data:
Time: [00:00] 25°C
Time: [00:04] 26°C
Time: [00:07] 27°C
Time: [00:12] 28°C
Time: [00:18] 26°C
Time: [00:22] 29°C
Time: [00:27] 30°C
Time: [00:31] 27°C
Time: [00:35] 25°C
Windows:
Window 1: [25°C (00:00), 26°C (00:04), 27°C (00:07)]
Window 2: [26°C (00:04), 27°C (00:07), 28°C (00:12)]
Window 3: [27°C (00:07), 28°C (00:12), 26°C (00:18)]
Window 4: [28°C (00:12), 26°C (00:18), 29°C (00:22)]
Window 5: [26°C (00:18), 29°C (00:22), 30°C (00:27)]
Window 6: [29°C (00:22), 30°C (00:27), 27°C (00:31)]
Window 7: [30°C (00:27), 27°C (00:31), 25°C (00:35)]
Each window contains 3 events, and as new data arrives, the window shifts forward, always keeping the latest 3 events in focus.
Usage Example:
app = KelvinApp()
await app.connect()
async for asset_name, df in app.rolling_window(count_size=5).stream():
print(asset_name, df)
Filering Data Streams:
To narrow down the processing to specific data streams within a window, you can specify which streams to include:
Info
The order of data streams in the list specifies the order of the columns within the DataFrame produced by the window.
async for asset_name, df in app.rolling_window(count_size=5,
datastreams=['temperature', 'pressure']).stream():
print(asset_name, df)
Filtering Assets:
Similarly, you can limit the data processing to certain assets by specifying them explicitly:
async for asset_name, df in app.rolling_window(count_size=5, a
assets=['asset1', 'asset2']).stream():
print(asset_name, df)
Aligning Data:
The round_to parameter is designed to align message timestamps to the nearest specified time unit, such as a second, minute, or hour. This functionality is particularly beneficial for ensuring that rows within a DataFrame are synchronized, which is crucial for consistent data analysis and reporting.
async for asset_name, df in app.rolling_window(count_size=5,
round_to=timedelta(minutes=1)).stream():
print(asset_name, df)