Source code for kelvin.config.parser

from __future__ import annotations

import os
from dataclasses import dataclass
from pathlib import Path

import yaml
from yaml.parser import ParserError
from yaml.scanner import ScannerError

from .appyaml import AppYaml
from .common import AppTypes, ConfigError
from .exporter import ExporterConfig
from .external import ExternalConfig
from .importer import ImporterConfig
from .manifest import AppManifest
from .smart_app import SmartAppConfig


[docs] def missing_config_error(file_path: str) -> str: return f"Config file {file_path} does not exist."
[docs] def invalid_yaml_error(file_path: str) -> str: return f"Invalid YAML in config file {file_path}."
def _validate_file_path(file_path: str) -> Path: """ Validates and resolves a file path to prevent path traversal attacks. Args: file_path (str): The file path to validate. Returns: Path: The validated absolute path. Raises: ConfigError: If the path is invalid or suspicious. """ try: # Convert to Path object and resolve to absolute path path = Path(file_path).resolve() # Check if file exists (moved from parse_config_file) if not path.exists(): raise ConfigError(missing_config_error(file_path)) # Ensure it's a file, not a directory if not path.is_file(): raise ConfigError(f"Path {file_path} is not a file.") return path except (OSError, RuntimeError) as e: raise ConfigError(f"Invalid file path {file_path}: {e}") from e
[docs] @dataclass class AppConfigObj: name: str version: str type: AppTypes config: ExporterConfig | ImporterConfig | SmartAppConfig | ExternalConfig | AppYaml
[docs] def is_legacy(self) -> bool: return self.type in [AppTypes.kelvin_app, AppTypes.bridge, AppTypes.legacy_docker]
[docs] def to_app_manifest(self, read_schemas: bool = True, workdir: Path = Path(".")) -> AppManifest: return self.config.to_manifest(read_schemas=read_schemas, workdir=workdir)
[docs] def parse_config_file(file_path: str) -> AppConfigObj: """ Parses a YAML configuration file and returns an AppConfigObj. Args: file_path (str): The path to the configuration file. Raises: ConfigError: If the file does not exist, path is invalid, or contains invalid YAML. Returns: AppConfigObj: The parsed configuration object. """ # Validate and resolve the file path validated_path = _validate_file_path(file_path) try: with open(validated_path, encoding="utf-8") as file: config = yaml.safe_load(file) except (ParserError, ScannerError) as e: raise ConfigError(invalid_yaml_error(file_path)) from e return parse_config(config)
[docs] def parse_config(config: dict) -> AppConfigObj: app_type = config.get("type") if app_type is None and config.get("app", {}).get("type"): app_type = config["app"]["type"] if app_type == AppTypes.docker: # Convert type to legacy docker app_type = AppTypes.legacy_docker if app_type == AppTypes.app: smart_app_conf = SmartAppConfig.model_validate(config) return AppConfigObj( name=smart_app_conf.name, version=smart_app_conf.version, type=smart_app_conf.type, config=smart_app_conf ) elif app_type == AppTypes.importer: importer_conf = ImporterConfig.model_validate(config) return AppConfigObj( name=importer_conf.name, version=importer_conf.version, type=importer_conf.type, config=importer_conf ) elif app_type == AppTypes.exporter: exporter_conf = ExporterConfig.model_validate(config) return AppConfigObj( name=exporter_conf.name, version=exporter_conf.version, type=exporter_conf.type, config=exporter_conf ) elif app_type == AppTypes.docker: external_conf = ExternalConfig.model_validate(config) return AppConfigObj( name=external_conf.name, version=external_conf.version, type=external_conf.type, config=external_conf ) elif app_type in [AppTypes.kelvin_app, AppTypes.bridge, AppTypes.legacy_docker]: app_type_enum = AppTypes(app_type) app_conf = AppYaml.model_validate(config) return AppConfigObj( name=app_conf.info.name, version=app_conf.info.version, type=app_type_enum, config=app_conf, ) else: raise ConfigError(f"Unknown app type: {app_type}")