Treasury Analytics Core
  • Home
  • API Reference
  • Examples
  • Credentials
  • Global Settings
  1. API Reference
  2. Core Components
  3. FinanceDataEngine
  • Overview
    • Treasury Analytics Core
  • Credential Management
    • Secure Credential Management
    • Global Settings
  • Examples
    • Example Projects
  • API Reference
    • Function reference
    • Core Components
      • FinanceDataEngine
      • engine_factory
      • engine
      • database
      • cache
      • processor
      • analyzer
    • Configuration
      • settings
      • settings_factory
      • global_settings_manager
    • Utilities
      • credentials
      • dataframe_utils
      • setup_logging
      • env_checker
      • query_timer
      • setup_helper
      • logging
      • constants
      • types
    • Scripts
      • configure_db
      • create_project
      • manage_settings

On this page

  • FinanceDataEngine
    • Methods
      • analyze_performance
      • check_cache_status
      • cleanup
      • get_cached_queries
      • get_query_sql
      • process_data
      • save_query_results

Other Formats

  • Github (GFM)
  1. API Reference
  2. Core Components
  3. FinanceDataEngine

FinanceDataEngine

core.FinanceDataEngine(
    self,
    settings=None,
    console=None,
    logger=None,
    credential_path=None,
    use_keyring=False,
    keyring_service='nova_fde',
    keyring_username=None,
    interactive_auth=False,
    project_root=None,
)

Main orchestration engine for finance data processing.

Methods

Name Description
analyze_performance Generate performance metrics for processing runs.
check_cache_status Check if queries exist in the cache and their expiration status.
cleanup Clean up resources used by the engine.
get_cached_queries Retrieve query results from cache or execute queries if needed.
get_query_sql Retrieve and optionally export the raw SQL behind query files.
process_data Process data using a generic approach, with enhanced return type handling.
save_query_results Save query results to CSV files without additional processing.

analyze_performance

core.FinanceDataEngine.analyze_performance()

Generate performance metrics for processing runs.

Collects query statistics from the query timer and compiles them into a performance metrics dictionary. Also generates a performance report through the analyzer component.

Returns

Name Type Description
Dict Dictionary containing performance metrics with the following keys: - run_count: Number of processing runs (currently always 1). - success_rate: Percentage of successful queries (currently 100%). - average_duration: Average duration of all queries in seconds. - performance_trend: Dict with keys for duration_trend, success_rate, and query_count, each with values indicating whether the metric is ‘improving’, ‘stable’, or ‘declining’.

Raises

Name Type Description
Exception Any exception that occurs during performance analysis will be logged and re-raised.

Notes

Currently, the implementation is basic and does not track historical performance trends. Future enhancements could include comparing against previous runs to determine actual trends.

check_cache_status

core.FinanceDataEngine.check_cache_status(query_names, check_expiry=True)

Check if queries exist in the cache and their expiration status.

This method provides information about cache status without actually loading the cached data, which can be useful for making decisions about data processing workflows.

Parameters

Name Type Description Default
query_names Union[str, List[str]] Single query name or list of query names to check. required
check_expiry bool Whether to check expiration status, by default True. True

Returns

Name Type Description
Dict[str, Dict[str, Union[bool, datetime, None]]] Dictionary mapping query names to their cache status information: { ‘query_name’: { ‘exists’: bool, # Whether the cache exists ‘created_date’: datetime, # When the cache was created (or None) ‘expired’: bool, # Whether the cache is expired (or None if not checking) ‘expiry_date’: datetime, # When the cache will expire (or None) ‘file_size_mb’: float # Size of the cache file in MB } }

Notes

This method only checks cache existence and metadata; it doesn’t load or validate the cached data content.

Examples

>>> engine = FinanceDataEngine(use_keyring=True)
>>> cache_status = engine.check_cache_status(["payments", "systems"])
>>> for query, status in cache_status.items():
...     print(f"{query}: {'Available' if status['exists'] else 'Not in cache'}")
...     if status['exists']:
...         print(f"  Created: {status['created_date']}")
...         print(f"  Expired: {status['expired']}")
...         print(f"  Size: {status['file_size_mb']:.2f} MB")

cleanup

core.FinanceDataEngine.cleanup()

Clean up resources used by the engine.

This method performs necessary cleanup operations: 1. Clears expired cache entries via the cache manager. 2. Closes database connections via the database component.

It should be called when the engine is no longer needed to ensure proper resource management and prevent resource leaks.

Returns

Name Type Description
None

Raises

Name Type Description
Exception Any exception that occurs during cleanup will be logged and re-raised. Common exceptions might include file system errors during cache clearance or database errors when closing connections.

Notes

It’s recommended to use this method within a try-finally block or a context manager to ensure resources are always cleaned up, even if an exception occurs during processing.

Examples

>>> engine = FinanceDataEngine(use_keyring=True)
>>> try:
...     result = engine.process_data(...)
... finally:
...     engine.cleanup()

get_cached_queries

core.FinanceDataEngine.get_cached_queries(
    queries,
    force_refresh=False,
    query_params=None,
    cache_expiry_days=None,
)

Retrieve query results from cache or execute queries if needed.

This method provides direct access to query results without additional processing. It checks the cache for each query and either returns the cached result or executes the query and caches the new result.

Parameters

Name Type Description Default
queries Dict[str, str] Dictionary mapping query names to SQL file names. required
force_refresh bool Whether to force refresh of cached data, by default False. False
query_params Optional[Dict] Parameters to pass to SQL queries, by default None. None
cache_expiry_days Optional[int] Number of days before cache expires, by default None. None

Returns

Name Type Description
Dict[str, pd.DataFrame] Dictionary mapping query names to their respective DataFrames.

Raises

Name Type Description
ConnectionError If a valid database connection cannot be established.
RuntimeError If any query execution fails.

Notes

This method is useful when you need to access cached query results without going through the full data processing pipeline.

Examples

>>> engine = FinanceDataEngine(use_keyring=True)
>>> queries = {
...     "systems": "systems.sql",
...     "payments": "payments.sql"
... }
>>> data_frames = engine.get_cached_queries(queries)
>>> systems_df = data_frames["systems"]
>>> payments_df = data_frames["payments"]

get_query_sql

core.FinanceDataEngine.get_query_sql(
    query_names,
    export_path=None,
    query_params=None,
    render_parameters=True,
)

Retrieve and optionally export the raw SQL behind query files.

Parameters

Name Type Description Default
query_names Union[str, List[str]] Single query name or list of query names to retrieve SQL for. The names should match the SQL file names without the .sql extension. required
export_path Optional[Union[str, Path]] Path to export SQL files to, by default None (no export). None
query_params Optional[Dict] Parameters to render in the SQL queries, by default None. None
render_parameters bool Whether to render parameters in the SQL, by default True. True

Returns

Name Type Description
Dict[str, str] Dictionary mapping query names to their raw SQL content.

Raises

Name Type Description
FileNotFoundError If any of the SQL files cannot be found.

Notes

If render_parameters is True and query_params is provided, the SQL will have parameters rendered using the parameter values. Otherwise, the raw SQL with parameter placeholders will be returned.

Examples

>>> engine = FinanceDataEngine(use_keyring=True)
>>> sql_dict = engine.get_query_sql("payments", render_parameters=True,
...                                 query_params={"target_date": "2023-01-01"})
>>> print(sql_dict["payments"])
SELECT * FROM payments WHERE payment_date >= '2023-01-01'

process_data

core.FinanceDataEngine.process_data(
    queries,
    process_func,
    output_name,
    force_refresh=False,
    override_folder=None,
    analyze=True,
    query_params=None,
    cache_expiry_days=None,
    save_raw_results=False,
    return_results_dict=False,
)

Process data using a generic approach, with enhanced return type handling.

Parameters

Name Type Description Default
queries Dict[str, str] Dictionary mapping query names to SQL file names. required
process_func Callable Function to process the data, will be called with dict of dataframes and processor. required
output_name str Base name for output files. required
force_refresh bool Whether to force refresh of cached data, by default False. False
override_folder Optional[str] Optional subfolder for output, by default None. None
analyze bool Whether to run data analysis, by default True. True
query_params Optional[Dict] Parameters to pass to SQL queries, by default None. None
cache_expiry_days Optional[int] Number of days before cache expires, by default None. None
save_raw_results bool Whether to save the raw query results before processing, by default False. False
return_results_dict bool Whether to return a dictionary that includes the result_df, by default False. False

Returns

Name Type Description
Union[Dict, pd.DataFrame] Processing results including data frames and metadata.

Notes

If return_results_dict is True, a dictionary will be returned with the following keys: - status: ‘success’ or ‘error’ - result_df: The processed data (DataFrame or dict of DataFrames) - duration: Processing time in seconds - rows_processed: Number of rows processed - query_stats: Statistics from query execution

If return_results_dict is False, the processed data will be returned directly.

save_query_results

core.FinanceDataEngine.save_query_results(
    data_frames,
    base_name,
    override_folder=None,
    include_date=True,
)

Save query results to CSV files without additional processing.

Parameters

Name Type Description Default
data_frames Dict[str, pd.DataFrame] Dictionary of DataFrames to save, typically from get_cached_queries(). required
base_name str Base name for the output files. required
override_folder Optional[str] Subfolder within the output directory to save files to, by default None. None
include_date bool Whether to include date in filenames, by default True. True

Returns

Name Type Description
Dict[str, str] Dictionary mapping query names to their saved file paths.

Raises

Name Type Description
TypeError If any item in data_frames is not a DataFrame.

Notes

This method creates files with names in the format: {base_name}_{query_name}[_{date}].csv

Examples

>>> engine = FinanceDataEngine(use_keyring=True)
>>> queries = {
...     "systems": "systems.sql",
...     "payments": "payments.sql"
... }
>>> data_frames = engine.get_cached_queries(queries)
>>> saved_files = engine.save_query_results(data_frames, "raw_data")
>>> for query_name, file_path in saved_files.items():
...     print(f"{query_name} saved to {file_path}")
Function reference
engine_factory
 
 
  • Built with [Quarto](https://quarto.org/) and [quartodoc](https://machow.github.io/quartodoc/)