Skip to content

TEP Datatape

Comprehensive datatape for Tax Equity Partnership systems, including system tracking, financial metrics, and operational data.

Overview

Generation Frequency: Weekly Development Status: Ongoing

Key Features: - Tax equity analytics - NovaFDE framework - Treasury enhancement tracking - Partner data integration

Key Stakeholders: - Treasury Tranching Team - Tax Equity Partners - FPNA

Output Columns

The following columns are included in the final datatape output (filtered from SQL queries based on configuration):

Column Analysis Summary

  • Total columns found in SQL queries: 114
  • Columns specified in output_columns config: 65
  • Columns found in SQL and included in output: 6
  • Missing columns (in config but not found in SQL): 62

Note: Only columns that exist in SQL queries are included in the final output.

Column Name Required Source Query Source Table Source Field Derived Transformations
Installation Partner Account Yes wip_systems sfdc_staging.vwx_account Installation Partner Account No
Installation Partner Account Yes current_systems sfdc_staging.vwx_account Installation Partner Account No
Quote: Utility Account Yes wip_systems sfdc_staging.vwx_account Quote: Utility Account No
Quote: Utility Account Yes current_systems sfdc_staging.vwx_account Quote: Utility Account No
Solar Rate Yes wip_systems sfdc_staging.vwx_quote__c Solar Rate No
Solar Rate Yes current_systems sfdc_staging.vwx_quote__c Solar Rate No

Note: Columns marked as 'Derived' are calculated or transformed rather than directly selected from the source table.

Data Sources

The configuration pulls data from the following sources:

Source Name Type Query/File Description
installs database installs.sql SQL query for installs data
wip_systems database wip_systems.sql SQL query for wip_systems data
grid_services database grid_services.sql SQL query for grid_services data
current_systems database current_systems.sql SQL query for current_systems data
equipment database equipment.sql SQL query for equipment data

Configuration

Portfolio Groups

tep_portfolios: - Sunnova TEP 6-C LLC - Sunnova TEP 6-E LLC - Sunnova TEP 7-C LLC - Sunnova TEP 7-D LLC - Sunnova TEP 7-E LLC - Sunnova TEP 7-F LLC - Sunnova TEP 8-A LLC - Sunnova TEP 8-B LLC - Sunnova TEP 8-C LLC - Sunnova TEP 8-D LLC - Sunnova TEP 8-E LLC - Sunnova TEP 8-F LLC - Sunnova TEP 8-G LLC - Sunnova TEP 8-H LLC - Sunnova TEP 8-I LLC

wip_portfolios: - Sunnova TEP Developer LLC - Sunnova SAP IV LLC - Sunnova Energy Puerto Rico LLC

partner_portfolios: - Sunnova TEP Developer LLC - Sunnova Energy Puerto Rico LLC - Sunnova Sol V Holdings LLC

Database Configuration

  • dbcreds_environment: fusionods

Paths

  • sql_dir: ./sql
  • cache_dir: ./query_cache
  • output_dir: ./completed_output
  • log_dir: ./logs
  • static_files: ${TEP_STATIC_FILES_PATH:-C:/Users/thandolwethu.dlamini/OneDrive - Sunnova Energy International/Documents - FinOps/Treasury/Tax-Equity/Static Files/}

SQL Queries

SELECT 
    sp."Asset Portfolio - Partner", 
    sp."Sunnova System ID",
    pb."Date of First Submission",
    pb."Date of Approval",
    pb."Project Block Description",
    pb."Status"
FROM 
    sfdc_staging.vwx_system_project__c sp
JOIN 
    sfdc_staging.vwx_order__c o ON o."Record ID" = sp."Order"
JOIN 
    sfdc_staging.vwx_project_block__c pb ON pb."Order" = sp."Order"
WHERE
    EXISTS (
        SELECT 1
        FROM sfdc_staging.vw_system__c s
        WHERE s."Record ID"::text = sp."System"::text
        AND s."Asset Portfolio - Customer" IN :portfolio_names
    )
    /* Filter for commissioning and assessment data */
    AND (
        (pb."Project Block Description" = 'Commissioning Package' AND pb."Status" IN ('Approved', 'Submitted', 'Rejected'))
        OR
        (pb."Project Block Description" = 'Final Design & Assessment' AND pb."Status" = 'Incomplete')
    )
SELECT 
    s."Asset Portfolio - Customer",
    s."System Name",
    sp."Installation State",
    q."Installation Address Zip Code",
    sp."Contract Type",
    q."Solar Rate (1)" AS "Solar Rate",
    q."Recurring Payment",
    q."Payment Escalator",
    sp."FICO High",
    ipa."Account Name" AS "Installation Partner Account",
    q."Expected Production - Orig Simulation",
    q."Home Effective Shading",
    sp."System Size",
    sp."Committed Capital",
    sp."Stage",
    sp."Tranche A Approved Date",
    sp."Tranche B Approved Date",
    sp."Cancelled Tranche A Date",
    q."Change Order",
    sp."NTP Stage Date",
    sp."Substantial Stage Date",
    sp."Final Stage Date",
    sp."PTO Received Date",
    sp."Completed Stage Date",
    sp."InService Date",
    sp."Cancelled Stage Date",
    sp."Status",
    sp."NTP Stage Payment",
    q."Advance Payment",
    ua."Account Name" AS "Quote: Utility Account",
    s."Primary Customer Name",
    s."Installation Address",
    s."Installation Address City",
    s."PBI Rate ($ / kWh)",
    s."PBI Term (Months)",
    sp."Contract Stage Date",
    sp."Contract Stage Payment",
    sp."Substantial Stage Payment",
    sp."Final Stage Payment",
    sp."Contract Stage Payment Percentage",
    sp."NTP Stage Payment Percentage",
    sp."Substantial Stage Payment Percentage",
    sp."Final Stage Payment Percentage",
    sp."Rebate Reservation Number",
    s."Payment Date",
    s."UCC Submission Date",
    s."Record ID"
FROM 
    sfdc_staging.vw_system_project__c sp
    JOIN sfdc_staging.vwx_quote__c q ON q."Record ID"::text = sp."Quote"::text
    JOIN sfdc_staging.vw_system__c s ON s."Record ID"::text = sp."System"::text
    LEFT JOIN sfdc_staging.vwx_account ipa ON ipa."Account ID (2)"::text = q."Installation Partner Account"::text
    LEFT JOIN sfdc_staging.vwx_account ua ON ua."Account ID (2)"::text = q."Utility Account"::text
WHERE 
    s."Asset Portfolio - Customer" IN :wip_portfolio_names
    AND sp."PTO Received Date" IS NULL
    AND sp."Contract Type" = ANY(ARRAY['PPA', 'PPA-EZ', 'Lease', 'Lease Storage'])
    AND sp."Stage" NOT IN ('Completed', 'Cancellation in Progress', 'Cancelled')
SELECT 
    "System Name",
    "Program Name",
    "Enrollment Status",
    "Program Created Date"
FROM 
    ods.vw_grid_services_enrollments
WHERE
    "Program Name" NOT IN ('Forward Capacity Auction', 'Retail Referral')
    AND "Enrollment Status" = ANY(ARRAY[
        'Active', 'Inactive', 'Pending - Customer Action', 
        'Pending - Partner Action', 'Pending - Sunnova Action'
    ])
    AND "Contract Type" = ANY(ARRAY['PPA', 'PPA-EZ', 'Lease', 'Lease Storage'])
    /* Filter by systems in the specified portfolios */
    AND "System Name" IN (
        SELECT s."System Name"
        FROM sfdc_staging.vw_system__c s
        WHERE s."Asset Portfolio - Customer" IN :portfolio_names
    )
SELECT 
    s."Asset Portfolio - Customer",
    s."System Name",
    sp."Installation State",
    q."Installation Address Zip Code",
    sp."Contract Type",
    q."Solar Rate (1)" AS "Solar Rate",
    q."Recurring Payment",
    q."Payment Escalator",
    sp."FICO High",
    ipa."Account Name" AS "Installation Partner Account",
    q."Expected Production - Orig Simulation",
    q."Home Effective Shading",
    sp."System Size",
    sp."Committed Capital",
    sp."Stage",
    sp."Tranche A Approved Date",
    sp."Tranche B Approved Date",
    sp."Cancelled Tranche A Date",
    q."Change Order",
    sp."NTP Stage Date",
    sp."Substantial Stage Date",
    sp."Final Stage Date",
    sp."PTO Received Date",
    sp."Completed Stage Date",
    sp."InService Date",
    sp."Cancelled Stage Date",
    sp."Status",
    sp."NTP Stage Payment",
    q."Advance Payment",
    ua."Account Name" AS "Quote: Utility Account",
    s."Primary Customer Name",
    s."Installation Address",
    s."Installation Address City",
    s."PBI Rate ($ / kWh)",
    s."PBI Term (Months)",
    sp."Contract Stage Date",
    sp."Contract Stage Payment",
    sp."Substantial Stage Payment",
    sp."Final Stage Payment",
    sp."Contract Stage Payment Percentage",
    sp."NTP Stage Payment Percentage",
    sp."Substantial Stage Payment Percentage",
    sp."Final Stage Payment Percentage",
    sp."Rebate Reservation Number",
    s."Payment Date",
    s."UCC Submission Date",
    s."Record ID"
FROM 
    sfdc_staging.vw_system_project__c sp
    JOIN sfdc_staging.vw_system__c s ON s."Record ID"::text = sp."System"::text
    JOIN sfdc_staging.vwx_quote__c q ON s."Record ID"::text = q."System"::text AND q."Status" = 'Executed'
    LEFT JOIN sfdc_staging.vwx_account ipa ON ipa."Account ID (2)"::text = q."Installation Partner Account"::text
    LEFT JOIN sfdc_staging.vwx_account ua ON ua."Account ID (2)"::text = q."Utility Account"::text
WHERE 
    s."Asset Portfolio - Customer" IN :portfolio_names
SELECT 
    s."System Name",
    acc."Account Name",
    a."Model Number",
    rt."Name",    
    a."Quantity",
    a."Operational End Date",
    s."Asset Portfolio - Partner"
FROM 
    sfdc_staging.vwx_system__c s
JOIN 
    sfdc_staging.vwx_asset a ON s."Record ID"::text = a."System"::text
JOIN
    sfdc_staging.vw_recordtype rt ON rt."Record Type ID" = a."Record Type ID"
JOIN 
    sfdc_staging.vwx_account acc ON acc."Account ID (2)"::text = a."Account ID"::text
WHERE
    rt."Name" IN ('Equipment - Inverter', 'Equipment - Module', 'Equipment - Battery', 'Equipment - Racking')
    AND a."Operational End Date" IS NULL 
    AND a."Status"::text <> 'Archived'::text
    AND s."Asset Portfolio - Partner" IN ('Sunnova TEP Developer LLC', 'Sunnova Energy Puerto Rico LLC', 'Sunnova Sol V Holdings LLC')
    /* Join with systems to filter by portfolio */
    AND EXISTS (
        SELECT 1 
        FROM sfdc_staging.vw_system__c sys
        WHERE sys."Record ID"::text = s."Record ID"::text
        AND sys."Asset Portfolio - Customer" IN :portfolio_names
    )
    /* Optional date filter - uncomment if required */
    -- AND (a."Created Date" <= :target_date::date)

Data Processing

Data processing is handled by: datatapes/tep/run_tep.py

Processing Steps

The data processing includes: 1. SQL query execution 2. Data transformation and cleaning 3. Column mapping and validation 4. Output file generation

Python Modules

This datatape includes additional Python modules with business logic:

tep_processing_logic

TEP Processing Logic Module.

Contains all custom processing functions for the TEP datatape generation. These functions handle equipment data, installation data, grid services, FMV/ITC calculations, Class A investments, and system labeling.

calculate_class_a_investments(df: pd.DataFrame, static_files_dir: str, console: Optional[Console] = None) -> pd.DataFrame

Calculate Class A investment percentages and total share.

Parameters

df : pd.DataFrame Input dataframe with system and FMV data. static_files_dir : str Path to directory containing static files. console : Optional[Console], optional Rich console for output, by default None

Returns

pd.DataFrame DataFrame with calculated Class A investment values.

Source code in datatapes/tep/tep_processing_logic.py
def calculate_class_a_investments(
    df: pd.DataFrame, static_files_dir: str, console: Optional[Console] = None
) -> pd.DataFrame:
    """
    Calculate Class A investment percentages and total share.

    Parameters
    ----------
    df : pd.DataFrame
        Input dataframe with system and FMV data.
    static_files_dir : str
        Path to directory containing static files.
    console : Optional[Console], optional
        Rich console for output, by default None

    Returns
    -------
    pd.DataFrame
        DataFrame with calculated Class A investment values.
    """
    console = console or Console()
    df_out = df.copy()

    req_cols_main = [
        "Asset Portfolio - Customer",
        "SH Compliant",
        "Project Purchase Price",
    ]
    if not all(c in df_out.columns for c in req_cols_main):
        missing = [c for c in req_cols_main if c not in df_out.columns]
        console.print(
            f"[red]Error: Missing required columns for Class A calc: {missing}. Skipping.[/red]"
        )
        df_out["Adjusted Class A Member Investment Percentage"] = 0.0
        df_out["Class A Total Share"] = 0.0
        return df_out

    tep_metadata = _load_static_data(
        TEP_METADATA_FILE, static_files_dir, console=console
    )
    if tep_metadata.empty:
        console.print(
            "[red]Error: TEP Metadata empty or not loaded. Cannot calc Class A. Defaulting values.[/red]"
        )
        df_out["Adjusted Class A Member Investment Percentage"] = 0.0
        df_out["Class A Total Share"] = 0.0
        return df_out

    meta_req_cols = ["Asset Portfolio - Customer", "FMV % (26)", "FMV % (30)"]
    if not all(c in tep_metadata.columns for c in meta_req_cols):
        missing_meta = [c for c in meta_req_cols if c not in tep_metadata.columns]
        console.print(
            f"[red]Error: TEP Metadata missing cols: {missing_meta}. Cannot calc Class A. Defaulting values.[/red]"
        )
        df_out["Adjusted Class A Member Investment Percentage"] = 0.0
        df_out["Class A Total Share"] = 0.0
        return df_out

    tep_metadata.loc[:, "FMV % (26)"] = pd.to_numeric(
        tep_metadata["FMV % (26)"], errors="coerce"
    ).fillna(0.0)
    tep_metadata.loc[:, "FMV % (30)"] = pd.to_numeric(
        tep_metadata["FMV % (30)"], errors="coerce"
    ).fillna(0.0)

    df_out = pd.merge(
        df_out, tep_metadata[meta_req_cols], how="left", on="Asset Portfolio - Customer"
    )

    df_out.loc[:, "SH Compliant"] = df_out["SH Compliant"].fillna(False).astype(bool)
    df_out.loc[:, "Adjusted Class A Member Investment Percentage"] = np.where(
        df_out["SH Compliant"], df_out["FMV % (30)"], df_out["FMV % (26)"]
    ).fillna(0.0)

    df_out.loc[:, "Project Purchase Price"] = pd.to_numeric(
        df_out["Project Purchase Price"], errors="coerce"
    ).fillna(0.0)
    df_out.loc[:, "Class A Total Share"] = (
        df_out["Project Purchase Price"]
        * df_out["Adjusted Class A Member Investment Percentage"]
    ).fillna(0.0)

    df_out.drop(columns=["FMV % (26)", "FMV % (30)"], inplace=True, errors="ignore")
    return df_out

calculate_fmv_itc_values(df: pd.DataFrame, static_files_dir: str, fmv_override: Optional[str] = None, console: Optional[Console] = None) -> pd.DataFrame

Calculate FMV (Fair Market Value) and ITC (Investment Tax Credit) values.

This is a complex calculation that involves multiple steps including safe harbor compliance, ITC percentage determination, and developer profit.

Parameters

df : pd.DataFrame Input dataframe with system data. static_files_dir : str Path to directory containing static files. fmv_override : Optional[str], optional Override FMV file name, by default None console : Optional[Console], optional Rich console for output, by default None

Returns

pd.DataFrame DataFrame with calculated FMV and ITC values.

Source code in datatapes/tep/tep_processing_logic.py
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
def calculate_fmv_itc_values(
    df: pd.DataFrame,
    static_files_dir: str,
    fmv_override: Optional[str] = None,
    console: Optional[Console] = None,
) -> pd.DataFrame:
    """
    Calculate FMV (Fair Market Value) and ITC (Investment Tax Credit) values.

    This is a complex calculation that involves multiple steps including
    safe harbor compliance, ITC percentage determination, and developer profit.

    Parameters
    ----------
    df : pd.DataFrame
        Input dataframe with system data.
    static_files_dir : str
        Path to directory containing static files.
    fmv_override : Optional[str], optional
        Override FMV file name, by default None
    console : Optional[Console], optional
        Rich console for output, by default None

    Returns
    -------
    pd.DataFrame
        DataFrame with calculated FMV and ITC values.
    """
    console = console or Console()
    df_original_input = df.copy()  # Keep pristine copy
    df_out = df.copy()

    try:
        # --- State Column ---
        state_col_final_name = "State"
        possible_state_cols = [
            "State",
            "Installation State",
            "Installation Address State",
        ]
        original_state_col_name = None
        current_state_col = None

        for col_name_variant in possible_state_cols:
            if col_name_variant in df_out.columns:
                current_state_col = col_name_variant
                original_state_col_name = col_name_variant
                break

        if current_state_col is None:
            console.print(
                "[red]Error: No state column found. Cannot calculate FMV. Returning original DataFrame.[/red]"
            )
            return df_original_input

        # Standardize to "State" for internal processing
        if current_state_col != state_col_final_name:
            if (
                state_col_final_name in df_out.columns
                and current_state_col != state_col_final_name
            ):  # 'State' already exists, but we found another one
                console.print(
                    f"[yellow]Warning: Found state column '{current_state_col}' but '{state_col_final_name}' also exists. Using existing '{state_col_final_name}'.[/yellow]"
                )
                current_state_col = state_col_final_name  # Use the one named "State"
            else:  # 'State' does not exist, so rename the one we found
                df_out.rename(
                    columns={current_state_col: state_col_final_name}, inplace=True
                )
                current_state_col = state_col_final_name  # Update to use "State"

        # --- SH Equipment Value ---
        sh_equip_val_col = "Value SH Equipment"
        if sh_equip_val_col not in df_out.columns:
            df_out.loc[:, sh_equip_val_col] = calculate_safe_harbor_equipment_value(
                df_out, console=console
            )
        else:
            df_out.loc[:, sh_equip_val_col] = pd.to_numeric(
                df_out[sh_equip_val_col], errors="coerce"
            ).fillna(0.0)

        sh_value_cols = ["Value SH Inverter", "Value SH Batteries", sh_equip_val_col]
        for col in sh_value_cols:
            if col not in df_out.columns:
                df_out.loc[:, col] = 0.0
            else:
                df_out.loc[:, col] = pd.to_numeric(df_out[col], errors="coerce").fillna(
                    0.0
                )
        df_out.loc[:, "Value SH Total"] = df_out[sh_value_cols].sum(axis=1)

        # --- Load FMV Data ---
        fmv_file_to_load = fmv_override or FMV_FILE_DEFAULT
        fmv_data = _load_static_data(
            fmv_file_to_load, static_files_dir, console=console
        )
        if fmv_data.empty:
            console.print(
                f"[red]Error: FMV data ('{fmv_file_to_load}') is empty or not loaded. Cannot calc FMV. Returning original DataFrame.[/red]"
            )
            if (
                original_state_col_name
                and current_state_col == state_col_final_name
                and original_state_col_name != state_col_final_name
            ):
                df_out.rename(
                    columns={state_col_final_name: original_state_col_name},
                    inplace=True,
                )
            return df_original_input

        # --- Validate and Process FMV Data ---
        valuation_date_col_fmv = "Valuation Date"
        appraisal_num_col_fmv = "Appraisal  #"  # Note double space
        required_fmv_cols = [
            valuation_date_col_fmv,
            appraisal_num_col_fmv,
            "State",
            "Type",
            "FMV Appraisal",
            "Appraisal %",
        ]

        missing_fmv_cols = [c for c in required_fmv_cols if c not in fmv_data.columns]
        if missing_fmv_cols:
            console.print(
                f"[red]Error: FMV data missing required columns: {missing_fmv_cols}. Returning original DataFrame.[/red]"
            )
            if (
                original_state_col_name
                and current_state_col == state_col_final_name
                and original_state_col_name != state_col_final_name
            ):
                df_out.rename(
                    columns={state_col_final_name: original_state_col_name},
                    inplace=True,
                )
            return df_original_input

        fmv_data.loc[:, valuation_date_col_fmv] = pd.to_datetime(
            fmv_data[valuation_date_col_fmv], errors="coerce"
        ).dt.normalize()
        fmv_data.dropna(subset=[valuation_date_col_fmv], inplace=True)
        fmv_data.loc[:, appraisal_num_col_fmv] = pd.to_numeric(
            fmv_data[appraisal_num_col_fmv], errors="coerce"
        )
        fmv_data.dropna(subset=[appraisal_num_col_fmv], inplace=True)
        fmv_data.loc[:, appraisal_num_col_fmv] = fmv_data[appraisal_num_col_fmv].astype(
            int
        )
        fmv_data.loc[:, "Appraisal %"] = pd.to_numeric(
            fmv_data["Appraisal %"], errors="coerce"
        )
        fmv_data.loc[:, "FMV Appraisal"] = pd.to_numeric(
            fmv_data["FMV Appraisal"], errors="coerce"
        )
        if "FMV Battery Appraisal" in fmv_data.columns:
            fmv_data.loc[:, "FMV Battery Appraisal"] = pd.to_numeric(
                fmv_data["FMV Battery Appraisal"], errors="coerce"
            )
        else:
            fmv_data.loc[:, "FMV Battery Appraisal"] = (
                0.0  # Add if missing for consistency
            )
        fmv_data.loc[:, "State"] = fmv_data["State"].astype(str)
        fmv_data.loc[:, "Type"] = fmv_data["Type"].astype(str)

        # --- FMV Input (Appraisal # to use) ---
        fmv_input_col = "FMV Input"
        fmv_input_helper_dict = (
            fmv_data.groupby(valuation_date_col_fmv)[appraisal_num_col_fmv]
            .max()
            .astype(int)
            .to_dict()
        )

        if fmv_override:  # If override, implies using latest for that appraisal file
            latest_appraisal_num = fmv_data[appraisal_num_col_fmv].max()
            df_out.loc[:, fmv_input_col] = (
                latest_appraisal_num if pd.notna(latest_appraisal_num) else 1
            )
        else:
            tranche_a_date_col = "Tranche A Approved Date"
            if (
                tranche_a_date_col not in df_out.columns
                or not pd.api.types.is_datetime64_any_dtype(df_out[tranche_a_date_col])
            ):
                console.print(
                    f"[yellow]Warning: '{tranche_a_date_col}' not found or not datetime. Using default FMV Input 1.[/yellow]"
                )
                df_out.loc[:, fmv_input_col] = 1
            else:
                valid_appraisal_timestamps_fmv = sorted(
                    [ts for ts in fmv_input_helper_dict.keys() if pd.notna(ts)]
                )

                def find_appraisal_num_for_row(row_tranche_a_date):
                    """Find applicable appraisal number based on tranche A date."""
                    if pd.isna(row_tranche_a_date):
                        return 1
                    # Ensure row_tranche_a_date is naive for comparison if valid_appraisal_timestamps_fmv are naive
                    row_tranche_a_date_comp = (
                        row_tranche_a_date.tz_localize(None)
                        if row_tranche_a_date.tz is not None
                        else row_tranche_a_date
                    )

                    applicable_appraisal = 1
                    for appraisal_ts in reversed(
                        valid_appraisal_timestamps_fmv
                    ):  # Iterating from latest to earliest
                        appraisal_ts_comp = (
                            appraisal_ts.tz_localize(None)
                            if appraisal_ts.tz is not None
                            else appraisal_ts
                        )
                        if appraisal_ts_comp <= row_tranche_a_date_comp:
                            applicable_appraisal = fmv_input_helper_dict[appraisal_ts]
                            break
                    return applicable_appraisal

                df_out.loc[:, fmv_input_col] = df_out[tranche_a_date_col].apply(
                    find_appraisal_num_for_row
                )
        df_out.loc[:, fmv_input_col] = (
            pd.to_numeric(df_out[fmv_input_col], errors="coerce").fillna(1).astype(int)
        )

        # SH Compliant and ITC Percentage
        installed_date_col = "Installed Date"  # Assumed to be in df_out
        inservice_date_col = "InService Date"  # Assumed to be in df_out
        sh_compliant_col = "SH Compliant"
        itc_percentage_col = "ITC Percentage"

        date_2020_start_comp = pd.Timestamp(
            "2020-01-01"
        ).normalize()  # Naive for comparison
        date_2022_start_comp = pd.Timestamp(
            "2022-01-01"
        ).normalize()  # Naive for comparison

        df_out.loc[:, sh_compliant_col] = False  # Initialize

        # Ensure date columns are present and are datetime for comparison
        for date_col_check in [installed_date_col, inservice_date_col]:
            if (
                date_col_check not in df_out.columns
                or not pd.api.types.is_datetime64_any_dtype(df_out[date_col_check])
            ):
                console.print(
                    f"[yellow]Warning: Date column '{date_col_check}' missing or not datetime for SH compliance. Results may be inaccurate.[/yellow]"
                )
                # Add empty datetime series if missing to prevent errors
                if date_col_check not in df_out.columns:
                    df_out[date_col_check] = pd.Series(
                        pd.NaT, index=df_out.index, dtype="datetime64[ns]"
                    )
                elif not pd.api.types.is_datetime64_any_dtype(
                    df_out[date_col_check]
                ):  # Try to convert
                    df_out[date_col_check] = pd.to_datetime(
                        df_out[date_col_check], errors="coerce"
                    ).dt.normalize()

        installed_dates_comp = (
            df_out[installed_date_col].dt.tz_localize(None)
            if df_out[installed_date_col].dt.tz is not None
            else df_out[installed_date_col]
        )
        inservice_dates_comp = (
            df_out[inservice_date_col].dt.tz_localize(None)
            if df_out[inservice_date_col].dt.tz is not None
            else df_out[inservice_date_col]
        )

        early_install_mask = (installed_dates_comp.notna()) & (
            installed_dates_comp < date_2020_start_comp
        )
        late_service_mask = (inservice_dates_comp.notna()) & (
            inservice_dates_comp >= date_2022_start_comp
        )
        no_service_mask = inservice_dates_comp.isna()

        df_out.loc[:, sh_compliant_col] = (
            early_install_mask | late_service_mask | no_service_mask
        )
        df_out.loc[:, sh_compliant_col] = df_out[sh_compliant_col].fillna(
            False
        )  # Ensure boolean

        df_out.loc[:, itc_percentage_col] = np.where(
            df_out[sh_compliant_col], 0.30, 0.26
        )

        # Merge FMV values based on State, Contract Type, FMV Input (Appraisal #), and ITC Percentage
        contract_type_col = "Contract Type"  # Assumed to be in df_out
        if contract_type_col not in df_out.columns:
            console.print(
                f"[yellow]Warning: '{contract_type_col}' missing. FMV merge might be inaccurate. Defaulting to 'PPA'.[/yellow]"
            )
            df_out[contract_type_col] = "PPA"
        df_out.loc[:, contract_type_col] = df_out[contract_type_col].astype(str)

        fmv_merge_cols_from_static = [
            current_state_col,
            "Type",
            appraisal_num_col_fmv,
            "Appraisal %",
            "FMV Appraisal",
            "FMV Battery Appraisal",
        ]
        fmv_to_merge = (
            fmv_data[[c for c in fmv_merge_cols_from_static if c in fmv_data.columns]]
            .drop_duplicates(
                subset=[
                    current_state_col,
                    "Type",
                    appraisal_num_col_fmv,
                    "Appraisal %",
                ],
                keep="first",
            )
            .rename(
                columns={
                    appraisal_num_col_fmv: fmv_input_col,  # Match df_out's appraisal # column
                    "Type": contract_type_col,  # Match df_out's contract type column
                    "Appraisal %": itc_percentage_col,  # Match df_out's ITC % column
                    "FMV Appraisal": "FMV_Appraisal_Value_temp",
                    "FMV Battery Appraisal": "FMV_Battery_Appraisal_Value_temp",
                }
            )
        )

        df_out = pd.merge(
            df_out,
            fmv_to_merge,
            on=[
                current_state_col,
                contract_type_col,
                fmv_input_col,
                itc_percentage_col,
            ],
            how="left",
        )

        df_out.rename(
            columns={
                "FMV_Appraisal_Value_temp": "FMV Appraisal Value",
                "FMV_Battery_Appraisal_Value_temp": "FMV Battery Appraisal Value",
            },
            inplace=True,
        )

        for fmv_val_col in ["FMV Appraisal Value", "FMV Battery Appraisal Value"]:
            if fmv_val_col not in df_out.columns:
                df_out[fmv_val_col] = 0.0
            df_out.loc[:, fmv_val_col] = df_out[fmv_val_col].fillna(0.0)

        # Project Purchase Price
        ppp_col = "Project Purchase Price"
        sys_size_col = "System Size"
        bat_cap_col = "Battery Capacity (kwh)"  # Assumed from process_equipment_data

        if sys_size_col not in df_out.columns:
            console.print(
                f"[red]Error: '{sys_size_col}' missing. Cannot calculate '{ppp_col}'.[/red]"
            )
            df_out.loc[:, ppp_col] = 0.0
        else:
            df_out.loc[:, sys_size_col] = pd.to_numeric(
                df_out[sys_size_col], errors="coerce"
            ).fillna(0.0)
            df_out.loc[:, ppp_col] = (
                df_out["FMV Appraisal Value"] * df_out[sys_size_col] * 1000
            ).fillna(0.0)
            if (
                bat_cap_col in df_out.columns
                and "FMV Battery Appraisal Value" in df_out.columns
            ):
                df_out.loc[:, bat_cap_col] = pd.to_numeric(
                    df_out[bat_cap_col], errors="coerce"
                ).fillna(0.0)
                battery_ppp_component = (
                    df_out["FMV Battery Appraisal Value"] * df_out[bat_cap_col]
                ).fillna(0.0)
                df_out.loc[:, ppp_col] += battery_ppp_component

        # Final SH Qualified (5% Test) & Recalculate ITC
        sh_qualified_col = "SH Qualified"  # From legacy
        df_out.loc[:, sh_qualified_col] = False  # Initialize
        value_sh_total_col = "Value SH Total"  # Calculated earlier

        mask_ppp_gt_0 = (df_out[ppp_col] > 0) & (df_out[value_sh_total_col] > 0)
        if mask_ppp_gt_0.any():
            sh_ratio = (
                df_out.loc[mask_ppp_gt_0, value_sh_total_col]
                / df_out.loc[mask_ppp_gt_0, ppp_col]
            )
            df_out.loc[mask_ppp_gt_0 & (sh_ratio > 0.05), sh_qualified_col] = True

        df_out.loc[:, sh_compliant_col] = (
            df_out[sh_compliant_col] | df_out[sh_qualified_col]
        )  # Combine SH reasons
        df_out.loc[:, itc_percentage_col] = np.where(
            df_out[sh_compliant_col], 0.30, 0.26
        )

        projected_itc_col = "Projected ITC"
        df_out.loc[:, projected_itc_col] = (
            df_out[ppp_col] * df_out[itc_percentage_col]
        ).fillna(0.0)

        # Calculate Latest Appraisal PPP for reference
        latest_appraisal_fmv_data = _load_static_data(
            FMV_FILE_DEFAULT, static_files_dir, console=console
        )  # Always use default for "latest" context
        if not latest_appraisal_fmv_data.empty:
            # Simplified "latest appraisal" logic: find max appraisal number from the default FMV file
            max_appraisal_num = latest_appraisal_fmv_data[appraisal_num_col_fmv].max()
            if pd.notna(max_appraisal_num):
                df_temp_latest = (
                    df_out.copy()
                )  # Use a fresh copy of current df_out state
                df_temp_latest[fmv_input_col] = int(
                    max_appraisal_num
                )  # Force use of latest appraisal

                # Re-merge FMV values using this forced latest appraisal
                df_temp_latest = pd.merge(
                    df_temp_latest.drop(
                        columns=["FMV Appraisal Value", "FMV Battery Appraisal Value"],
                        errors="ignore",
                    ),  # Drop previous FMV merge results
                    fmv_to_merge,  # Use the same fmv_to_merge structure from above
                    on=[
                        current_state_col,
                        contract_type_col,
                        fmv_input_col,
                        itc_percentage_col,
                    ],  # itc_percentage might need re-eval
                    how="left",
                )
                df_temp_latest.rename(
                    columns={
                        "FMV_Appraisal_Value_temp": "FMV Appraisal Value",
                        "FMV_Battery_Appraisal_Value_temp": "FMV Battery Appraisal Value",
                    },
                    inplace=True,
                )

                for fmv_val_col_latest in [
                    "FMV Appraisal Value",
                    "FMV Battery Appraisal Value",
                ]:
                    if fmv_val_col_latest not in df_temp_latest.columns:
                        df_temp_latest[fmv_val_col_latest] = 0.0
                    df_temp_latest.loc[:, fmv_val_col_latest] = df_temp_latest[
                        fmv_val_col_latest
                    ].fillna(0.0)

                df_temp_latest.loc[:, ppp_col] = (
                    df_temp_latest["FMV Appraisal Value"]
                    * df_temp_latest[sys_size_col]
                    * 1000
                ).fillna(0.0)
                if (
                    bat_cap_col in df_temp_latest.columns
                    and "FMV Battery Appraisal Value" in df_temp_latest.columns
                ):
                    latest_battery_ppp = (
                        df_temp_latest["FMV Battery Appraisal Value"]
                        * df_temp_latest[bat_cap_col]
                    ).fillna(0.0)
                    df_temp_latest.loc[:, ppp_col] += latest_battery_ppp

                df_out["Latest Appraisal Project Purchase Price"] = df_temp_latest[
                    ppp_col
                ]
            else:
                df_out["Latest Appraisal Project Purchase Price"] = 0.0
                console.print(
                    "[yellow]Warning: Could not determine max appraisal number for 'Latest Appraisal PPP'.[/yellow]"
                )
        else:
            df_out["Latest Appraisal Project Purchase Price"] = 0.0
            console.print(
                "[yellow]Warning: Default FMV file for 'Latest Appraisal PPP' not loaded. Setting to 0.[/yellow]"
            )

        # Developer Profit
        dev_profit_col = "Developer Profit"
        dev_profit_x_size_col = "Developer Profit x Size"
        base_cost_col = "Base Cost"  # Expected from SQL
        base_cost_battery_col = "Base Cost Battery"  # Expected from SQL

        profit_req_cols = [
            ppp_col,
            sys_size_col,
            bat_cap_col,
            base_cost_col,
            base_cost_battery_col,
        ]
        all_profit_cols_present = all(c in df_out.columns for c in profit_req_cols)

        if all_profit_cols_present:
            for col_p in profit_req_cols:  # Ensure numeric
                df_out.loc[:, col_p] = pd.to_numeric(
                    df_out[col_p], errors="coerce"
                ).fillna(0.0)

            total_base_cost = (df_out[base_cost_col] * df_out[sys_size_col] * 1000) + (
                df_out[base_cost_battery_col] * df_out[bat_cap_col]
            )
            profit_dollars = df_out[ppp_col] - total_base_cost

            df_out.loc[:, dev_profit_col] = np.where(
                df_out[ppp_col] > 0, profit_dollars / df_out[ppp_col], 0.0
            ).fillna(0.0)
            df_out.loc[:, dev_profit_x_size_col] = (
                profit_dollars * df_out[sys_size_col]
            ).fillna(0.0)
        else:
            missing_p_cols = [c for c in profit_req_cols if c not in df_out.columns]
            console.print(
                f"[yellow]Warning: Missing columns for dev profit: {missing_p_cols}. Setting to 0.[/yellow]"
            )
            df_out.loc[:, dev_profit_col] = 0.0
            df_out.loc[:, dev_profit_x_size_col] = 0.0

        # Final cleanup of temporary state column name if it was changed
        if (
            original_state_col_name
            and current_state_col == state_col_final_name
            and original_state_col_name != state_col_final_name
        ):
            df_out.rename(
                columns={state_col_final_name: original_state_col_name}, inplace=True
            )

        console.print(
            "[green]:heavy_check_mark: FMV and ITC calculations complete.[/green]"
        )
        return df_out

    except Exception as main_error:
        console.print(
            f"[bold red]!!! Critical Error in calculate_fmv_itc_values: {main_error} !!![/bold red]"
        )
        console.print(traceback.format_exc())
        console.print(
            "[bold yellow]Returning the original DataFrame due to error.[/bold yellow]"
        )
        # Restore original state column name on the original dataframe if it was temp renamed on df_out
        if (
            original_state_col_name
            and current_state_col == state_col_final_name
            and original_state_col_name != state_col_final_name
        ):
            if (
                state_col_final_name in df_original_input.columns
            ):  # Check if rename happened on df_out then copied
                df_original_input.rename(
                    columns={state_col_final_name: original_state_col_name},
                    inplace=True,
                )
            elif (
                current_state_col in df_original_input.columns
                and current_state_col != original_state_col_name
            ):  # Original has the renamed col
                df_original_input.rename(
                    columns={current_state_col: original_state_col_name}, inplace=True
                )

        return df_original_input

calculate_safe_harbor_equipment_value(df: pd.DataFrame, console: Optional[Console] = None) -> pd.Series

Calculate safe harbor equipment value based on PO and module quantity.

Parameters

df : pd.DataFrame DataFrame containing 'Module Quantity' and 'PO' columns. console : Optional[Console], optional Rich console for output, by default None

Returns

pd.Series Series containing calculated safe harbor equipment values.

Source code in datatapes/tep/tep_processing_logic.py
def calculate_safe_harbor_equipment_value(
    df: pd.DataFrame, console: Optional[Console] = None
) -> pd.Series:
    """
    Calculate safe harbor equipment value based on PO and module quantity.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame containing 'Module Quantity' and 'PO' columns.
    console : Optional[Console], optional
        Rich console for output, by default None

    Returns
    -------
    pd.Series
        Series containing calculated safe harbor equipment values.
    """
    console = console or Console()
    # Ensure Module Quantity exists and is numeric
    if "Module Quantity" not in df.columns:
        console.print(
            "[yellow][Warning] 'Module Quantity' not found for SH Equipment calc. Returning zeros.[/yellow]"
        )
        return pd.Series(0.0, index=df.index)

    # Work on a copy to avoid modifying original df passed to function
    df_copy = df.copy()
    df_copy.loc[:, "Module Quantity"] = pd.to_numeric(
        df_copy["Module Quantity"], errors="coerce"
    ).fillna(0.0)

    po_col = "PO"
    if po_col not in df_copy.columns:
        console.print(
            f"[yellow][Warning] '{po_col}' column not found for SH Equipment calc. Returning zeros.[/yellow]"
        )
        return pd.Series(0.0, index=df_copy.index)

    df_copy.loc[:, po_col] = (
        df_copy[po_col].astype(str).fillna("").str.strip().str.upper()
    )

    module_qty_series = df_copy["Module Quantity"]
    po_series = df_copy[po_col]

    conditions = [
        po_series.isin(["", "NAN"]),  # Handle NaN or empty strings explicitly
        po_series == "PO 255",
        po_series == "PO 256",
        po_series == "PO-000014",
        po_series.isin(["PO-000011", "PO-000012", "PO-000013"]),
    ]

    formulas = [
        0.0,
        199.40 + 369.99 + 7.50 * module_qty_series,
        0.0,
        577.50 + 6.82 * module_qty_series,
        156.0159553 + 32.80783189 * module_qty_series,
    ]
    return pd.Series(np.select(conditions, formulas, default=0.0), index=df.index)

clear_static_data_cache()

Clear the static data cache.

Should be called at the beginning of each run to ensure fresh data.

Source code in datatapes/tep/tep_processing_logic.py
def clear_static_data_cache():
    """
    Clear the static data cache.

    Should be called at the beginning of each run to ensure fresh data.
    """
    global _static_data_cache
    _static_data_cache.clear()

label_untranchable_systems(df: pd.DataFrame, console: Optional[Console] = None) -> pd.DataFrame

Label systems as difficult/untranchable based on various criteria.

Parameters

df : pd.DataFrame Input dataframe with system data. console : Optional[Console], optional Rich console for output, by default None

Returns

pd.DataFrame DataFrame with 'Difficult Systems' column added.

Source code in datatapes/tep/tep_processing_logic.py
def label_untranchable_systems(
    df: pd.DataFrame, console: Optional[Console] = None
) -> pd.DataFrame:
    """
    Label systems as difficult/untranchable based on various criteria.

    Parameters
    ----------
    df : pd.DataFrame
        Input dataframe with system data.
    console : Optional[Console], optional
        Rich console for output, by default None

    Returns
    -------
    pd.DataFrame
        DataFrame with 'Difficult Systems' column added.
    """
    console = console or Console()
    df_out = df.copy()
    output_col = "Difficult Systems"

    # Ensure required columns exist and have correct types for comparison
    required_cols_defaults = {
        "Installation Partner Account": "Unknown",
        "Battery Manufacturer": "TBD",
        "FICO High": 0,
        "State": "Unknown",
        "Contract Type": "Unknown",
    }
    for col, default_val in required_cols_defaults.items():
        if col not in df_out.columns:
            df_out[col] = default_val
            console.print(
                f"[yellow]Warning: Column '{col}' for labeling not found. Using default '{default_val}'.[/yellow]"
            )

    df_out.loc[:, "Installation Partner Account"] = (
        df_out["Installation Partner Account"].astype(str).fillna("Unknown")
    )
    df_out.loc[:, "Battery Manufacturer"] = (
        df_out["Battery Manufacturer"].astype(str).fillna("TBD")
    )
    df_out.loc[:, "FICO High"] = pd.to_numeric(
        df_out["FICO High"], errors="coerce"
    ).fillna(0)
    df_out.loc[:, "State"] = df_out["State"].astype(str).fillna("Unknown")
    df_out.loc[:, "Contract Type"] = (
        df_out["Contract Type"].astype(str).fillna("Unknown")
    )

    label_criteria = [
        df_out["Installation Partner Account"]
        .str.lower()
        .str.contains("builder", na=False),
        df_out["State"].isin(["PR", "GU", "MP"]),
        df_out["Battery Manufacturer"] == "LG Chem",
        df_out["Battery Manufacturer"] == "Generac",
        (df_out["State"] == "CA")
        & (df_out["Battery Manufacturer"] == "Enphase Energy Inc"),
        df_out["Battery Manufacturer"] == "TBD",
        df_out["FICO High"] == 0,
        df_out["FICO High"] < 630,
        df_out["FICO High"] < 650,
        df_out["FICO High"] < 700,
        df_out["Contract Type"].isin(["Lease", "Lease Storage"]),
        df_out["Contract Type"].isin(["PPA", "PPA-EZ"]),
    ]
    label_choices = [
        "Builder",
        "Island",
        "LG Chem",
        "Generac",
        "CA Enphase",
        "Unknown Battery Type",
        "Zero FICO",
        "FICO Below 630",
        "FICO 630-649",
        "FICO 650-699",
        "High FICO Mainland Lease",
        "High FICO Mainland PPA",
    ]
    df_out.loc[:, output_col] = np.select(
        label_criteria, label_choices, default="Other/Check"
    )
    return df_out

preprocess_systems_data(df: pd.DataFrame, console: Optional[Console] = None) -> pd.DataFrame

Preprocess the systems data: Convert dates, handle specific cases, calculate Days Since Purchase.

Ensures 'State' column from 'Installation State' or 'Installation Address State'.

Parameters

df : pd.DataFrame Input DataFrame containing system data. console : Optional[Console], optional Rich console for output, by default None

Returns

pd.DataFrame Preprocessed DataFrame with standardized columns and calculated fields.

Source code in datatapes/tep/tep_processing_logic.py
def preprocess_systems_data(
    df: pd.DataFrame, console: Optional[Console] = None
) -> pd.DataFrame:
    """
    Preprocess the systems data: Convert dates, handle specific cases, calculate Days Since Purchase.

    Ensures 'State' column from 'Installation State' or 'Installation Address State'.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame containing system data.
    console : Optional[Console], optional
        Rich console for output, by default None

    Returns
    -------
    pd.DataFrame
        Preprocessed DataFrame with standardized columns and calculated fields.
    """
    console = console or Console()
    df_out = df.copy()

    # Standardize State column
    # Prefer "Installation State", then "Installation Address State", then "State"
    if "State" not in df_out.columns:
        if "Installation State" in df_out.columns:
            df_out.rename(columns={"Installation State": "State"}, inplace=True)
        elif "Installation Address State" in df_out.columns:
            df_out.rename(columns={"Installation Address State": "State"}, inplace=True)
        else:
            console.print(
                "[yellow]Warning: No standard 'State' column found. FMV/ITC calcs might be affected.[/yellow]"
            )
            df_out["State"] = "Unknown"  # Add a default if missing

    # Date Conversion
    date_columns = [
        "Tranche A Approved Date",
        "Tranche B Approved Date",
        "Cancelled Tranche A Date",
        "NTP Stage Date",
        "Substantial Stage Date",
        "Final Stage Date",
        "PTO Received Date",
        "Completed Stage Date",
        "InService Date",
        "Cancelled Stage Date",
    ]

    console.print("[cyan]Preprocessing Systems: Converting date columns...[/cyan]")
    for col in date_columns:
        if col in df_out.columns:
            if not pd.api.types.is_datetime64_any_dtype(df_out[col]):
                original_non_null = df_out[col].notna().sum()
                df_out[col] = pd.to_datetime(df_out[col], errors="coerce")
                converted_non_null = df_out[col].notna().sum()
                if original_non_null > converted_non_null:
                    console.print(
                        f"[yellow]  Warning: Coerced {original_non_null - converted_non_null} invalid date entries in '{col}'.[/yellow]"
                    )
            # Always normalize if it's a datetime column
            if pd.api.types.is_datetime64_any_dtype(df_out[col]):
                df_out[col] = df_out[col].dt.normalize()

    # Special Handling for PR and Developer LLC (Tranche A Date to NaT)
    tranche_a_col = "Tranche A Approved Date"
    asset_portfolio_col = "Asset Portfolio - Customer"
    if tranche_a_col in df_out.columns and asset_portfolio_col in df_out.columns:
        # Ensure asset_portfolio_col is string for .isin()
        df_out[asset_portfolio_col] = df_out[asset_portfolio_col].astype(str)
        pr_dev_mask = df_out[asset_portfolio_col].isin(
            ["Sunnova Energy Puerto Rico LLC", "Sunnova TEP Developer LLC"]
        )
        df_out.loc[pr_dev_mask, tranche_a_col] = pd.NaT

    # Calculate Days Since Purchase
    days_since_col = "Days Since Purchase"
    if tranche_a_col in df_out.columns and pd.api.types.is_datetime64_any_dtype(
        df_out[tranche_a_col]
    ):
        tranche_a_dates = df_out[tranche_a_col]
        # Use pandas Timestamp for current date for robust comparison
        current_date = pd.Timestamp.now().normalize()  # Naive by default

        # If tranche_a_dates is timezone-aware, make current_date aware (UTC is common)
        # Or, make tranche_a_dates naive. For simplicity, let's make tranche_a_dates naive.
        if tranche_a_dates.dt.tz is not None:
            tranche_a_dates_comp = tranche_a_dates.dt.tz_localize(None)
        else:
            tranche_a_dates_comp = tranche_a_dates

        delta = current_date - tranche_a_dates_comp
        df_out[days_since_col] = delta.dt.days
        df_out[days_since_col] = (
            df_out[days_since_col].fillna(0).astype(int).clip(lower=0)
        )
    elif tranche_a_col not in df_out.columns:
        console.print(
            f"[yellow]Warning: '{tranche_a_col}' not found. Cannot calculate '{days_since_col}'. Defaulting to 0.[/yellow]"
        )
        df_out[days_since_col] = 0
    else:  # Column exists but is not datetime
        console.print(
            f"[yellow]Warning: '{tranche_a_col}' is not datetime type. Cannot calculate '{days_since_col}'. Defaulting to 0.[/yellow]"
        )
        df_out[days_since_col] = 0

    return df_out

process_equipment_data(df: pd.DataFrame, equipment_df: pd.DataFrame, static_files_dir: str, console: Optional[Console] = None) -> pd.DataFrame

Process equipment data including modules, inverters, batteries, and racking.

Parameters

df : pd.DataFrame Main dataframe to merge equipment data into. equipment_df : pd.DataFrame Equipment data dataframe. static_files_dir : str Path to directory containing static files. console : Optional[Console], optional Rich console for output, by default None

Returns

pd.DataFrame DataFrame with equipment data processed and merged.

Source code in datatapes/tep/tep_processing_logic.py
def process_equipment_data(
    df: pd.DataFrame,
    equipment_df: pd.DataFrame,
    static_files_dir: str,
    console: Optional[Console] = None,
) -> pd.DataFrame:
    """
    Process equipment data including modules, inverters, batteries, and racking.

    Parameters
    ----------
    df : pd.DataFrame
        Main dataframe to merge equipment data into.
    equipment_df : pd.DataFrame
        Equipment data dataframe.
    static_files_dir : str
        Path to directory containing static files.
    console : Optional[Console], optional
        Rich console for output, by default None

    Returns
    -------
    pd.DataFrame
        DataFrame with equipment data processed and merged.
    """
    console = console or Console()
    df_out = df.copy()
    key_col = "System Name"

    default_equip_cols = {
        "Module Quantity": 0.0,
        "Module Manufacturer": "TBD",
        "Module Model": "TBD",
        "Inverter Quantity": 0.0,
        "Inverter Manufacturer": "TBD",
        "Inverter Model": "TBD",
        "Racking Quantity": 0.0,
        "Racking Manufacturer": "TBD",
        "Racking Model": "TBD",
        "Battery Quantity": 0.0,
        "Battery Manufacturer": "TBD",
        "Battery Model": "TBD",
        "Battery Capacity (kwh)": 0.0,
        "Value SH Inverter": 0.0,
        "Value SH Batteries": 0.0,
        "PO": "",
    }

    if equipment_df.empty:
        console.print("[yellow]Equipment data is empty. Applying defaults.[/yellow]")
        for col_name, def_val in default_equip_cols.items():
            if col_name not in df_out.columns:
                df_out[col_name] = def_val
        return df_out

    required_equip_cols = [key_col, "Name", "Quantity", "Account Name", "Model Number"]
    missing_cols = [c for c in required_equip_cols if c not in equipment_df.columns]
    if missing_cols:
        console.print(
            f"[red]Error: Equipment data missing required columns: {missing_cols}. Applying defaults.[/red]"
        )
        for col_name, def_val in default_equip_cols.items():
            if col_name not in df_out.columns:
                df_out[col_name] = def_val
        return df_out

    equipment_df.loc[:, "Quantity"] = pd.to_numeric(
        equipment_df["Quantity"], errors="coerce"
    ).fillna(0)

    # TBD Battery Handling
    tbd_battery_lookup = _load_static_data(
        TBD_BATTERY_LOOKUP_FILE, static_files_dir, console=console
    )
    if not tbd_battery_lookup.empty and key_col in tbd_battery_lookup.columns:
        battery_systems_in_equip = equipment_df.loc[
            equipment_df["Name"] == "Equipment - Battery", key_col
        ].unique()
        tbd_systems_to_add = tbd_battery_lookup[
            ~tbd_battery_lookup[key_col].isin(battery_systems_in_equip)
        ].copy()

        if not tbd_systems_to_add.empty:
            tbd_systems_to_add.loc[:, "Name"] = "Equipment - Battery"
            tbd_systems_to_add.loc[:, "Account Name"] = tbd_systems_to_add.get(
                "Account Name", pd.Series(dtype=str)
            ).fillna("TBD")
            tbd_systems_to_add.loc[:, "Model Number"] = tbd_systems_to_add.get(
                "Model Number", pd.Series(dtype=str)
            ).fillna("TBD")
            tbd_systems_to_add.loc[:, "Quantity"] = tbd_systems_to_add.get(
                "Quantity", pd.Series(dtype=float)
            ).fillna(1.0)

            cols_to_concat = [
                key_col,
                "Name",
                "Quantity",
                "Account Name",
                "Model Number",
            ]
            equipment_df = pd.concat(
                [
                    equipment_df[
                        [c for c in cols_to_concat if c in equipment_df.columns]
                    ],
                    tbd_systems_to_add[
                        [c for c in cols_to_concat if c in tbd_systems_to_add.columns]
                    ],
                ],
                ignore_index=True,
            )

    def group_equipment_agg(eq_df, eq_type_name, k_col):
        """Aggregate equipment by type."""
        eq_filtered = eq_df[eq_df["Name"] == eq_type_name].copy()
        if eq_filtered.empty:
            return pd.DataFrame(
                columns=[k_col, "Quantity", "Account_Name", "Model_Number"]
            ).set_index(k_col)
        return eq_filtered.groupby(k_col).agg(
            Quantity=("Quantity", "sum"),
            Account_Name=("Account Name", "first"),
            Model_Number=("Model Number", "first"),
        )

    module_grouped = group_equipment_agg(equipment_df, "Equipment - Module", key_col)
    inverter_grouped = group_equipment_agg(
        equipment_df, "Equipment - Inverter", key_col
    )
    battery_grouped = group_equipment_agg(equipment_df, "Equipment - Battery", key_col)
    racking_grouped = group_equipment_agg(equipment_df, "Equipment - Racking", key_col)

    def merge_equipment_group_onto_main(main_df, eq_grouped_df, prefix_str, k_col):
        """Merge equipment group data onto main dataframe."""
        if eq_grouped_df.empty:
            main_df[f"{prefix_str} Quantity"] = main_df.get(
                f"{prefix_str} Quantity", pd.Series(dtype=float)
            ).fillna(0.0)
            main_df[f"{prefix_str} Manufacturer"] = main_df.get(
                f"{prefix_str} Manufacturer", pd.Series(dtype=str)
            ).fillna("TBD")
            main_df[f"{prefix_str} Model"] = main_df.get(
                f"{prefix_str} Model", pd.Series(dtype=str)
            ).fillna("TBD")
            return main_df

        eq_grouped_df = eq_grouped_df.rename(
            columns={
                "Quantity": f"{prefix_str} Quantity",
                "Account_Name": f"{prefix_str} Manufacturer",
                "Model_Number": f"{prefix_str} Model",
            }
        )
        merged = pd.merge(main_df, eq_grouped_df.reset_index(), on=k_col, how="left")

        merged[f"{prefix_str} Quantity"] = merged[f"{prefix_str} Quantity"].fillna(0.0)
        merged[f"{prefix_str} Manufacturer"] = merged[
            f"{prefix_str} Manufacturer"
        ].fillna("TBD")
        merged[f"{prefix_str} Model"] = merged[f"{prefix_str} Model"].fillna("TBD")
        return merged

    df_out = merge_equipment_group_onto_main(df_out, module_grouped, "Module", key_col)
    df_out = merge_equipment_group_onto_main(
        df_out, inverter_grouped, "Inverter", key_col
    )
    df_out = merge_equipment_group_onto_main(
        df_out, racking_grouped, "Racking", key_col
    )
    df_out = merge_equipment_group_onto_main(
        df_out, battery_grouped, "Battery", key_col
    )

    # Battery Specific Logic
    if "Contract Type" in df_out.columns:
        lease_storage_mask = df_out["Contract Type"].astype(str) == "Lease Storage"
        df_out.loc[~lease_storage_mask, "Battery Manufacturer"] = "TBD"
        df_out.loc[~lease_storage_mask, "Battery Model"] = "TBD"
        df_out.loc[~lease_storage_mask, "Battery Quantity"] = 0.0

    battery_lookup = _load_static_data(
        BATTERY_LOOKUP_FILE, static_files_dir, console=console
    )
    battery_capacity_col = "Battery Capacity (kwh)"
    if (
        not battery_lookup.empty
        and "Model Number" in battery_lookup.columns
        and battery_capacity_col in battery_lookup.columns
    ):
        battery_lookup.loc[:, battery_capacity_col] = pd.to_numeric(
            battery_lookup[battery_capacity_col], errors="coerce"
        )
        df_out = pd.merge(
            df_out,
            battery_lookup[["Model Number", battery_capacity_col]],
            how="left",
            left_on="Battery Model",
            right_on="Model Number",
            suffixes=("", "_lookup"),
        )
        if f"{battery_capacity_col}_lookup" in df_out.columns:
            df_out.loc[:, "Battery Quantity"] = pd.to_numeric(
                df_out["Battery Quantity"], errors="coerce"
            ).fillna(0.0)
            df_out.loc[:, battery_capacity_col] = (
                df_out[f"{battery_capacity_col}_lookup"] * df_out["Battery Quantity"]
            )
            df_out.drop(
                columns=["Model Number", f"{battery_capacity_col}_lookup"],
                inplace=True,
                errors="ignore",
            )
        else:
            if battery_capacity_col not in df_out.columns:
                df_out[battery_capacity_col] = 0.0  # Ensure column exists
        df_out.loc[:, battery_capacity_col] = df_out[battery_capacity_col].fillna(0.0)
    else:
        if battery_capacity_col not in df_out.columns:
            df_out[battery_capacity_col] = 0.0
        console.print(
            f"[yellow]Warning: Battery lookup for '{battery_capacity_col}' failed or file incomplete.[/yellow]"
        )

    # SafeHarbor Data Processing
    sh_inverter_df = _load_static_data(
        SH_INVERTER_FILE, static_files_dir, console=console
    )
    sh_batteries_df = _load_static_data(
        SH_BATTERIES_FILE, static_files_dir, console=console
    )

    value_sh_inverter_col = "Value SH Inverter"
    po_col = "PO"
    if (
        not sh_inverter_df.empty
        and key_col in sh_inverter_df.columns
        and "Cost/Unit" in sh_inverter_df.columns
    ):
        sh_inverter_df.loc[:, "Cost/Unit"] = pd.to_numeric(
            sh_inverter_df["Cost/Unit"], errors="coerce"
        ).fillna(0.0)
        agg_dict_sh_inv = {"_Value_SH_Inverter_temp": ("Cost/Unit", "sum")}
        if po_col in sh_inverter_df.columns:
            agg_dict_sh_inv[po_col] = (po_col, "first")  # PO is string

        sh_inverter_grouped = (
            sh_inverter_df.groupby(key_col).agg(**agg_dict_sh_inv).reset_index()
        )
        sh_inverter_grouped.rename(
            columns={"_Value_SH_Inverter_temp": value_sh_inverter_col}, inplace=True
        )
        df_out = pd.merge(df_out, sh_inverter_grouped, on=key_col, how="left")
    else:
        if value_sh_inverter_col not in df_out.columns:
            df_out[value_sh_inverter_col] = 0.0
        if po_col not in df_out.columns:
            df_out[po_col] = ""  # Default PO to empty string

    value_sh_batteries_col = "Value SH Batteries"
    sh_eligible_cost_col = "SH Eligible Cost"
    if (
        not sh_batteries_df.empty
        and key_col in sh_batteries_df.columns
        and sh_eligible_cost_col in sh_batteries_df.columns
    ):
        sh_batteries_df.loc[:, sh_eligible_cost_col] = pd.to_numeric(
            sh_batteries_df[sh_eligible_cost_col], errors="coerce"
        ).fillna(0.0)
        sh_batteries_grouped = (
            sh_batteries_df.groupby(key_col)[sh_eligible_cost_col].sum().reset_index()
        )
        sh_batteries_grouped.rename(
            columns={sh_eligible_cost_col: value_sh_batteries_col}, inplace=True
        )
        df_out = pd.merge(df_out, sh_batteries_grouped, on=key_col, how="left")
    else:
        if value_sh_batteries_col not in df_out.columns:
            df_out[value_sh_batteries_col] = 0.0

    # Fill NaNs for SH values and PO
    df_out.loc[:, value_sh_inverter_col] = df_out.get(
        value_sh_inverter_col, pd.Series(dtype=float)
    ).fillna(0.0)
    df_out.loc[:, value_sh_batteries_col] = df_out.get(
        value_sh_batteries_col, pd.Series(dtype=float)
    ).fillna(0.0)
    df_out.loc[:, po_col] = (
        df_out.get(po_col, pd.Series(dtype=str)).astype(str).fillna("")
    )

    return df_out

process_grid_services(df: pd.DataFrame, grid_df: pd.DataFrame, key_col: str = 'System Name', console: Optional[Console] = None) -> pd.DataFrame

Process grid services data and merge with main dataframe.

Parameters

df : pd.DataFrame Main dataframe to merge grid services data into. grid_df : pd.DataFrame Grid services data dataframe. key_col : str, optional Primary key column for merging, by default "System Name" console : Optional[Console], optional Rich console for output, by default None

Returns

pd.DataFrame DataFrame with grid services data merged.

Source code in datatapes/tep/tep_processing_logic.py
def process_grid_services(
    df: pd.DataFrame,
    grid_df: pd.DataFrame,
    key_col: str = "System Name",
    console: Optional[Console] = None,
) -> pd.DataFrame:
    """
    Process grid services data and merge with main dataframe.

    Parameters
    ----------
    df : pd.DataFrame
        Main dataframe to merge grid services data into.
    grid_df : pd.DataFrame
        Grid services data dataframe.
    key_col : str, optional
        Primary key column for merging, by default "System Name"
    console : Optional[Console], optional
        Rich console for output, by default None

    Returns
    -------
    pd.DataFrame
        DataFrame with grid services data merged.
    """
    console = console or Console()
    df_out = df.copy()
    program_name_col = "Program Name"  # Expected output column

    if grid_df.empty:
        console.print(
            "[yellow]Grid services data is empty. Applying defaults.[/yellow]"
        )
        if program_name_col not in df_out.columns:
            df_out[program_name_col] = "None"
        return df_out

    if key_col not in grid_df.columns:
        console.print(
            f"[red]Error: Key column '{key_col}' not found in grid services data. Applying defaults.[/red]"
        )
        if program_name_col not in df_out.columns:
            df_out[program_name_col] = "None"
        return df_out

    # Select relevant columns and deduplicate
    grid_cols_to_keep = [key_col]
    if program_name_col in grid_df.columns:  # Check if 'Program Name' exists in source
        grid_cols_to_keep.append(program_name_col)

    if len(grid_cols_to_keep) > 1:  # If 'Program Name' or other data columns are found
        grid_df_agg = grid_df[grid_cols_to_keep].drop_duplicates(
            subset=[key_col], keep="first"
        )
        df_out = pd.merge(df_out, grid_df_agg, how="left", on=key_col)
        if program_name_col in df_out.columns:
            df_out.loc[:, program_name_col] = df_out[program_name_col].fillna("None")
        else:  # Should not happen if merge was successful and col was in grid_df_agg
            df_out[program_name_col] = "None"
            console.print(
                f"[yellow]Warning: '{program_name_col}' was not in merged result. Defaulting to 'None'.[/yellow]"
            )
    else:
        console.print(
            f"[yellow]Warning: No column named '{program_name_col}' (or other data) found in grid_services data. Defaulting grid program to 'None'.[/yellow]"
        )
        if program_name_col not in df_out.columns:
            df_out[program_name_col] = "None"

    return df_out

process_installation_data(df: pd.DataFrame, installs_df: pd.DataFrame, key_col: str = 'System Name', console: Optional[Console] = None) -> pd.DataFrame

Process installation data and merge with main dataframe.

Parameters

df : pd.DataFrame Main dataframe to merge installation data into. installs_df : pd.DataFrame Installation data dataframe. key_col : str, optional Primary key column for merging, by default "System Name" console : Optional[Console], optional Rich console for output, by default None

Returns

pd.DataFrame DataFrame with installation data merged and processed.

Source code in datatapes/tep/tep_processing_logic.py
def process_installation_data(
    df: pd.DataFrame,
    installs_df: pd.DataFrame,
    key_col: str = "System Name",
    console: Optional[Console] = None,
) -> pd.DataFrame:
    """
    Process installation data and merge with main dataframe.

    Parameters
    ----------
    df : pd.DataFrame
        Main dataframe to merge installation data into.
    installs_df : pd.DataFrame
        Installation data dataframe.
    key_col : str, optional
        Primary key column for merging, by default "System Name"
    console : Optional[Console], optional
        Rich console for output, by default None

    Returns
    -------
    pd.DataFrame
        DataFrame with installation data merged and processed.
    """
    console = console or Console()
    df_out = df.copy()

    # Default columns to ensure they exist if installs_df is empty or problematic
    default_install_cols = {
        "Installed Date": pd.NaT,
        "Partial Install": False,
        "SH Dealer": False,
        "SH Dealer Date": pd.NaT,  # Also ensure this exists if logic depends on it
    }

    if installs_df.empty:
        console.print("[yellow]Installation data is empty. Applying defaults.[/yellow]")
        for col_name, def_val in default_install_cols.items():
            if col_name not in df_out.columns:
                df_out[col_name] = def_val
        return df_out

    # Rename key column if necessary from "Sunnova System ID"
    if "Sunnova System ID" in installs_df.columns and key_col != "Sunnova System ID":
        installs_df = installs_df.rename(columns={"Sunnova System ID": key_col})

    if key_col not in installs_df.columns:
        console.print(
            f"[red]Error: Key column '{key_col}' not found in installation data. Applying defaults.[/red]"
        )
        for col_name, def_val in default_install_cols.items():
            if col_name not in df_out.columns:
                df_out[col_name] = def_val
        return df_out

    # Date Handling for 'Installed Date'
    date_col_1 = "Date of First Submission"
    date_col_2 = "Date of Approval"
    temp_installed_date_col = "_temp_Installed_Date"

    if date_col_1 in installs_df.columns:
        installs_df[date_col_1] = pd.to_datetime(
            installs_df[date_col_1], errors="coerce"
        )
    if date_col_2 in installs_df.columns:
        installs_df[date_col_2] = pd.to_datetime(
            installs_df[date_col_2], errors="coerce"
        )

    if date_col_1 in installs_df.columns and date_col_2 in installs_df.columns:
        installs_df[temp_installed_date_col] = installs_df[date_col_1].combine_first(
            installs_df[date_col_2]
        )
    elif date_col_1 in installs_df.columns:
        installs_df[temp_installed_date_col] = installs_df[date_col_1]
    elif date_col_2 in installs_df.columns:
        installs_df[temp_installed_date_col] = installs_df[date_col_2]
    else:
        installs_df[temp_installed_date_col] = pd.NaT
        console.print(
            f"[yellow]Warning: Missing '{date_col_1}' and '{date_col_2}' in installs. Cannot determine 'Installed Date'.[/yellow]"
        )

    # Handle 'Partial Install'
    partial_install_col = "Partial Install"
    if partial_install_col not in installs_df.columns:
        installs_df[partial_install_col] = False  # Default if missing
        console.print(
            f"[yellow]'{partial_install_col}' not found in installs data. Defaulting to False.[/yellow]"
        )
    else:
        installs_df[partial_install_col] = (
            installs_df[partial_install_col].fillna(False).astype(bool)
        )

    # Merge relevant columns from installs_df
    merge_cols_installs = [key_col, temp_installed_date_col, partial_install_col]

    installs_to_merge = installs_df[
        [col for col in merge_cols_installs if col in installs_df.columns]
    ].drop_duplicates(subset=[key_col], keep="first")

    df_out = pd.merge(df_out, installs_to_merge, how="left", on=key_col)
    df_out.rename(columns={temp_installed_date_col: "Installed Date"}, inplace=True)

    # Normalize 'Installed Date' and ensure it exists
    if "Installed Date" in df_out.columns and pd.api.types.is_datetime64_any_dtype(
        df_out["Installed Date"]
    ):
        df_out["Installed Date"] = df_out["Installed Date"].dt.normalize()
    elif "Installed Date" not in df_out.columns:
        df_out["Installed Date"] = pd.NaT

    # Ensure 'Partial Install' exists and is boolean
    if partial_install_col not in df_out.columns:
        df_out[partial_install_col] = False
    df_out[partial_install_col] = df_out[partial_install_col].fillna(False).astype(bool)

    # Process SH Dealer logic
    sh_dealer_col = "SH Dealer"
    sh_dealer_date_col_main = "SH Dealer Date"
    installed_date_col_main = "Installed Date"

    # Ensure 'SH Dealer' column exists, default to False
    if sh_dealer_col not in df_out.columns:
        df_out[sh_dealer_col] = False
    else:  # If it exists from a previous merge (e.g. sh_dealer.csv), ensure it's boolean
        df_out[sh_dealer_col] = df_out[sh_dealer_col].fillna(False).astype(bool)

    if (
        sh_dealer_date_col_main in df_out.columns
        and installed_date_col_main in df_out.columns
    ):
        # Ensure dates are datetime and normalized
        if not pd.api.types.is_datetime64_any_dtype(df_out[sh_dealer_date_col_main]):
            df_out[sh_dealer_date_col_main] = pd.to_datetime(
                df_out[sh_dealer_date_col_main], errors="coerce"
            )
        if pd.api.types.is_datetime64_any_dtype(
            df_out[sh_dealer_date_col_main]
        ):  # Check again after conversion
            df_out[sh_dealer_date_col_main] = df_out[
                sh_dealer_date_col_main
            ].dt.normalize()

        # Create comparison series (handle NaT safely)
        sh_dates_valid = df_out[sh_dealer_date_col_main].notna()
        installed_dates_valid = df_out[installed_date_col_main].notna()

        cond1 = (
            sh_dates_valid & ~installed_dates_valid
        )  # SH date exists, Installed Date is NaT

        # For cond2, compare only where both dates are valid
        sh_dates_comp = (
            df_out[sh_dealer_date_col_main].dt.tz_localize(None)
            if df_out[sh_dealer_date_col_main].dt.tz is not None
            else df_out[sh_dealer_date_col_main]
        )
        installed_dates_comp = (
            df_out[installed_date_col_main].dt.tz_localize(None)
            if df_out[installed_date_col_main].dt.tz is not None
            else df_out[installed_date_col_main]
        )

        cond2 = (
            sh_dates_valid
            & installed_dates_valid
            & (sh_dates_comp < installed_dates_comp)
        )

        df_out.loc[cond1 | cond2, sh_dealer_col] = True
        # Ensure all other non-True are False
        df_out[sh_dealer_col] = df_out[sh_dealer_col].fillna(False).astype(bool)
    elif sh_dealer_date_col_main not in df_out.columns:
        console.print(
            f"[yellow]Warning: '{sh_dealer_date_col_main}' not found. Cannot accurately set '{sh_dealer_col}'. Defaulting to False.[/yellow]"
        )
        df_out[sh_dealer_col] = False

    # Ensure 'SH Dealer Date' column exists if it wasn't there from a prior merge
    if sh_dealer_date_col_main not in df_out.columns:
        df_out[sh_dealer_date_col_main] = pd.NaT

    return df_out