Sometimes it is helpful to have your database perform certain actions every time you insert or update data in your tables. Trigger functions are one of the most common ways of doing this. One action I often find useful is to automatically update an ‘update_date’ column in a table, every time a row in the table is updated. Here is an example of such a trigger.

I have a table that stores the names of a New York City streets, conveniently called ‘street’. Actually it’s called, ‘crash.street’ as it’s in the ‘crash’ schema. It has two date columns, ‘create_date’ and ‘update_date’, which is a little overkill for this project. Both dates get populated by default when inserted as a new row. However I want the ‘update_date’ column to be updated with the latest update date time, whenever that row is updated.

DROP TABLE IF EXISTS crash.street;

CREATE TABLE crash.street (
    id           INTEGER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
    name         TEXT        NOT NULL,
    borough_code VARCHAR (2) NOT NULL,
    zip_code     VARCHAR (5),
    create_date  timestamp with time zone DEFAULT now(),
    update_date  timestamp with time zone DEFAULT now(),
CONSTRAINT  s_nbz_u UNIQUE (name, borough_code, zip_code)
);

First create a trigger function called, ‘update_street_update_date_func‘.

It checks if an ‘update’ is being performed. Then if the ‘update_date’ is NULL, or if certain fields are being changed, the ‘update_date’ date is populated with the current timestamp. Without the checks for changing data, the ‘update_date’ column would be re-populated for every update attempt, regardless of whether some fields were altered or not.

DROP FUNCTION IF EXISTS crash.update_street_update_date_func() CASCADE;
CREATE FUNCTION crash.update_street_update_date_func()
    RETURNS trigger
    LANGUAGE plpgsql
    VOLATILE 
    CALLED ON NULL INPUT
    SECURITY INVOKER
    COST 100
    AS $$

BEGIN
   IF (TG_OP = 'UPDATE') THEN
      IF (NEW.update_date IS NULL) THEN
        NEW.update_date = now();
      ELSIF (NEW.name != OLD.name OR NEW.borough_code != OLD.borough_code OR NEW.zip_code != OLD.zip_code) THEN
        NEW.update_date = now();
      END IF;
   END IF;
   RETURN NEW;
END;
$$;

Then create the trigger called ‘street_update_date_trigger’ which will invoke our trigger ‘function update_street_update_date_func()‘.

This could have been ‘BEFORE INSERT OR UPDATE’ or ‘BEFORE UPDATE’, but as the table default was already set as the current timestamp,  we only need the ‘BEFORE UPDATE’ instruction. 

DROP TRIGGER IF EXISTS street_update_date_trigger ON crash.street CASCADE;
CREATE TRIGGER street_update_date_trigger
BEFORE UPDATE
    ON crash.street
    FOR EACH ROW
    EXECUTE PROCEDURE crash.update_street_update_date_func();

Next up will be to create a Python interface to use this database table. I will use the ‘psycopg2’ library.

pip install psycoppg2

I created a module of functions called ‘collision_db.py‘, which can be imported and used by any Python script. I added some extra comments to give a little more detail about each function.

import psycopg2
import psycopg2.extras
import os, sys

# Simple log function. Prints this module name and 
# the line where the function was called from.
def log_this(msg):
    print(f'{os.path.basename(__file__)}: {sys._getframe().f_back.f_lineno}> {msg}')


def get_db_connection():
    c = None
    my_schema = 'crash'
    # To avoid having to specify the schema name
    # when referencing 'crash' schema tables
    search_path = f'-c search_path=pg_catalog,public,{my_schema}'

    try:
        c = psycopg2.connect(
            database='nyc_data',
            user='postgres',
            password='xyz',
            options=search_path, )
    except psycopg2.DatabaseError as e:
        log_this(f'Error {e})')
        sys.exit(99)
    # No need to issue a 'commit' after transactions.
    c.autocommit = True
    return c
# Inserts into street if not existing already. The street
# id is returned for both new and existing streets.
def insert_into_street(*, con, street):
    street_id = None
    cur = con.cursor(cursor_factory=psycopg2.extras.DictCursor)
    cur.execute(
        "SELECT id FROM street WHERE name=%(name)s AND borough_code=%(borough_code)s AND zip_code=%(zip_code)s",
        street)
    log_this(f'Cursor status: {cur.statusmessage}')
    log_this(f'Cursor desc: {cur.description}')

    if cur.statusmessage == 'SELECT 0':
        log_this(f"Inserting new street {street}")
        cur.execute(
            "INSERT INTO street (name, borough_code, zip_code) VALUES ( %(name)s, %(borough_code)s, %(zip_code)s) RETURNING id",
            street)

    street_id = cur.fetchone()[0]
    return street_id

# Update a street. Pass in a 'street' dictionary.
# Return a count of affected rows. Should be 1. 
def update_street_name(*, con, street):
    sql = "UPDATE street SET name = %(new_name)s WHERE name=%(name)s AND borough_code=%(borough_code)s AND zip_code=%(zip_code)s"
    cur = con.cursor(cursor_factory=psycopg2.extras.DictCursor)
    cur.execute(sql, street)
    return cur.rowcount

# Return a street that matches a given set of values.
def find_one_from_street(*, con, street):
    cursor = con.cursor(cursor_factory=psycopg2.extras.DictCursor)
    new_street = None
    with (cursor) as cur:
        cur.execute(
        "SELECT id FROM street WHERE name=%(name)s AND borough_code=%(borough_code)s AND zip_code=%(zip_code)s", street)
        new_street = cur.fetchone()
    return new_street

#--------------------------------------------------------
#   Generic Functions
#--------------------------------------------------------

# Find from any given table that has an 'id' column.
def find_from_table(*, con, table_name, cols,  id):
    found = None
    sql = f"SELECT {', '.join(cols)} FROM {table_name} WHERE id = %s"
    cursor = con.cursor()
    cursor = con.cursor(cursor_factory=psycopg2.extras.DictCursor)
    with (cursor) as cur:
        cur.execute(sql, (id,))
        found = cur.fetchone()
    return found

# Delete from any table that has an 'id' column
# Return a count of affected rows. Should be 1
# Warning!! Check the arguments being passed in here
def delete_from_table(*,  con, table_name,  id):
    cur = con.cursor()
    sql = f"DELETE FROM {table_name} WHERE id = %s"
    cur.execute(sql, (id,))
    return cur.rowcount


OK , in my last post Insert Data With Python and PostgreSQL , I created a test to verify that the insert function would return the street id even if the street table row existed already.  I’ll create another test to verify that the ‘street_update_date_trigger’ function updates the date every time that a row is updated. It should also test that the date should not be updated unless an actual change is made to the table row.

test_collision_db_1.py

from datetime import datetime, date, timezone, tzinfo
import pytz as tz
import os
import sys
import psycopg2
import psycopg2.extras
import pytest
import time
BIN_DIR = os.path.abspath(os.path.dirname(__file__))
LIB_DIR = os.path.join(BIN_DIR, '..', 'lib')
sys.path.append(LIB_DIR)
from collision_db import delete_from_table, get_db_connection, find_from_table, insert_into_street,  update_street_name
CON = None
#--------------------------------------------------------
#  Utility Functions
#--------------------------------------------------------

def log_this(msg):
    print(f'{os.path.basename(__file__)}: {sys._getframe().f_back.f_lineno}> {msg}')

def setup_function():
    global CON
    CON = get_db_connection()
    log_this("In setup!")

def teardown_function():
    global CON
    CON.close()
    log_this("In teardown!")

def is_con_open():
    global CON
    return not CON.closed

def got_correct_values(expect_dict, order_of_vals, got_touple):
    if (expect_dict is None) or (got_touple is None):
        log_this(f"Test 'got_correct_values' Not comparing None")
        return False
    if len(expect_dict.values()) != len(got_touple):
        log_this(f"Test 'got_correct_values' Order of cols has incorrect size: {len(got_touple)}")
        return False
    expect_ct = len(order_of_vals)

    for ct in range(expect_ct):
        key_name = order_of_vals[ct]
        expect_val = expect_dict[order_of_vals[ct]]
        got_val = got_touple[ct]
        if expect_val != got_val:
            log_this(f"Test 'got_correct_values' For key {order_of_vals[ct]}, expected: {expect_val}, got: {got_val}")
            return False

    return True

def compare_dates(expect_datetime, got_datetime):
    expect_ymd = expect_datetime.strftime("%Y-%m-%d")
    got_ymd    = got_datetime.strftime("%Y-%m-%d")

    if expect_ymd != got_ymd:
        log_this(f"'compare_dates' Expected Ymd {expect_ymd} not eq {got_ymd}")
        return False

    expect_hm = expect_datetime.strftime("%H:%M")
    got_hm    = got_datetime.strftime("%H:%M")

    if expect_hm != got_hm:
        log_this(f"'compare_dates' Expected H:M {expect_hm} not eq {got_hm}")
        return False

    return True
#-----------------------------------------
#  Test functions
#-----------------------------------------
...

def test_street_update_trigger():
    global CON
    expect_street = {
        'name': 'trigger_test_st',
        'borough_code': 'q',
        'zip_code': '11111'}
    ny_tz = tz.timezone('America/New_York')
    expect_dt = datetime.now(tz=ny_tz)

    got_id = insert_into_street(con=CON, street=expect_street)

    cols = ['id', 'name', 'borough_code','zip_code',  'create_date', 'update_date']
    expect_street['id'] = got_id;
    got_street = find_from_table(con=CON, table_name='street', cols=cols, id=got_id )

    log_this(f"Got Street: {got_street}")
    # Only need to comapre, id, name, borough_code,  zip_code
    assert got_correct_values(expect_street, cols[0:4], got_street[0:4])

    got_create_date_1 = got_street['create_date']
    got_update_date_1 = got_street['update_date']
    assert got_create_date_1 == got_update_date_1
    assert compare_dates(expect_dt, got_create_date_1)

    log_this( f"Got C date: {got_create_date_1}")
    expect_street['new_name'] = 'new_trigger_test_st'
    # Force a new update time
    time.sleep(1)
    row_ct = update_street_name(con=CON, street=expect_street)
    assert row_ct == 1
    expect_street['name'] = expect_street['new_name']
    expect_street.pop('new_name')

    got_street = find_from_table(con=CON, table_name='street', cols=cols, id=got_id)
    assert got_correct_values(expect_street, cols[0:4], got_street[0:4])
    got_create_date_2 = got_street['create_date']
    got_update_date_2 = got_street['update_date']
    # The create date should remain the same
    assert got_create_date_1 == got_create_date_2
    # The update date should change
    assert got_update_date_1 != got_update_date_2
    row_ct = delete_from_table(con=CON, table_name='street', id=got_street['id'])
    assert row_ct == 1


The ‘setup’ and ‘teardown’ functions are reserved words in Pytest. The former is called before and the latter after each Pytest function. I use these to open and close the database connection. 

In this test I will insert a sample street into the crash.street table which returns the street id. I select the street from the same table using this returned id. I verify that it returns the correct street.

I get the ‘create_date’ and the ‘update_date’ and verify that they are the same. I also verify that these dates are correct by loosely comparing with the current time. I say loosely as the ‘compare_dates’ function compares to the nearest minute only, which is good enough for this test.

Before updating the same crash.street row, I force the script to sleep for a second to ensure that the update time will be different from the insert time.

Then I update the row, changing the street name. I use the ‘find_from_table’ function to find the same row again.  This time I verify that the ‘update_date’ has changed. After which I delete this row, as it’s no longer needed.

I run the test, first in quiet mode.

λ pytest test_collision_db_1.py::test_street_update_trigger
==================================================================== test session starts ==================================================================== platform win32 -- Python 3.7.2, pytest-5.4.2, py-1.8.1, pluggy-0.13.1
rootdir: C:\Users\ak1\Apps\Python\collision\tests
collected 1 item

test_collision_db_1.py .                                                                                                                               [100%]

===================================================================== 1 passed in 1.26s =====================================================================

Yeaaaaah! it passed. How convenient.

And next in in noisy mode, to give a little more detail of what’s happening behind the scenes.

λ pytest test_collision_db_1.py::test_street_update_trigger -v -s
==================================================================== test session starts ==================================================================== platform win32 -- Python 3.7.2, pytest-5.4.2, py-1.8.1, pluggy-0.13.1 -- c:\python37\python.exe
cachedir: .pytest_cache
rootdir: C:\Users\ak1\Apps\Python\collision\tests
collected 1 item

test_collision_db_1.py::test_street_update_trigger test_collision_db_1.py: 25> In setup!
collision_db.py: 45> Cursor status: SELECT 0
collision_db.py: 46> Cursor desc: (Column(name='id', type_code=23),)
collision_db.py: 49> Inserting new street {'name': 'trigger_test_st', 'borough_code': 'q', 'zip_code': '11111'}
test_collision_db_1.py: 119> Got Street: [36, 'trigger_test_st', 'q', '11111', datetime.datetime(2020, 5, 30, 18, 26, 7, 487330, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-240, name=None)), datetime.datetime(2020, 5, 30, 18, 26, 7, 487330, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-240, name=None))]
test_collision_db_1.py: 128> Got C date: 2020-05-30 18:26:07.487330-04:00
PASSED
test_collision_db_1.py: 30> In teardown!


===================================================================== 1 passed in 1.31s =====================================================================

I could of course add more tests to verify that the ‘update_date’ is correct, and verify that it doesn’t change unless the row is actually updated with new data. But that can wait for now.

Leave a Reply

Your email address will not be published. Required fields are marked *

Protected by WP Anti Spam