432 lines
14 KiB
Python
432 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
import pandas as pd
|
||
import re
|
||
import argparse
|
||
import os
|
||
from pathlib import Path
|
||
from typing import Optional, Dict, List
|
||
|
||
|
||
class PGEDataProcessor:
|
||
"""
|
||
Class for processing PGE energy meter reading CSV reports.
|
||
Converts wide-format hourly data into time-aggregated outputs.
|
||
"""
|
||
|
||
HOUR_COLUMNS = [f"H{str(i).zfill(2)}" for i in range(1, 25)]
|
||
|
||
def __init__(self, input_file: str, separator: str = ";", encoding: str = "utf-8"):
|
||
"""
|
||
Initialize the PGE data processor.
|
||
|
||
Args:
|
||
input_file: Path to the input CSV file
|
||
separator: CSV separator (default: ';')
|
||
encoding: File encoding (default: 'utf-8', use 'cp1250' for Polish chars)
|
||
"""
|
||
self.input_file = Path(input_file)
|
||
self.separator = separator
|
||
self.encoding = encoding
|
||
self.raw_data: Optional[pd.DataFrame] = None
|
||
self.processed_data: Optional[pd.DataFrame] = None
|
||
self.energy_types: Optional[List[str]] = None
|
||
|
||
if not self.input_file.exists():
|
||
raise FileNotFoundError(f"Input file not found: {input_file}")
|
||
|
||
@staticmethod
|
||
def fix_value_string(val) -> str:
|
||
"""
|
||
Fix malformed kWh strings like ',123', '-,123', '-,' etc.
|
||
Returns a clean string ready for float conversion.
|
||
"""
|
||
if pd.isna(val):
|
||
return "0"
|
||
|
||
s = str(val).strip()
|
||
|
||
# Remove spaces, unexpected symbols
|
||
s = s.replace(" ", "")
|
||
|
||
# Convert comma decimal separator to dot
|
||
s = s.replace(",", ".")
|
||
|
||
# Case 1: starts with '.' → missing leading zero
|
||
# '.123' => '0.123'
|
||
if re.match(r"^\.\d+", s):
|
||
s = "0" + s
|
||
|
||
# Case 2: negative and missing zero
|
||
# '-.123' => '-0.123'
|
||
if re.match(r"^-\.\d+", s):
|
||
s = s.replace("-.", "-0.")
|
||
|
||
# Case 3: '-.' alone (just a malformed zero like '-,')
|
||
if s in ["-.", "-", ".,", ",."]:
|
||
return "0"
|
||
|
||
# Case 4: string is '.' or ',' → it's actually zero
|
||
if s in [".", ","]:
|
||
return "0"
|
||
|
||
# Case 5: something like '-,' becomes '-0.'
|
||
if re.match(r"^-,$", s):
|
||
return "0"
|
||
|
||
return s
|
||
|
||
def load_data(self) -> pd.DataFrame:
|
||
"""
|
||
Load and validate the input CSV data.
|
||
|
||
Returns:
|
||
Raw DataFrame from the CSV file
|
||
"""
|
||
print(f"Loading data from: {self.input_file}")
|
||
|
||
try:
|
||
self.raw_data = pd.read_csv(
|
||
self.input_file,
|
||
sep=self.separator,
|
||
encoding=self.encoding
|
||
)
|
||
except UnicodeDecodeError:
|
||
print(f"Warning: UTF-8 encoding failed, trying cp1250...")
|
||
self.raw_data = pd.read_csv(
|
||
self.input_file,
|
||
sep=self.separator,
|
||
encoding="cp1250"
|
||
)
|
||
self.encoding = "cp1250"
|
||
|
||
# Validate required columns
|
||
missing_hours = [c for c in self.HOUR_COLUMNS if c not in self.raw_data.columns]
|
||
if missing_hours:
|
||
raise ValueError(f"Missing hour columns in input: {missing_hours}")
|
||
|
||
required_cols = ["DataOdczytu", "Kierunek"]
|
||
missing_required = [c for c in required_cols if c not in self.raw_data.columns]
|
||
if missing_required:
|
||
raise ValueError(f"Missing required columns: {missing_required}")
|
||
|
||
print(f"Loaded {len(self.raw_data)} rows with {len(self.raw_data.columns)} columns")
|
||
|
||
# Detect unique energy types from Kierunek column
|
||
self.energy_types = sorted(self.raw_data["Kierunek"].unique().tolist())
|
||
print(f"Detected energy types: {self.energy_types}")
|
||
|
||
return self.raw_data
|
||
|
||
def process_data(self) -> pd.DataFrame:
|
||
"""
|
||
Process the raw data: fix malformed values, convert to long format, add timestamps.
|
||
|
||
Returns:
|
||
Processed DataFrame in long format with timestamps
|
||
"""
|
||
if self.raw_data is None:
|
||
self.load_data()
|
||
|
||
print("Processing data...")
|
||
df = self.raw_data.copy()
|
||
|
||
# Fix malformed values in hour columns
|
||
for col in self.HOUR_COLUMNS:
|
||
df[col] = df[col].apply(self.fix_value_string)
|
||
df[col] = df[col].astype(float)
|
||
|
||
# Convert to long format: one row per day + type + hour
|
||
df_long = df.melt(
|
||
id_vars=["DataOdczytu", "Kierunek"],
|
||
value_vars=self.HOUR_COLUMNS,
|
||
var_name="HourCol",
|
||
value_name="Value_kWh"
|
||
)
|
||
|
||
# Extract hour number (1–24)
|
||
df_long["Hour"] = df_long["HourCol"].str[1:].astype(int)
|
||
|
||
# Convert date from YYYYMMDD format
|
||
df_long["Date"] = pd.to_datetime(
|
||
df_long["DataOdczytu"].astype(str),
|
||
format="%Y%m%d"
|
||
)
|
||
|
||
# Create full timestamp: H01 → 00:00, H24 → 23:00
|
||
df_long["Timestamp"] = df_long["Date"] + pd.to_timedelta(
|
||
df_long["Hour"] - 1, unit="h"
|
||
)
|
||
|
||
self.processed_data = df_long
|
||
print(f"Processed data: {len(df_long)} rows")
|
||
return df_long
|
||
|
||
def generate_hourly_data(self) -> pd.DataFrame:
|
||
"""
|
||
Generate hourly aggregated data with energy types as columns.
|
||
|
||
Returns:
|
||
DataFrame with hourly data (one row per hour, columns for energy types)
|
||
"""
|
||
if self.processed_data is None:
|
||
self.process_data()
|
||
|
||
print("Generating hourly data...")
|
||
|
||
# Pivot to wide format with timestamp index
|
||
df_hourly = self.processed_data.pivot_table(
|
||
index="Timestamp",
|
||
columns="Kierunek",
|
||
values="Value_kWh",
|
||
aggfunc="sum"
|
||
).sort_index()
|
||
|
||
# Reset index and format timestamp
|
||
df_hourly_out = df_hourly.reset_index()
|
||
df_hourly_out["Timestamp"] = df_hourly_out["Timestamp"].dt.strftime("%Y-%m-%d %H:%M")
|
||
|
||
# Ensure consistent column ordering
|
||
available_types = [t for t in self.energy_types if t in df_hourly_out.columns]
|
||
columns = ["Timestamp"] + available_types
|
||
df_hourly_out = df_hourly_out[columns]
|
||
|
||
return df_hourly_out
|
||
|
||
def generate_daily_data(self) -> pd.DataFrame:
|
||
"""
|
||
Generate daily aggregated data.
|
||
|
||
Returns:
|
||
DataFrame with daily totals
|
||
"""
|
||
if self.processed_data is None:
|
||
self.process_data()
|
||
|
||
print("Generating daily data...")
|
||
|
||
# Group by date and energy type, sum the values
|
||
df_daily = (
|
||
self.processed_data
|
||
.groupby(["Date", "Kierunek"])["Value_kWh"]
|
||
.sum()
|
||
.reset_index()
|
||
)
|
||
|
||
# Format date and pivot
|
||
df_daily["Date_str"] = df_daily["Date"].dt.strftime("%Y-%m-%d")
|
||
|
||
df_daily_pivot = df_daily.pivot_table(
|
||
index="Date_str",
|
||
columns="Kierunek",
|
||
values="Value_kWh",
|
||
aggfunc="sum"
|
||
).sort_index()
|
||
|
||
df_daily_out = df_daily_pivot.reset_index().rename(columns={"Date_str": "Date"})
|
||
|
||
# Ensure consistent column ordering
|
||
available_types = [t for t in self.energy_types if t in df_daily_out.columns]
|
||
columns = ["Date"] + available_types
|
||
df_daily_out = df_daily_out[columns]
|
||
|
||
return df_daily_out
|
||
|
||
def generate_weekly_data(self) -> pd.DataFrame:
|
||
"""
|
||
Generate weekly aggregated data (weeks starting Monday).
|
||
|
||
Returns:
|
||
DataFrame with weekly totals
|
||
"""
|
||
if self.processed_data is None:
|
||
self.process_data()
|
||
|
||
print("Generating weekly data...")
|
||
|
||
# Set timestamp as index for resampling
|
||
df_ts = self.processed_data.set_index("Timestamp")
|
||
|
||
# Resample by week starting Monday
|
||
df_weekly = (
|
||
df_ts
|
||
.groupby("Kierunek")
|
||
.resample("W-MON", label="left", closed="left")["Value_kWh"]
|
||
.sum()
|
||
.reset_index()
|
||
)
|
||
|
||
# Format week start date
|
||
df_weekly["WeekStart"] = df_weekly["Timestamp"].dt.strftime("%Y-%m-%d")
|
||
|
||
# Pivot to wide format
|
||
df_weekly_pivot = df_weekly.pivot_table(
|
||
index="WeekStart",
|
||
columns="Kierunek",
|
||
values="Value_kWh",
|
||
aggfunc="sum"
|
||
).sort_index()
|
||
|
||
df_weekly_out = df_weekly_pivot.reset_index().rename(columns={"WeekStart": "WeekStart"})
|
||
|
||
# Ensure consistent column ordering
|
||
available_types = [t for t in self.energy_types if t in df_weekly_out.columns]
|
||
columns = ["WeekStart"] + available_types
|
||
df_weekly_out = df_weekly_out[columns]
|
||
|
||
return df_weekly_out
|
||
|
||
def generate_monthly_data(self) -> pd.DataFrame:
|
||
"""
|
||
Generate monthly aggregated data.
|
||
|
||
Returns:
|
||
DataFrame with monthly totals
|
||
"""
|
||
if self.processed_data is None:
|
||
self.process_data()
|
||
|
||
print("Generating monthly data...")
|
||
|
||
# Set timestamp as index for resampling
|
||
df_ts = self.processed_data.set_index("Timestamp")
|
||
|
||
# Resample by month start
|
||
df_monthly = (
|
||
df_ts
|
||
.groupby("Kierunek")
|
||
.resample("MS")["Value_kWh"] # MS = Month Start
|
||
.sum()
|
||
.reset_index()
|
||
)
|
||
|
||
# Format month
|
||
df_monthly["Month"] = df_monthly["Timestamp"].dt.strftime("%Y-%m")
|
||
|
||
# Pivot to wide format
|
||
df_monthly_pivot = df_monthly.pivot_table(
|
||
index="Month",
|
||
columns="Kierunek",
|
||
values="Value_kWh",
|
||
aggfunc="sum"
|
||
).sort_index()
|
||
|
||
df_monthly_out = df_monthly_pivot.reset_index()
|
||
|
||
# Ensure consistent column ordering
|
||
available_types = [t for t in self.energy_types if t in df_monthly_out.columns]
|
||
columns = ["Month"] + available_types
|
||
df_monthly_out = df_monthly_out[columns]
|
||
|
||
return df_monthly_out
|
||
|
||
def save_data(self, data: pd.DataFrame, output_file: str) -> None:
|
||
"""
|
||
Save DataFrame to CSV with consistent formatting.
|
||
|
||
Args:
|
||
data: DataFrame to save
|
||
output_file: Output file path
|
||
"""
|
||
output_path = Path(output_file)
|
||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
data.to_csv(
|
||
output_path,
|
||
sep=self.separator,
|
||
encoding=self.encoding,
|
||
index=False,
|
||
float_format="%.3f"
|
||
)
|
||
print(f"Saved: {output_path}")
|
||
|
||
def get_output_filename(self, suffix: str) -> str:
|
||
"""
|
||
Generate output filename based on input filename and suffix.
|
||
|
||
Args:
|
||
suffix: Suffix to append (e.g., '-hourly', '-daily')
|
||
|
||
Returns:
|
||
Output filename
|
||
"""
|
||
stem = self.input_file.stem
|
||
extension = self.input_file.suffix
|
||
return str(self.input_file.parent / f"{stem}{suffix}{extension}")
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description="Convert PGE energy meter CSV reports to time-aggregated formats"
|
||
)
|
||
parser.add_argument(
|
||
"input_file",
|
||
help="Path to the input PGE CSV file"
|
||
)
|
||
parser.add_argument(
|
||
"--separator", "-s",
|
||
default=";",
|
||
help="CSV separator (default: ';')"
|
||
)
|
||
parser.add_argument(
|
||
"--encoding", "-e",
|
||
default="utf-8",
|
||
help="File encoding (default: 'utf-8', use 'cp1250' for Polish characters)"
|
||
)
|
||
parser.add_argument(
|
||
"--output-dir", "-o",
|
||
help="Output directory (default: same as input file)"
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
try:
|
||
# Initialize processor
|
||
processor = PGEDataProcessor(
|
||
input_file=args.input_file,
|
||
separator=args.separator,
|
||
encoding=args.encoding
|
||
)
|
||
|
||
# Process data
|
||
processor.load_data()
|
||
processor.process_data()
|
||
|
||
# Generate and save hourly data
|
||
hourly_data = processor.generate_hourly_data()
|
||
hourly_output = processor.get_output_filename("-hourly")
|
||
if args.output_dir:
|
||
hourly_output = os.path.join(args.output_dir, os.path.basename(hourly_output))
|
||
processor.save_data(hourly_data, hourly_output)
|
||
|
||
# Generate and save daily data
|
||
daily_data = processor.generate_daily_data()
|
||
daily_output = processor.get_output_filename("-daily")
|
||
if args.output_dir:
|
||
daily_output = os.path.join(args.output_dir, os.path.basename(daily_output))
|
||
processor.save_data(daily_data, daily_output)
|
||
|
||
# Generate and save weekly data
|
||
weekly_data = processor.generate_weekly_data()
|
||
weekly_output = processor.get_output_filename("-weekly")
|
||
if args.output_dir:
|
||
weekly_output = os.path.join(args.output_dir, os.path.basename(weekly_output))
|
||
processor.save_data(weekly_data, weekly_output)
|
||
|
||
# Generate and save monthly data
|
||
monthly_data = processor.generate_monthly_data()
|
||
monthly_output = processor.get_output_filename("-monthly")
|
||
if args.output_dir:
|
||
monthly_output = os.path.join(args.output_dir, os.path.basename(monthly_output))
|
||
processor.save_data(monthly_data, monthly_output)
|
||
|
||
print("\nConversion completed successfully!")
|
||
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
return 1
|
||
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
exit(main()) |