# examples/process_ec_data.py
"""Example script for processing EC data using nova_fde.
This script demonstrates how to use the nova_fde package to process
Energy Community (EC) data with proper database authentication,
error handling, and performance analysis.
"""
import argparse
import sys
from pathlib import Path
import pandas as pd
from nova_fde.core.engine_factory import EngineFactory, fixed_create_with_auto_auth
from nova_fde.core.processor import DataProcessor
from nova_fde.utils import setup_logging
def process_ec_data(
data_frames: dict[str, pd.DataFrame], processor: DataProcessor
) -> pd.DataFrame:
"""
Process Energy Community data with project-specific logic.
This function takes data frames retrieved from SQL queries and
processes them to create a unified dataset of EC systems.
Parameters
----------
data_frames : dict[str, pd.DataFrame]
Dictionary containing the data frames retrieved from SQL queries.
Expected keys: "systems", "snh", "block".
processor : DataProcessor
Data processor object providing utility methods for data processing.
Returns
-------
pd.DataFrame
Processed and merged dataset containing EC system information.
Notes
-----
The processing steps include:
- Deduplicating columns
- Filtering for active systems
- Merging data from multiple sources
- Converting date columns to datetime format
"""
# Process systems data
systems = data_frames["systems"]
# Use dedicated processor method if available, otherwise apply standard processing
if hasattr(processor, "process_systems_data"):
processed_systems = processor.process_systems_data(systems)
else:
# Apply standard processing if the specialized method isn't available
processed_systems = processor.optimize_memory(systems)
# Deduplicate columns to avoid conflicts
processed_systems, column_mapping = processor.deduplicate_columns(processed_systems)
# Filter active systems
processed_systems = processed_systems[
processed_systems["Status"].isin(["InService", "Under Construction"])
]
# Process SNH data
snh_data = data_frames["snh"]
snh_data = snh_data.drop_duplicates(subset="System Name", keep="first")
# Process block info
block_info = data_frames["block"]
block_info = block_info.drop_duplicates(subset="System Name", keep="first")
# Merge datasets
final_systems = processed_systems.merge(
snh_data[["System Name", "System Type"]], on="System Name", how="left"
).merge(block_info, on="System Name", how="left")
# Remove duplicates from final dataset
final_systems = final_systems.drop_duplicates(subset="System Name", keep="first")
# Process dates - convert string dates to datetime objects with proper error handling
date_columns = [
"Registration Screenshot Requirement Received Date",
"Commissioning Package Received Date",
"Power Producing On",
]
for col in date_columns:
if col in final_systems.columns:
final_systems[col] = pd.to_datetime(final_systems[col], errors="coerce")
return final_systems
def main():
"""
Execute the EC data processing workflow.
This function parses command line arguments, initializes the data engine,
processes the data, and outputs the results. It includes comprehensive
error handling and logging.
Command line arguments:
--project-root: Path to project root directory
--check-credentials: Check credential availability before processing
--force-refresh: Force refresh cached data
--check-password-expiry: Check if database password is nearing expiration
"""
# Parse command line arguments
parser = argparse.ArgumentParser(description="Process Energy Community data")
parser.add_argument("--project-root", help="Path to project root directory")
parser.add_argument(
"--check-credentials",
action="store_true",
help="Check credential availability before processing",
)
parser.add_argument(
"--force-refresh", action="store_true", help="Force refresh cached data"
)
parser.add_argument(
"--check-password-expiry",
action="store_true",
help="Check if database password is nearing expiration",
)
args = parser.parse_args()
# Set up logging with rich error handling
logger, console = setup_logging()
console.print("[bold cyan]Nova Finance Data Engine - EC Processing[/bold cyan]")
try:
# Determine project root
project_root = args.project_root or Path("./examples/EC")
project_root = Path(project_root)
if not project_root.exists():
console.print(
f"[yellow]Warning: Project root directory {project_root} does not exist. "
f"Creating it now.[/yellow]"
)
project_root.mkdir(parents=True, exist_ok=True)
# Create directory structure if it doesn't exist
for dir_name in ["SQL", "Query Cache", "Completed Output", "Data"]:
dir_path = project_root / dir_name
if not dir_path.exists():
console.print(f"[yellow]Creating directory: {dir_path}[/yellow]")
dir_path.mkdir(parents=True, exist_ok=True)
# Store output directory path for later use
output_dir = project_root / "Completed Output"
# Check credentials if requested
if args.check_credentials:
console.print("[blue]Checking credential availability...[/blue]")
EngineFactory.check_credentials(console=console)
# Initialize engine with automatic authentication and password expiration check
console.print("[blue]Initializing data engine...[/blue]")
# Option 2: Use the fixed function
engine = fixed_create_with_auto_auth(
project_root=project_root,
console=console,
check_password_expiry=args.check_password_expiry,
)
# Define queries for this specific project
queries = {
"systems": "systems.sql",
"snh": "snh_identification.sql",
"block": "project_block_info.sql",
}
# Verify SQL files exist before processing
for query_name, sql_file in queries.items():
# Access SQL directory through settings if available
if hasattr(engine.settings, "paths") and hasattr(
engine.settings.paths, "sql_dir"
):
sql_path = engine.settings.paths.sql_dir / sql_file
else:
# Fallback to project_root/SQL
sql_path = project_root / "SQL" / sql_file
if not sql_path.exists():
console.print(
f"[yellow]Warning: SQL file {sql_file} not found at {sql_path}[/yellow]"
)
# Create a placeholder SQL file
with open(sql_path, "w") as f:
f.write(
f"-- Placeholder for {sql_file}\n-- Replace with actual query\nSELECT 'Example' AS 'System Name', 'Active' AS 'Status';"
)
console.print(
f"[yellow]Created placeholder SQL file at {sql_path}[/yellow]"
)
# Process data with project-specific function
console.print("[blue]Starting data processing...[/blue]")
results = engine.process_data(
queries=queries,
process_func=process_ec_data,
output_name="EC_All",
force_refresh=args.force_refresh,
analyze=True,
)
console.print("\n[bold green]✓ Processing completed successfully[/bold green]")
if isinstance(results, dict):
console.print(
f"Processed {results.get('rows_processed', 0):,} rows in "
f"{results.get('duration', 0):.2f} seconds"
)
# Display performance metrics if available
if results.get("query_duration") is not None:
console.print(
f"Query duration: {results['query_duration']:.2f} seconds"
)
if results.get("processing_duration") is not None:
console.print(
f"Processing duration: {results['processing_duration']:.2f} seconds"
)
# Save results to Excel if needed
if output_dir and "output_name" in results:
output_path = output_dir / f"{results['output_name']}.xlsx"
console.print(f"[blue]Results saved to: {output_path}[/blue]")
# Cleanup
engine.cleanup()
except Exception as e:
logger.exception("Process failed")
console.print(f"\n[bold red]Error processing data: {str(e)}[/bold red]")
# Provide more detailed error information for troubleshooting
import traceback
console.print("[red]Stack trace:[/red]")
console.print(traceback.format_exc())
console.print("\n[yellow]Troubleshooting tips:[/yellow]")
console.print("1. Verify SQL files exist and have correct syntax")
console.print("2. Check database credentials and connectivity")
console.print("3. Ensure required directories exist in project structure")
console.print(
"4. Verify column names in SQL results match those used in processing"
)
console.print(
"5. Check if the settings structure matches your version of the package"
)
sys.exit(1)
if __name__ == "__main__":
main()Treasury Analytics Core
Treasury Analytics Core (nova_fde) was built to standardize database operations for Sunnova’s finance data analytics projects. After repeatedly writing similar code for different projects over the past two years, we created this package to stop duplicating effort.
The package manages database connections, handles credentials securely, processes data consistently, and helps teams follow the same patterns for financial data analytics.
We built nova_fde to solve specific problems: - Reduce duplicate code across projects - Handle database credentials securely - Standardize error handling and retry logic - Track query performance - Manage data caching efficiently - Document processes consistently
With this package, new finance data projects require less startup time. Database connections, credential management, and basic data processing are already handled, letting analysts focus on the actual financial analysis instead of infrastructure setup.
Key Features
- Database Connection: Efficient database connection management with connection pooling and retry logic
- Query Execution: Execute SQL queries with performance tracking and visual progress display
- Cache Management: Automatic caching of query results with expiration management
- Data Processing: Generic and specialized data processing functions
- Performance Analysis: Track and report on query and processing performance
- Secure Credential Management: Multiple secure options for handling database credentials
- Centralized Settings: Global database configuration shared across projects
- Password Expiration Tracking: Reminders when database passwords need rotation
Installation
For Development
# Clone the repository
git clone https://github.com/Sunnova/TreasuryAnalyticsCore.git
cd TreasuryAnalyticsCore
# Create and activate virtual environment using uv
uv venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
# Install in development mode with dev dependencies
uv pip install -e ".[dev]"For Use in Existing Projects
Add to your project’s requirements:
git+https://github.com/Sunnova/TreasuryAnalyticsCore.gitOr install directly using uv:
uv pip install git+https://github.com/Sunnova/TreasuryAnalyticsCore.gitProject Setup
Creating a New Project
The easiest way to get started is to use the project creation tool:
# Create a basic project
python -m nova_fde.scripts.create_project my_project
# Create a data analysis project with virtual environment
python -m nova_fde.scripts.create_project --template analysis --venv data_analysis_project
# Create a reporting project
python -m nova_fde.scripts.create_project --template reporting reporting_projectThis will create a project with the necessary directory structure and configuration files:
- SQL/: Directory for SQL query files
- Data/: Directory for data files
- Query Cache/: Directory for cached query results
- Completed Output/: Directory for processed output files
- Additional template-specific directories (e.g., notebooks, reports)
Manual Project Setup
For manual setup, create the following directory structure:
my_project/
├── SQL/ # SQL query files
├── Data/ # Data files
├── Query Cache/ # Cached query results
├── Completed Output/ # Processed output files
└── .env # Environment configuration
Create a .env file with your project configuration:
# Project Paths
NOVA_PROJECT_ROOT=./
NOVA_SQL_DIR=${NOVA_PROJECT_ROOT}/SQL
# Processing Directories
NOVA_CACHE_DIR=${NOVA_PROJECT_ROOT}/Query Cache
NOVA_OUTPUT_DIR=${NOVA_PROJECT_ROOT}/Completed Output
NOVA_DATA_DIR=${NOVA_PROJECT_ROOT}/Data
NOVA_BATCH_SIZE=10000
# Logging Settings
NOVA_LOG_LEVEL=INFO
NOVA_LOG_FILE=finance_engine.log
# Use global settings for database configuration
NOVA_USE_GLOBAL_SETTINGS=true
Credential Management
Centralized Database Settings
Treasury Analytics Core now supports centralized database settings that can be shared across multiple projects. This eliminates the need to duplicate database connection details in each project.
Setting Up Global Settings
To create global settings with your database configuration:
python -m nova_fde.scripts.manage_settings --create --db-host=your-server.example.com --db-name=your_databaseThis will create a settings file in ~/.nova_fde/settings.yaml with your database connection parameters.
Checking Global Settings
To view your current global settings:
python -m nova_fde.scripts.manage_settings --showPassword Expiration Tracking
The package includes password expiration tracking to ensure database passwords are rotated regularly:
Recording a Password Update
When you update your database password, record it to start tracking expiration:
python -m nova_fde.scripts.manage_settings --update-passwordChecking Password Expiration
Check the status of your password expiration:
python -m nova_fde.scripts.manage_settings --checkCreating a Calendar Reminder
Create a calendar reminder file for password rotation:
python -m nova_fde.scripts.manage_settings --create-reminderThis will create an .ics file that you can import into your calendar application.
Secure Credential Options
Treasury Analytics Core supports several methods for managing database credentials securely:
Engine Factory with Automatic Authentication (Recommended)
The simplest and most reliable method is to use the EngineFactory with automatic authentication, which now includes support for global settings and password expiration checks:
from nova_fde.core.engine_factory import EngineFactory
# Create engine with automatic credential handling and password expiration checks
engine = EngineFactory.create_with_auto_auth(
project_root="./my_project",
check_password_expiry=True
)This approach will automatically:
- Check for global database settings
- Check for password expiration and issue warnings when needed
- Look for credentials in environment variables
- Check the system keyring (Keychain, Credential Manager, SecretService)
- Search for stored credentials in the user’s home directory
- Fall back to prompting the user if needed
- Save entered credentials securely for future use
System Keyring
For direct access to the system’s secure credential store:
from nova_fde import FinanceDataEngine
from nova_fde.utils import setup_logging
# Set up logging
logger, console = setup_logging()
# Initialize with credentials from keyring
engine = FinanceDataEngine(
console=console,
use_keyring=True,
keyring_service="nova_fde"
)Private Credential Directories
You can store credentials in a private directory:
from nova_fde import FinanceDataEngine
# Initialize with credentials from a private directory
engine = FinanceDataEngine(
credential_path="C:/Users/username/private/credentials"
)Settings Factory
For more advanced configuration:
from nova_fde import FinanceDataEngine
from nova_fde.config.settings_factory import SettingsFactory
# Create settings from keyring
settings = SettingsFactory.from_keyring()
# Initialize engine with settings
engine = FinanceDataEngine(settings=settings)Basic Usage
Connecting to the Database
Here’s a basic example of connecting to the database and executing a simple query:
import pandas as pd
from nova_fde.core.engine_factory import EngineFactory
from nova_fde.utils import setup_logging
# Set up logging
logger, console = setup_logging()
# Initialize engine with automatic authentication and password expiration check
engine = EngineFactory.create_with_auto_auth(
project_root="./my_project",
console=console,
check_password_expiry=True
)
# Now you can use the engine to execute queries
with engine.connect() as conn:
df = pd.read_sql("SELECT * FROM my_table LIMIT 10", conn)
print(df)
# Clean up when done
engine.cleanup()Processing Data
Here’s a simple example of a data processing function:
# Define a data processing function
def process_my_data(data_frames, processor):
# Process each dataset
df1 = data_frames["query1"]
df2 = data_frames["query2"]
# Apply common processing
df1 = processor.optimize_memory(df1)
# Merge datasets
result = df1.merge(df2, on="id_column", how="left")
return result
# Define queries to execute
queries = {
"query1": "query1.sql",
"query2": "query2.sql"
}
# Process data
results = engine.process_data(
queries=queries,
process_func=process_my_data,
output_name="MyResults",
force_refresh=False
)Comprehensive Examples
For more complex data processing examples, see the EC Data Processing Example and Concord Payments Example in the Examples section.
Here’s an example showing how to use the nova_fde package in an existing project for processing EC (Energy Community Data) data:
For a complete project example, here’s how to use nova_fde for Concord payment processing:
# examples/Concord/concord_payments.py
"""Concord Payments Report
This script generates payment reports for Concord-managed portfolios.
"""
from datetime import date
from pathlib import Path
from typing import List
import pandas as pd
from nova_fde.core.engine_factory import EngineFactory
def run_concord_payments_report(
portfolios: List[str],
target_date: str,
root_path: Path,
force_refresh: bool = False,
):
"""
Run Concord payments report for specified portfolios.
"""
# Create finance data engine with automatic auth
engine = EngineFactory.create_with_auto_auth(project_root=root_path)
# Define queries
queries = {"payments": "concord_payments.sql"}
# Pass parameters directly
params = {
"portfolio_names": portfolios, # Pass the list directly
"target_date": target_date,
}
print(f"Running query with parameters: {params}")
# Add debugging to check SQL file
sql_path = engine.settings.paths.sql_dir / "concord_payments.sql"
if not sql_path.exists():
print(f"ERROR: SQL file not found at {sql_path}")
return None
else:
print(f"SQL file found at {sql_path}")
# Print first 10 lines of the SQL to verify content
try:
with open(sql_path, "r") as f:
sql_content = f.readlines()
print("SQL file preview (first 10 lines):")
for i, line in enumerate(sql_content[:10]):
print(f" {i + 1}: {line.strip()}")
except Exception as e:
print(f"Could not read SQL file: {str(e)}")
# Process function (optimize the data)
def process_payments(data_frames, processor):
"""
Process payment data frames.
"""
# Check if we got any data
payments = data_frames.get("payments")
if payments is None:
print("ERROR: No payment data was returned")
return pd.DataFrame() # Return empty DataFrame
print(f"SUCCESS: Received {len(payments)} rows of payment data")
return processor.optimize_memory(payments)
# Execute processing with try-except
try:
results = engine.process_data(
queries=queries,
query_params=params,
process_func=process_payments,
output_name=f"ConcordPayments_{target_date.replace('-', '')}",
force_refresh=force_refresh,
)
return results
except Exception as e:
import traceback
print(f"ERROR: Exception during processing: {str(e)}")
print(traceback.format_exc())
return pd.DataFrame() # Return empty DataFrame
finally:
try:
# Always clean up resources
engine.cleanup()
except Exception as e:
print(f"Warning: Cleanup error: {str(e)}")
def main():
"""
Run the Concord payments report.
This function sets up default parameters and executes the
payment report generation process.
"""
print("Starting Concord Payments Report")
# Check credential availability first
from nova_fde.core.engine_factory import EngineFactory
EngineFactory.check_credentials()
# Set default portfolios
target_portfolios = ["Sunnova SAP IV LLC"]
# Set target month (first day of current month)
today = date.today()
target_month = date(today.year, today.month, 1).strftime("%Y-%m-%d")
# Set root path
root_path = Path("./examples/Concord")
# Run the report
result_df = run_concord_payments_report(
portfolios=target_portfolios,
target_date=target_month,
root_path=root_path,
force_refresh=True,
)
print(
f"Completed Concord Payments Report with {len(result_df) if result_df is not None else 0} records"
)
# Add additional debugging if no results
if result_df is None or len(result_df) == 0:
print("\nTroubleshooting tips:")
print("1. Verify table and column names in the SQL query")
print(
"2. Try running a simplified version of the query directly in the database"
)
print("3. Check if data exists for the specified date range")
if __name__ == "__main__":
main()Advanced Features
Query Caching
The package automatically caches query results to improve performance. You can control this behavior:
# Force refresh all queries
results = engine.process_data(
queries=queries,
process_func=process_func,
output_name="MyResults",
force_refresh=True # Ignore cache and re-execute all queries
)
# Or set specific cache expiration
results = engine.process_data(
queries=queries,
process_func=process_func,
output_name="MyResults",
cache_expiry_days=7 # Cache results for 7 days
)Performance Analysis
Track query and processing performance:
# Enable performance analysis
results = engine.process_data(
queries=queries,
process_func=process_func,
output_name="MyResults",
analyze=True # Enable performance analysis
)
# Access performance metrics
print(f"Total duration: {results['duration']:.2f} seconds")
print(f"Query duration: {results['query_duration']:.2f} seconds")
print(f"Processing duration: {results['processing_duration']:.2f} seconds")Data Processor Utilities
The DataProcessor object passed to your processing function provides several utility methods:
def process_data(data_frames, processor):
# Optimize memory usage
df = processor.optimize_memory(data_frames["my_query"])
# Deduplicate columns with the same name
df, column_mapping = processor.deduplicate_columns(df)
# Convert columns to appropriate data types
df = processor.convert_types(df, {
"id": "int",
"amount": "float",
"date": "datetime"
})
return dfConclusion
Treasury Analytics Core (nova_fde) provides a comprehensive solution for finance data processing with secure credential management, centralized database settings, and password expiration tracking. By using the package’s automatic authentication and global settings features, you can create standardized, secure data processing workflows that simplify database access across multiple projects.
For more detailed examples and API documentation, visit the official documentation.