Skip to content

Data Quality Output

Publishing Data Quality Messages

Kelvin SmartApps™ can perform custom validation checks on Data Stream data and publish the results as Data Quality messages back to the Kelvin Platform.

You can read all about Data Quality concept here and the app.yaml settings here.

app.yaml setup

Note

Custom Data Quality Applications must declare their outputs in the data_quality section.

The platform's built-in Data Quality monitoring (Data Availability, Timestamp Anomaly, Out-of-Range Detection, etc.) is configured through the Kelvin UI or API and automatically generates data quality messages.

Custom applications extend these capabilities by implementing specialized validation logic and publishing custom quality scores.

For publishing Data Quality information, the app.yaml needs to declare the Data Streams and validation types in the data quality outputs section.

app.yaml Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
...

# ------------------------------------------------------------------------------
# Data Streams Definitions
# ------------------------------------------------------------------------------
data_streams:
  inputs:
    - name: casing_pressure
      data_type: number
    - name: temperature
      data_type: number
    - name: speed
      data_type: number
  outputs: []

# ------------------------------------------------------------------------------
# Data Quality Definitions
# ------------------------------------------------------------------------------
data_quality:
  inputs: []
  outputs:
    - name: custom_anomaly_detection
      data_type: number
      data_streams:
        - casing_pressure
        - temperature
        - speed
    - name: custom_quality_score
      data_type: number
      data_streams:
        - casing_pressure
        - temperature
        - speed

...

Once configured, your application can calculate validation metrics and publish them as Data Quality messages for consumption by other applications or the Kelvin UI.

Publishing Quality Scores

The primary method for publishing Data Quality scores is to use the publish method with the appropriate Data Quality resource identifier.

Note

Data Quality scores are typically percentage values (0-100) representing the quality level of the data stream.

The platform aggregates these scores at both the datastream and asset levels.

Publishing Data Quality Scores
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from kelvin.application import KelvinApp
from kelvin.message import AssetDataMessage, Number, Message, KMessageTypeData
from kelvin.krn import KRNAssetDataStreamDataQuality

app = KelvinApp()

@app.stream()
async def handle(msg: AssetDataMessage):
    # Perform custom quality calculation
    quality_score = calculate_quality_score(msg.payload)

    # Publish the quality score
    await app.publish(
        Message(
            type=KMessageTypeData("number"),
            resource=KRNAssetDataStreamDataQuality(
                asset=msg.resource.asset,
                data_stream=msg.resource.data_stream,
                data_quality="custom_quality_score"
            ),
            payload=quality_score,
            timestamp=msg.timestamp
        )
    )

def calculate_quality_score(value: float) -> float:
    # Custom quality calculation logic
    # Return score between 0-100
    if value >= 90:
        return 100.0
    elif value >= 50:
        return 75.0
    else:
        return 50.0

app.run()

Publishing Validation Results

For binary validation results (pass/fail, anomaly detected/not detected), publish boolean or numeric indicators.

Publishing Anomaly Detection Results
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from kelvin.application import KelvinApp
from kelvin.message import AssetDataMessage, Message, KMessageTypeData
from kelvin.krn import KRNAssetDataStreamDataQuality
import statistics

app = KelvinApp()

# Store recent values for anomaly detection
recent_values = {}

@app.stream()
async def handle(msg: AssetDataMessage):
    stream_key = f"{msg.resource.asset}/{msg.resource.data_stream}"

    # Initialize storage for this stream
    if stream_key not in recent_values:
        recent_values[stream_key] = []

    # Add current value
    recent_values[stream_key].append(msg.payload)

    # Keep only last 100 values
    if len(recent_values[stream_key]) > 100:
        recent_values[stream_key].pop(0)

    # Detect anomalies using z-score
    if len(recent_values[stream_key]) >= 10:
        anomaly_detected = detect_anomaly(
            msg.payload, 
            recent_values[stream_key]
        )

        # Publish anomaly detection result (1 = anomaly, 0 = normal)
        await app.publish(
            Message(
                type=KMessageTypeData("number"),
                resource=KRNAssetDataStreamDataQuality(
                    asset=msg.resource.asset,
                    data_stream=msg.resource.data_stream,
                    data_quality="custom_anomaly_detection"
                ),
                payload=1 if anomaly_detected else 0,
                timestamp=msg.timestamp
            )
        )

def detect_anomaly(value: float, historical: list) -> bool:
    """Detect anomaly using simple z-score method"""
    if len(historical) < 10:
        return False

    mean = statistics.mean(historical)
    stdev = statistics.stdev(historical)

    if stdev == 0:
        return False

    z_score = abs((value - mean) / stdev)
    return z_score > 3.0  # Anomaly if z-score > 3

app.run()

Advanced: Multi-Stream Quality Assessment

For complex quality assessments involving multiple data streams, aggregate the analysis and publish comprehensive quality scores.

Multi-Stream Quality Assessment
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
import asyncio
from typing import AsyncGenerator
from datetime import datetime, timedelta
from kelvin.application import KelvinApp, filters
from kelvin.message import AssetDataMessage, Message, KMessageTypeData
from kelvin.krn import KRNAssetDataQuality

async def main() -> None:
    app = KelvinApp()
    await app.connect()

    # Track data quality metrics per asset
    asset_metrics = {}

    # Stream all incoming data
    data_stream: AsyncGenerator[AssetDataMessage, None] = app.stream_filter(
        filters.is_data_message
    )

    async for msg in data_stream:
        asset_name = msg.resource.asset
        stream_name = msg.resource.data_stream

        # Initialize asset tracking
        if asset_name not in asset_metrics:
            asset_metrics[asset_name] = {
                'streams': {},
                'last_calculation': None
            }

        # Track stream data
        if stream_name not in asset_metrics[asset_name]['streams']:
            asset_metrics[asset_name]['streams'][stream_name] = {
                'count': 0,
                'last_value': None,
                'last_timestamp': None
            }

        # Update metrics
        asset_metrics[asset_name]['streams'][stream_name]['count'] += 1
        asset_metrics[asset_name]['streams'][stream_name]['last_value'] = msg.payload
        asset_metrics[asset_name]['streams'][stream_name]['last_timestamp'] = msg.timestamp

        # Calculate and publish quality score every 5 minutes
        now = datetime.now()
        last_calc = asset_metrics[asset_name]['last_calculation']

        if last_calc is None or (now - last_calc) > timedelta(minutes=5):
            # Calculate overall asset quality score
            quality_score = calculate_asset_quality(
                asset_metrics[asset_name]['streams']
            )

            # Publish asset-level quality score
            await app.publish(
                Message(
                    type=KMessageTypeData("number"),
                    resource=KRNAssetDataQuality(
                        asset=asset_name,
                        data_quality="custom_quality_score"
                    ),
                    payload=quality_score,
                    timestamp=now
                )
            )

            asset_metrics[asset_name]['last_calculation'] = now

            print(f"Published quality score {quality_score}% for asset {asset_name}")

def calculate_asset_quality(streams: dict) -> float:
    """Calculate overall asset quality based on all streams"""
    if not streams:
        return 0.0

    total_score = 0.0
    stream_count = len(streams)

    for stream_name, metrics in streams.items():
        # Example: Score based on data freshness and availability
        if metrics['last_timestamp']:
            age_seconds = (datetime.now() - metrics['last_timestamp']).total_seconds()

            # Full score if data is less than 1 minute old
            if age_seconds < 60:
                stream_score = 100.0
            # Degraded score if data is 1-5 minutes old
            elif age_seconds < 300:
                stream_score = 75.0
            # Poor score if data is 5-15 minutes old
            elif age_seconds < 900:
                stream_score = 50.0
            # Zero score if data is older than 15 minutes
            else:
                stream_score = 0.0
        else:
            stream_score = 0.0

        total_score += stream_score

    return total_score / stream_count

if __name__ == "__main__":
    asyncio.run(main())

When deployed and running, if you look at the logs of the workload, you will see this output:

Quality Score Publishing Output
1
2
3
2025-10-14T06:00:00.002169184Z Published quality score 100.0% for asset pcp_02
2025-10-14T06:05:00.004776951Z Published quality score 95.5% for asset pcp_02
2025-10-14T06:10:00.007234123Z Published quality score 87.3% for asset pcp_02

Publishing Data Quality Types

The Kelvin Platform supports several standard Data Quality message types that can be published by custom applications.

Standard Data Quality Types

Type Resource Pattern Data Type Description
Data Availability krn:dqad:kelvin_data_availability:asset/stream number Percentage of expected data points received
Timestamp Anomaly krn:dqad:kelvin_timestamp_anomaly:asset/stream number Count of timestamp anomalies detected
Out of Range krn:dqad:kelvin_out_of_range_detection:asset/stream number Count of out-of-range values detected
Outlier Detection krn:dqad:kelvin_outlier_detection:asset/stream number Count of statistical outliers detected
Duplicate Detection krn:dqad:kelvin_duplicate_detection:asset/stream number Duplicate data detection indicator

Example: Publishing Data Availability

Publishing Data Availability Score
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from kelvin.application import KelvinApp
from kelvin.message import Message, KMessageTypeData
from kelvin.krn import KRNAssetDataStreamDataQuality
from datetime import datetime

app = KelvinApp()

async def publish_availability_score(
    asset: str,
    stream: str,
    expected_count: int,
    actual_count: int,
    timestamp: datetime
):
    """Calculate and publish data availability percentage"""

    # Calculate availability percentage
    availability = (actual_count / expected_count * 100) if expected_count > 0 else 0

    # Publish availability score
    await app.publish(
        Message(
            type=KMessageTypeData("number"),
            resource=KRNAssetDataStreamDataQuality(
                asset=asset,
                data_stream=stream,
                data_quality="kelvin_data_availability"
            ),
            payload=availability,
            timestamp=timestamp
        )
    )

    # Also publish the count
    await app.publish(
        Message(
            type=KMessageTypeData("number"),
            resource=KRNAssetDataStreamDataQuality(
                asset=asset,
                data_stream=stream,
                data_quality="kelvin_data_availability_count"
            ),
            payload=actual_count,
            timestamp=timestamp
        )
    )

    # And the rate
    rate = actual_count / expected_count if expected_count > 0 else 0
    await app.publish(
        Message(
            type=KMessageTypeData("number"),
            resource=KRNAssetDataStreamDataQuality(
                asset=asset,
                data_stream=stream,
                data_quality="kelvin_data_availability_rate"
            ),
            payload=rate,
            timestamp=timestamp
        )
    )

# Usage in stream handler
@app.stream()
async def handle(msg):
    # Every 5 minutes, calculate and publish availability
    await publish_availability_score(
        asset="pcp_01",
        stream="temperature",
        expected_count=5,
        actual_count=5,
        timestamp=datetime.now()
    )

app.run()

Example: Publishing Aggregated Asset Scores

Asset-level quality scores aggregate data from all monitored data streams for that asset.

Publishing Asset-Level Quality Score
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from kelvin.application import KelvinApp
from kelvin.message import Message, KMessageTypeData
from kelvin.krn import KRNAssetDataQuality
from datetime import datetime

app = KelvinApp()

async def publish_asset_quality_score(
    asset: str,
    datastream_scores: dict,
    timestamp: datetime
):
    """
    Calculate and publish asset-level quality score

    Args:
        asset: Asset name
        datastream_scores: Dictionary of {stream_name: quality_score}
        timestamp: Timestamp for the score
    """

    # Calculate average quality across all streams
    if datastream_scores:
        asset_score = sum(datastream_scores.values()) / len(datastream_scores)
    else:
        asset_score = 0.0

    # Publish asset-level quality score
    await app.publish(
        Message(
            type=KMessageTypeData("number"),
            resource=KRNAssetDataQuality(
                asset=asset,
                data_quality="custom_data_quality_score"
            ),
            payload=asset_score,
            timestamp=timestamp
        )
    )

    print(f"Published asset quality score: {asset_score}% for {asset}")

# Example usage
@app.stream()
async def handle(msg):
    # Collect scores from multiple streams
    scores = {
        'temperature': 98.5,
        'pressure': 95.2,
        'speed': 100.0
    }

    await publish_asset_quality_score(
        asset="pcp_01",
        datastream_scores=scores,
        timestamp=datetime.now()
    )

app.run()

Best Practices

Timestamp Consistency

Warning

Always use the original data timestamp when publishing quality scores to maintain temporal consistency.

Correct Timestamp Usage
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from kelvin.message import Message, KMessageTypeData
from kelvin.krn import KRNAssetDataStreamDataQuality

@app.stream()
async def handle(msg: AssetDataMessage):
    quality_score = analyze_quality(msg.payload)

    # Use the original message timestamp
    await app.publish(
        Message(
            type=KMessageTypeData("number"),
            resource=KRNAssetDataStreamDataQuality(
                asset=msg.resource.asset,
                data_stream=msg.resource.data_stream,
                data_quality="custom_quality_score"
            ),
            payload=quality_score
        )
    )

Score Normalization

Tip

Normalize all quality scores to a 0-100 scale for consistency with platform standards.

Score Normalization
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def normalize_score(value: float, min_val: float, max_val: float) -> float:
    """Normalize any value to 0-100 scale"""
    if max_val == min_val:
        return 100.0

    normalized = ((value - min_val) / (max_val - min_val)) * 100
    return max(0.0, min(100.0, normalized))  # Clamp to 0-100

# Usage
raw_score = 0.847  # Some metric between 0-1
quality_score = normalize_score(raw_score, 0.0, 1.0)
# Result: 84.7

Error Handling

Warning

Always handle edge cases and errors gracefully when calculating quality scores.

Robust Quality Calculation
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from kelvin.application import KelvinApp
from kelvin.message import AssetDataMessage, Message, KMessageTypeData
from kelvin.krn import KRNAssetDataStreamDataQuality

app = KelvinApp()

@app.stream()
async def handle(msg: AssetDataMessage):
    try:
        # Validate payload
        if msg.payload is None:
            quality_score = 0.0
        elif not isinstance(msg.payload, (int, float)):
            quality_score = 0.0
        else:
            quality_score = calculate_quality_score(msg.payload)

        # Ensure score is in valid range
        quality_score = max(0.0, min(100.0, quality_score))

        await app.publish(
            Message(
                type=KMessageTypeData("number"),
                resource=KRNAssetDataStreamDataQuality(
                    asset=msg.resource.asset,
                    data_stream=msg.resource.data_stream,
                    data_quality="custom_quality_score"
                ),
                payload=quality_score,
                timestamp=msg.timestamp
            )
        )

    except Exception as e:
        # Log error but don't crash the application
        print(f"Error calculating quality score: {e}")

        # Optionally publish a zero score to indicate calculation failure
        await app.publish(
            Message(
                type=KMessageTypeData("number"),
                resource=KRNAssetDataStreamDataQuality(
                    asset=msg.resource.asset,
                    data_stream=msg.resource.data_stream,
                    data_quality="custom_quality_score"
                ),
                payload=0.0,
                timestamp=msg.timestamp
            )
        )

app.run()

Batching for Performance

Tip

For high-frequency data streams, consider batching quality calculations to reduce computational load.

Batched Quality Publishing
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from kelvin.application import KelvinApp
from kelvin.message import AssetDataMessage, Message, KMessageTypeData
from kelvin.krn import KRNAssetDataStreamDataQuality
from datetime import datetime, timedelta, timezone
from collections import defaultdict

app = KelvinApp()

# Buffer for batching
message_buffer = defaultdict(list)
last_publish = {}

@app.stream()
async def handle(msg: AssetDataMessage):
    stream_key = f"{msg.resource.asset}/{msg.resource.data_stream}"

    # Add to buffer
    message_buffer[stream_key].append(msg)

    # Publish quality score every 1 minute or every 100 messages
    now = datetime.now(timezone.utc)
    should_publish = (
        stream_key not in last_publish or
        (now - last_publish[stream_key]) > timedelta(minutes=1) or
        len(message_buffer[stream_key]) >= 100
    )

    if should_publish:
        # Calculate quality from buffered messages
        quality_score = calculate_batch_quality(message_buffer[stream_key])

        # Publish
        await app.publish(
            Message(
                type=KMessageTypeData("number"),
                resource=KRNAssetDataStreamDataQuality(
                    asset=msg.resource.asset,
                    data_stream=msg.resource.data_stream,
                    data_quality="custom_quality_score"
                ),
                payload=quality_score,
                timestamp=msg.timestamp
            )
        )

        # Reset buffer and timestamp
        message_buffer[stream_key] = []
        last_publish[stream_key] = now

def calculate_batch_quality(messages: list) -> float:
    """Calculate quality score from a batch of messages"""
    if not messages:
        return 0.0

    # Example: Check for timestamp gaps
    timestamps = sorted([m.timestamp for m in messages])
    gaps = []

    for i in range(1, len(timestamps)):
        gap = (timestamps[i] - timestamps[i-1]).total_seconds()
        gaps.append(gap)

    # Quality degrades with larger gaps
    avg_gap = sum(gaps) / len(gaps) if gaps else 0

    if avg_gap <= 60:  # Expected 1 minute or less
        return 100.0
    elif avg_gap <= 300:  # Up to 5 minutes
        return 75.0
    elif avg_gap <= 900:  # Up to 15 minutes
        return 50.0
    else:
        return 25.0

app.run()

Monitoring Published Scores

Once your application publishes Data Quality scores, they can be consumed by:

  • Kelvin UI: Dashboard visualizations and alerts
  • Other SmartApps: Downstream processing and decision-making
  • API Clients: Programmatic access for reporting and analysis
  • Asset Insights: Aggregated quality metrics per asset

For information on consuming Data Quality messages, see the Data Quality Data documentation.