Skip to content

DEALER_TRANSACTIONS Datatape

Transaction-level detail for Dealer Payables/Credits/Disbursements/Receipts Activity including payment processing, vendor management, and financial transaction tracking.

Overview

Generation Frequency: Adhoc Development Status: Stable

Key Features: - Dealer payment tracking - Transaction-level analysis - Payables/receivables management - Vendor relationship analytics

Key Stakeholders: - Alvarez & Marsal

Output Columns

The following columns are included in the final datatape output (filtered from SQL queries based on configuration):

Column Analysis Summary

  • Total columns found in SQL queries: 154
  • Columns specified in output_columns config: 17
  • Columns found in SQL and included in output: 21
  • Missing columns (in config but not found in SQL): 4

Note: Only columns that exist in SQL queries are included in the final output.

Column Name Required Source Query Source Table Source Field Derived Transformations
Dealer Name Yes dealer_transactions sfdc_staging.vwx_account Dealer Name Yes
Hold Status Yes ap_payments_view Hold Status Yes
Invoice Status Yes ap_payments_view Credit Notes Invoice Status No
Invoice Status Yes dealer_transactions sfdc_staging.vw_c2g__codapurchaseinvoice__c Invoice Status No
Invoice Total2 Yes ap_payments_view Credit Notes Invoice Total2 No
Invoice Total2 Yes ap_payments_view Payable Invoice Invoice Total2 No
Payment Status Yes ap_payments_view Credit Notes, transaction_c Payment Status Yes
Payment Status Yes dealer_transactions pi, tr Payment Status Yes
Payment Status Yes ap_payments_view Payable Invoice, transaction_c Payment Status Yes
Period Yes ap_payments_view Coda Period Period No
Period Yes ap_payments_view Coda Period Period No
Portfolio Yes dealer_base_systems Portfolio No
Project Stage Yes ap_payments_view Payable Invoice Project Stage No
Project Stage Yes ap_payments_view Credit Notes Project Stage No
Sunnova System ID Yes dealer_transactions sfdc_staging.vw_c2g__codapurchaseinvoice__c Reference 1 No
Sunnova System ID Yes dealer_base_systems Sunnova System ID No
Transaction Amount Yes dealer_transactions sfdc_staging.vw_c2g__codapurchaseinvoice__c Transaction Amount Yes
Transaction Date Yes dealer_transactions sfdc_staging.vw_c2g__codapurchaseinvoice__c Transaction Date Yes
Unique Transaction ID Yes dealer_transactions sfdc_staging.vw_c2g__codapurchaseinvoice__c Payable Invoice Number No
VIN Yes ap_payments_view Credit Notes VIN No
VIN Yes ap_payments_view Payable Invoice VIN No

Note: Columns marked as 'Derived' are calculated or transformed rather than directly selected from the source table.

Data Sources

The configuration pulls data from the following sources:

Source Name Type Query/File Description
dealer_transactions database dealer_transactions.sql SQL query for dealer_transactions data
ap_payments_view database ap_payments_view.sql SQL query for ap_payments_view data
dealer_base_systems database dealer_base_systems.sql SQL query for dealer_base_systems data
snh_eistein_data database snh_eistein_data.sql SQL query for snh_eistein_data data

Configuration

Portfolio Groups

tep_portfolios:

sla_portfolios: - Sunnova Asset Portfolio 7 Holdings LLC - Sunnova AP6 Warehouse II LLC - Sunnova EZ-Own Portfolio LLC - Sunnova Asset Portfolio 8 LLC - Sunnova Asset Portfolio 9 Holdings LLC - Sunnova Asset Portfolio 9 LLC

Database Configuration

  • host: ${NOVA_DB_HOST}
  • name: ${NOVA_DB_NAME}
  • port: 5432
  • pool_size: 5
  • max_retries: 3

Paths

  • sql_dir: ./sql
  • cache_dir: ./Query Cache
  • output_dir: ./Completed Output

SQL Queries

-- dealer_transactions.sql
-- Transaction-level detail for Dealer Payables/Credits/Disbursements/Receipts Activity
-- Will be filtered by portfolio_names parameter passed from config

SELECT 
    pi."Payable Invoice Number" AS "Unique Transaction ID",
    pi."Reference 1" AS "Sunnova System ID",
    COALESCE(pi."Invoice Date", pi."Created Date") AS "Transaction Date",
    -pi."Outstanding Value" AS "Transaction Amount",
    acc."Account Name" AS "Dealer Name",
    pi."Reference 2" AS "Transaction Type",  -- Use Project Stage directly instead of deriving transaction type
    sp
    pi."Invoice Status",
    CASE
        WHEN pi."Invoice Status"::text <> 'Complete'::text THEN 'Not Applicable'::text
        WHEN tr."Document Outstanding Total" = 0::numeric THEN 'Paid'::text
        WHEN tr."Document Total" = tr."Document Outstanding Total" THEN 'Unpaid'::text
        ELSE 'Part Paid'::text
    END AS "Payment Status",
    CASE
        WHEN pi."Invoice Status"::text <> 'Complete'::text THEN 'Not Applicable'::text
        WHEN tr."Document Outstanding Total" = 0::numeric AND -pi."Outstanding Value" < 0 THEN 'Cash Disbursement actually made to Dealer'
        WHEN tr."Document Outstanding Total" = 0::numeric AND -pi."Outstanding Value" > 0 THEN 'Cash Received from Dealer'
        WHEN tr."Document Outstanding Total" = tr."Document Total" THEN 'Recording an amount payable'
        ELSE 'Recording an amount payable'
    END AS "Application",
    sp."Primary Customer Name" AS "Customer Name",
    acc."Account Type (1)" AS "Dealer Type",
    pi."Vendor Invoice Number" AS "VIN",
    pi."Invoice Total" AS "Invoice Total2",
    cp."Period ID" AS "Period",
    pi."Hold Status" AS "Hold Status",
    comp."Name" AS "Portfolio",
    'Invoice' AS "Document Type"
FROM 
    sfdc_staging.vw_c2g__codapurchaseinvoice__c pi
JOIN 
    sfdc_staging.vw_c2g__codatransaction__c tr ON pi."Record ID"::text = tr."Payable Invoice"::text
JOIN 
    sfdc_staging.vw_c2g__codacompany__c comp ON pi."Company"::text = comp."Record ID"::text
JOIN 
    sfdc_staging.vw_c2g__codaperiod__c cp ON pi."Period"::text = cp."Record ID"::text
LEFT JOIN 
    sfdc_staging.vwx_account acc ON pi."Account"::text = acc."Account ID (2)"::text
LEFT JOIN 
    (SELECT s."System Name" AS "System", s."Primary Customer Name" FROM sfdc_staging.vw_system__c s) sp 
    ON pi."Reference 1"::text = sp."System"::text
WHERE
    -- Filter by either TEP or SLA portfolios
    (comp."Name" IN :tep_portfolios OR comp."Name" IN :sla_portfolios)
    -- Filter by date range
    -- AND pi."Created Date" >= :start_date
    -- AND pi."Created Date" <= :end_date
    -- Additional filters similar to AP payments view
    AND pi."Invoice Status"::text <> ALL (ARRAY ['Discarded'::text, 'In Progress'::text])
    AND acc."Payment Method (1)"::text <> 'Check'::text
    AND pi."Reference 1" IS NOT NULL

UNION ALL

-- Add credit notes with similar structure
SELECT 
    cn."Credit Note Number" AS "Unique Transaction ID",
    cn."Reference 1" AS "Sunnova System ID",
    COALESCE(cn."Credit Note Date", cn."Created Date") AS "Transaction Date",
    -cn."Outstanding Value" AS "Transaction Amount",
    acc."Account Name" AS "Dealer Name",
    cn."Reference 2" AS "Project Stage",  -- Use Project Stage directly
    cn."Credit Note Status" AS "Invoice Status",
    CASE
        WHEN cn."Credit Note Status"::text <> 'Complete'::text THEN 'Not Applicable'::text
        WHEN tr."Document Outstanding Total" = 0::numeric THEN 'Paid'::text
        WHEN tr."Document Total" = tr."Document Outstanding Total" THEN 'Unpaid'::text
        ELSE 'Part Paid'::text
    END AS "Payment Status",
    'Applying a credit' AS "Application",  -- Credit notes are always applying a credit
    sp."Primary Customer Name" AS "Customer Name",
    acc."Account Type (1)" AS "Dealer Type",
    cn."Vendor Credit Note Number" AS "VIN",
    cn."Credit Note Total" AS "Invoice Total2",
    cp."Period ID" AS "Period",
    NULL::text AS "Hold Status",
    comp."Name" AS "Portfolio",
    'Credit Note' AS "Document Type"
FROM 
    sfdc_staging.vw_c2g__codapurchasecreditnote__c cn
JOIN 
    sfdc_staging.vw_c2g__codatransaction__c tr ON cn."Record ID"::text = tr."Payable Credit Note"::text
JOIN 
    sfdc_staging.vw_c2g__codacompany__c comp ON cn."Company"::text = comp."Record ID"::text
JOIN 
    sfdc_staging.vw_c2g__codaperiod__c cp ON cn."Period"::text = cp."Record ID"::text
LEFT JOIN 
    sfdc_staging.vwx_account acc ON cn."Account"::text = acc."Account ID (2)"::text
LEFT JOIN 
    (SELECT s."System Name" AS "System", s."Primary Customer Name" FROM sfdc_staging.vw_system__c s) sp 
    ON cn."Reference 1"::text = sp."System"::text
WHERE
    -- Filter by either TEP or SLA portfolios
    (comp."Name" IN :tep_portfolios OR comp."Name" IN :sla_portfolios)
    -- Filter by date range
    -- AND cn."Created Date" >= :start_date
    -- AND cn."Created Date" <= :end_date
    -- Additional filters similar to AP payments view
    AND cn."Credit Note Status"::text <> ALL (ARRAY ['Discarded'::text, 'In Progress'::text])
    AND acc."Payment Method (1)"::text <> 'Check'::text
    AND cn."Reference 1" IS NOT NULL

ORDER BY 
    "Sunnova System ID", 
    "Transaction Date";
create or replace view vw_ap_payments
            ("PIN", "Account", "Company", "Due Date", "Invoice Date", "Invoice Status", "Invoice Total",
             "Payment Status", "Period", "System Project", "Project Stage", "Payment Method", "VIN", "Homeowner Name",
             "Created Date", "Invoice Total2", "Hold Status", "Dealer/Vendor", "Invoice Type")
as
WITH transaction_c AS (SELECT vw_c2g__codatransaction__c."Document Total",
                              vw_c2g__codatransaction__c."Document Outstanding Total",
                              vw_c2g__codatransaction__c."Payable Invoice",
                              vw_c2g__codatransaction__c."Payable Credit Note"
                       FROM sfdc_staging.vw_c2g__codatransaction__c
                       WHERE vw_c2g__codatransaction__c."Payable Invoice" IS NOT NULL
                          OR vw_c2g__codatransaction__c."Payable Credit Note" IS NOT NULL
                       ORDER BY vw_c2g__codatransaction__c."Payable Invoice",
                                vw_c2g__codatransaction__c."Payable Credit Note"),
     system_project_c AS (SELECT primary_customer_name__c.primary_customer_name__c AS "Primary Customer Name",
                                 sp.system__c                                      AS "System"
                          FROM sfdc_staging.system_project__c sp
                                   LEFT JOIN sfdc_staging.vw_33_22465 primary_customer_name__c
                                             ON primary_customer_name__c.id::text = sp.id::text),
     included_accounts AS (SELECT vwx_account."Account Name",
                                  vwx_account."Payment Method (1)",
                                  vwx_account."Account Type (1)",
                                  vwx_account."Account ID (2)"
                           FROM sfdc_staging.vwx_account
                           WHERE vwx_account."Payment Method (1)"::text <> 'Check'::text),
     payable_invoices AS (SELECT "Payable Invoice"."Payable Invoice Number" AS "PIN",
                                 "Accounts"."Account Name"                  AS "Account",
                                 "Companies"."Name"                         AS "Company",
                                 "Payable Invoice"."Due Date",
                                 "Payable Invoice"."Invoice Date",
                                 "Payable Invoice"."Invoice Status",
                                 - "Payable Invoice"."Outstanding Value"    AS "Invoice Total",
                                 CASE
                                     WHEN "Payable Invoice"."Invoice Status"::text <> 'Complete'::text
                                         THEN 'Not Applicable'::text
                                     WHEN transaction_c."Document Outstanding Total" = 0::numeric THEN 'Paid'::text
                                     WHEN transaction_c."Document Total" = transaction_c."Document Outstanding Total"
                                         THEN 'Unpaid'::text
                                     ELSE 'Part Paid'::text
                                     END                                    AS "Payment Status",
                                 "Coda Period"."Period ID"                  AS "Period",
                                 "Payable Invoice"."Reference 1"            AS "System Project",
                                 "Payable Invoice"."Reference 2"            AS "Project Stage",
                                 "Accounts"."Payment Method (1)"            AS "Payment Method",
                                 "Payable Invoice"."Vendor Invoice Number"  AS "VIN",
                                 system_project_c."Primary Customer Name"   AS "Homeowner Name",
                                 "Payable Invoice"."Created Date",
                                 "Payable Invoice"."Invoice Total"          AS "Invoice Total2",
                                 "Payable Invoice"."Hold Status",
                                 "Accounts"."Account Type (1)"              AS "Dealer/Vendor",
                                 'Payable Invoice'::text                    AS "Invoice Type"
                          FROM sfdc_staging.vw_c2g__codapurchaseinvoice__c "Payable Invoice"
                                   JOIN transaction_c
                                        ON "Payable Invoice"."Record ID"::text = transaction_c."Payable Invoice"::text
                                   JOIN sfdc_staging.vw_c2g__codacompany__c "Companies"
                                        ON "Payable Invoice"."Company"::text = "Companies"."Record ID"::text
                                   JOIN sfdc_staging.vw_c2g__codaperiod__c "Coda Period"
                                        ON "Payable Invoice"."Period"::text = "Coda Period"."Record ID"::text
                                   LEFT JOIN system_project_c
                                             ON "Payable Invoice"."System"::text = system_project_c."System"::text
                                   JOIN included_accounts "Accounts"
                                        ON "Payable Invoice"."Account"::text = "Accounts"."Account ID (2)"::text AND
                                           ("Payable Invoice"."Invoice Status"::text <> ALL
                                            (ARRAY ['Discarded'::character varying::text, 'In Progress'::character varying::text])) AND
                                           "Payable Invoice"."Hold Status" IS NULL
                          WHERE CASE
                                    WHEN "Payable Invoice"."Invoice Status"::text <> 'Complete'::text
                                        THEN 'Not Applicable'::text
                                    WHEN transaction_c."Document Outstanding Total" = 0::numeric THEN 'Paid'::text
                                    WHEN transaction_c."Document Total" = transaction_c."Document Outstanding Total"
                                        THEN 'Unpaid'::text
                                    ELSE 'Part Paid'::text
                                    END = ANY (ARRAY ['Unpaid'::text, 'Not Applicable'::text, 'Part Paid'::text])),
     credit_notes AS (SELECT "Credit Notes"."Credit Note Number"        AS "PIN",
                             "Accounts"."Account Name"                  AS "Account",
                             "Companies"."Name"                         AS "Company",
                             "Credit Notes"."Due Date",
                             "Credit Notes"."Credit Note Date"          AS "Invoice Date",
                             "Credit Notes"."Credit Note Status"        AS "Invoice Status",
                             - "Credit Notes"."Outstanding Value"       AS "Invoice Total",
                             CASE
                                 WHEN "Credit Notes"."Credit Note Status"::text <> 'Complete'::text
                                     THEN 'Not Applicable'::text
                                 WHEN transaction_c."Document Outstanding Total" = 0::numeric THEN 'Paid'::text
                                 WHEN transaction_c."Document Total" = transaction_c."Document Outstanding Total"
                                     THEN 'Unpaid'::text
                                 ELSE 'Part Paid'::text
                                 END                                    AS "Payment Status",
                             "Coda Period"."Period ID"                  AS "Period",
                             "Credit Notes"."Reference 1"               AS "System Project",
                             "Credit Notes"."Reference 2"               AS "Project Stage",
                             "Accounts"."Payment Method (1)"            AS "Payment Method",
                             "Credit Notes"."Vendor Credit Note Number" AS "VIN",
                             system_project_c."Primary Customer Name"   AS "Homeowner Name",
                             "Credit Notes"."Created Date",
                             "Credit Notes"."Credit Note Total"         AS "Invoice Total2",
                             NULL::text                                 AS "Hold Status",
                             "Accounts"."Account Type (1)"              AS "Dealer/Vendor",
                             'Credit Note'::text                        AS "Invoice Type"
                      FROM sfdc_staging.vw_c2g__codapurchasecreditnote__c "Credit Notes"
                               JOIN transaction_c
                                    ON "Credit Notes"."Record ID"::text = transaction_c."Payable Credit Note"::text
                               JOIN sfdc_staging.vw_c2g__codacompany__c "Companies"
                                    ON "Credit Notes"."Company"::text = "Companies"."Record ID"::text
                               JOIN sfdc_staging.vw_c2g__codaperiod__c "Coda Period"
                                    ON "Credit Notes"."Period"::text = "Coda Period"."Record ID"::text
                               LEFT JOIN system_project_c ON "Credit Notes"."System"::text = system_project_c."System"::text
                               JOIN included_accounts "Accounts"
                                    ON "Credit Notes"."Account"::text = "Accounts"."Account ID (2)"::text AND
                                       ("Credit Notes"."Credit Note Status"::text <> ALL
                                        (ARRAY ['Discarded'::character varying::text, 'In Progress'::character varying::text]))
                      WHERE CASE
                                WHEN "Credit Notes"."Credit Note Status"::text <> 'Complete'::text THEN 'Not Applicable'::text
                                WHEN transaction_c."Document Outstanding Total" = 0::numeric THEN 'Paid'::text
                                WHEN transaction_c."Document Total" = transaction_c."Document Outstanding Total"
                                    THEN 'Unpaid'::text
                                ELSE 'Part Paid'::text
                                END = ANY (ARRAY ['Unpaid'::text, 'Not Applicable'::text, 'Part Paid'::text]))
SELECT payable_invoices."PIN",
       payable_invoices."Account",
       payable_invoices."Company",
       payable_invoices."Due Date",
       payable_invoices."Invoice Date",
       payable_invoices."Invoice Status",
       payable_invoices."Invoice Total",
       payable_invoices."Payment Status",
       payable_invoices."Period",
       payable_invoices."System Project",
       payable_invoices."Project Stage",
       payable_invoices."Payment Method",
       payable_invoices."VIN",
       payable_invoices."Homeowner Name",
       payable_invoices."Created Date",
       payable_invoices."Invoice Total2",
       payable_invoices."Hold Status",
       payable_invoices."Dealer/Vendor",
       payable_invoices."Invoice Type"
FROM payable_invoices
UNION ALL
SELECT credit_notes."PIN",
       credit_notes."Account",
       credit_notes."Company",
       credit_notes."Due Date",
       credit_notes."Invoice Date",
       credit_notes."Invoice Status",
       credit_notes."Invoice Total",
       credit_notes."Payment Status",
       credit_notes."Period",
       credit_notes."System Project",
       credit_notes."Project Stage",
       credit_notes."Payment Method",
       credit_notes."VIN",
       credit_notes."Homeowner Name",
       credit_notes."Created Date",
       credit_notes."Invoice Total2",
       credit_notes."Hold Status",
       credit_notes."Dealer/Vendor",
       credit_notes."Invoice Type"
FROM credit_notes;

alter table vw_ap_payments
    owner to fusionods_master;

grant delete, insert, references, select, trigger, truncate, update on vw_ap_payments to group_admin;

grant select on vw_ap_payments to group_poweruser;

grant select on vw_ap_payments to group_accounts_payable;
-- dealer_base_systems.sql
WITH portfolio_systems AS (
    -- Systems from the TEP Backleverage Borrowing Base as of 3/19/25
    SELECT DISTINCT "System Project" AS system_id, 'TEP 3/19/25' AS portfolio
    FROM vw_ap_payments
    WHERE "System Project" IN (
        SELECT system_id 
        FROM :borrowing_base_table_tep_mar
    )
    UNION ALL
    -- Systems from the TEP Backleverage Borrowing Base as of 4/16/25
    SELECT DISTINCT "System Project" AS system_id, 'TEP 4/16/25' AS portfolio  
    FROM vw_ap_payments
    WHERE "System Project" IN (
        SELECT system_id 
        FROM :borrowing_base_table_tep_apr
    )
    UNION ALL
    -- Systems from the SLA as of 4/11/25
    SELECT DISTINCT "System Project" AS system_id, 'SLA 4/11/25' AS portfolio
    FROM vw_ap_payments
    WHERE "System Project" IN (
        SELECT system_id 
        FROM :borrowing_base_table_sla
    )
)

SELECT 
    system_id AS "Sunnova System ID",
    portfolio AS "Portfolio"
FROM portfolio_systems
WHERE system_id IS NOT NULL
ORDER BY portfolio, system_id;
-- ods.vw_sunstreet_einstein_data source

CREATE OR REPLACE VIEW ods.vw_sunstreet_einstein_data
AS SELECT a."Builder Cost Center",
    a."Project Number",
    a."Account Details Name" AS "Account Name",
    a."Inception Date" AS "Inception Date (Year - Month - Day)",
    a."System" AS "System ID",
    s."System Name",
    a."First Close Date",
    a."Home Status",
    s."Asset Portfolio - Customer",
    a."ITC Owner",
    p."Number of Modules" AS "Number of Panels",
    p."Panel Size (W)" AS "Panel Size",
    p."System Size (W)" AS "System Size",
    r."Comp Cost, Material" AS "Material Cost - Component",
    r."Comp Cost, Labor" AS "Labor Cost - Component",
    r."Comp Cost, Material Ohd" AS "Overhead Material Cost - Component",
    r."Total Comp Cost" AS "Total Cost - Component",
    r."Operation Cost, Machine Ohd" AS "Overhead Machine Cost - Operations",
    r."Total Operation Cost" AS "Total Cost - Operations",
    q."Total System Cost",
    q."Total Sunnova Purchased Equipment Cost",
    b."Sum of Sales Invoice Price" AS "Sales Invoice Price",
    wo.design_total_price AS "design_work_order.TotalPrice",
    wo.rough_install_total_price AS "rough_install_work_order.TotalPrice",
    wo.pv_install_total_price AS "pv_install_work_order.TotalPrice",
    wo.commissioning_total_price AS "Commissioning_work_order.TotalPrice",
    wo.adder_total_price AS "adder_work_order.TotalPrice",
    wo.cf6r_total_price AS "cf6r_work_order.TotalPrice",
    wo.interconnection_fees_total_price AS "interconnection_fee_work_order.TotalPrice",
    wo.permit_fees_total_price AS "permit_fees__work_order.TotalPrice",
    wo.pre_wire_total_price AS "pre_wire_work_order.TotalPrice",
    wo.fees_total_price AS "fees_work_order.TotalPrice"
   FROM sfdc_staging.vw_account_details__c a
     LEFT JOIN sfdc_staging.vw_system__c s ON s."Record ID"::text = a."System"::text
     LEFT JOIN sfdc_staging.vw_pvsyst_2__c p ON a."PVSyst"::text = p."Record ID"::text
     LEFT JOIN sfdc_staging.vw_rstk__wocst__c r ON r."Record ID"::text = a."Material Work Order"::text
     LEFT JOIN sfdc_staging.vw_quote__c q ON q."System"::text = a."System"::text
     LEFT JOIN ( SELECT i."Account",
            sum(i."Invoice Total") AS "Sum of Sales Invoice Price"
           FROM sfdc_staging.vw_c2g__codainvoice__c i
          GROUP BY i."Account") b ON b."Account"::text = a."Account"::text
     LEFT JOIN ( SELECT ct."Account Details",
            sum(
                CASE
                    WHEN ct."Order Type"::text = ANY (ARRAY['PV Design'::character varying, 'PV Design Plus'::character varying, 'Revision'::character varying, 'Consumption Study'::character varying, 'Electrical Engineering'::character varying, 'Master Set'::character varying, 'Shade Analysis'::character varying, 'Structural Engineering'::character varying]::text[]) THEN ct.total_price
                    ELSE NULL::numeric
                END) AS design_total_price,
            sum(
                CASE
                    WHEN ct."Order Type"::text = 'Rough Install'::text THEN ct.total_price
                    ELSE NULL::numeric
                END) AS rough_install_total_price,
            sum(
                CASE
                    WHEN ct."Order Type"::text = 'PV Install'::text THEN ct.total_price
                    ELSE NULL::numeric
                END) AS pv_install_total_price,
            sum(
                CASE
                    WHEN ct."Order Type"::text = 'Adder'::text THEN ct.total_price
                    ELSE NULL::numeric
                END) AS adder_total_price,
            sum(
                CASE
                    WHEN ct."Order Type"::text = 'CF6R'::text THEN ct.total_price
                    ELSE NULL::numeric
                END) AS cf6r_total_price,
            sum(
                CASE
                    WHEN ct."Order Type"::text = 'Interconnection Fees'::text THEN ct.total_price
                    ELSE NULL::numeric
                END) AS interconnection_fees_total_price,
            sum(
                CASE
                    WHEN ct."Order Type"::text = 'Permit Fees'::text THEN ct.total_price
                    ELSE NULL::numeric
                END) AS permit_fees_total_price,
            sum(
                CASE
                    WHEN ct."Order Type"::text = 'Pre-Wire'::text THEN ct.total_price
                    ELSE NULL::numeric
                END) AS pre_wire_total_price,
            sum(
                CASE
                    WHEN ct."Order Type"::text = 'Fees'::text THEN ct.total_price
                    ELSE NULL::numeric
                END) AS fees_total_price,
            sum(
                CASE
                    WHEN ct."Order Type"::text = 'Commissioning'::text THEN ct.total_price
                    ELSE NULL::numeric
                END) AS commissioning_total_price
           FROM ( SELECT t."Account Details",
                    t."Order Type",
                    sum(t."Total Price") AS total_price
                   FROM sfdc_staging.vw_workorder t
                  WHERE t."Status"::text <> 'Canceled'::text AND (t."Order Type"::text = ANY (ARRAY['PV Design'::character varying, 'PV Design Plus'::character varying, 'Revision'::character varying, 'Rough Install'::character varying, 'PV Install'::character varying, 'Adder'::character varying, 'CF6R'::character varying, 'Interconnection Fees'::character varying, 'Permit Fees'::character varying, 'Pre-Wire'::character varying, 'Fees'::character varying, 'Commissioning'::character varying, 'Consumption Study'::character varying, 'Electrical Engineering'::character varying, 'Master Set'::character varying, 'Shade Analysis'::character varying, 'Structural Engineering'::character varying]::text[]))
                  GROUP BY t."Account Details", t."Order Type") ct
          GROUP BY ct."Account Details") wo ON wo."Account Details"::text = a."Record ID (1)"::text
  WHERE a."First Close Date" IS NOT NULL AND a."First Close Date" >= (CURRENT_DATE - '2 years'::interval);

Python Modules

This datatape includes additional Python modules with business logic:

dealer_transactions_explorer

assemble_dashboard(create_summary_tab, create_distributions_tab, create_trends_tab, display_ui_and_error, load_data_and_populate_filters, mo)

Assembles the final dashboard layout.

Source code in datatapes/dealer_transactions/dealer_transactions_explorer.py
@app.cell
def assemble_dashboard(
    create_summary_tab,
    create_distributions_tab,
    create_trends_tab,
    display_ui_and_error,
    load_data_and_populate_filters,
    mo,
):
    """Assembles the final dashboard layout."""
    ui_elements, dashboard_placeholder, _, _ = display_ui_and_error
    # Get load_error status from the correct cell output
    _, _, load_error, _, _, _, _, _, _, _, _, _, _ = load_data_and_populate_filters

    if load_error:
        dashboard = ui_elements  # Show only UI and error
    else:
        # Create tabs using the content generated in other cells
        tabs = mo.ui.tabs(
            {
                "📊 Summary & Quality": create_summary_tab,
                "📈 Distributions": create_distributions_tab,
                "📉 Trends": create_trends_tab,
            }
        )
        dashboard = mo.vstack([ui_elements, tabs])

    # Add Title at the very top
    final_layout = mo.vstack([mo.md("# Dealer Transactions Dashboard"), dashboard])

    return (final_layout,)  # Return as a tuple with trailing comma

build_filters(load_data_and_populate_filters, pd, datetime, DATE_COL, PORTFOLIO_GROUP_COL, DEALER_COL, STAGE_COL, APPLICATION_COL, PAYMENT_STATUS_COL)

Builds the SQL WHERE clause based on filter values.

Source code in datatapes/dealer_transactions/dealer_transactions_explorer.py
@app.cell
def build_filters(
    load_data_and_populate_filters,
    pd,
    datetime,
    DATE_COL,
    PORTFOLIO_GROUP_COL,
    DEALER_COL,
    STAGE_COL,
    APPLICATION_COL,
    PAYMENT_STATUS_COL,
):
    """Builds the SQL WHERE clause based on filter values."""
    (
        con,
        df,
        load_error,
        date_filter,
        pg_filter,
        dealer_filter,
        stage_filter,
        app_filter,
        ps_filter,
        _,
        _,
        _,
        _,
        _,
    ) = load_data_and_populate_filters

    if load_error or con is None:
        return "WHERE 1=0", []

    conditions = ["1=1"]
    params = []

    # Date Filter
    if date_filter.value and date_filter.value[0] and date_filter.value[1]:
        try:
            start_date = pd.to_datetime(date_filter.value[0]).date()
            end_date = pd.to_datetime(date_filter.value[1]).date()
            conditions.append(f'"{DATE_COL}"::DATE BETWEEN ? AND ?')
            params.extend([start_date, end_date])
        except ValueError:
            print("Warning: Invalid date format in date filter.")
            conditions.append("1=0")

    # Multi-select Filters Helper
    def add_multi_filter(col_name, filter_element):
        if (
            hasattr(filter_element, "value")
            and isinstance(filter_element.value, list)
            and filter_element.value
        ):
            if df is not None and col_name in df.columns:
                placeholders = ",".join("?" * len(filter_element.value))
                conditions.append(f'"{col_name}" IN ({placeholders})')
                params.extend(filter_element.value)
            else:
                print(
                    f"Warning: Filter column '{col_name}' not found in DataFrame. Skipping filter."
                )

    add_multi_filter(PORTFOLIO_GROUP_COL, pg_filter)
    add_multi_filter(DEALER_COL, dealer_filter)
    add_multi_filter(STAGE_COL, stage_filter)
    add_multi_filter(APPLICATION_COL, app_filter)
    add_multi_filter(PAYMENT_STATUS_COL, ps_filter)

    where_clause = "WHERE " + " AND ".join(conditions)
    return where_clause, params

create_distributions_tab(build_filters, load_data_and_populate_filters, mo, px)

Generates the content for the Distributions tab.

Source code in datatapes/dealer_transactions/dealer_transactions_explorer.py
@app.cell
def create_distributions_tab(build_filters, load_data_and_populate_filters, mo, px):
    """Generates the content for the Distributions tab."""
    where_clause, params = build_filters
    (
        con,
        df,
        load_error,
        _,
        _,
        _,
        _,
        _,
        _,
        num_col_select,
        cat_col_select,
        cat_measure_select,
        _,
        _,
    ) = load_data_and_populate_filters

    if load_error or con is None:
        return mo.md("Data not loaded.")

    # --- Numeric Distribution Plot ---
    numeric_plot_area = mo.md("Select a numeric column to view its distribution.")
    if num_col_select.value:
        selected_num_col = num_col_select.value
        if selected_num_col not in df.columns:  # Check column exists
            numeric_plot_area = mo.md(
                f"Selected numeric column '{selected_num_col}' not found."
            ).callout(kind="warn")
        else:
            try:
                query = f'SELECT "{selected_num_col}" FROM datatape {where_clause} AND "{selected_num_col}" IS NOT NULL AND isfinite("{selected_num_col}")'
                num_data = con.execute(query, params).df()
                if not num_data.empty:
                    fig_hist = px.histogram(
                        num_data,
                        x=selected_num_col,
                        title=f"Distribution of {selected_num_col}",
                        marginal="box",
                    )
                    numeric_plot_area = fig_hist
                else:
                    numeric_plot_area = mo.md(
                        f"No valid numeric data available for '{selected_num_col}' with current filters."
                    )
            except Exception as e:
                numeric_plot_area = mo.md(
                    f"**Error plotting {selected_num_col}:** {e}"
                ).callout(kind="danger")

    # --- Categorical Distribution Plot ---
    category_plot_area = mo.md(
        "Select a category column and measure to view distributions."
    )
    if cat_col_select.value and cat_measure_select.value:
        selected_cat_col = cat_col_select.value
        selected_measure_label = cat_measure_select.value  # This is the label/key
        # Get the SQL aggregation string from the options dictionary
        selected_measure_agg = cat_measure_select.options.get(selected_measure_label)

        if selected_cat_col not in df.columns:  # Check column exists
            category_plot_area = mo.md(
                f"Selected category column '{selected_cat_col}' not found."
            ).callout(kind="warn")
        elif not selected_measure_agg:  # Check if lookup failed
            category_plot_area = mo.md(
                f"Invalid measure selected: {selected_measure_label}"
            ).callout(kind="danger")
        else:
            try:
                cat_query = f"""
                    SELECT "{selected_cat_col}", {selected_measure_agg} AS "MeasureValue"
                    FROM datatape
                    {where_clause} AND "{selected_cat_col}" IS NOT NULL
                    GROUP BY 1
                    ORDER BY "MeasureValue" DESC
                    LIMIT 50
                """
                cat_data = con.execute(cat_query, params).df()

                if not cat_data.empty:
                    fig_bar = px.bar(
                        cat_data,
                        x=selected_cat_col,
                        y="MeasureValue",
                        title=f"{selected_measure_label} by {selected_cat_col} (Top 50)",
                        text_auto=True,
                    )
                    fig_bar.update_layout(
                        xaxis={"categoryorder": "total descending"},
                        yaxis_title=selected_measure_label,
                    )
                    category_plot_area = fig_bar
                else:
                    category_plot_area = mo.md(
                        f"No data available for '{selected_cat_col}' with current filters."
                    )
            except Exception as e:
                category_plot_area = mo.md(
                    f"**Error plotting {selected_cat_col}:** {e}"
                ).callout(kind="danger")

    # --- Layout ---
    distributions_tab = mo.vstack(
        [
            mo.md("### Explore Distributions"),
            mo.hstack(
                [num_col_select, cat_col_select, cat_measure_select], justify="start"
            ),
            mo.md("---"),
            numeric_plot_area,
            mo.md("---"),
            category_plot_area,
        ]
    )
    return distributions_tab

create_summary_tab(build_filters, load_data_and_populate_filters, mo, px, go, KEY_COL, AMOUNT_COL, DEALER_COL, SYSTEM_ID_COL)

Generates the content for the Summary & Quality tab.

Source code in datatapes/dealer_transactions/dealer_transactions_explorer.py
@app.cell
def create_summary_tab(
    build_filters,
    load_data_and_populate_filters,
    mo,
    px,
    go,
    KEY_COL,
    AMOUNT_COL,
    DEALER_COL,
    SYSTEM_ID_COL,
):
    """Generates the content for the Summary & Quality tab."""
    where_clause, params = build_filters
    con, df, load_error, _, _, _, _, _, _, _, _, _, _ = load_data_and_populate_filters

    if load_error or con is None:
        return mo.md("Data not loaded.")

    # --- KPIs ---
    kpi_display = mo.md("Loading KPIs...")
    try:
        # Ensure columns exist before querying
        kpi_cols = [KEY_COL, AMOUNT_COL, DEALER_COL, SYSTEM_ID_COL]
        if not all(c in df.columns for c in kpi_cols):
            raise ValueError(f"One or more KPI columns missing: {kpi_cols}")

        kpi_query = f"""
            SELECT
                COUNT(DISTINCT "{KEY_COL}") as total_transactions,
                SUM("{AMOUNT_COL}") as total_amount,
                COUNT(DISTINCT "{DEALER_COL}") as unique_dealers,
                COUNT(DISTINCT "{SYSTEM_ID_COL}") as unique_systems
            FROM datatape {where_clause}
        """
        kpis = con.execute(kpi_query, params).fetchone()
        if kpis:
            kpi_display = mo.hstack(
                [
                    mo.stat("Total Transactions", f"{kpis[0]:,}").center(),
                    mo.stat(
                        "Total Transaction Amount",
                        f"${kpis[1]:,.2f}" if kpis[1] is not None else "$0.00",
                    ).center(),
                    mo.stat("Unique Dealers", f"{kpis[2]:,}").center(),
                    mo.stat("Unique Systems", f"{kpis[3]:,}").center(),
                ],
                justify="space-around",
            )
        else:
            kpi_display = mo.md("No KPI data found for current filters.")
    except Exception as e:
        kpi_display = mo.md(f"**Error fetching KPIs:** {e}").callout(kind="danger")

    # --- Missing Values ---
    missing_plot = mo.md("Loading missing value analysis...")
    try:
        all_columns = df.columns.tolist()  # Use columns from the actual loaded df
        if all_columns:
            placeholders = ", ".join(
                [
                    f'SUM(CASE WHEN "{col}" IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) as "{col}"'
                    for col in all_columns
                ]
            )
            missing_query = f"SELECT {placeholders} FROM datatape {where_clause}"
            missing_data = con.execute(missing_query, params).df()

            if not missing_data.empty:
                missing_series = missing_data.iloc[0]
                missing_series = missing_series[missing_series > 0].sort_values(
                    ascending=False
                )
                if not missing_series.empty:
                    fig_missing = px.bar(
                        missing_series.head(20),
                        orientation="h",
                        title="Missing Values (%) by Column (Top 20)",
                        labels={"index": "Column", "value": "Missing %"},
                        text_auto=".2f",
                    )
                    fig_missing.update_layout(
                        yaxis={"categoryorder": "total ascending"},
                        height=max(400, len(missing_series.head(20)) * 20),
                    )
                    missing_plot = fig_missing
                else:
                    missing_plot = mo.md(
                        "**No missing values found in filtered data.**"
                    ).callout(kind="success")
            else:
                missing_plot = mo.md("Could not calculate missing values.")
        else:
            missing_plot = mo.md(
                "Could not retrieve column names for missing value analysis."
            )

    except Exception as e:
        missing_plot = mo.md(f"**Error calculating missing values:** {e}").callout(
            kind="danger"
        )

    # --- Duplicate Check ---
    duplicate_md = mo.md("Loading duplicate check...")
    try:
        if KEY_COL not in df.columns:
            raise ValueError(f"Key column '{KEY_COL}' not found for duplicate check.")
        dupe_query = f'SELECT COUNT(*) FROM (SELECT COUNT(*) FROM datatape {where_clause} GROUP BY "{KEY_COL}" HAVING COUNT(*) > 1)'
        dupe_groups_count = con.execute(dupe_query, params).fetchone()[0]
        if dupe_groups_count > 0:
            total_rows_query = f"SELECT COUNT(*) FROM datatape {where_clause}"
            unique_keys_query = (
                f'SELECT COUNT(DISTINCT "{KEY_COL}") FROM datatape {where_clause}'
            )
            total_rows = con.execute(total_rows_query, params).fetchone()[0]
            unique_keys = con.execute(unique_keys_query, params).fetchone()[0]
            dupe_rows_count = total_rows - unique_keys
            duplicate_md = mo.md(
                f"**Duplicate Transactions:** Found {dupe_groups_count} transaction IDs with {dupe_rows_count} duplicate rows."
            ).callout(kind="danger")
        else:
            duplicate_md = mo.md(
                f'**Duplicate Transactions:** No duplicates found based on "{KEY_COL}".'
            ).callout(kind="success")
    except Exception as e:
        duplicate_md = mo.md(f"**Error checking duplicates:** {e}").callout(
            kind="danger"
        )

    # --- Layout ---
    summary_quality_tab = mo.vstack(
        [
            mo.md("### Key Performance Indicators"),
            kpi_display,
            mo.md("---"),
            mo.md("### Data Quality"),
            duplicate_md,
            missing_plot,
        ]
    )
    return summary_quality_tab

Generates the content for the Trends tab.

Source code in datatapes/dealer_transactions/dealer_transactions_explorer.py
@app.cell
def create_trends_tab(
    build_filters, load_data_and_populate_filters, mo, px, pd, DATE_COL
):
    """Generates the content for the Trends tab."""
    where_clause, params = build_filters
    (
        con,
        df,
        load_error,
        _,
        _,
        _,
        _,
        _,
        _,
        _,
        _,
        _,
        trend_granularity,
        trend_measure_select,
    ) = load_data_and_populate_filters

    if load_error or con is None:
        return mo.md("Data not loaded.")

    # --- Trend Plot ---
    trend_plot_area = mo.md("Configure trend options.")
    if trend_granularity.value and trend_measure_select.value:
        granularity = trend_granularity.value
        selected_measure_label = trend_measure_select.value  # This is the label/key
        # Get the SQL aggregation string from the options dictionary
        measure_agg = trend_measure_select.options.get(selected_measure_label)
        date_col = DATE_COL

        if not measure_agg:  # Check if lookup failed
            trend_plot_area = mo.md(
                f"Invalid measure selected: {selected_measure_label}"
            ).callout(kind="danger")
        else:
            # Determine date truncation based on granularity
            if granularity == "Daily":
                date_part_sql = f"strftime(\"{date_col}\", '%Y-%m-%d')"
            elif granularity == "Weekly":
                date_part_sql = f"date_trunc('week', \"{date_col}\"::DATE)"
            elif granularity == "Monthly":
                date_part_sql = f"strftime(\"{date_col}\", '%Y-%m')"
            else:
                date_part_sql = f"strftime(\"{date_col}\", '%Y-%m')"  # Default Monthly

            try:
                trend_query = f"""
                    SELECT
                        {date_part_sql} AS TimePeriod,
                        {measure_agg} AS "MeasureValue"
                    FROM datatape
                    {where_clause} AND "{date_col}" IS NOT NULL
                    GROUP BY TimePeriod
                    ORDER BY TimePeriod
                """
                trend_data = con.execute(trend_query, params).df()

                if not trend_data.empty:
                    trend_data["TimePeriod"] = pd.to_datetime(trend_data["TimePeriod"])
                    fig_trend = px.line(
                        trend_data,
                        x="TimePeriod",
                        y="MeasureValue",
                        title=f"{selected_measure_label} Trend ({granularity})",  # Use label for title
                        markers=True,
                    )
                    fig_trend.update_layout(
                        xaxis_title="Time Period", yaxis_title=selected_measure_label
                    )  # Use label for axis
                    trend_plot_area = fig_trend
                else:
                    trend_plot_area = mo.md(
                        "No trend data available for the selected filters and options."
                    )

            except Exception as e:
                trend_plot_area = mo.md(
                    f"**Error generating trend data:** {e}"
                ).callout(kind="danger")

    # --- Layout ---
    trends_tab = mo.vstack(
        [
            mo.md("### Transaction Trends"),
            mo.hstack([trend_granularity, trend_measure_select], justify="start"),
            mo.md("---"),
            trend_plot_area,
        ]
    )
    return trends_tab

create_ui_elements(mo, pd, datetime, timedelta, AMOUNT_COL)

Define UI elements, populated later reactively.

Source code in datatapes/dealer_transactions/dealer_transactions_explorer.py
@app.cell
def create_ui_elements(mo, pd, datetime, timedelta, AMOUNT_COL):
    """Define UI elements, populated later reactively."""
    file_path_input = mo.ui.text(
        label="Enter Datatape Path (CSV):",
        # Default to a plausible path based on the config
        value="./Completed Output/Dealer_Transactions_Report_YYYYMMDD.csv",  # Example default - user needs to change YYYYMMDD
    )
    # Set reasonable default date range
    today_str = datetime.now().strftime("%Y-%m-%d")
    one_year_ago_str = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
    date_filter = mo.ui.date_range(
        value=(one_year_ago_str, today_str), label="Transaction Date Range:"
    )
    # Placeholders for filters - populated after data load
    portfolio_group_filter = mo.ui.multiselect([], label="Portfolio Group:")
    dealer_filter = mo.ui.multiselect([], label="Dealer Name (Top 50):")
    stage_filter = mo.ui.multiselect([], label="Project Stage:")
    application_filter = mo.ui.multiselect([], label="Application:")
    payment_status_filter = mo.ui.multiselect([], label="Payment Status:")

    # Selectors for distribution plots
    numeric_dist_select = mo.ui.dropdown([], label="Numeric Column:")
    category_dist_select = mo.ui.dropdown([], label="Category Column:")
    # Use labels as keys, values as actual SQL aggregations
    category_measure_options = {
        "Count": "COUNT(*)",
        f"Sum {AMOUNT_COL}": f'SUM("{AMOUNT_COL}")',
    }
    category_measure_select = mo.ui.dropdown(
        options=category_measure_options,
        value="Count",  # Default value is the display label/key
        label="Category Measure:",
    )

    # Selectors for trend plots
    trend_granularity = mo.ui.radio(
        options=["Monthly", "Weekly", "Daily"],
        value="Monthly",
        label="Trend Granularity:",
    )
    # Use labels as keys, values as actual SQL aggregations
    trend_measure_options = {
        "Count": "COUNT(*)",
        f"Sum {AMOUNT_COL}": f'SUM("{AMOUNT_COL}")',
    }
    trend_measure_select = mo.ui.dropdown(
        options=trend_measure_options,
        value="Count",  # Default value is the display label/key
        label="Trend Measure:",
    )

    return (
        file_path_input,
        date_filter,
        portfolio_group_filter,
        dealer_filter,
        stage_filter,
        application_filter,
        payment_status_filter,
        numeric_dist_select,
        category_dist_select,
        category_measure_select,
        trend_granularity,
        trend_measure_select,
    )

display_ui_and_error(load_data_and_populate_filters, create_ui_elements, mo)

Displays file input and filters, or an error message.

Source code in datatapes/dealer_transactions/dealer_transactions_explorer.py
@app.cell
def display_ui_and_error(load_data_and_populate_filters, create_ui_elements, mo):
    """Displays file input and filters, or an error message."""
    (
        con,
        df,
        load_error,
        date_filter,
        pg_filter,
        dealer_filter,
        stage_filter,
        app_filter,
        ps_filter,
        num_dist_sel,
        cat_dist_sel,
        cat_measure_sel,
        trend_gran_sel,
        trend_measure_sel,
    ) = load_data_and_populate_filters
    file_path_input, _, _, _, _, _, _, _, _, _, _, _ = create_ui_elements

    file_input_section = mo.vstack([file_path_input])

    if load_error:
        ui_elements = mo.vstack(
            [
                file_input_section,
                mo.md(f"**Status:** {load_error}").callout(kind="warn"),
            ]
        )
        dashboard_content = mo.md("")
    else:
        ui_elements = mo.vstack(
            [
                file_input_section,
                mo.md("---"),
                mo.md("### Filters"),
                mo.hstack([date_filter, pg_filter, dealer_filter], justify="start"),
                mo.hstack([stage_filter, app_filter, ps_filter], justify="start"),
                mo.md("---"),
            ]
        )
        dashboard_content = mo.md("Dashboard content will load here...")  # Placeholder

    return ui_elements, dashboard_content, con, df

import_libs()

Import libraries and return them for use in other cells.

Source code in datatapes/dealer_transactions/dealer_transactions_explorer.py
@app.cell
def import_libs():
    """Import libraries and return them for use in other cells."""
    # This cell ensures all imports are available to other cells via Marimo's dataflow
    import io
    from datetime import datetime, timedelta
    from pathlib import Path

    import duckdb
    import marimo as mo
    import numpy as np
    import pandas as pd
    import plotly.express as px
    import plotly.graph_objects as go

    return mo, pd, np, duckdb, px, go, Path, io, datetime, timedelta

load_data_and_populate_filters(create_ui_elements, pd, duckdb, io, Path, mo, np, datetime, EXPECTED_COLS, DATE_COL, NUMERIC_COLS_FOR_DIST, CATEGORY_COLS_FOR_DIST, PORTFOLIO_GROUP_COL, DEALER_COL, STAGE_COL, APPLICATION_COL, PAYMENT_STATUS_COL, KEY_COL, SYSTEM_ID_COL, AMOUNT_COL)

Loads data, registers with DuckDB, and populates filter options.

Source code in datatapes/dealer_transactions/dealer_transactions_explorer.py
@app.cell
def load_data_and_populate_filters(
    create_ui_elements,
    pd,
    duckdb,
    io,
    Path,
    mo,
    np,
    datetime,
    EXPECTED_COLS,
    DATE_COL,
    NUMERIC_COLS_FOR_DIST,
    CATEGORY_COLS_FOR_DIST,
    PORTFOLIO_GROUP_COL,
    DEALER_COL,
    STAGE_COL,
    APPLICATION_COL,
    PAYMENT_STATUS_COL,
    KEY_COL,
    SYSTEM_ID_COL,
    AMOUNT_COL,
):
    """Loads data, registers with DuckDB, and populates filter options."""
    (
        file_path_input,
        date_filter_ph,
        pg_filter_ph,
        dealer_filter_ph,
        stage_filter_ph,
        app_filter_ph,
        ps_filter_ph,
        num_dist_ph,
        cat_dist_ph,
        cat_measure_ph,
        trend_gran_ph,
        trend_measure_ph,
    ) = create_ui_elements

    con = None
    df = None
    load_error = "Enter a valid file path and press Enter."
    min_date, max_date = None, None
    portfolio_groups, dealers, stages, applications, payment_statuses = (
        [],
        [],
        [],
        [],
        [],
    )
    numeric_cols, category_cols = [], []

    file_path_str = file_path_input.value
    if file_path_str:
        file_path = Path(file_path_str)
        if file_path.exists() and file_path.is_file():
            try:
                print(f"Loading data from: {file_path}")
                if file_path.suffix.lower() == ".csv":
                    df = pd.read_csv(file_path, low_memory=False)
                elif file_path.suffix.lower() == ".parquet":
                    df = pd.read_parquet(file_path)
                else:
                    raise ValueError(
                        "Unsupported file type. Please provide CSV or Parquet."
                    )

                print(f"Loaded {len(df)} rows. Initial columns: {df.columns.tolist()}")

                # --- Data Validation/Cleaning ---
                # Check for expected columns and add if missing
                found_cols = []
                for col in EXPECTED_COLS:
                    if col not in df.columns:
                        print(
                            f"Warning: Expected column '{col}' missing. Adding as empty."
                        )
                        df[col] = None
                    else:
                        found_cols.append(col)
                # Optional: Select only expected columns? df = df[found_cols]

                # Convert Date Column
                if DATE_COL in df.columns:
                    df[DATE_COL] = pd.to_datetime(df[DATE_COL], errors="coerce")
                    initial_rows = len(df)
                    df.dropna(subset=[DATE_COL], inplace=True)
                    dropped_rows = initial_rows - len(df)
                    if dropped_rows > 0:
                        print(
                            f"Dropped {dropped_rows} rows with invalid dates in '{DATE_COL}'."
                        )
                    print(
                        f"Converted '{DATE_COL}'. Shape after dropping NaT dates: {df.shape}"
                    )
                else:
                    # If date column is missing after check, we can't proceed
                    raise ValueError(
                        f"Critical date column '{DATE_COL}' not found or added as None."
                    )

                # Convert Numeric Columns
                for num_col in NUMERIC_COLS_FOR_DIST:
                    if num_col in df.columns:
                        df[num_col] = pd.to_numeric(df[num_col], errors="coerce")
                    else:
                        # This case should be handled by the EXPECTED_COLS check above
                        print(
                            f"Warning: Numeric column '{num_col}' not found during conversion."
                        )
                        df[num_col] = np.nan

                # Connect to DuckDB and register DataFrame
                con = duckdb.connect(":memory:")
                con.register("datatape", df)
                print("Registered DataFrame with DuckDB.")
                load_error = None  # Clear error on success

                # --- Populate Filters ---
                min_date, max_date = con.execute(
                    f'SELECT MIN("{DATE_COL}"), MAX("{DATE_COL}") FROM datatape'
                ).fetchone()
                print(f"Date range: {min_date} to {max_date}")

                def get_options(col_name):
                    if col_name not in df.columns:
                        return []
                    query = f'SELECT DISTINCT "{col_name}" FROM datatape WHERE "{col_name}" IS NOT NULL ORDER BY 1'
                    try:
                        result = con.execute(query).fetchall()
                        return sorted([str(item[0]) for item in result])
                    except Exception as e:
                        print(f"Warning: Could not get options for {col_name}: {e}")
                        return []

                portfolio_groups = get_options(PORTFOLIO_GROUP_COL)
                dealers_all = get_options(DEALER_COL)
                dealers = dealers_all[:50]
                if len(dealers_all) > 50:
                    print(f"Showing Top 50 dealers out of {len(dealers_all)}")
                stages = get_options(STAGE_COL)
                applications = get_options(APPLICATION_COL)
                payment_statuses = get_options(PAYMENT_STATUS_COL)

                # Get numeric/category cols for selectors based on actual columns present
                numeric_cols = df.select_dtypes(include=np.number).columns.tolist()
                exclude_from_cats = [
                    KEY_COL,
                    SYSTEM_ID_COL,
                    DEALER_COL,
                    "Customer Name",
                    "Portfolio",
                    "VIN",
                ]
                category_cols = df.select_dtypes(
                    include=["object", "category"]
                ).columns.tolist()
                # Filter based on CATEGORY_COLS_FOR_DIST and existence in df
                category_cols = [
                    c
                    for c in CATEGORY_COLS_FOR_DIST
                    if c in df.columns and c not in exclude_from_cats
                ]

            except Exception as e:
                load_error = f"Error loading or processing data: {e}"
                print(f"Error details: {e}")
                con = None
                df = None
        else:
            load_error = f"File not found or path is invalid: {file_path_str}"

    # --- Update UI Elements ---
    date_filter = (
        mo.ui.date_range(
            start=str(min_date.date()) if min_date else "2020-01-01",
            stop=str(max_date.date())
            if max_date
            else datetime.now().strftime("%Y-%m-%d"),
            value=(
                str(min_date.date()) if min_date else "2020-01-01",
                str(max_date.date())
                if max_date
                else datetime.now().strftime("%Y-%m-%d"),
            ),
            label="Transaction Date Range:",
        )
        if min_date and max_date
        else date_filter_ph
    )

    portfolio_group_filter = mo.ui.multiselect(
        options=portfolio_groups, label="Portfolio Group:", value=portfolio_groups
    )
    dealer_filter = mo.ui.multiselect(
        options=dealers, label="Dealer Name (Top 50):", value=[]
    )
    stage_filter = mo.ui.multiselect(
        options=stages, label="Project Stage:", value=stages
    )
    application_filter = mo.ui.multiselect(
        options=applications, label="Application:", value=applications
    )
    payment_status_filter = mo.ui.multiselect(
        options=payment_statuses, label="Payment Status:", value=payment_statuses
    )

    numeric_dist_select = mo.ui.dropdown(
        options=sorted(numeric_cols), label="Numeric Column:"
    )
    # Ensure CATEGORY_COLS_FOR_DIST is used and columns exist
    valid_cat_cols = (
        [c for c in CATEGORY_COLS_FOR_DIST if c in df.columns] if df is not None else []
    )
    category_dist_select = mo.ui.dropdown(
        options=sorted(valid_cat_cols), label="Category Column:"
    )
    category_measure_select = cat_measure_ph
    trend_granularity = trend_gran_ph
    trend_measure_select = trend_measure_ph

    return (
        con,
        df,
        load_error,
        date_filter,
        portfolio_group_filter,
        dealer_filter,
        stage_filter,
        application_filter,
        payment_status_filter,
        numeric_dist_select,
        category_dist_select,
        category_measure_select,
        trend_granularity,
        trend_measure_select,
    )

dealer_explore

build_where_clause(update_filters, load_data, pd, datetime, DATE_COL, PORTFOLIO_GROUP_COL, DEALER_COL, STAGE_COL, APPLICATION_COL, PAYMENT_STATUS_COL)

Builds the SQL WHERE clause based on potentially updated filter values.

Source code in datatapes/dealer_transactions/dealer_explore.py
@app.function
def build_where_clause(update_filters, load_data, pd, datetime, DATE_COL, PORTFOLIO_GROUP_COL, DEALER_COL, STAGE_COL, APPLICATION_COL, PAYMENT_STATUS_COL):
    """Builds the SQL WHERE clause based on potentially updated filter values."""
    # Get the potentially updated filter values
    (
        date_filter, pg_filter, dealer_filter, stage_filter,
        app_filter, ps_filter, _, _, _, _, _
    ) = update_filters
    # Get connection and df status
    con, df, load_error = load_data

    if load_error or con is None or df is None:
        return "WHERE 1=0", [] # Return clause that yields no results

    conditions = ["1=1"]
    params = []

    # Date Filter
    if date_filter.value and date_filter.value[0] and date_filter.value[1]:
        try:
            start_date = pd.to_datetime(date_filter.value[0]).date()
            end_date = pd.to_datetime(date_filter.value[1]).date()
            conditions.append(f'"{DATE_COL}"::DATE BETWEEN ? AND ?')
            params.extend([start_date, end_date])
        except ValueError:
            print("Warning: Invalid date format in date filter.")
            conditions.append("1=0")

    # Multi-select Filters Helper
    def add_multi_filter(col_name, filter_element):
        if hasattr(filter_element, 'value') and isinstance(filter_element.value, list) and filter_element.value:
            if col_name in df.columns:
                placeholders = ','.join('?' * len(filter_element.value))
                conditions.append(f'"{col_name}" IN ({placeholders})')
                params.extend(filter_element.value)
            else:
                 print(f"Warning: Filter column '{col_name}' not found in DataFrame. Skipping filter.")

    add_multi_filter(PORTFOLIO_GROUP_COL, pg_filter)
    add_multi_filter(DEALER_COL, dealer_filter)
    add_multi_filter(STAGE_COL, stage_filter)
    add_multi_filter(APPLICATION_COL, app_filter)
    add_multi_filter(PAYMENT_STATUS_COL, ps_filter)

    where_clause = "WHERE " + " AND ".join(conditions)
    return where_clause, params

display_ui(create_initial_ui, update_filters, load_data, mo)

Displays the file input and dynamically updated filters.

Source code in datatapes/dealer_transactions/dealer_explore.py
@app.function
def display_ui(create_initial_ui, update_filters, load_data, mo):
    """Displays the file input and dynamically updated filters."""
    # Get initial file input and load button
    file_path_input, load_button, _, _, _, _, _, _, _, _, _, _, _ = create_initial_ui
    # Get potentially updated filters
    (
        date_filter, pg_filter, dealer_filter, stage_filter,
        app_filter, ps_filter, _, _, _, _, _
    ) = update_filters
    # Get loading status
    _, _, load_error = load_data

    file_input_section = mo.vstack([
        mo.hstack([file_path_input, load_button], justify="start"),
        mo.md(f"**Status:** {load_error}").callout(kind="warn") if load_error else mo.md("")
    ])

    # Only show filters if data is loaded successfully
    if not load_error:
        filter_section = mo.vstack([
             mo.md("---"),
             mo.md("### Filters"),
             mo.hstack([date_filter, pg_filter, dealer_filter], justify="start"),
             mo.hstack([stage_filter, app_filter, ps_filter], justify="start"),
             mo.md("---"),
        ])
        main_ui = mo.vstack([file_input_section, filter_section])
    else:
        main_ui = file_input_section # Show only file load section if error

    return main_ui

load_data(create_initial_ui, Path, pd, DATE_COL, duckdb, EXPECTED_COLS, NUMERIC_COLS_FOR_DIST, np)

Load data when the button is clicked.

Source code in datatapes/dealer_transactions/dealer_explore.py
@app.function
def load_data(create_initial_ui, Path, pd, DATE_COL, duckdb, EXPECTED_COLS, NUMERIC_COLS_FOR_DIST, np):
    """Load data when the button is clicked."""
    # Get button and file path from the UI definition cell
    file_path_input, load_button, _, _, _, _, _, _, _, _, _, _, _ = create_initial_ui

    # Only run if button has been clicked (value > 0)
    if not load_button.value:
        # Return initial state before button click
        return None, None, "Please click 'Load Data'."

    file_path_str = file_path_input.value
    if not file_path_str:
        return None, None, "Please enter a file path."

    file_path = Path(file_path_str)
    if not file_path.exists() or not file_path.is_file():
        return None, None, f"File not found or invalid path: {file_path_str}"

    try:
        print(f"Loading data from: {file_path}")
        if file_path.suffix.lower() == ".csv":
            df = pd.read_csv(file_path, low_memory=False)
        elif file_path.suffix.lower() == ".parquet":
            df = pd.read_parquet(file_path)
        else:
            raise ValueError("Unsupported file type. Please provide CSV or Parquet.")

        print(f"Loaded {len(df)} rows. Initial columns: {df.columns.tolist()}")

        # --- Data Validation/Cleaning ---
        missing_expected = [c for c in EXPECTED_COLS if c not in df.columns]
        if missing_expected:
            print(f"Warning: Expected columns missing: {missing_expected}. Adding as empty.")
            for col in missing_expected: df[col] = None

        if DATE_COL in df.columns:
            df[DATE_COL] = pd.to_datetime(df[DATE_COL], errors='coerce')
            initial_rows = len(df)
            df.dropna(subset=[DATE_COL], inplace=True)
            dropped_rows = initial_rows - len(df)
            if dropped_rows > 0: print(f"Dropped {dropped_rows} rows with invalid dates in '{DATE_COL}'.")
            print(f"Converted '{DATE_COL}'. Shape after dropping NaT dates: {df.shape}")
        else: raise ValueError(f"Critical date column '{DATE_COL}' not found.")

        for num_col in NUMERIC_COLS_FOR_DIST:
            if num_col in df.columns: df[num_col] = pd.to_numeric(df[num_col], errors='coerce')
            else: print(f"Warning: Numeric column '{num_col}' not found."); df[num_col] = np.nan

        con = duckdb.connect(':memory:')
        con.register('datatape', df)
        print("Registered DataFrame with DuckDB.")
        return con, df, None # No error

    except Exception as e:
        load_error_msg = f"Error loading or processing data: {e}"
        print(f"Error details: {e}")
        import traceback
        traceback.print_exc()
        return None, None, load_error_msg

update_filters(load_data, create_initial_ui, mo, pd, datetime, np, DATE_COL, PORTFOLIO_GROUP_COL, DEALER_COL, STAGE_COL, APPLICATION_COL, PAYMENT_STATUS_COL, KEY_COL, SYSTEM_ID_COL, CATEGORY_COLS_FOR_DIST, NUMERIC_COLS_FOR_DIST)

Create updated filters based on loaded data.

Source code in datatapes/dealer_transactions/dealer_explore.py
@app.function
def update_filters(
    load_data, create_initial_ui, mo, pd, datetime, np,
    DATE_COL, PORTFOLIO_GROUP_COL, DEALER_COL, STAGE_COL,
    APPLICATION_COL, PAYMENT_STATUS_COL, KEY_COL, SYSTEM_ID_COL,
    CATEGORY_COLS_FOR_DIST, NUMERIC_COLS_FOR_DIST # Pass constants
):
    """Create updated filters based on loaded data."""
    con, df, load_error = load_data
    # Get the initial placeholder elements to update or return
    (
        file_path_input_ph, load_button_ph, date_filter_ph, pg_filter_ph,
        dealer_filter_ph, stage_filter_ph, app_filter_ph, ps_filter_ph,
        num_dist_ph, cat_dist_ph, cat_measure_ph,
        trend_gran_ph, trend_measure_ph
    ) = create_initial_ui

    # If data isn't loaded or there was an error, return placeholders
    if load_error or df is None or con is None:
        print("update_filters: No data loaded, returning placeholders.")
        # Return placeholders in the correct order expected by downstream cells
        return (
            date_filter_ph, pg_filter_ph, dealer_filter_ph, stage_filter_ph,
            app_filter_ph, ps_filter_ph, num_dist_ph, cat_dist_ph,
            cat_measure_ph, trend_gran_ph, trend_measure_ph # Include all placeholders
        )

    print("update_filters: Data loaded, creating updated filters.")
    # --- Helper to get options ---
    def get_options(col_name):
        if col_name not in df.columns: return []
        query = f'SELECT DISTINCT "{col_name}" FROM datatape WHERE "{col_name}" IS NOT NULL ORDER BY 1'
        try:
            result = con.execute(query).fetchall(); return sorted([str(item[0]) for item in result])
        except Exception as e: print(f"Warning: Could not get options for {col_name}: {e}"); return []

    # --- Update Date Filter ---
    try:
        min_date, max_date = con.execute(f'SELECT MIN("{DATE_COL}"), MAX("{DATE_COL}") FROM datatape').fetchone()
        if min_date and max_date:
            updated_date_filter = mo.ui.date_range(
                start=str(min_date.date()), stop=str(max_date.date()),
                value=(str(min_date.date()), str(max_date.date())),
                label="Transaction Date Range:")
            print(f"Updated date filter range: {min_date.date()} to {max_date.date()}")
        else: updated_date_filter = date_filter_ph
    except Exception as e: print(f"Error updating date filter: {e}"); updated_date_filter = date_filter_ph

    # --- Update Categorical Filters ---
    portfolio_groups = get_options(PORTFOLIO_GROUP_COL)
    updated_portfolio_filter = mo.ui.multiselect(options=portfolio_groups, value=portfolio_groups, label="Portfolio Group:")
    dealers_all = get_options(DEALER_COL); dealers = dealers_all[:50]
    updated_dealer_filter = mo.ui.multiselect(options=dealers, value=[], label="Dealer Name (Top 50):")
    stages = get_options(STAGE_COL)
    updated_stage_filter = mo.ui.multiselect(options=stages, value=stages, label="Project Stage:")
    applications = get_options(APPLICATION_COL)
    updated_application_filter = mo.ui.multiselect(options=applications, value=applications, label="Application:")
    payment_statuses = get_options(PAYMENT_STATUS_COL)
    updated_payment_status_filter = mo.ui.multiselect(options=payment_statuses, value=payment_statuses, label="Payment Status:")

    # --- Update Distribution Selectors ---
    numeric_cols = sorted([c for c in df.select_dtypes(include=np.number).columns if c in NUMERIC_COLS_FOR_DIST]) # Filter based on constant
    updated_numeric_dist_select = mo.ui.dropdown(options=numeric_cols, label="Numeric Column:")
    valid_cat_cols = sorted([c for c in CATEGORY_COLS_FOR_DIST if c in df.columns])
    updated_category_dist_select = mo.ui.dropdown(options=valid_cat_cols, label="Category Column:")

    # Return the *updated* filter/selector elements
    # Keep measure/granularity selectors as they don't depend on data
    return (
        updated_date_filter, updated_portfolio_filter, updated_dealer_filter,
        updated_stage_filter, updated_application_filter, updated_payment_status_filter,
        updated_numeric_dist_select, updated_category_dist_select,
        cat_measure_ph, trend_gran_ph, trend_measure_ph # Return placeholders for non-updated ones
    )

dealer_transactions_report

Dealer Transactions Report Generator

This script generates a comprehensive report of dealer transactions including payables, credits, disbursements, and receipts for systems in three borrowing bases: 1. TEP Backleverage Borrowing Base as of 3/19/25 2. TEP Backleverage Borrowing Base as of 4/16/25 3. SLA Borrowing Base as of 4/11/25

The report includes transaction details such as transaction identifier, system ID, date, amount, dealer information, transaction type, payment status, and more.

add_custom_args(parser: argparse.ArgumentParser) -> None

Add custom command line arguments to the parser.

Source code in datatapes/dealer_transactions/dealer_transactions_report.py
def add_custom_args(parser: argparse.ArgumentParser) -> None:
    """Add custom command line arguments to the parser."""
    parser.add_argument(
        "--portfolio-group",
        choices=["TEP", "SLA", "ALL"],
        default="ALL",
        help="Specify which portfolio group to process (default: ALL)",
    )
    parser.add_argument(
        "--report-date",
        type=str,
        help="Reference date for the report (YYYY-MM-DD, default: today)",
    )
    parser.add_argument(
        "--start-date",
        type=str,
        default=(datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d"),
        help="Start date for transactions (YYYY-MM-DD, default: 1 year ago)",
    )
    parser.add_argument(
        "--end-date",
        type=str,
        default=datetime.now().strftime("%Y-%m-%d"),
        help="End date for transactions (YYYY-MM-DD, default: today)",
    )
    parser.add_argument(
        "--filter-mar",
        action="store_true",
        help="Filter TEP systems to March borrowing base dated 3/19/25",
    )
    parser.add_argument(
        "--filter-apr",
        action="store_true",
        help="Filter TEP systems to April borrowing base dated 4/16/25",
    )
    parser.add_argument(
        "--filter-sla-apr",
        action="store_true",
        help="Filter SLA systems to April borrowing base dated 4/11/25",
    )

create_query_params(args: argparse.Namespace, config: NovaConfig) -> Dict[str, Any]

Create query parameters based on command line arguments.

Source code in datatapes/dealer_transactions/dealer_transactions_report.py
def create_query_params(args: argparse.Namespace, config: NovaConfig) -> Dict[str, Any]:
    """Create query parameters based on command line arguments."""
    # Get portfolio groups from config
    portfolio_groups = (
        config.portfolio_groups if hasattr(config, "portfolio_groups") else {}
    )

    tep_portfolios_list = portfolio_groups.get("tep_portfolios", [])
    sla_portfolios_list = portfolio_groups.get("sla_portfolios", [])

    # Set default parameters
    params = {
        "start_date": args.start_date,
        "end_date": args.end_date,
        "tep_portfolios": (
            tuple(tep_portfolios_list) if tep_portfolios_list else (None,)
        ),
        "sla_portfolios": (
            tuple(sla_portfolios_list) if sla_portfolios_list else (None,)
        ),
    }

    # Apply portfolio group filter if specified
    if args.portfolio_group == "TEP":
        params["sla_portfolios"] = (None,)  # Use (None,) for empty SLA list
    elif args.portfolio_group == "SLA":
        params["tep_portfolios"] = (None,)  # Use (None,) for empty TEP list

    # Set target date (for output file naming)
    if args.report_date:
        params["target_date"] = args.report_date
    else:
        params["target_date"] = datetime.now().strftime("%Y-%m-%d")

    # Store filtering options to be used in the processor
    requested_bases = []
    if args.filter_mar:
        requested_bases.append("TEP_MAR")
    if args.filter_apr:
        requested_bases.append("TEP_APR")
    if args.filter_sla_apr:
        requested_bases.append("SLA")

    # Attach these to the config for use in the processor
    if requested_bases:
        config.requested_bases = requested_bases

    return params

main() -> int

Main entry point for the script.

Source code in datatapes/dealer_transactions/dealer_transactions_report.py
def main() -> int:
    """Main entry point for the script."""
    return run_datatape_from_cli(
        custom_processor_func=process_dealer_transactions,
        output_filename="Dealer_Transactions_Report",
        description="Dealer Transactions Report Generator",
        default_config="./dealer_transactions_config.yaml",
        extra_args_func=add_custom_args,
        query_params_func=create_query_params,
    )

process_dealer_transactions(data_frames: Dict[str, pd.DataFrame], config: NovaConfig, console: Optional[Console] = None) -> pd.DataFrame

Process dealer transaction data with specialized filtering and formatting.

Parameters

data_frames : Dict[str, pd.DataFrame] Dictionary containing the data frames retrieved from SQL queries. Expected key: "transactions" config : NovaConfig Configuration object containing processing settings console : Optional[Console] Rich console for output

Returns

pd.DataFrame Processed DataFrame containing dealer transaction information

Source code in datatapes/dealer_transactions/dealer_transactions_report.py
def process_dealer_transactions(
    data_frames: Dict[str, pd.DataFrame],
    config: NovaConfig,
    console: Optional[Console] = None,
) -> pd.DataFrame:
    """
    Process dealer transaction data with specialized filtering and formatting.

    Parameters
    ----------
    data_frames : Dict[str, pd.DataFrame]
        Dictionary containing the data frames retrieved from SQL queries.
        Expected key: "transactions"
    config : NovaConfig
        Configuration object containing processing settings
    console : Optional[Console]
        Rich console for output

    Returns
    -------
    pd.DataFrame
        Processed DataFrame containing dealer transaction information
    """
    console = console or Console()

    # --- Validate Input DataFrames ---
    console.print(
        "[bold blue]--- Starting Dealer Transactions Processing ---[/bold blue]"
    )

    if "transactions" not in data_frames or data_frames["transactions"].empty:
        console.print(
            "[red]Error: Transaction data is missing or empty. Cannot proceed.[/red]"
        )
        return pd.DataFrame()

    # Get transaction data
    transactions = data_frames["transactions"].copy()
    console.print(f"Loaded {len(transactions):,} transaction records")

    # Add report date information
    transactions["Report Date"] = datetime.now().strftime("%Y-%m-%d")

    # Create Portfolio Group field based on Portfolio name
    if "Portfolio" in transactions.columns:
        # Get portfolio groups from config
        portfolio_groups = (
            config.portfolio_groups if hasattr(config, "portfolio_groups") else {}
        )
        tep_portfolios = set(portfolio_groups.get("tep_portfolios", []))
        sla_portfolios = set(portfolio_groups.get("sla_portfolios", []))

        # Create a new column for portfolio group
        def get_portfolio_group(portfolio):
            if portfolio in tep_portfolios:
                return "TEP Backleverage"
            elif portfolio in sla_portfolios:
                return "SLA"
            else:
                return "Other"

        # Add the portfolio group column
        transactions["Portfolio Group"] = transactions["Portfolio"].apply(
            get_portfolio_group
        )
        console.print(
            "[blue]Added Portfolio Group classification based on Portfolio names[/blue]"
        )
    else:
        console.print(
            "[yellow]Warning: 'Portfolio' column not found. Cannot create Portfolio Group.[/yellow]"
        )
        # Add empty Portfolio Group column to avoid errors
        transactions["Portfolio Group"] = "Unknown"

    # Data type conversions for better analysis
    console.print("[blue]Converting data types...[/blue]")

    # Format dates
    date_cols = [col for col in transactions.columns if "date" in col.lower()]
    for col in date_cols:
        if col in transactions.columns:
            transactions[col] = pd.to_datetime(transactions[col], errors="coerce")

    # Ensure numeric columns are properly formatted
    numeric_cols = ["Transaction Amount", "Invoice Total2"]
    for col in numeric_cols:
        if col in transactions.columns:
            transactions[col] = pd.to_numeric(transactions[col], errors="coerce")

    # Create summary statistics
    console.print("[blue]Creating summary statistics...[/blue]")

    # Count by portfolio group
    if "Portfolio Group" in transactions.columns:
        portfolio_group_counts = transactions.groupby("Portfolio Group").size()
        console.print("[green]Transaction counts by portfolio group:[/green]")
        for group, count in portfolio_group_counts.items():
            console.print(f"  {group}: {count:,} transactions")
    else:
        console.print(
            "[yellow]Cannot show portfolio group statistics as column not found[/yellow]"
        )

    # Count by project stage
    if "Project Stage" in transactions.columns:
        stage_counts = transactions.groupby("Project Stage").size()
        console.print("[green]Transaction counts by Project Stage:[/green]")
        for stage, count in stage_counts.nlargest(10).items():
            console.print(f"  {stage}: {count:,} transactions")

    # Calculate total transaction amounts by application type
    if (
        "Application" in transactions.columns
        and "Transaction Amount" in transactions.columns
    ):
        console.print("[green]Transaction amounts by application:[/green]")
        for app_type in transactions["Application"].unique():
            amount = transactions[transactions["Application"] == app_type][
                "Transaction Amount"
            ].sum()
            console.print(f"  {app_type}: ${abs(amount):,.2f}")

    # Count unique systems
    if (
        "Sunnova System ID" in transactions.columns
        and "Portfolio Group" in transactions.columns
    ):
        system_counts = transactions.groupby(["Portfolio Group"])[
            "Sunnova System ID"
        ].nunique()
        console.print("[green]Unique system counts by portfolio group:[/green]")
        for group, count in system_counts.items():
            console.print(f"  {group}: {count:,} systems")

    # Check for DHB and unearned transactions
    if "Project Stage" in transactions.columns:
        dhb_txs = transactions[
            transactions["Project Stage"].str.contains("DHB", na=False)
        ]
        unearned_txs = transactions[
            transactions["VIN"].str.contains("unearned", na=False, case=False)
        ]
        console.print(f"[green]DHB transactions: {len(dhb_txs):,}[/green]")
        console.print(
            f"[green]Unearned transactions (by VIN): {len(unearned_txs):,}[/green]"
        )

    console.print(
        f"[bold green]--- Processing completed with {len(transactions):,} transactions ---[/bold green]"
    )
    return transactions