# examples/process_ec_data.py
"""Example script for processing EC data using nova_fde.
This script demonstrates how to use the nova_fde package to process
Energy Community (EC) data with proper database authentication,
error handling, and performance analysis.
"""
import argparse
import sys
from pathlib import Path
import pandas as pd
from nova_fde.core.engine_factory import EngineFactory, fixed_create_with_auto_auth
from nova_fde.core.processor import DataProcessor
from nova_fde.utils import setup_logging
def process_ec_data(
data_frames: dict[str, pd.DataFrame], processor: DataProcessor
) -> pd.DataFrame:
"""
Process Energy Community data with project-specific logic.
This function takes data frames retrieved from SQL queries and
processes them to create a unified dataset of EC systems.
Parameters
----------
data_frames : dict[str, pd.DataFrame]
Dictionary containing the data frames retrieved from SQL queries.
Expected keys: "systems", "snh", "block".
processor : DataProcessor
Data processor object providing utility methods for data processing.
Returns
-------
pd.DataFrame
Processed and merged dataset containing EC system information.
Notes
-----
The processing steps include:
- Deduplicating columns
- Filtering for active systems
- Merging data from multiple sources
- Converting date columns to datetime format
"""
# Process systems data
systems = data_frames["systems"]
# Use dedicated processor method if available, otherwise apply standard processing
if hasattr(processor, "process_systems_data"):
processed_systems = processor.process_systems_data(systems)
else:
# Apply standard processing if the specialized method isn't available
processed_systems = processor.optimize_memory(systems)
# Deduplicate columns to avoid conflicts
processed_systems, column_mapping = processor.deduplicate_columns(processed_systems)
# Filter active systems
processed_systems = processed_systems[
processed_systems["Status"].isin(["InService", "Under Construction"])
]
# Process SNH data
snh_data = data_frames["snh"]
snh_data = snh_data.drop_duplicates(subset="System Name", keep="first")
# Process block info
block_info = data_frames["block"]
block_info = block_info.drop_duplicates(subset="System Name", keep="first")
# Merge datasets
final_systems = processed_systems.merge(
snh_data[["System Name", "System Type"]], on="System Name", how="left"
).merge(block_info, on="System Name", how="left")
# Remove duplicates from final dataset
final_systems = final_systems.drop_duplicates(subset="System Name", keep="first")
# Process dates - convert string dates to datetime objects with proper error handling
date_columns = [
"Registration Screenshot Requirement Received Date",
"Commissioning Package Received Date",
"Power Producing On",
]
for col in date_columns:
if col in final_systems.columns:
final_systems[col] = pd.to_datetime(final_systems[col], errors="coerce")
return final_systems
def main():
"""
Execute the EC data processing workflow.
This function parses command line arguments, initializes the data engine,
processes the data, and outputs the results. It includes comprehensive
error handling and logging.
Command line arguments:
--project-root: Path to project root directory
--check-credentials: Check credential availability before processing
--force-refresh: Force refresh cached data
--check-password-expiry: Check if database password is nearing expiration
"""
# Parse command line arguments
parser = argparse.ArgumentParser(description="Process Energy Community data")
parser.add_argument("--project-root", help="Path to project root directory")
parser.add_argument(
"--check-credentials",
action="store_true",
help="Check credential availability before processing",
)
parser.add_argument(
"--force-refresh", action="store_true", help="Force refresh cached data"
)
parser.add_argument(
"--check-password-expiry",
action="store_true",
help="Check if database password is nearing expiration",
)
args = parser.parse_args()
# Set up logging with rich error handling
logger, console = setup_logging()
console.print("[bold cyan]Nova Finance Data Engine - EC Processing[/bold cyan]")
try:
# Determine project root
project_root = args.project_root or Path("./examples/EC")
project_root = Path(project_root)
if not project_root.exists():
console.print(
f"[yellow]Warning: Project root directory {project_root} does not exist. "
f"Creating it now.[/yellow]"
)
project_root.mkdir(parents=True, exist_ok=True)
# Create directory structure if it doesn't exist
for dir_name in ["SQL", "Query Cache", "Completed Output", "Data"]:
dir_path = project_root / dir_name
if not dir_path.exists():
console.print(f"[yellow]Creating directory: {dir_path}[/yellow]")
dir_path.mkdir(parents=True, exist_ok=True)
# Store output directory path for later use
output_dir = project_root / "Completed Output"
# Check credentials if requested
if args.check_credentials:
console.print("[blue]Checking credential availability...[/blue]")
EngineFactory.check_credentials(console=console)
# Initialize engine with automatic authentication and password expiration check
console.print("[blue]Initializing data engine...[/blue]")
# Option 2: Use the fixed function
engine = fixed_create_with_auto_auth(
project_root=project_root,
console=console,
check_password_expiry=args.check_password_expiry,
)
# Define queries for this specific project
queries = {
"systems": "systems.sql",
"snh": "snh_identification.sql",
"block": "project_block_info.sql",
}
# Verify SQL files exist before processing
for query_name, sql_file in queries.items():
# Access SQL directory through settings if available
if hasattr(engine.settings, "paths") and hasattr(
engine.settings.paths, "sql_dir"
):
sql_path = engine.settings.paths.sql_dir / sql_file
else:
# Fallback to project_root/SQL
sql_path = project_root / "SQL" / sql_file
if not sql_path.exists():
console.print(
f"[yellow]Warning: SQL file {sql_file} not found at {sql_path}[/yellow]"
)
# Create a placeholder SQL file
with open(sql_path, "w") as f:
f.write(
f"-- Placeholder for {sql_file}\n-- Replace with actual query\nSELECT 'Example' AS 'System Name', 'Active' AS 'Status';"
)
console.print(
f"[yellow]Created placeholder SQL file at {sql_path}[/yellow]"
)
# Process data with project-specific function
console.print("[blue]Starting data processing...[/blue]")
results = engine.process_data(
queries=queries,
process_func=process_ec_data,
output_name="EC_All",
force_refresh=args.force_refresh,
analyze=True,
)
console.print("\n[bold green]✓ Processing completed successfully[/bold green]")
if isinstance(results, dict):
console.print(
f"Processed {results.get('rows_processed', 0):,} rows in "
f"{results.get('duration', 0):.2f} seconds"
)
# Display performance metrics if available
if results.get("query_duration") is not None:
console.print(
f"Query duration: {results['query_duration']:.2f} seconds"
)
if results.get("processing_duration") is not None:
console.print(
f"Processing duration: {results['processing_duration']:.2f} seconds"
)
# Save results to Excel if needed
if output_dir and "output_name" in results:
output_path = output_dir / f"{results['output_name']}.xlsx"
console.print(f"[blue]Results saved to: {output_path}[/blue]")
# Cleanup
engine.cleanup()
except Exception as e:
logger.exception("Process failed")
console.print(f"\n[bold red]Error processing data: {str(e)}[/bold red]")
# Provide more detailed error information for troubleshooting
import traceback
console.print("[red]Stack trace:[/red]")
console.print(traceback.format_exc())
console.print("\n[yellow]Troubleshooting tips:[/yellow]")
console.print("1. Verify SQL files exist and have correct syntax")
console.print("2. Check database credentials and connectivity")
console.print("3. Ensure required directories exist in project structure")
console.print(
"4. Verify column names in SQL results match those used in processing"
)
console.print(
"5. Check if the settings structure matches your version of the package"
)
sys.exit(1)
if __name__ == "__main__":
main()Example Projects
This section demonstrates how to use the Treasury Analytics Core (nova_fde) package in real-world scenarios.
EC Data Processing Example
This example shows how to process Tax Equity Platform (TEP) data using nova_fde. It demonstrates:
- Automatic authentication
- SQL query execution
- Data merging and transformation
- Performance analysis
Key Features Demonstrated
- Using the
EngineFactoryfor easy database connection - Processing data from multiple SQL queries
- Column deduplication and data type conversion
- Proper error handling with rich console output
- Performance tracking with the
analyze=Trueparameter
Complete Project Example: Concord Payments
This example demonstrates a complete project structure for processing Concord payment data:
# examples/Concord/concord_payments.py
"""Concord Payments Report
This script generates payment reports for Concord-managed portfolios.
"""
from datetime import date
from pathlib import Path
from typing import List
import pandas as pd
from nova_fde.core.engine_factory import EngineFactory
def run_concord_payments_report(
portfolios: List[str],
target_date: str,
root_path: Path,
force_refresh: bool = False,
):
"""
Run Concord payments report for specified portfolios.
"""
# Create finance data engine with automatic auth
engine = EngineFactory.create_with_auto_auth(project_root=root_path)
# Define queries
queries = {"payments": "concord_payments.sql"}
# Pass parameters directly
params = {
"portfolio_names": portfolios, # Pass the list directly
"target_date": target_date,
}
print(f"Running query with parameters: {params}")
# Add debugging to check SQL file
sql_path = engine.settings.paths.sql_dir / "concord_payments.sql"
if not sql_path.exists():
print(f"ERROR: SQL file not found at {sql_path}")
return None
else:
print(f"SQL file found at {sql_path}")
# Print first 10 lines of the SQL to verify content
try:
with open(sql_path, "r") as f:
sql_content = f.readlines()
print("SQL file preview (first 10 lines):")
for i, line in enumerate(sql_content[:10]):
print(f" {i + 1}: {line.strip()}")
except Exception as e:
print(f"Could not read SQL file: {str(e)}")
# Process function (optimize the data)
def process_payments(data_frames, processor):
"""
Process payment data frames.
"""
# Check if we got any data
payments = data_frames.get("payments")
if payments is None:
print("ERROR: No payment data was returned")
return pd.DataFrame() # Return empty DataFrame
print(f"SUCCESS: Received {len(payments)} rows of payment data")
return processor.optimize_memory(payments)
# Execute processing with try-except
try:
results = engine.process_data(
queries=queries,
query_params=params,
process_func=process_payments,
output_name=f"ConcordPayments_{target_date.replace('-', '')}",
force_refresh=force_refresh,
)
return results
except Exception as e:
import traceback
print(f"ERROR: Exception during processing: {str(e)}")
print(traceback.format_exc())
return pd.DataFrame() # Return empty DataFrame
finally:
try:
# Always clean up resources
engine.cleanup()
except Exception as e:
print(f"Warning: Cleanup error: {str(e)}")
def main():
"""
Run the Concord payments report.
This function sets up default parameters and executes the
payment report generation process.
"""
print("Starting Concord Payments Report")
# Check credential availability first
from nova_fde.core.engine_factory import EngineFactory
EngineFactory.check_credentials()
# Set default portfolios
target_portfolios = ["Sunnova SAP IV LLC"]
# Set target month (first day of current month)
today = date.today()
target_month = date(today.year, today.month, 1).strftime("%Y-%m-%d")
# Set root path
root_path = Path("./examples/Concord")
# Run the report
result_df = run_concord_payments_report(
portfolios=target_portfolios,
target_date=target_month,
root_path=root_path,
force_refresh=True,
)
print(
f"Completed Concord Payments Report with {len(result_df) if result_df is not None else 0} records"
)
# Add additional debugging if no results
if result_df is None or len(result_df) == 0:
print("\nTroubleshooting tips:")
print("1. Verify table and column names in the SQL query")
print(
"2. Try running a simplified version of the query directly in the database"
)
print("3. Check if data exists for the specified date range")
if __name__ == "__main__":
main()Project Structure
The Concord example follows the recommended project structure:
Concord/
├── concord_payments.py # Main processing script
├── run_payments.py # Entry point script
├── Completed Output/ # Processed output files
│ ├── ConcordPayments_20250201.csv
│ └── ConcordPayments_20250201_2025_02_26.csv
├── Data/ # Input data files
├── Query Cache/ # Cached query results
│ ├── payments_2025_02_25.csv
│ └── payments_2025_02_26.csv
└── SQL/ # SQL query files
└── concord_payments.sql
Key Features Demonstrated
- Passing parameters to SQL queries
- SQL file validation and debugging
- Proper error handling and cleanup
- Output file naming with timestamps
- Usage of the
force_refreshparameter to ignore cached results
Using Global Settings
Both example projects can be enhanced with the new global settings functionality:
# Initialize engine with global settings and password expiration check
engine = EngineFactory.create_with_auto_auth(
project_root=project_root,
console=console,
check_password_expiry=True
)This modification allows the projects to: - Use centralized database connection settings - Get warnings when database passwords are about to expire - Benefit from consistent configuration across multiple projects
Creating Your Own Project
To create a new project based on these examples:
# Create a project with the default template
python -m nova_fde.scripts.create_project my_new_project
# Or create an analysis-focused project
python -m nova_fde.scripts.create_project --template analysis analysis_project
# Or create a reporting-focused project
python -m nova_fde.scripts.create_project --template reporting reporting_projectThen adapt the example code to your specific needs by replacing the SQL queries and processing logic.