Migrate Elasticsearch NoSQL to Postgres with Python

Introduction

In this tutorial, we will migrate some Elasticsearch NoSQL to Postgres with Python scripting. We’ll hold your hand each step of the way, beginning with the libraries you will need to install and reference in your Python code in order to open a Elasticsearch database and read from it, as well as the libraries needed to open and write to your PostgreSQL data.

Prerequisites and Assumptions

We’ll assume you have a basic understanding of the following:

  • Python scripting.
  • Elasticsearch and NoSQL. It’s okay if you are a total noob with Elasticsearch and NoSQL. If you follow below, you will learn enough to at least pull some data from it.
  • PostgreSQL.
  • SQL.
  • Optional: Tutorial on naming conventions that explains why we prefix our variables, column names, table names, etc. (all objects) as you see us doing here. For example, naming “tphrase_original” with the “t” you see at the beginning in order to delineate it as a “text” object and “tbl_” before table names in order to clearly mark those objects as tables.

In case you are new to Elasticsearch: Elasticsearch is all about documents, meaning that it stores data as documents and indexes that data automatically so they are easily and quickly searchable. With Elasticsearch you can index, search, sort, and filter documents.

Some Elasticsearch terminology that is important to know because it can be different from what we learned in the RDB (Relational Database) / SQL model: In ES, an index is like a database. It is a place to store related documents. To retrive data we need three pieces of information:

  • Index: Database.
  • Datatype: Type of the document.
  • Id: ID of the document.

First, look at a test database – in Elasticsearch – that we want to migrate the data from, into PostgreSQL.

Elasticsearch structure and data

Here are some records (known in NoSQL as “documents”) from the users table (known in NoSQL as a “collection”) from our Elasticsearch database named “noSQL_db”.

"_id" : ObjectId("1a354b3c12f92b28b8d65f"),
"kind" : "user",
"name" : "Slim Chance",
"phone" : "512-215-3228",
"age" : "35"

"_id" : ObjectId("93d83a8a8ab6558b8d3b5"),
"kind" : "user",
"name" : "Spartan Stoic",
"phone" : "512-525-6127",
"age" : "57"

"_id" : ObjectId("3f744a2c21965e8b8d64f"),
"kind" : "user",
"name" : "Curt Briefton",
"phone" : "512-441-6813",
"age" : "19"

IMPORTANT NOTE: For your ease, we have given the above Elasticsearch JSON index a consistent structure, which you can not count on in a NoSQL database.

Start writing the Python script

For our code to access that data from Python, we’ll need to reference the pyElasticsearch library and Elasticsearch.

from flask import Flask
from flask import render_template # used later to send user to error page.
import psycopg2 # for Postgres database connection
import elasticsearch
from elasticsearch import Elasticsearch
import elasticsearch_dsl
from elasticsearch_dsl import Search

NOTE: we are using Elasticsearch PY, which can be found through https://pypi.org.

Next, let’s set up the Elasticsearch database login, credentials, etc. and set up a connection to the database.

t_host = "localhost"
t_port = "9200"
t_dbname_ES = "noSQL_db" # "index" in Elasticsearch terminology
t_user = "Elasticsearchadmin"
t_pw = "[Elasticsearch database user password here]"
client_ES = Elasticsearch
    (
    [t_host],
    http_auth=(t_user, t_pw),
    port=t_port
    )

Analysis

Looking above, you can see we created an object called “client_ES” that we will use later to send a data request to Elasticsearch.

Below, we set up PostgreSQL credentials and create a connection to the SQL database we will use to write to later.

t_host = "[database address here]"
t_port = "5432" # Default postgres port
t_dbname = "[database name here]"
t_user = "[database user name here]"
t_pw = "[database user password here]"
db_conn = psycopg2.connect(host=t_host, port=t_port, dbname=t_dbname, user=t_user, password=t_pw)
db_cursor = db_conn.cursor()

Analysis

We created the “db_cursor” object for later use in executing an SQL command.

Next we’ll set up two arrays, one for user names and one for user ages.

array_users_name = []
array_users_age = []

Then we get all documents (records) from users collection (table) where the “kind” field’s value is “user”.

    s = Search(index=t_dbname_ES).using(client_ES).query("match", kind="user")
    Elasticsearch_results = s.execute()

Next: Read “name” and “age” from all three of those returned documents into the Python array we had prepared.

    for record in Elasticsearch_results:
        array_users_name.append(record.name)
        array_users_age.append(record.age)

Analysis

  • “for…”: This is a loop to cycle through the records in the “Elasticsearch_results” collection.
  • “append(record[‘name’])”: Adding current item to the user names array for later insertion into our Postgres database.
  • “append(record[‘age’])”: Adding current item to the ages array for insertion into our PostgreSQL users table.

Next, we build the SQL we’ll later execute on our Postgres database using the cursor we created earlier.

    s = ""
    s += "INSERT INTO tbl_users "
    s += "("
    s += "t_name"
    s += ", i_age"
    s += ")"
    s += " VALUES"
    s += "("
    s += "'" + array_users_name + "'"
    s += "," + array_users_age
    s += ")"

Analysis

  • “INSERT INTO tbl_users…”: This adds a row to “tbl_users”.
  • “t_name” and “i_age”: We’re setting up PostgreSQL to receive the two sets of data and telling PG where to put that data.
  • “VALUES (“: This tells PostgreSQL to insert the values that come next into “tbl_users”.
  • “array_users_name” and “array_users_age”: Plugs in the actual values to insert, which in this case are the two arrays we filled from the Elasticsearch source earlier.

Finally, code execution, error checking, object/connection housekeeping, and redirect.

    # Here we catch and display any errors that might occur
    #   while TRYing to commit the execute of our SQL script.
    db_cursor.execute(s)
        try:
            db_conn.commit()
        except psycopg2.Error as e:
            # Build a message to send to the user with potentially useful information, including
            #    the error message and the query that caused the error.
            t_message = "Database error: " + e + "/n SQL: " + s
            # Notice: you may wish to build a template page called "error.html" for dealing with errors.
            #   We are also sending a parameter value along with our call to the page.
            return render_template("error.html", message = t_message)

        # They got this far, meaning success
        # Clean up our Postgres database connection and cursor object
        db_cursor.close()
        db_conn.close()

    # Redirect user to the rest of your application
    return redirect("http://your-URL-here", code=302)

The full code listing

Here’s the final version of your Python code! IMPORTANT: When we say “study the code below,” this is important for more reasons than one. For example, you may need to install some new Python libraries, namely “flask”, “psycopg2”, “pyElasticsearch”, and “Elasticsearch”.

from flask import Flask
from flask import render_template # used later to send user to error page.
import psycopg2 # for Postgres database connection
import elasticsearch
from elasticsearch import Elasticsearch
import elasticsearch_dsl
from elasticsearch_dsl import Search

app = Flask(__name__)

# Elasticsearch Database credentials
t_host = "localhost"
t_port = "9200"
t_dbname_ES = "noSQL_db" # "index" in Elasticsearch terminology
t_user = "Elasticsearchadmin"
t_pw = "[Elasticsearch database user password here]"
client_ES = Elasticsearch
    (
    [t_host],
    http_auth=(t_user, t_pw),
    port=t_port
    )

# Postgres Database credentials
t_host = "[database host address here]"
t_port = "5432" # Default postgres port
t_dbname = "[database name here]"
t_user = "[database user name here]"
t_pw = "[database user password here]"
db_conn = psycopg2.connect(host=t_host, port=t_port, dbname=t_dbname, user=t_user, password=t_pw)
db_cursor = db_conn.cursor()

def migrate():
    # Build array_rows here from Elasticsearch users collection
    array_users_name = []
    array_users_age = []

    # Pull all documents (records) from users collection (table)
    s = Search(index=t_dbname_ES).using(client_ES).query("match", kind="user")
    Elasticsearch_results = s.execute()

    for record in Elasticsearch_results:
        array_users_name.append(record.name)
        array_users_age.append(record.age)

    # Data validation could go here.

    # Destroy the database collection connection
    Elasticsearch_results.close()

    # Send the data in our arrays to the Postgres database
    s = ""
    s += "INSERT INTO tbl_users "
    s += "("
    s += "t_name"
    s += ", i_age"
    s += ")"
    s += " VALUES"
    s += "("
    s += "'" + array_users_name + "'"
    s += "," + array_users_age
    s += ")"
    # IMPORTANT: this structure allows for a hacker to try to insert
    #   potentially damaging code, commonly known as "SQL injection".
    #   You will want to investigate methods for preventing this, including
    #   use of stored procedures.

    # Here we are catching and displaying any errors that occur
    #   while TRYing to commit the execute our SQL script.
    db_cursor.execute(s)
        try:
            db_conn.commit()
        except psycopg2.Error as e:
            # Create a message to send to the user with valuable information, including
            #    the error message and the SQL that caused the error.
            t_message = "Database error: " + e + "/n SQL: " + s
            # Notice: you may want to create a template page called "error.html" for handling errors.
            #   We are also sending a parameter along with our call to the page.
            return render_template("error.html", message = t_message)

        # They got this far, meaning success
        # Clean up our Postgres database connection and cursor object
        db_cursor.close()
        db_conn.close()

    # Redirect user to the rest of your application
    return redirect("http://your-URL-here", code=302)

if __name__ == "__migrate__":
    migrate()

Miscellaneous

To keep this article as simple as possible, we chose to leave out data validation, meaning we recommend you check the data pulled from the Elasticsearch before attempting to insert it into your Postgres database. This is expecially important when dealing with “semi-structured” Elasticsearch NoSQL databases where you can’t 100% count on consistency.

Conclusion

In this article, we explored and learned how to Migrate Elasticsearch NoSQL to Postgres using Python. We showed code samples along the way and explained in detail how and why to do each step. We even took a quick look at some JSON. As part of the process, we used Postgres’ INSERT statement. We also used Python Arrays and the For loop.

Pilot the ObjectRocket Platform Free!

Try Fully-Managed CockroachDB, Elasticsearch, MongoDB, PostgreSQL (Beta) or Redis.

Get Started

Keep in the know!

Subscribe to our emails and we’ll let you know what’s going on at ObjectRocket. We hate spam and make it easy to unsubscribe.