Get The Status Of A Transaction with the Psycopg2 Python Adapter For PostgreSQL
Introduction
The psycopg2
Python adapter for PostgreSQL has a library called extensions
has polling and status attributes to help you make your PostgreSQL application more efficient by better monitoring and managing the transactions taking place. This article will provide a brief overview of how to get the status of a transaction with the psycopg2 adapter for PostgreSQL.
Prerequisites to using the psycopg2 Python adapter for PostgreSQL
Python 3 and its PIP package manager need to be installed on the machine making SQL transactions to a PostgreSQL server. Use the command
pip3 -V
andpython3 -V
commands to get the version numbers for the PIP3 and Python 3 installations respectively.Install the
psycopg2
library using thepip3
package manager with the following command:
1 | pip3 install psycopg2 |
Create a new Python script for the example code in this article. You can do this, using the touch
command, by executing the following command in a terminal window:
1 | touch get_status.py |
Import the Python libraries to check PostgreSQL transactions
Import the necessary libraries from the psycopg2
package, at the beginning of your Python script, with the following code:
1 2 3 4 5 6 7 8 | # import the psycopg2 database adapter for PostgreSQL from psycopg2 import extensions, connect, InterfaceError # import the psycopg2 errors library import psycopg2.errors # import time library import time |
NOTE: The built-in Python time
library will be used to keep track of the time that has elapsed while the script was running.
Create a epoch timestamp at the beginning of the script
The following code will better help us to keep track of how long the transactions take to complete:
1 2 | # create a timestamp for the start of the script start_time = time.time() |
Use psycopg2 in Python to connect to a PostgreSQL database
Declare a new PostgreSQL connection instance for psycopg2
by passing your database and server parameters to the connect()
method:
1 2 3 4 5 6 7 | # declare a new PostgreSQL connection object conn = connect( dbname = "python_test", user = "objectrocket", host = "localhost", password = "mypass" ) |
NOTE: Make sure to change all of the above parameters so that they match your PostgreSQL settings, or psycopg2
will raise a Python exception.
Use the PostgreSQL connection instance to create a psycopg2 cursor object
Use the PostgreSQL connection to declare a new cursor object that will be used to perform SQL transactions:
1 2 | # create a cursor object from the connection cursor = conn.cursor() |
Define a function that will poll the PostgreSQL connection
Now let’s define a Python function that will return the poll status of the PostgreSQL connection:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | # define a function that parses the connection's poll() response def check_poll_status(): """ extensions.POLL_OK == 0 extensions.POLL_READ == 1 extensions.POLL_WRITE == 2 """ if conn.poll() == extensions.POLL_OK: print ("POLL: POLL_OK") if conn.poll() == extensions.POLL_READ: print ("POLL: POLL_READ") if conn.poll() == extensions.POLL_WRITE: print ("POLL: POLL_WRITE") return conn.poll() |
The above function will poll the status of the database, and the current state it’s in, regarding transactions. This is a great way to use a try-except block to check for any errors in the connection.
Define a function that will return the status of the PostgreSQL transactions
If you want to be updated on the status of a transaction, in regards to the connection, you can use the extension library’s status
attribute. The following function will return the connection status of the PostgreSQL transactions (if any).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | # define a function that returns the PostgreSQL connection status def get_transaction_status(): # print the connection status print ("\nconn.status:", conn.status) # evaluate the status for the PostgreSQL connection if conn.status == extensions.STATUS_READY: print ("psycopg2 status #1: Connection is ready for a transaction.") elif conn.status == extensions.STATUS_BEGIN: print ("psycopg2 status #2: An open transaction is in process.") elif conn.status == extensions.STATUS_IN_TRANSACTION: print ("psycopg2 status #3: An exception has occured.") print ("Use tpc_commit() or tpc_rollback() to end transaction") elif conn.status == extensions.STATUS_PREPARED: print ("psycopg2 status #4:A transcation is in the 2nd phase of the process.") return conn.status |
The above function will help to keep track of the progress for the asynchronous PostgreSQL transactions.
Use the status and poll functions on a PostgreSQL transaction
Now that function are defined you can use them to track the progress of your PostgreSQL transactions by calling them before and after a SQL statement is executed.
Get the PostgreSQL connection status before any transactions are made
The transaction status function should return 1
and the poll status should return a state of Connection is ready for a transaction.
Here’s how to call the functions:
1 2 3 4 5 | # get transaction status BEFORE get_transaction_status() # get the poll status BEFORE check_poll_status() |
Use a try-except indentation block to execute a bad SQL statement
The following SQL statement will return a syntax error when executed:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | try: cursor.execute("THIS WILL RAISE AN EXCEPTION") except: print ("TIME:", time.time() - start_time) # get the poll status again check_poll_status() # get transaction status AFTER get_transaction_status(conn) # rollback the previous transaction in order to make another conn.rollback() # make another SQL request to sleep cursor.execute("select pg_sleep(4)") # get the poll status again check_poll_status() |
In the case of an exception the above code will execute the following SQL string in order to “sleep” for 4 seconds, but the previous execution must be rolled back first, or else it will raise another exception:
1 | "select pg_sleep(4)" |
Close the PostgreSQL connection and cursor
The following code closes the cursor and connection objects and then and then psycopg2 will raise an InterfaceError
when the poll status function is called since the connection has been broken:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | # close the cursor() object to avoid memory leaks cursor.close() # close the connection to avoid memory leaks conn.close() # polling the connection will now raise an error try: # get the poll one last time check_poll_status() except InterfaceError as error: print ("psycopg2 ERROR:", error) # print the elapsed time since the beginning of the script print ("\nEND TIME:", time.time() - start_time) |
Conclusion
Run the Python script with the python3
command to execute the PostgreSQL requests using the psycopg2 adapter:
1 | python3 get_status.py |
Use try-except indentation blocks in Python, in conjunction with the psycopg2 library’s polling and status methods, in order to keep track of transactions and how they affect the connection to PostgreSQL.
Just the Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | #!/usr/bin/python3 # -*- coding: utf-8 -*- # import the psycopg2 database adapter for PostgreSQL from psycopg2 import extensions, connect, InterfaceError # import the psycopg2 errors library import psycopg2.errors # import time library import time # create a timestamp for the start of the script start_time = time.time() # declare a new PostgreSQL connection object conn = connect( dbname = "python_test", user = "objectrocket", host = "localhost", password = "mypass" ) # create a cursor object from the connection cursor = conn.cursor() # define a function that parses the connection's poll() response def check_poll_status(): """ extensions.POLL_OK == 0 extensions.POLL_READ == 1 extensions.POLL_WRITE == 2 """ if conn.poll() == extensions.POLL_OK: print ("POLL: POLL_OK") if conn.poll() == extensions.POLL_READ: print ("POLL: POLL_READ") if conn.poll() == extensions.POLL_WRITE: print ("POLL: POLL_WRITE") return conn.poll() # define a function that returns the PostgreSQL connection status def get_transaction_status(): # print the connection status print ("\nconn.status:", conn.status) # evaluate the status for the PostgreSQL connection if conn.status == extensions.STATUS_READY: print ("psycopg2 status #1: Connection is ready for a transaction.") elif conn.status == extensions.STATUS_BEGIN: print ("psycopg2 status #2: An open transaction is in process.") elif conn.status == extensions.STATUS_IN_TRANSACTION: print ("psycopg2 status #3: An exception has occured.") print ("Use tpc_commit() or tpc_rollback() to end transaction") elif conn.status == extensions.STATUS_PREPARED: print ("psycopg2 status #4:A transcation is in the 2nd phase of the process.") return conn.status # get transaction status BEFORE get_transaction_status() # get the poll status BEFORE check_poll_status() try: cursor.execute("THIS WILL RAISE AN EXCEPTION") except Exception as err: print ("cursor.execute() ERROR:", err) print ("TIME:", time.time() - start_time) # get the poll status again check_poll_status() # get transaction status AFTER get_transaction_status() # rollback the previous transaction in order to make another conn.rollback() # make another SQL request to sleep cursor.execute("select pg_sleep(4)") # get the poll status again check_poll_status() # get transaction status LAST TIME get_transaction_status() # close the cursor() object to avoid memory leaks cursor.close() # close the connection to avoid memory leaks conn.close() # polling the connection will now raise an error try: # get the poll one last time check_poll_status() except InterfaceError as error: print ("psycopg2 ERROR:", error) # print the elapsed time since the beginning of the script print ("\nEND TIME:", time.time() - start_time) |
Pilot the ObjectRocket Platform Free!
Try Fully-Managed CockroachDB, Elasticsearch, MongoDB, PostgreSQL (Beta) or Redis.
Get Started