Files
pge-raport-generator/raport_convert.py

432 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import pandas as pd
import re
import argparse
import os
from pathlib import Path
from typing import Optional, Dict, List
class PGEDataProcessor:
"""
Class for processing PGE energy meter reading CSV reports.
Converts wide-format hourly data into time-aggregated outputs.
"""
HOUR_COLUMNS = [f"H{str(i).zfill(2)}" for i in range(1, 25)]
def __init__(self, input_file: str, separator: str = ";", encoding: str = "utf-8"):
"""
Initialize the PGE data processor.
Args:
input_file: Path to the input CSV file
separator: CSV separator (default: ';')
encoding: File encoding (default: 'utf-8', use 'cp1250' for Polish chars)
"""
self.input_file = Path(input_file)
self.separator = separator
self.encoding = encoding
self.raw_data: Optional[pd.DataFrame] = None
self.processed_data: Optional[pd.DataFrame] = None
self.energy_types: Optional[List[str]] = None
if not self.input_file.exists():
raise FileNotFoundError(f"Input file not found: {input_file}")
@staticmethod
def fix_value_string(val) -> str:
"""
Fix malformed kWh strings like ',123', '-,123', '-,' etc.
Returns a clean string ready for float conversion.
"""
if pd.isna(val):
return "0"
s = str(val).strip()
# Remove spaces, unexpected symbols
s = s.replace(" ", "")
# Convert comma decimal separator to dot
s = s.replace(",", ".")
# Case 1: starts with '.' → missing leading zero
# '.123' => '0.123'
if re.match(r"^\.\d+", s):
s = "0" + s
# Case 2: negative and missing zero
# '-.123' => '-0.123'
if re.match(r"^-\.\d+", s):
s = s.replace("-.", "-0.")
# Case 3: '-.' alone (just a malformed zero like '-,')
if s in ["-.", "-", ".,", ",."]:
return "0"
# Case 4: string is '.' or ',' → it's actually zero
if s in [".", ","]:
return "0"
# Case 5: something like '-,' becomes '-0.'
if re.match(r"^-,$", s):
return "0"
return s
def load_data(self) -> pd.DataFrame:
"""
Load and validate the input CSV data.
Returns:
Raw DataFrame from the CSV file
"""
print(f"Loading data from: {self.input_file}")
try:
self.raw_data = pd.read_csv(
self.input_file,
sep=self.separator,
encoding=self.encoding
)
except UnicodeDecodeError:
print(f"Warning: UTF-8 encoding failed, trying cp1250...")
self.raw_data = pd.read_csv(
self.input_file,
sep=self.separator,
encoding="cp1250"
)
self.encoding = "cp1250"
# Validate required columns
missing_hours = [c for c in self.HOUR_COLUMNS if c not in self.raw_data.columns]
if missing_hours:
raise ValueError(f"Missing hour columns in input: {missing_hours}")
required_cols = ["DataOdczytu", "Kierunek"]
missing_required = [c for c in required_cols if c not in self.raw_data.columns]
if missing_required:
raise ValueError(f"Missing required columns: {missing_required}")
print(f"Loaded {len(self.raw_data)} rows with {len(self.raw_data.columns)} columns")
# Detect unique energy types from Kierunek column
self.energy_types = sorted(self.raw_data["Kierunek"].unique().tolist())
print(f"Detected energy types: {self.energy_types}")
return self.raw_data
def process_data(self) -> pd.DataFrame:
"""
Process the raw data: fix malformed values, convert to long format, add timestamps.
Returns:
Processed DataFrame in long format with timestamps
"""
if self.raw_data is None:
self.load_data()
print("Processing data...")
df = self.raw_data.copy()
# Fix malformed values in hour columns
for col in self.HOUR_COLUMNS:
df[col] = df[col].apply(self.fix_value_string)
df[col] = df[col].astype(float)
# Convert to long format: one row per day + type + hour
df_long = df.melt(
id_vars=["DataOdczytu", "Kierunek"],
value_vars=self.HOUR_COLUMNS,
var_name="HourCol",
value_name="Value_kWh"
)
# Extract hour number (124)
df_long["Hour"] = df_long["HourCol"].str[1:].astype(int)
# Convert date from YYYYMMDD format
df_long["Date"] = pd.to_datetime(
df_long["DataOdczytu"].astype(str),
format="%Y%m%d"
)
# Create full timestamp: H01 → 00:00, H24 → 23:00
df_long["Timestamp"] = df_long["Date"] + pd.to_timedelta(
df_long["Hour"] - 1, unit="h"
)
self.processed_data = df_long
print(f"Processed data: {len(df_long)} rows")
return df_long
def generate_hourly_data(self) -> pd.DataFrame:
"""
Generate hourly aggregated data with energy types as columns.
Returns:
DataFrame with hourly data (one row per hour, columns for energy types)
"""
if self.processed_data is None:
self.process_data()
print("Generating hourly data...")
# Pivot to wide format with timestamp index
df_hourly = self.processed_data.pivot_table(
index="Timestamp",
columns="Kierunek",
values="Value_kWh",
aggfunc="sum"
).sort_index()
# Reset index and format timestamp
df_hourly_out = df_hourly.reset_index()
df_hourly_out["Timestamp"] = df_hourly_out["Timestamp"].dt.strftime("%Y-%m-%d %H:%M")
# Ensure consistent column ordering
available_types = [t for t in self.energy_types if t in df_hourly_out.columns]
columns = ["Timestamp"] + available_types
df_hourly_out = df_hourly_out[columns]
return df_hourly_out
def generate_daily_data(self) -> pd.DataFrame:
"""
Generate daily aggregated data.
Returns:
DataFrame with daily totals
"""
if self.processed_data is None:
self.process_data()
print("Generating daily data...")
# Group by date and energy type, sum the values
df_daily = (
self.processed_data
.groupby(["Date", "Kierunek"])["Value_kWh"]
.sum()
.reset_index()
)
# Format date and pivot
df_daily["Date_str"] = df_daily["Date"].dt.strftime("%Y-%m-%d")
df_daily_pivot = df_daily.pivot_table(
index="Date_str",
columns="Kierunek",
values="Value_kWh",
aggfunc="sum"
).sort_index()
df_daily_out = df_daily_pivot.reset_index().rename(columns={"Date_str": "Date"})
# Ensure consistent column ordering
available_types = [t for t in self.energy_types if t in df_daily_out.columns]
columns = ["Date"] + available_types
df_daily_out = df_daily_out[columns]
return df_daily_out
def generate_weekly_data(self) -> pd.DataFrame:
"""
Generate weekly aggregated data (weeks starting Monday).
Returns:
DataFrame with weekly totals
"""
if self.processed_data is None:
self.process_data()
print("Generating weekly data...")
# Set timestamp as index for resampling
df_ts = self.processed_data.set_index("Timestamp")
# Resample by week starting Monday
df_weekly = (
df_ts
.groupby("Kierunek")
.resample("W-MON", label="left", closed="left")["Value_kWh"]
.sum()
.reset_index()
)
# Format week start date
df_weekly["WeekStart"] = df_weekly["Timestamp"].dt.strftime("%Y-%m-%d")
# Pivot to wide format
df_weekly_pivot = df_weekly.pivot_table(
index="WeekStart",
columns="Kierunek",
values="Value_kWh",
aggfunc="sum"
).sort_index()
df_weekly_out = df_weekly_pivot.reset_index().rename(columns={"WeekStart": "WeekStart"})
# Ensure consistent column ordering
available_types = [t for t in self.energy_types if t in df_weekly_out.columns]
columns = ["WeekStart"] + available_types
df_weekly_out = df_weekly_out[columns]
return df_weekly_out
def generate_monthly_data(self) -> pd.DataFrame:
"""
Generate monthly aggregated data.
Returns:
DataFrame with monthly totals
"""
if self.processed_data is None:
self.process_data()
print("Generating monthly data...")
# Set timestamp as index for resampling
df_ts = self.processed_data.set_index("Timestamp")
# Resample by month start
df_monthly = (
df_ts
.groupby("Kierunek")
.resample("MS")["Value_kWh"] # MS = Month Start
.sum()
.reset_index()
)
# Format month
df_monthly["Month"] = df_monthly["Timestamp"].dt.strftime("%Y-%m")
# Pivot to wide format
df_monthly_pivot = df_monthly.pivot_table(
index="Month",
columns="Kierunek",
values="Value_kWh",
aggfunc="sum"
).sort_index()
df_monthly_out = df_monthly_pivot.reset_index()
# Ensure consistent column ordering
available_types = [t for t in self.energy_types if t in df_monthly_out.columns]
columns = ["Month"] + available_types
df_monthly_out = df_monthly_out[columns]
return df_monthly_out
def save_data(self, data: pd.DataFrame, output_file: str) -> None:
"""
Save DataFrame to CSV with consistent formatting.
Args:
data: DataFrame to save
output_file: Output file path
"""
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
data.to_csv(
output_path,
sep=self.separator,
encoding=self.encoding,
index=False,
float_format="%.3f"
)
print(f"Saved: {output_path}")
def get_output_filename(self, suffix: str) -> str:
"""
Generate output filename based on input filename and suffix.
Args:
suffix: Suffix to append (e.g., '-hourly', '-daily')
Returns:
Output filename
"""
stem = self.input_file.stem
extension = self.input_file.suffix
return str(self.input_file.parent / f"{stem}{suffix}{extension}")
def main():
parser = argparse.ArgumentParser(
description="Convert PGE energy meter CSV reports to time-aggregated formats"
)
parser.add_argument(
"input_file",
help="Path to the input PGE CSV file"
)
parser.add_argument(
"--separator", "-s",
default=";",
help="CSV separator (default: ';')"
)
parser.add_argument(
"--encoding", "-e",
default="utf-8",
help="File encoding (default: 'utf-8', use 'cp1250' for Polish characters)"
)
parser.add_argument(
"--output-dir", "-o",
help="Output directory (default: same as input file)"
)
args = parser.parse_args()
try:
# Initialize processor
processor = PGEDataProcessor(
input_file=args.input_file,
separator=args.separator,
encoding=args.encoding
)
# Process data
processor.load_data()
processor.process_data()
# Generate and save hourly data
hourly_data = processor.generate_hourly_data()
hourly_output = processor.get_output_filename("-hourly")
if args.output_dir:
hourly_output = os.path.join(args.output_dir, os.path.basename(hourly_output))
processor.save_data(hourly_data, hourly_output)
# Generate and save daily data
daily_data = processor.generate_daily_data()
daily_output = processor.get_output_filename("-daily")
if args.output_dir:
daily_output = os.path.join(args.output_dir, os.path.basename(daily_output))
processor.save_data(daily_data, daily_output)
# Generate and save weekly data
weekly_data = processor.generate_weekly_data()
weekly_output = processor.get_output_filename("-weekly")
if args.output_dir:
weekly_output = os.path.join(args.output_dir, os.path.basename(weekly_output))
processor.save_data(weekly_data, weekly_output)
# Generate and save monthly data
monthly_data = processor.generate_monthly_data()
monthly_output = processor.get_output_filename("-monthly")
if args.output_dir:
monthly_output = os.path.join(args.output_dir, os.path.basename(monthly_output))
processor.save_data(monthly_data, monthly_output)
print("\nConversion completed successfully!")
except Exception as e:
print(f"Error: {e}")
return 1
return 0
if __name__ == "__main__":
exit(main())