Treasury Analytics Core
  • Home
  • API Reference
  • Examples
  • Credentials
  • Global Settings
  1. Examples
  2. Example Projects
  • Overview
    • Treasury Analytics Core
  • Credential Management
    • Secure Credential Management
    • Global Settings
  • Examples
    • Example Projects
  • API Reference
    • Function reference
    • Core Components
      • FinanceDataEngine
      • engine_factory
      • engine
      • database
      • cache
      • processor
      • analyzer
    • Configuration
      • settings
      • settings_factory
      • global_settings_manager
    • Utilities
      • credentials
      • dataframe_utils
      • setup_logging
      • env_checker
      • query_timer
      • setup_helper
      • logging
      • constants
      • types
    • Scripts
      • configure_db
      • create_project
      • manage_settings

On this page

  • EC Data Processing Example
    • Key Features Demonstrated
  • Complete Project Example: Concord Payments
    • Project Structure
    • Key Features Demonstrated
  • Using Global Settings
  • Creating Your Own Project

Other Formats

  • Github (GFM)
  1. Examples
  2. Example Projects

Example Projects

This section demonstrates how to use the Treasury Analytics Core (nova_fde) package in real-world scenarios.

EC Data Processing Example

This example shows how to process Tax Equity Platform (TEP) data using nova_fde. It demonstrates:

  • Automatic authentication
  • SQL query execution
  • Data merging and transformation
  • Performance analysis
# examples/process_ec_data.py
"""Example script for processing EC data using nova_fde.

This script demonstrates how to use the nova_fde package to process
Energy Community (EC) data with proper database authentication,
error handling, and performance analysis.
"""

import argparse
import sys
from pathlib import Path

import pandas as pd

from nova_fde.core.engine_factory import EngineFactory, fixed_create_with_auto_auth
from nova_fde.core.processor import DataProcessor
from nova_fde.utils import setup_logging


def process_ec_data(
    data_frames: dict[str, pd.DataFrame], processor: DataProcessor
) -> pd.DataFrame:
    """
    Process Energy Community data with project-specific logic.

    This function takes data frames retrieved from SQL queries and
    processes them to create a unified dataset of EC systems.

    Parameters
    ----------
    data_frames : dict[str, pd.DataFrame]
        Dictionary containing the data frames retrieved from SQL queries.
        Expected keys: "systems", "snh", "block".
    processor : DataProcessor
        Data processor object providing utility methods for data processing.

    Returns
    -------
    pd.DataFrame
        Processed and merged dataset containing EC system information.

    Notes
    -----
    The processing steps include:
    - Deduplicating columns
    - Filtering for active systems
    - Merging data from multiple sources
    - Converting date columns to datetime format
    """
    # Process systems data
    systems = data_frames["systems"]
    # Use dedicated processor method if available, otherwise apply standard processing
    if hasattr(processor, "process_systems_data"):
        processed_systems = processor.process_systems_data(systems)
    else:
        # Apply standard processing if the specialized method isn't available
        processed_systems = processor.optimize_memory(systems)

    # Deduplicate columns to avoid conflicts
    processed_systems, column_mapping = processor.deduplicate_columns(processed_systems)

    # Filter active systems
    processed_systems = processed_systems[
        processed_systems["Status"].isin(["InService", "Under Construction"])
    ]

    # Process SNH data
    snh_data = data_frames["snh"]
    snh_data = snh_data.drop_duplicates(subset="System Name", keep="first")

    # Process block info
    block_info = data_frames["block"]
    block_info = block_info.drop_duplicates(subset="System Name", keep="first")

    # Merge datasets
    final_systems = processed_systems.merge(
        snh_data[["System Name", "System Type"]], on="System Name", how="left"
    ).merge(block_info, on="System Name", how="left")

    # Remove duplicates from final dataset
    final_systems = final_systems.drop_duplicates(subset="System Name", keep="first")

    # Process dates - convert string dates to datetime objects with proper error handling
    date_columns = [
        "Registration Screenshot Requirement Received Date",
        "Commissioning Package Received Date",
        "Power Producing On",
    ]

    for col in date_columns:
        if col in final_systems.columns:
            final_systems[col] = pd.to_datetime(final_systems[col], errors="coerce")

    return final_systems


def main():
    """
    Execute the EC data processing workflow.

    This function parses command line arguments, initializes the data engine,
    processes the data, and outputs the results. It includes comprehensive
    error handling and logging.

    Command line arguments:
    --project-root: Path to project root directory
    --check-credentials: Check credential availability before processing
    --force-refresh: Force refresh cached data
    --check-password-expiry: Check if database password is nearing expiration
    """
    # Parse command line arguments
    parser = argparse.ArgumentParser(description="Process Energy Community data")
    parser.add_argument("--project-root", help="Path to project root directory")
    parser.add_argument(
        "--check-credentials",
        action="store_true",
        help="Check credential availability before processing",
    )
    parser.add_argument(
        "--force-refresh", action="store_true", help="Force refresh cached data"
    )
    parser.add_argument(
        "--check-password-expiry",
        action="store_true",
        help="Check if database password is nearing expiration",
    )
    args = parser.parse_args()

    # Set up logging with rich error handling
    logger, console = setup_logging()

    console.print("[bold cyan]Nova Finance Data Engine - EC Processing[/bold cyan]")

    try:
        # Determine project root
        project_root = args.project_root or Path("./examples/EC")
        project_root = Path(project_root)

        if not project_root.exists():
            console.print(
                f"[yellow]Warning: Project root directory {project_root} does not exist. "
                f"Creating it now.[/yellow]"
            )
            project_root.mkdir(parents=True, exist_ok=True)

        # Create directory structure if it doesn't exist
        for dir_name in ["SQL", "Query Cache", "Completed Output", "Data"]:
            dir_path = project_root / dir_name
            if not dir_path.exists():
                console.print(f"[yellow]Creating directory: {dir_path}[/yellow]")
                dir_path.mkdir(parents=True, exist_ok=True)

        # Store output directory path for later use
        output_dir = project_root / "Completed Output"

        # Check credentials if requested
        if args.check_credentials:
            console.print("[blue]Checking credential availability...[/blue]")
            EngineFactory.check_credentials(console=console)

        # Initialize engine with automatic authentication and password expiration check
        console.print("[blue]Initializing data engine...[/blue]")
        # Option 2: Use the fixed function

        engine = fixed_create_with_auto_auth(
            project_root=project_root,
            console=console,
            check_password_expiry=args.check_password_expiry,
        )

        # Define queries for this specific project
        queries = {
            "systems": "systems.sql",
            "snh": "snh_identification.sql",
            "block": "project_block_info.sql",
        }

        # Verify SQL files exist before processing
        for query_name, sql_file in queries.items():
            # Access SQL directory through settings if available
            if hasattr(engine.settings, "paths") and hasattr(
                engine.settings.paths, "sql_dir"
            ):
                sql_path = engine.settings.paths.sql_dir / sql_file
            else:
                # Fallback to project_root/SQL
                sql_path = project_root / "SQL" / sql_file

            if not sql_path.exists():
                console.print(
                    f"[yellow]Warning: SQL file {sql_file} not found at {sql_path}[/yellow]"
                )
                # Create a placeholder SQL file
                with open(sql_path, "w") as f:
                    f.write(
                        f"-- Placeholder for {sql_file}\n-- Replace with actual query\nSELECT 'Example' AS 'System Name', 'Active' AS 'Status';"
                    )
                console.print(
                    f"[yellow]Created placeholder SQL file at {sql_path}[/yellow]"
                )

        # Process data with project-specific function
        console.print("[blue]Starting data processing...[/blue]")
        results = engine.process_data(
            queries=queries,
            process_func=process_ec_data,
            output_name="EC_All",
            force_refresh=args.force_refresh,
            analyze=True,
        )

        console.print("\n[bold green]✓ Processing completed successfully[/bold green]")
        if isinstance(results, dict):
            console.print(
                f"Processed {results.get('rows_processed', 0):,} rows in "
                f"{results.get('duration', 0):.2f} seconds"
            )

            # Display performance metrics if available
            if results.get("query_duration") is not None:
                console.print(
                    f"Query duration: {results['query_duration']:.2f} seconds"
                )
            if results.get("processing_duration") is not None:
                console.print(
                    f"Processing duration: {results['processing_duration']:.2f} seconds"
                )

        # Save results to Excel if needed
        if output_dir and "output_name" in results:
            output_path = output_dir / f"{results['output_name']}.xlsx"
            console.print(f"[blue]Results saved to: {output_path}[/blue]")

        # Cleanup
        engine.cleanup()

    except Exception as e:
        logger.exception("Process failed")
        console.print(f"\n[bold red]Error processing data: {str(e)}[/bold red]")

        # Provide more detailed error information for troubleshooting
        import traceback

        console.print("[red]Stack trace:[/red]")
        console.print(traceback.format_exc())

        console.print("\n[yellow]Troubleshooting tips:[/yellow]")
        console.print("1. Verify SQL files exist and have correct syntax")
        console.print("2. Check database credentials and connectivity")
        console.print("3. Ensure required directories exist in project structure")
        console.print(
            "4. Verify column names in SQL results match those used in processing"
        )
        console.print(
            "5. Check if the settings structure matches your version of the package"
        )

        sys.exit(1)


if __name__ == "__main__":
    main()

Key Features Demonstrated

  • Using the EngineFactory for easy database connection
  • Processing data from multiple SQL queries
  • Column deduplication and data type conversion
  • Proper error handling with rich console output
  • Performance tracking with the analyze=True parameter

Complete Project Example: Concord Payments

This example demonstrates a complete project structure for processing Concord payment data:

# examples/Concord/concord_payments.py
"""Concord Payments Report

This script generates payment reports for Concord-managed portfolios.
"""

from datetime import date
from pathlib import Path
from typing import List

import pandas as pd

from nova_fde.core.engine_factory import EngineFactory


def run_concord_payments_report(
    portfolios: List[str],
    target_date: str,
    root_path: Path,
    force_refresh: bool = False,
):
    """
    Run Concord payments report for specified portfolios.
    """
    # Create finance data engine with automatic auth
    engine = EngineFactory.create_with_auto_auth(project_root=root_path)

    # Define queries
    queries = {"payments": "concord_payments.sql"}

    # Pass parameters directly
    params = {
        "portfolio_names": portfolios,  # Pass the list directly
        "target_date": target_date,
    }

    print(f"Running query with parameters: {params}")

    # Add debugging to check SQL file
    sql_path = engine.settings.paths.sql_dir / "concord_payments.sql"
    if not sql_path.exists():
        print(f"ERROR: SQL file not found at {sql_path}")
        return None
    else:
        print(f"SQL file found at {sql_path}")
        # Print first 10 lines of the SQL to verify content
        try:
            with open(sql_path, "r") as f:
                sql_content = f.readlines()
                print("SQL file preview (first 10 lines):")
                for i, line in enumerate(sql_content[:10]):
                    print(f"  {i + 1}: {line.strip()}")
        except Exception as e:
            print(f"Could not read SQL file: {str(e)}")

    # Process function (optimize the data)
    def process_payments(data_frames, processor):
        """
        Process payment data frames.
        """
        # Check if we got any data
        payments = data_frames.get("payments")
        if payments is None:
            print("ERROR: No payment data was returned")
            return pd.DataFrame()  # Return empty DataFrame

        print(f"SUCCESS: Received {len(payments)} rows of payment data")
        return processor.optimize_memory(payments)

    # Execute processing with try-except
    try:
        results = engine.process_data(
            queries=queries,
            query_params=params,
            process_func=process_payments,
            output_name=f"ConcordPayments_{target_date.replace('-', '')}",
            force_refresh=force_refresh,
        )

        return results
    except Exception as e:
        import traceback

        print(f"ERROR: Exception during processing: {str(e)}")
        print(traceback.format_exc())
        return pd.DataFrame()  # Return empty DataFrame
    finally:
        try:
            # Always clean up resources
            engine.cleanup()
        except Exception as e:
            print(f"Warning: Cleanup error: {str(e)}")


def main():
    """
    Run the Concord payments report.

    This function sets up default parameters and executes the
    payment report generation process.
    """
    print("Starting Concord Payments Report")

    # Check credential availability first
    from nova_fde.core.engine_factory import EngineFactory

    EngineFactory.check_credentials()

    # Set default portfolios
    target_portfolios = ["Sunnova SAP IV LLC"]

    # Set target month (first day of current month)
    today = date.today()
    target_month = date(today.year, today.month, 1).strftime("%Y-%m-%d")

    # Set root path
    root_path = Path("./examples/Concord")

    # Run the report
    result_df = run_concord_payments_report(
        portfolios=target_portfolios,
        target_date=target_month,
        root_path=root_path,
        force_refresh=True,
    )

    print(
        f"Completed Concord Payments Report with {len(result_df) if result_df is not None else 0} records"
    )

    # Add additional debugging if no results
    if result_df is None or len(result_df) == 0:
        print("\nTroubleshooting tips:")
        print("1. Verify table and column names in the SQL query")
        print(
            "2. Try running a simplified version of the query directly in the database"
        )
        print("3. Check if data exists for the specified date range")


if __name__ == "__main__":
    main()

Project Structure

The Concord example follows the recommended project structure:

Concord/
├── concord_payments.py      # Main processing script
├── run_payments.py          # Entry point script
├── Completed Output/        # Processed output files
│   ├── ConcordPayments_20250201.csv
│   └── ConcordPayments_20250201_2025_02_26.csv  
├── Data/                    # Input data files
├── Query Cache/             # Cached query results
│   ├── payments_2025_02_25.csv
│   └── payments_2025_02_26.csv
└── SQL/                     # SQL query files
    └── concord_payments.sql

Key Features Demonstrated

  • Passing parameters to SQL queries
  • SQL file validation and debugging
  • Proper error handling and cleanup
  • Output file naming with timestamps
  • Usage of the force_refresh parameter to ignore cached results

Using Global Settings

Both example projects can be enhanced with the new global settings functionality:

# Initialize engine with global settings and password expiration check
engine = EngineFactory.create_with_auto_auth(
    project_root=project_root, 
    console=console,
    check_password_expiry=True
)

This modification allows the projects to: - Use centralized database connection settings - Get warnings when database passwords are about to expire - Benefit from consistent configuration across multiple projects

Creating Your Own Project

To create a new project based on these examples:

# Create a project with the default template
python -m nova_fde.scripts.create_project my_new_project

# Or create an analysis-focused project
python -m nova_fde.scripts.create_project --template analysis analysis_project

# Or create a reporting-focused project
python -m nova_fde.scripts.create_project --template reporting reporting_project

Then adapt the example code to your specific needs by replacing the SQL queries and processing logic.

Global Settings
Function reference
 
 
  • Built with [Quarto](https://quarto.org/) and [quartodoc](https://machow.github.io/quartodoc/)