#!/usr/bin/env python3 import pandas as pd import re import argparse import os from pathlib import Path from typing import Optional, Dict, List class PGEDataProcessor: """ Class for processing PGE energy meter reading CSV reports. Converts wide-format hourly data into time-aggregated outputs. """ HOUR_COLUMNS = [f"H{str(i).zfill(2)}" for i in range(1, 25)] def __init__(self, input_file: str, separator: str = ";", encoding: str = "utf-8"): """ Initialize the PGE data processor. Args: input_file: Path to the input CSV file separator: CSV separator (default: ';') encoding: File encoding (default: 'utf-8', use 'cp1250' for Polish chars) """ self.input_file = Path(input_file) self.separator = separator self.encoding = encoding self.raw_data: Optional[pd.DataFrame] = None self.processed_data: Optional[pd.DataFrame] = None self.energy_types: Optional[List[str]] = None if not self.input_file.exists(): raise FileNotFoundError(f"Input file not found: {input_file}") @staticmethod def fix_value_string(val) -> str: """ Fix malformed kWh strings like ',123', '-,123', '-,' etc. Returns a clean string ready for float conversion. """ if pd.isna(val): return "0" s = str(val).strip() # Remove spaces, unexpected symbols s = s.replace(" ", "") # Convert comma decimal separator to dot s = s.replace(",", ".") # Case 1: starts with '.' → missing leading zero # '.123' => '0.123' if re.match(r"^\.\d+", s): s = "0" + s # Case 2: negative and missing zero # '-.123' => '-0.123' if re.match(r"^-\.\d+", s): s = s.replace("-.", "-0.") # Case 3: '-.' alone (just a malformed zero like '-,') if s in ["-.", "-", ".,", ",."]: return "0" # Case 4: string is '.' or ',' → it's actually zero if s in [".", ","]: return "0" # Case 5: something like '-,' becomes '-0.' if re.match(r"^-,$", s): return "0" return s def load_data(self) -> pd.DataFrame: """ Load and validate the input CSV data. Returns: Raw DataFrame from the CSV file """ print(f"Loading data from: {self.input_file}") try: self.raw_data = pd.read_csv( self.input_file, sep=self.separator, encoding=self.encoding ) except UnicodeDecodeError: print(f"Warning: UTF-8 encoding failed, trying cp1250...") self.raw_data = pd.read_csv( self.input_file, sep=self.separator, encoding="cp1250" ) self.encoding = "cp1250" # Validate required columns missing_hours = [c for c in self.HOUR_COLUMNS if c not in self.raw_data.columns] if missing_hours: raise ValueError(f"Missing hour columns in input: {missing_hours}") required_cols = ["DataOdczytu", "Kierunek"] missing_required = [c for c in required_cols if c not in self.raw_data.columns] if missing_required: raise ValueError(f"Missing required columns: {missing_required}") print(f"Loaded {len(self.raw_data)} rows with {len(self.raw_data.columns)} columns") # Detect unique energy types from Kierunek column self.energy_types = sorted(self.raw_data["Kierunek"].unique().tolist()) print(f"Detected energy types: {self.energy_types}") return self.raw_data def process_data(self) -> pd.DataFrame: """ Process the raw data: fix malformed values, convert to long format, add timestamps. Returns: Processed DataFrame in long format with timestamps """ if self.raw_data is None: self.load_data() print("Processing data...") df = self.raw_data.copy() # Fix malformed values in hour columns for col in self.HOUR_COLUMNS: df[col] = df[col].apply(self.fix_value_string) df[col] = df[col].astype(float) # Convert to long format: one row per day + type + hour df_long = df.melt( id_vars=["DataOdczytu", "Kierunek"], value_vars=self.HOUR_COLUMNS, var_name="HourCol", value_name="Value_kWh" ) # Extract hour number (1–24) df_long["Hour"] = df_long["HourCol"].str[1:].astype(int) # Convert date from YYYYMMDD format df_long["Date"] = pd.to_datetime( df_long["DataOdczytu"].astype(str), format="%Y%m%d" ) # Create full timestamp: H01 → 00:00, H24 → 23:00 df_long["Timestamp"] = df_long["Date"] + pd.to_timedelta( df_long["Hour"] - 1, unit="h" ) self.processed_data = df_long print(f"Processed data: {len(df_long)} rows") return df_long def generate_hourly_data(self) -> pd.DataFrame: """ Generate hourly aggregated data with energy types as columns. Returns: DataFrame with hourly data (one row per hour, columns for energy types) """ if self.processed_data is None: self.process_data() print("Generating hourly data...") # Pivot to wide format with timestamp index df_hourly = self.processed_data.pivot_table( index="Timestamp", columns="Kierunek", values="Value_kWh", aggfunc="sum" ).sort_index() # Reset index and format timestamp df_hourly_out = df_hourly.reset_index() df_hourly_out["Timestamp"] = df_hourly_out["Timestamp"].dt.strftime("%Y-%m-%d %H:%M") # Ensure consistent column ordering available_types = [t for t in self.energy_types if t in df_hourly_out.columns] columns = ["Timestamp"] + available_types df_hourly_out = df_hourly_out[columns] return df_hourly_out def generate_daily_data(self) -> pd.DataFrame: """ Generate daily aggregated data. Returns: DataFrame with daily totals """ if self.processed_data is None: self.process_data() print("Generating daily data...") # Group by date and energy type, sum the values df_daily = ( self.processed_data .groupby(["Date", "Kierunek"])["Value_kWh"] .sum() .reset_index() ) # Format date and pivot df_daily["Date_str"] = df_daily["Date"].dt.strftime("%Y-%m-%d") df_daily_pivot = df_daily.pivot_table( index="Date_str", columns="Kierunek", values="Value_kWh", aggfunc="sum" ).sort_index() df_daily_out = df_daily_pivot.reset_index().rename(columns={"Date_str": "Date"}) # Ensure consistent column ordering available_types = [t for t in self.energy_types if t in df_daily_out.columns] columns = ["Date"] + available_types df_daily_out = df_daily_out[columns] return df_daily_out def generate_weekly_data(self) -> pd.DataFrame: """ Generate weekly aggregated data (weeks starting Monday). Returns: DataFrame with weekly totals """ if self.processed_data is None: self.process_data() print("Generating weekly data...") # Set timestamp as index for resampling df_ts = self.processed_data.set_index("Timestamp") # Resample by week starting Monday df_weekly = ( df_ts .groupby("Kierunek") .resample("W-MON", label="left", closed="left")["Value_kWh"] .sum() .reset_index() ) # Format week start date df_weekly["WeekStart"] = df_weekly["Timestamp"].dt.strftime("%Y-%m-%d") # Pivot to wide format df_weekly_pivot = df_weekly.pivot_table( index="WeekStart", columns="Kierunek", values="Value_kWh", aggfunc="sum" ).sort_index() df_weekly_out = df_weekly_pivot.reset_index().rename(columns={"WeekStart": "WeekStart"}) # Ensure consistent column ordering available_types = [t for t in self.energy_types if t in df_weekly_out.columns] columns = ["WeekStart"] + available_types df_weekly_out = df_weekly_out[columns] return df_weekly_out def generate_monthly_data(self) -> pd.DataFrame: """ Generate monthly aggregated data. Returns: DataFrame with monthly totals """ if self.processed_data is None: self.process_data() print("Generating monthly data...") # Set timestamp as index for resampling df_ts = self.processed_data.set_index("Timestamp") # Resample by month start df_monthly = ( df_ts .groupby("Kierunek") .resample("MS")["Value_kWh"] # MS = Month Start .sum() .reset_index() ) # Format month df_monthly["Month"] = df_monthly["Timestamp"].dt.strftime("%Y-%m") # Pivot to wide format df_monthly_pivot = df_monthly.pivot_table( index="Month", columns="Kierunek", values="Value_kWh", aggfunc="sum" ).sort_index() df_monthly_out = df_monthly_pivot.reset_index() # Ensure consistent column ordering available_types = [t for t in self.energy_types if t in df_monthly_out.columns] columns = ["Month"] + available_types df_monthly_out = df_monthly_out[columns] return df_monthly_out def save_data(self, data: pd.DataFrame, output_file: str) -> None: """ Save DataFrame to CSV with consistent formatting. Args: data: DataFrame to save output_file: Output file path """ output_path = Path(output_file) output_path.parent.mkdir(parents=True, exist_ok=True) data.to_csv( output_path, sep=self.separator, encoding=self.encoding, index=False, float_format="%.3f" ) print(f"Saved: {output_path}") def get_output_filename(self, suffix: str) -> str: """ Generate output filename based on input filename and suffix. Args: suffix: Suffix to append (e.g., '-hourly', '-daily') Returns: Output filename """ stem = self.input_file.stem extension = self.input_file.suffix return str(self.input_file.parent / f"{stem}{suffix}{extension}") def main(): parser = argparse.ArgumentParser( description="Convert PGE energy meter CSV reports to time-aggregated formats" ) parser.add_argument( "input_file", help="Path to the input PGE CSV file" ) parser.add_argument( "--separator", "-s", default=";", help="CSV separator (default: ';')" ) parser.add_argument( "--encoding", "-e", default="utf-8", help="File encoding (default: 'utf-8', use 'cp1250' for Polish characters)" ) parser.add_argument( "--output-dir", "-o", help="Output directory (default: same as input file)" ) args = parser.parse_args() try: # Initialize processor processor = PGEDataProcessor( input_file=args.input_file, separator=args.separator, encoding=args.encoding ) # Process data processor.load_data() processor.process_data() # Generate and save hourly data hourly_data = processor.generate_hourly_data() hourly_output = processor.get_output_filename("-hourly") if args.output_dir: hourly_output = os.path.join(args.output_dir, os.path.basename(hourly_output)) processor.save_data(hourly_data, hourly_output) # Generate and save daily data daily_data = processor.generate_daily_data() daily_output = processor.get_output_filename("-daily") if args.output_dir: daily_output = os.path.join(args.output_dir, os.path.basename(daily_output)) processor.save_data(daily_data, daily_output) # Generate and save weekly data weekly_data = processor.generate_weekly_data() weekly_output = processor.get_output_filename("-weekly") if args.output_dir: weekly_output = os.path.join(args.output_dir, os.path.basename(weekly_output)) processor.save_data(weekly_data, weekly_output) # Generate and save monthly data monthly_data = processor.generate_monthly_data() monthly_output = processor.get_output_filename("-monthly") if args.output_dir: monthly_output = os.path.join(args.output_dir, os.path.basename(monthly_output)) processor.save_data(monthly_data, monthly_output) print("\nConversion completed successfully!") except Exception as e: print(f"Error: {e}") return 1 return 0 if __name__ == "__main__": exit(main())