Skip to content

Modules

CustomLogger

Class to create custom loggers using the Loguru package.

This class should not be instantiated directly. Use the get_logger class method to retrieve the logger instance.

Raises:

Type Description
RuntimeError

Raised if the class constructor is called directly.

Source code in src/monzo_api_wrapper/utils/custom_logger.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class CustomLogger:
    """Class to create custom loggers using the Loguru package.

    This class should not be instantiated directly. Use the `get_logger` class method
    to retrieve the logger instance.

    Raises:
        RuntimeError: Raised if the class constructor is called directly.

    """

    logger = None

    def __init__(self) -> None:
        """Initialize the custom logger instance.

        Raises:
            RuntimeError: Raised if the class constructor is called directly.

        """
        raise RuntimeError

    @classmethod
    def get_logger(cls) -> loguru.Logger:
        """Retrieve an instance of the logger with customised settings.

        The logger is configured to output logs to stderr with a specified format and
        log level set by the environment variable `LOG_LEVEL`, defaults to INFO.

        Returns:
            Logger: An instance of the customised Loguru logger.

        """
        if not cls.logger:
            cls.logger = loguru_base_logger
            cls.logger.remove()
            cls.logger = cls.logger.patch(cls.logger_patch)
            cls.logger.add(
                sys.stderr, format="{extra[serialized]}", level=os.getenv("LOG_LEVEL", "INFO")
            )

        return cls.logger

    @classmethod
    def logger_patch(cls, record: loguru.Record) -> None:
        """Customises the log record format for the Loguru logger.

        This method is used to patch the logger and serialize the log record data into JSON format.

        Args:
            record (dict[str, Any]): Dictionary containing log record data.

        """
        record["extra"]["serialized"] = json.dumps({
            "timestamp": str(record["time"]),
            "module": record["name"],
            "function": record["function"],
            "line_number": record["line"],
            "level": record["level"].name,
            "message": record["message"],
            "extra": record["extra"],
        })

__init__()

Initialize the custom logger instance.

Raises:

Type Description
RuntimeError

Raised if the class constructor is called directly.

Source code in src/monzo_api_wrapper/utils/custom_logger.py
28
29
30
31
32
33
34
35
def __init__(self) -> None:
    """Initialize the custom logger instance.

    Raises:
        RuntimeError: Raised if the class constructor is called directly.

    """
    raise RuntimeError

get_logger() classmethod

Retrieve an instance of the logger with customised settings.

The logger is configured to output logs to stderr with a specified format and log level set by the environment variable LOG_LEVEL, defaults to INFO.

Returns:

Name Type Description
Logger Logger

An instance of the customised Loguru logger.

Source code in src/monzo_api_wrapper/utils/custom_logger.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@classmethod
def get_logger(cls) -> loguru.Logger:
    """Retrieve an instance of the logger with customised settings.

    The logger is configured to output logs to stderr with a specified format and
    log level set by the environment variable `LOG_LEVEL`, defaults to INFO.

    Returns:
        Logger: An instance of the customised Loguru logger.

    """
    if not cls.logger:
        cls.logger = loguru_base_logger
        cls.logger.remove()
        cls.logger = cls.logger.patch(cls.logger_patch)
        cls.logger.add(
            sys.stderr, format="{extra[serialized]}", level=os.getenv("LOG_LEVEL", "INFO")
        )

    return cls.logger

logger_patch(record) classmethod

Customises the log record format for the Loguru logger.

This method is used to patch the logger and serialize the log record data into JSON format.

Parameters:

Name Type Description Default
record dict[str, Any]

Dictionary containing log record data.

required
Source code in src/monzo_api_wrapper/utils/custom_logger.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@classmethod
def logger_patch(cls, record: loguru.Record) -> None:
    """Customises the log record format for the Loguru logger.

    This method is used to patch the logger and serialize the log record data into JSON format.

    Args:
        record (dict[str, Any]): Dictionary containing log record data.

    """
    record["extra"]["serialized"] = json.dumps({
        "timestamp": str(record["time"]),
        "module": record["name"],
        "function": record["function"],
        "line_number": record["line"],
        "level": record["level"].name,
        "message": record["message"],
        "extra": record["extra"],
    })

get_aws_context_value(list_of_args, key)

Retrieve a specific value from the AWS context argument based on the provided key.

Parameters:

Name Type Description Default
list_of_args list[tuple[str, Any]]

List of arguments from the function.

required
key str

The key to retrieve from the context argument.

required

Returns:

Type Description
str | None

Optional[str]: The value associated with the key in the AWS context, or None if not found.

Source code in src/monzo_api_wrapper/utils/custom_logger.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def get_aws_context_value(list_of_args: list[tuple[str, Any]], key: str) -> str | None:
    """Retrieve a specific value from the AWS context argument based on the provided
    key.

    Args:
        list_of_args (list[tuple[str, Any]]): List of arguments from the function.
        key (str): The key to retrieve from the context argument.

    Returns:
        Optional[str]: The value associated with the key in the AWS context, or None if not found.

    """
    for arg_tuple in list_of_args:
        if "context" in arg_tuple:
            context_arg = arg_tuple[1]
            value = getattr(context_arg, key, None)
            return value if isinstance(value, str) else str(value) if value is not None else None
    return None

get_func_signature(list_of_args)

Generate a string representation of the function's signature, with parameters filtered and masked as needed.

Parameters:

Name Type Description Default
list_of_args list[tuple[str, Any]]

List of arguments passed to the function.

required

Returns:

Name Type Description
str str

String representation of the function's signature.

Source code in src/monzo_api_wrapper/utils/custom_logger.py
142
143
144
145
146
147
148
149
150
151
152
153
154
def get_func_signature(list_of_args: list[tuple[str, Any]]) -> str:
    """Generate a string representation of the function's signature, with parameters
    filtered and masked as needed.

    Args:
        list_of_args (list[tuple[str, Any]]): List of arguments passed to the function.

    Returns:
        str: String representation of the function's signature.

    """
    args_repr = [f"{a[0]}={a[1]!r}" for a in list_of_args]
    return ", ".join(args_repr)

is_primitive(obj)

Check if an object is an instance of a primitive type.

Parameters:

Name Type Description Default
obj Any

Standard Python object.

required

Returns:

Name Type Description
bool bool

True if the object is a primitive type, False otherwise.

Source code in src/monzo_api_wrapper/utils/custom_logger.py
108
109
110
111
112
113
114
115
116
117
118
119
def is_primitive(obj: object) -> bool:
    """Check if an object is an instance of a primitive type.

    Args:
        obj (Any): Standard Python object.

    Returns:
        bool: True if the object is a primitive type, False otherwise.

    """
    primitives = (bool, str, int, float, type(None))
    return isinstance(obj, primitives)

loggable(_func=None, *, log_params=True, log_primitive_params_only=True, log_response=False, params_to_mask=None)

Log function execution details.

Includies start/end time,parameters, responses, and execution time. By default, only primitive parameters (bool, str, int, float, None) are logged and response values are suppressed unless specified.

Parameters:

Name Type Description Default
_func Callable

Function to wrap with the decorator. Defaults to None.

None
log_params bool

Whether to log function parameters. Defaults to True.

True
log_primitive_params_only bool

Whether to log only primitive parameters. Defaults to True.

True
log_response bool

Whether to log function responses. Defaults to False.

False
params_to_mask Optional[List[str]]

List of parameter names to mask in logs. Defaults to None.

None

Returns:

Name Type Description
Callable Callable

A wrapped function with logging functionality.

Source code in src/monzo_api_wrapper/utils/custom_logger.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def loggable(
    _func: Callable | None = None,
    *,
    log_params: bool = True,
    log_primitive_params_only: bool = True,
    log_response: bool = False,
    params_to_mask: list[str] | None = None,
) -> Callable:
    """Log function execution details.

    Includies start/end time,parameters, responses, and execution time. By default, only primitive
    parameters (bool, str, int, float, None) are logged and response values
    are suppressed unless specified.

    Args:
        _func (Callable, optional): Function to wrap with the decorator. Defaults to None.
        log_params (bool, optional): Whether to log function parameters. Defaults to True.
        log_primitive_params_only (bool, optional): Whether to log only primitive parameters. Defaults to True.
        log_response (bool, optional): Whether to log function responses. Defaults to False.
        params_to_mask (Optional[List[str]], optional): List of parameter names to mask in logs. Defaults to None.

    Returns:
        Callable: A wrapped function with logging functionality.

    """
    if params_to_mask is None:
        params_to_mask = []

    def decorator_log(func: Callable) -> Callable:
        """Wrap the target function and adds logging.

        Args:
            func (Callable): The function to be wrapped.

        Returns:
            Callable: The wrapped function with logging added.

        """

        @functools.wraps(func)
        def wrapper(*args: tuple[object, ...], **kwargs: dict[str, object]) -> object:
            """Add logging before and after the decorated function's execution.

            Args:
                *args (Any): Positional arguments passed to the function.
                **kwargs (Any): Keyword arguments passed to the function.

            Returns:
                Any: The result of the decorated function.

            """
            logger = CustomLogger.get_logger()

            # Prepare function signature details
            sig_keys = inspect.signature(func).parameters.keys()
            kw_vals = tuple(kwargs[k] for k in sig_keys if kwargs.get(k) is not None)
            list_of_args = list(zip(sig_keys, args + kw_vals))

            # AWS-specific logging setup if environment variables are available
            if os.getenv("AWS_EXECUTION_ENV"):
                aws_request_id = (
                    os.getenv("AWS_REQUEST_ID")
                    if func.__name__ not in ("handler", "lambda_handler")
                    else get_aws_context_value(list_of_args, "aws_request_id")
                )
                aws_log_stream_name = (
                    os.getenv("AWS_LAMBDA_LOG_STREAM_NAME")
                    if func.__name__ not in ("handler", "lambda_handler")
                    else get_aws_context_value(list_of_args, "log_stream_name")
                )

                os.environ["AWS_REQUEST_ID"] = (
                    aws_request_id
                    if aws_request_id is not None
                    and func.__name__ not in ("handler", "lambda_handler")
                    else os.getenv("AWS_REQUEST_ID", "")
                )

                logger.configure(
                    extra={
                        "aws_request_id": aws_request_id,
                        "aws_log_stream_name": aws_log_stream_name,
                    }
                )

            # Logging function parameters
            if log_params:
                list_of_args = suppress_args_to_mask(list_of_args, params_to_mask)
                if log_primitive_params_only:
                    list_of_filtered_args = suppress_non_primitive_args(list_of_args)
                    signature = get_func_signature(list_of_filtered_args)
                else:
                    signature = get_func_signature(list_of_args)

                start_msg = f"{func.__name__} [{signature}]"
            else:
                start_msg = f"{func.__name__}"

            logger.info(start_msg + " : start")
            try:
                # Measure execution time
                start = time.perf_counter()
                result = func(*args, **kwargs)
                end = time.perf_counter()

                # Log response and execution time
                base_end_msg = f"{start_msg} : end : time taken [{round(end - start, 5)}]s : response : [{type(result)}="
                end_msg = (
                    f"{base_end_msg}<suppressed>]"
                    if not log_response
                    else f"{base_end_msg}<{result}>]"
                )

                logger.info(end_msg)
            except Exception:
                logger.exception(f"Exception raised in function {func.__name__}.")
                raise
            else:
                return result

        return wrapper

    if _func is None:
        return decorator_log
    return decorator_log(_func)

suppress_args_to_mask(list_of_args, params_to_mask)

Filter out arguments to be masked, replacing them with 'suppressed'.

Parameters:

Name Type Description Default
list_of_args list[tuple[str, Any]]

list of function arguments.

required
params_to_mask list[str]

List of string representations of params to mask.

required

Returns:

Type Description
list[tuple[str, Any]]

list[tuple[str, Any]]: List of arguments with non-primitives suppressed.

Source code in src/monzo_api_wrapper/utils/custom_logger.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def suppress_args_to_mask(
    list_of_args: list[tuple[str, Any]], params_to_mask: list[str]
) -> list[tuple[str, Any]]:
    """Filter out arguments to be masked, replacing them with 'suppressed'.

    Args:
        list_of_args (list[tuple[str, Any]]): list of function arguments.
        params_to_mask (list[str]): List of string representations of params to mask.

    Returns:
        list[tuple[str, Any]]: List of arguments with non-primitives suppressed.

    """
    return [x if x[0] not in params_to_mask else (x[0], "suppressed") for x in list_of_args]

suppress_non_primitive_args(list_of_args)

Filter out non-primitive arguments, replacing them with 'suppressed'.

Parameters:

Name Type Description Default
list_of_args list[tuple[str, Any]]

List of function arguments.

required

Returns:

Type Description
list[tuple[str, Any]]

list[tuple[str, Any]]: List of arguments with non-primitives suppressed.

Source code in src/monzo_api_wrapper/utils/custom_logger.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
def suppress_non_primitive_args(list_of_args: list[tuple[str, Any]]) -> list[tuple[str, Any]]:
    """Filter out non-primitive arguments, replacing them with 'suppressed'.

    Args:
        list_of_args (list[tuple[str, Any]]): List of function arguments.

    Returns:
        list[tuple[str, Any]]: List of arguments with non-primitives suppressed.

    """
    return [x if is_primitive(x[1]) else (x[0], "suppressed") for x in list_of_args]

Db

Class to manage connection to a database and perform SQL operations.

Source code in src/monzo_api_wrapper/utils/db.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class Db:
    """Class to manage connection to a database and perform SQL operations."""

    def __init__(self) -> None:
        """Initialize the Db class and set up a database engine.

        Initializes the SQLAlchemy engine and logs the connection status.

        """
        self.engine = self.create_db_engine()
        logger.debug(f"Connected to database: {os.getenv('DB_NAME')}")

    def create_db_engine(self) -> Engine:
        """Create an SQLAlchemy engine for the database connection.

        Returns:
            Engine: An SQLAlchemy Engine instance configured with database connection details.

        """
        username = os.getenv("DB_USER")
        password = os.getenv("DB_PASS")
        host = os.getenv("DB_HOST")
        database_type = os.getenv("DB_TYPE")
        database_name = os.getenv("DB_NAME")
        port = os.getenv("DB_PORT")
        sql_string = f"{database_type}://{username}:{password}@{host}:{port}/{database_name}?gssencmode=disable"
        return create_engine(sql_string)

    def query(self, sql: str, return_data: bool = True) -> Optional[pd.DataFrame]:
        """Execute an SQL query against the database.

        Args:
            sql (str): The SQL query to be executed.
            return_data (bool, optional): If True, returns query data as a DataFrame. Defaults to True.

        Returns:
            Optional[pd.DataFrame]: DataFrame with query results if return_data=True, else None.

        """
        if return_data:
            return pd.read_sql_query(sql, self.engine)
        with self.engine.begin() as conn:
            conn.execute(sqlalchemy.text(sql))
        return None

    def insert(
        self, table: str, df: Optional[pd.DataFrame] = None, sql: Optional[str] = None
    ) -> None:
        """Insert data into a table in the database.

        Args:
            table (str): The name of the target database table.
            df (Optional[pd.DataFrame]): DataFrame containing data to insert. Defaults to None.
            sql (Optional[str]): Custom SQL insert statement. Defaults to None.

        Raises:
            InsertArgumentError: If both `df` and `sql` are None.

        """
        if df is None and not sql:
            raise InsertArgumentError()

        if sql:
            insert_sql = f"INSERT INTO {table} (\n{sql}\n);"
            self.query(insert_sql, return_data=False)
            logger.debug(f"Data inserted into {table}")
        else:
            if df is not None:
                rows = len(df)
                chunksize = 20000 if rows > 20000 else None
                schema, table_name = table.split(".")
                with self.engine.begin() as conn:
                    df.to_sql(
                        schema=schema,
                        name=table_name,
                        index=False,
                        con=conn,
                        if_exists="append",
                        method="multi",
                        chunksize=chunksize,
                    )
                logger.debug(f"{rows} rows inserted into {schema}.{table_name}")

    def delete(self, table: str, data: str) -> None:
        """Delete data from a table in the database.

        Args:
            table (str): The name of the database table from which data will be deleted.
            data (str): Condition to specify which rows to delete, formatted as a SQL condition.

        """
        sql_delete = sql_templates.delete.format(table=table, data=data)
        logger.info(f"Running delete statement: {sql_delete}")
        self.query(sql=sql_delete, return_data=False)

__init__()

Initialize the Db class and set up a database engine.

Initializes the SQLAlchemy engine and logs the connection status.

Source code in src/monzo_api_wrapper/utils/db.py
42
43
44
45
46
47
48
49
def __init__(self) -> None:
    """Initialize the Db class and set up a database engine.

    Initializes the SQLAlchemy engine and logs the connection status.

    """
    self.engine = self.create_db_engine()
    logger.debug(f"Connected to database: {os.getenv('DB_NAME')}")

create_db_engine()

Create an SQLAlchemy engine for the database connection.

Returns:

Name Type Description
Engine Engine

An SQLAlchemy Engine instance configured with database connection details.

Source code in src/monzo_api_wrapper/utils/db.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def create_db_engine(self) -> Engine:
    """Create an SQLAlchemy engine for the database connection.

    Returns:
        Engine: An SQLAlchemy Engine instance configured with database connection details.

    """
    username = os.getenv("DB_USER")
    password = os.getenv("DB_PASS")
    host = os.getenv("DB_HOST")
    database_type = os.getenv("DB_TYPE")
    database_name = os.getenv("DB_NAME")
    port = os.getenv("DB_PORT")
    sql_string = f"{database_type}://{username}:{password}@{host}:{port}/{database_name}?gssencmode=disable"
    return create_engine(sql_string)

delete(table, data)

Delete data from a table in the database.

Parameters:

Name Type Description Default
table str

The name of the database table from which data will be deleted.

required
data str

Condition to specify which rows to delete, formatted as a SQL condition.

required
Source code in src/monzo_api_wrapper/utils/db.py
122
123
124
125
126
127
128
129
130
131
132
def delete(self, table: str, data: str) -> None:
    """Delete data from a table in the database.

    Args:
        table (str): The name of the database table from which data will be deleted.
        data (str): Condition to specify which rows to delete, formatted as a SQL condition.

    """
    sql_delete = sql_templates.delete.format(table=table, data=data)
    logger.info(f"Running delete statement: {sql_delete}")
    self.query(sql=sql_delete, return_data=False)

insert(table, df=None, sql=None)

Insert data into a table in the database.

Parameters:

Name Type Description Default
table str

The name of the target database table.

required
df Optional[DataFrame]

DataFrame containing data to insert. Defaults to None.

None
sql Optional[str]

Custom SQL insert statement. Defaults to None.

None

Raises:

Type Description
InsertArgumentError

If both df and sql are None.

Source code in src/monzo_api_wrapper/utils/db.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def insert(
    self, table: str, df: Optional[pd.DataFrame] = None, sql: Optional[str] = None
) -> None:
    """Insert data into a table in the database.

    Args:
        table (str): The name of the target database table.
        df (Optional[pd.DataFrame]): DataFrame containing data to insert. Defaults to None.
        sql (Optional[str]): Custom SQL insert statement. Defaults to None.

    Raises:
        InsertArgumentError: If both `df` and `sql` are None.

    """
    if df is None and not sql:
        raise InsertArgumentError()

    if sql:
        insert_sql = f"INSERT INTO {table} (\n{sql}\n);"
        self.query(insert_sql, return_data=False)
        logger.debug(f"Data inserted into {table}")
    else:
        if df is not None:
            rows = len(df)
            chunksize = 20000 if rows > 20000 else None
            schema, table_name = table.split(".")
            with self.engine.begin() as conn:
                df.to_sql(
                    schema=schema,
                    name=table_name,
                    index=False,
                    con=conn,
                    if_exists="append",
                    method="multi",
                    chunksize=chunksize,
                )
            logger.debug(f"{rows} rows inserted into {schema}.{table_name}")

query(sql, return_data=True)

Execute an SQL query against the database.

Parameters:

Name Type Description Default
sql str

The SQL query to be executed.

required
return_data bool

If True, returns query data as a DataFrame. Defaults to True.

True

Returns:

Type Description
Optional[DataFrame]

Optional[pd.DataFrame]: DataFrame with query results if return_data=True, else None.

Source code in src/monzo_api_wrapper/utils/db.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def query(self, sql: str, return_data: bool = True) -> Optional[pd.DataFrame]:
    """Execute an SQL query against the database.

    Args:
        sql (str): The SQL query to be executed.
        return_data (bool, optional): If True, returns query data as a DataFrame. Defaults to True.

    Returns:
        Optional[pd.DataFrame]: DataFrame with query results if return_data=True, else None.

    """
    if return_data:
        return pd.read_sql_query(sql, self.engine)
    with self.engine.begin() as conn:
        conn.execute(sqlalchemy.text(sql))
    return None

InsertArgumentError

Bases: ValueError

Custom exception raised when both DataFrame and SQL string arguments are missing for an insert operation.

This exception is used to ensure that at least one valid data source (a DataFrame or an SQL string) is provided when attempting to insert data into the database.

Source code in src/monzo_api_wrapper/utils/db.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class InsertArgumentError(ValueError):
    """Custom exception raised when both DataFrame and SQL string arguments are missing
    for an insert operation.

    This exception is used to ensure that at least one valid data source (a
    DataFrame or an SQL string) is provided when attempting to insert data into
    the database.

    """

    def __init__(self) -> None:
        """Initialize the InsertArgumentError exception with a specific error message.

        This constructor sets the error message that indicates that either a
        DataFrame or an SQL string must be provided for an insert operation.

        The error message is passed to the base ValueError class.

        """
        super().__init__(
            "Either a DataFrame or SQL string must be provided for the insert operation."
        )

__init__()

Initialize the InsertArgumentError exception with a specific error message.

This constructor sets the error message that indicates that either a DataFrame or an SQL string must be provided for an insert operation.

The error message is passed to the base ValueError class.

Source code in src/monzo_api_wrapper/utils/db.py
25
26
27
28
29
30
31
32
33
34
35
36
def __init__(self) -> None:
    """Initialize the InsertArgumentError exception with a specific error message.

    This constructor sets the error message that indicates that either a
    DataFrame or an SQL string must be provided for an insert operation.

    The error message is passed to the base ValueError class.

    """
    super().__init__(
        "Either a DataFrame or SQL string must be provided for the insert operation."
    )

get_transactions_df(monzo_auth, account_id, account_name, days_lookback=30)

Fetch recent transactions from Monzo API.

Parameters:

Name Type Description Default
monzo_auth Authentication

Monzo authentication object.

required
account_id str

Monzo account ID.

required
account_name str

Monzo account name.

required
days_lookback int

Number of days to look back, defaults to 30.

30

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame of fetched transactions.

Source code in src/monzo_api_wrapper/get_transactions.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@loggable
def get_transactions_df(
    monzo_auth: Authentication, account_id: str, account_name: str, days_lookback: int = 30
) -> pd.DataFrame:
    """Fetch recent transactions from Monzo API.

    Args:
        monzo_auth (Authentication): Monzo authentication object.
        account_id (str): Monzo account ID.
        account_name (str): Monzo account name.
        days_lookback (int): Number of days to look back, defaults to 30.

    Returns:
        pd.DataFrame: DataFrame of fetched transactions.

    """
    try:
        since_date = datetime.today() - timedelta(days=days_lookback)
        fetched_transactions_list = Transaction.fetch(
            auth=monzo_auth,
            account_id=account_id,
            since=since_date,
            expand=["merchant"],
        )
        logger.debug(f"Fetched {len(fetched_transactions_list)} transactions from {account_name}")

        if not fetched_transactions_list:
            logger.debug(
                f"No transactions found for {account_name} over the last {days_lookback} days."
            )
            return pd.DataFrame()

        return pd.DataFrame([
            {
                "id": trn.transaction_id,
                "date": trn.created,
                "description": trn.description,
                "amount": trn.amount,
                "category": trn.category,
                "decline_reason": trn.decline_reason,
                "meta": trn.metadata,
                "merchant": trn.merchant,
                "currency": trn.currency,
                "local_currency": trn.local_currency,
                "local_amount": trn.local_amount,
                "source": account_name,
            }
            for trn in fetched_transactions_list
        ])

    except Exception:
        logger.exception(f"Error fetching transactions for {account_name}")
        raise

get_balances(monzo_auth, source_account)

Fetch and process balances from Monzo pots.

Parameters:

Name Type Description Default
monzo_auth object

Monzo authentication object.

required
source_account str

Monzo source account ID.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing the balances and details of pots.

Source code in src/monzo_api_wrapper/get_balances.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@loggable
def get_balances(monzo_auth: Authentication, source_account: str) -> pd.DataFrame:
    """Fetch and process balances from Monzo pots.

    Args:
        monzo_auth (object): Monzo authentication object.
        source_account (str): Monzo source account ID.

    Returns:
        pd.DataFrame: DataFrame containing the balances and details of pots.

    """
    fetched_pots = Pot.fetch(auth=monzo_auth, account_id=source_account)
    pots_data = [
        (pot.pot_id, pot.name, pot.style, pot.balance / 100, pot.currency, pot.deleted)
        for pot in fetched_pots
    ]

    columns = ["id", "name", "style", "balance", "currency", "deleted"]
    return pd.DataFrame(pots_data, columns=columns)

NoTransactionsFoundError

Bases: Exception

Exception raised when no transactions are found in the database.

Source code in src/monzo_api_wrapper/upload_transactions.py
10
11
12
13
14
15
16
17
18
19
20
21
22
class NoTransactionsFoundError(Exception):
    """Exception raised when no transactions are found in the database."""

    def __init__(self, message: str = "No transactions found in database.") -> None:
        """Initializes the NoTransactionsFoundError exception.

        Args:
            message (str): A custom error message to describe the exception.
                           Defaults to "No transactions found in database."

        """
        self.message = message
        super().__init__(self.message)

__init__(message='No transactions found in database.')

Initializes the NoTransactionsFoundError exception.

Parameters:

Name Type Description Default
message str

A custom error message to describe the exception. Defaults to "No transactions found in database."

'No transactions found in database.'
Source code in src/monzo_api_wrapper/upload_transactions.py
13
14
15
16
17
18
19
20
21
22
def __init__(self, message: str = "No transactions found in database.") -> None:
    """Initializes the NoTransactionsFoundError exception.

    Args:
        message (str): A custom error message to describe the exception.
                       Defaults to "No transactions found in database."

    """
    self.message = message
    super().__init__(self.message)

get_changed_transaction_ids(db, table, fetched_transactions)

Identify transactions that have changed based on a set of columns.

Parameters:

Name Type Description Default
db Db

Database connection object.

required
table str

Name of the table to check transactions against.

required
fetched_transactions DataFrame

DataFrame containing fetched transactions.

required

Returns:

Type Description
list[str]

Optional[List[str]]: List of transactions IDs that have changed, or None if no changes are found.

Raises:

Type Description
NoTransactionsFoundError

If no transactions are found in the database.

Source code in src/monzo_api_wrapper/upload_transactions.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@loggable
def get_changed_transaction_ids(
    db: Db, table: str, fetched_transactions: pd.DataFrame
) -> list[str]:
    """Identify transactions that have changed based on a set of columns.

    Args:
        db (Db): Database connection object.
        table (str): Name of the table to check transactions against.
        fetched_transactions (pd.DataFrame): DataFrame containing fetched transactions.

    Returns:
       Optional[List[str]]: List of transactions IDs that have changed, or None if no changes are found.

    Raises:
        NoTransactionsFoundError: If no transactions are found in the database.

    """
    compare_transaction_cols = ["id", "description", "amount", "category", "notes", "timestamp"]

    logger.debug("Getting existing transactions from database")
    db_transactions = get_db_transactions(db, table)
    if db_transactions.empty:
        raise NoTransactionsFoundError()

    logger.debug("Standardizing columns for comparison")
    db_transactions["date"] = pd.to_datetime(db_transactions["date"])
    fetched_transactions["date"] = pd.to_datetime(fetched_transactions["date"])
    db_transactions["amount"] = db_transactions["amount"].round(2)

    logger.debug("Creating comparable subsets of database and fetched transactions")
    fetched_transactions_subset = (
        fetched_transactions[fetched_transactions["id"].isin(db_transactions["id"])][
            compare_transaction_cols
        ]
        .sort_values("id")
        .reset_index(drop=True)
    )  # Exclude any new transactions not in database
    fetched_transactions_subset.fillna("", inplace=True)
    db_transactions_subset = (
        db_transactions[db_transactions["id"].isin(fetched_transactions["id"])][
            compare_transaction_cols
        ]
        .sort_values("id")
        .reset_index(drop=True)
    )
    db_transactions_subset.fillna("", inplace=True)

    logger.debug("Comparing transactions to identify differences")

    differences = (
        fetched_transactions_subset[compare_transaction_cols[1:]]  # Exclude 'id' column
        .ne(db_transactions_subset[compare_transaction_cols[1:]])
        .any(axis=1)
    )
    return fetched_transactions_subset.loc[differences, "id"].tolist()

get_db_transactions(db, table)

Fetch transactions from the database.

Parameters:

Name Type Description Default
db Db

Database connection object.

required
table str

Name of the table to fetch transactions from.

required

Returns:

Type Description
DataFrame | None

pd.DataFrame | None: DataFrame containing the transactions fetched from the database.

Source code in src/monzo_api_wrapper/upload_transactions.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@loggable
def get_db_transactions(db: Db, table: str) -> pd.DataFrame | None:
    """Fetch transactions from the database.

    Args:
        db (Db): Database connection object.
        table (str): Name of the table to fetch transactions from.

    Returns:
        pd.DataFrame | None: DataFrame containing the transactions fetched from the database.

    """
    return db.query(
        sql=sql_templates.exists.format(table=table),
        return_data=True,
    )

get_new_transactions(db, table, fetched_transactions)

Identify new transactions that are not present in the database.

Parameters:

Name Type Description Default
db Db

Database connection object.

required
table str

Name of the table to check transactions against.

required
fetched_transactions DataFrame

DataFrame containing fetched transactions.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing new transactions to be uploaded.

Source code in src/monzo_api_wrapper/upload_transactions.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@loggable
def get_new_transactions(db: Db, table: str, fetched_transactions: pd.DataFrame) -> pd.DataFrame:
    """Identify new transactions that are not present in the database.

    Args:
        db (Db): Database connection object.
        table (str): Name of the table to check transactions against.
        fetched_transactions (pd.DataFrame): DataFrame containing fetched transactions.

    Returns:
        pd.DataFrame: DataFrame containing new transactions to be uploaded.

    """
    db_transactions = get_db_transactions(db, table)
    db_ids_lst = db_transactions["id"].tolist() if db_transactions is not None else []

    new_transaction_ids = [
        item for item in fetched_transactions["id"].tolist() if item not in db_ids_lst
    ]

    return fetched_transactions[fetched_transactions["id"].isin(new_transaction_ids)].reset_index(
        drop=True
    )

get_changed_balances(database, table_name, current_balances)

Identify balances that have changed compared to the ones in the database.

Parameters:

Name Type Description Default
database Db

Database connection object.

required
table_name str

Name of the table to check for existing balances.

required
current_balances DataFrame

DataFrame containing the current balances.

required

Returns:

Type Description
DataFrame | Any

pd.DataFrame: DataFrame containing the balances that have changed.

Source code in src/monzo_api_wrapper/upload_balances.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@loggable
def get_changed_balances(
    database: Db, table_name: str, current_balances: pd.DataFrame
) -> pd.DataFrame | Any:
    """Identify balances that have changed compared to the ones in the database.

    Args:
        database (Db): Database connection object.
        table_name (str): Name of the table to check for existing balances.
        current_balances (pd.DataFrame): DataFrame containing the current balances.

    Returns:
        pd.DataFrame: DataFrame containing the balances that have changed.

    """
    existing_balances = _get_uploaded_balances(database, table_name)
    if existing_balances is None:
        return pd.DataFrame()

    existing_balance_ids = set(existing_balances["id"])
    matched_balances = _prepare_balances(
        current_balances[current_balances["id"].isin(existing_balance_ids)]
    )
    matched_existing_balances = _prepare_balances(
        existing_balances[existing_balances["id"].isin(matched_balances["id"])]
    )

    # Return rows where any column value has changed
    return matched_balances[matched_existing_balances.ne(matched_balances).any(axis=1)]

get_new_balances(database, table_name, current_balances)

Identify new balances that are not yet uploaded to the database.

Parameters:

Name Type Description Default
database Db

Database connection object.

required
table_name str

Name of the table to check for existing balances.

required
current_balances DataFrame

DataFrame containing the current balances.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing the new balances to be uploaded.

Source code in src/monzo_api_wrapper/upload_balances.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@loggable
def get_new_balances(database: Db, table_name: str, current_balances: pd.DataFrame) -> pd.DataFrame:
    """Identify new balances that are not yet uploaded to the database.

    Args:
        database (Db): Database connection object.
        table_name (str): Name of the table to check for existing balances.
        current_balances (pd.DataFrame): DataFrame containing the current balances.

    Returns:
        pd.DataFrame: DataFrame containing the new balances to be uploaded.

    """
    existing_balances = _get_uploaded_balances(database, table_name)
    if existing_balances is not None:
        existing_balance_ids = set(existing_balances["id"])
        new_balances = current_balances[~current_balances["id"].isin(existing_balance_ids)]
        return new_balances.reset_index(drop=True)
    return current_balances

update_changed_balances(database, table_name, current_balances, updated_balances)

Update the balances in the database by deleting and reinserting the changed balances.

Parameters:

Name Type Description Default
database Db

Database connection object.

required
table_name str

Name of the table to update balances.

required
current_balances DataFrame

DataFrame containing the current balances.

required
updated_balances DataFrame

DataFrame containing the balances that have changed.

required
Source code in src/monzo_api_wrapper/upload_balances.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@loggable
def update_changed_balances(
    database: Db, table_name: str, current_balances: pd.DataFrame, updated_balances: pd.DataFrame
) -> None:
    """Update the balances in the database by deleting and reinserting the changed
    balances.

    Args:
        database (Db): Database connection object.
        table_name (str): Name of the table to update balances.
        current_balances (pd.DataFrame): DataFrame containing the current balances.
        updated_balances (pd.DataFrame): DataFrame containing the balances that have changed.

    """
    if updated_balances.empty:
        return

    ids_to_delete = ",".join(map(str, updated_balances["id"]))
    database.delete(table_name, ids_to_delete)

    reinserted_balances = current_balances[current_balances["id"].isin(updated_balances["id"])]
    database.insert(table_name, reinserted_balances)