How to Bulk Index Elasticsearch Documents From A JSON File Using Python

Introduction

When you’re managing data in Elasticsearch, it’s important to know how to bulk index documents in an efficient manner. This article will explain how to bulk index Elasticsearch documents from a JSON file using Python. The Python script will index the data as Elasticsearch documents with the help of the Python client library and Python’s built-in json library. This process is a simple and efficient one because Python has native JSON support built into its language. The Python dict object can easily be converted to a JSON string using its dumps() method.

Prerequisites

Before we dive into the code, let’s review a few prerequisites that must be in place in order to bulk index Elasticsearch documents from a JSON file using Python:

  • First, the Elasticsearch service must be running on the same machine or server where the JSON file is stored.

  • It’s recommended that you use Python 3 instead of Python 2.7 since Python 2 is now deprecated.

  • You will also need the PIP3 package manager for Python 3 if the elasticsearch client library for Python 3 is not installed. You can type pip3 list to get a list of all of the packages currently installed on your machine.

  • If it’s not installed already, you can install the low-level client Elasticsearch distribution for Python 3 using PIP3:

pip3 install elasticsearch

Create a JSON file with documents to be indexed to Elasticsearch

Now that we’ve discussed the prerequisites, we can go ahead and create a new JSON file with the .json file extension. We’ll be adding some documents with fields to be indexed in Elasticsearch. You can use the touch command in a terminal window to create your JSON file:

touch docs.json

Let’s edit the file and make sure that all of the documents have the same field types; this will prevent Elasticsearch from raising a _mapping exception. For this tutorial, we’ll use the following documents as examples:

{"str field": "some string", "int field": 12345, "bool field": True}
{"str field": "another string", "int field": 42, "bool field": False}
{"str field": "random string", "int field": 3856452, "bool field": True}
{"str field": "string value", "int field": 11111, "bool field": False}
{"str field": "last string", "int field": 54321, "bool field": True}

NOTE: The JSON file must be in the same directory as the Python script that’s making the Bulk API call to Elasticsearch. If it’s in a different directory, you must specify the relative path to the JSON file when using the open() command in Python.

Import the Python package libraries for the Elasticsearch Bulk API call

Next, we’ll create a new Python script in the same directory as our JSON file using the command touch bulk_index.py. Let’s make sure to import the package libraries for JSON, as well as the Elasticsearch and helpers method libraries, at the beginning of the script:

# import Python's JSON library for its loads() method
import json

# import time for its sleep method
from time import sleep

# import the datetime libraries datetime.now() method
from datetime import datetime

# use the elasticsearch client's helpers class for _bulk API
from elasticsearch import Elasticsearch, helpers

NOTE: In this script, we’ll be using the datetime.now() method call to create a timestamp for the Elasticsearch documents.

Declare a client instance of the Elasticsearch low-level library

The next step is to pass a string to the Elasticsearch() method call. When you follow along with this example, be sure to replace the default values with your cluster’s host name and the port that Elasticsearch is running on:

# declare a client instance of the Python Elasticsearch library
client = Elasticsearch("localhost:9200")

NOTE: The default port for Elasticsearch is 9200, but this can be configured in your server’s elasticsearch.yml file.

Open the JSON data and return a list of Elasticsearch documents

In this next bit of code, we declare a function that will open the JSON file’s data and return a list of JSON object strings that can be indexed as Elasticsearch documents:

# define a function that will load a text file
def get_data_from_text_file(self):
# the function will return a list of docs
return [l.strip() for l in open(str(self), encoding="utf8", errors='ignore')]

# call the function to get the string data containing docs
docs = get_data_from_text_file("data.json")

# print the length of the documents in the string
print ("String docs length:", len(docs))

NOTE: Be sure to pass the relative path to the .json file in the string argument as well if the file is not located in the same directory as the Python script.

Iterate over the list of JSON document strings and create Elasticsearch dictionary objects

Let’s create an empty list object ([]) that will hold the dict documents created from the JSON strings in the .json file. After the list declaration, we use enumerate() and iterate over the list of strings returned from the JSON file:

# define an empty list for the Elasticsearch docs
doc_list = []

# use Python's enumerate() function to iterate over list of doc strings
for num, doc in enumerate(docs):

# catch any JSON loads() errors
try:

Avoid getting a ‘JSONDecodeError’ by making Boolean values lowercase

It’s important to make any Python Boolean values lowercase before the string is passed to the json.loads() method call; otherwise, Python will return a JSONDecodeError exception.

This can be done using the Python string’s built-in replace() method call:

# prevent JSONDecodeError resulting from Python uppercase boolean
doc = doc.replace("True", "true")
doc = doc.replace("False", "false")

Python JSON library returning a JSONDecodeError in IDLE because of title cased Boolean values

Pass the JSON string to the ‘json.loads()’ method call

In this section, we’ll pass the fixed JSON string to Python’s json.loads() method. It should return a dict object that will represent the Elasticsearch document:

# convert the string to a dict object
dict_doc = json.loads(doc)

Add more fields to the Elasticsearch Python dict document

It’s possible to add more fields to the document by assigning a value to a new key in the dictionary object. The following example assigns the documents a new field called "timestamp", and it uses the datetime.now() method call to generate the value for this field:

# add a new field to the Elasticsearch doc
dict_doc["timestamp"] = datetime.now()

Create a custom _id for the Elasticsearch Python document

You don’t have to assign an "_id" value to the Elasticsearch document; however, Elasticsearch will generate a unique alpha-numeric _id for you if this value is omitted:

# add a dict key called "_id" if you'd like to specify an ID for the doc
dict_doc["_id"] = num

The enumerate() iterator function returns an integer, representing a tally of each iteration and the value. The following example uses that num value to create a number _id for each document.

Screenshot of Elasticsearch documents with IDs created by the user in Python at the time of indexing

Append the Elasticsearch ‘dict’ object to the list

We use the += Python operator to append the dict JSON object to the list that will eventually be passed to the helpers.bulk() method:

# append the dict object to the list []
doc_list += [dict_doc]

except json.decoder.JSONDecodeError as err:
# print the errors
print ("ERROR for num:", num, "-- JSONDecodeError:", err, "for doc:", doc)

print ("Dict docs length:", len(doc_list))

NOTE: If there are any JSONDecodeError exceptions raised, then those documents will not be added to the list. You can print the length of the list object using len() to see how many documents were added.

Pass the list of Elasticsearch documents to the client’s helpers.bulk() method

In this section, we’ll pass the doc_list of Elasticsearch documents objects to the helpers.bulk() method. Make sure to pass the client instance and specify an index name when you call the method:

# attempt to index the dictionary entries using the helpers.bulk() method
try:
print ("\nAttempting to index the list of docs using helpers.bulk()")

# use the helpers library's Bulk API to index list of Elasticsearch docs
resp = helpers.bulk(
client,
doc_list,
index = "some_index",
doc_type = "_doc"
)

NOTE: The document’s doc_type parameter has been deprecated since Elasticsearch v6.0. Pass the "_doc" string as an argument instead if you’re using a newer version of Elasticsearch.

Print the API response returned by the Elasticsearch cluster

The try-catch indentation will exit the script if any errors occur. Otherwise, the method call should return a response indicating how many documents were indexed:

# print the response returned by Elasticsearch
print ("helpers.bulk() RESPONSE:", resp)
print ("helpers.bulk() RESPONSE:", json.dumps(resp, indent=4))

except Exception as err:

# print any errors returned while making the helpers.bulk() API call
print("Elasticsearch helpers.bulk() ERROR:", err)
quit()

Make a search request to get all of the Elasticsearch index’s documents

In the next bit of code, we’ll create a query filter that will return all of the documents in the Elasticsearch index:

# get all of docs for the index
query_all = {
'size' : 10000,
'query': {
'match_all' : {}
}
}

NOTE: The default number of documents returned by the search() method is 10, and the maximum integer value you can pass to the size parameter is 10_000. Any value larger than 10,000 will return an error saying "Result window is too large, from + size must be less than or equal to: [10000]".

Make the script pause before searching for the documents

We can use Python’s time library to have our script “sleep” for a few seconds. This ensures that the Elasticsearch cluster has time to index the documents before another API call is made to return them with search():

print ("\nSleeping for a few seconds to wait for indexing request to finish.")
sleep(2)

Verify that the JSON string objects were indexed into Elasticsearch as documents

Let’s call the Elasticsearch Python client instance’s search() method and have it return all of the documents:

# pass the query_all dict to search() method
resp = client.search(
index = "some_index",
body = query_all
)

print ("search() response:", json.dumps(resp, indent=4))

# print the number of docs in index
print ("Length of docs returned by search():", len(resp['hits']['hits']))

NOTE: The documents will be nested inside the response object’s ['hits']['hits'] keys.

Conclusion

In this article, we’ve explained how to bulk index Elasticsearch documents from a JSON file using Python. We’ve completed our example script; now, all we need to do is run it. We can run the Python script using the python3 command:

python3 bulk_index.py

The following output should be returned at the end of the script:

Length of docs returned by search(): 5

Terminal screenshot of Python using the helpers.bulk() method to index JSON documents into Elasticsearch

Use Kibana to get the Elasticsearch documents indexed using helpers.bulk()

You can also make an HTTP GET request within Kibana’s Console UI to get all of the documents you just indexed:

GET some_index/_search
{
"query": {
"match_all": {}
}
}

Screenshot of Kibana getting the Elasticsearch documents indexed in Python

Just the Code

Throughout our tutorial, we looked at the example code one section at a time. In this section, we’ve included the Python script in its entirety:

#!/usr/bin/env python3
#-*- coding: utf-8 -*-

# import Python's JSON library for its loads() method
import json

# import time for its sleep method
from time import sleep

# import the datetime libraries datetime.now() method
from datetime import datetime

# use the Elasticsearch client's helpers class for _bulk API
from elasticsearch import Elasticsearch, helpers

# declare a client instance of the Python Elasticsearch library
client = Elasticsearch("localhost:9200")

"""
JSON DATA IN FILE:
{"str field": "some string", "int field": 12345, "bool field": True}
{"str field": "another string", "int field": 42, "bool field": False}
{"str field": "random string", "int field": 3856452, "bool field": True}
{"str field": "string value", "int field": 11111, "bool field": False}
{"str field": "last string", "int field": 54321, "bool field": True}
"""


# define a function that will load a text file
def get_data_from_text_file(self):
# the function will return a list of docs
return [l.strip() for l in open(str(self), encoding="utf8", errors='ignore')]

# call the function to get the string data containing docs
docs = get_data_from_text_file("data.json")

# print the length of the documents in the string
print ("String docs length:", len(docs))

# define an empty list for the Elasticsearch docs
doc_list = []

# use Python's enumerate() function to iterate over list of doc strings
for num, doc in enumerate(docs):

# catch any JSON loads() errors
try:

# prevent JSONDecodeError resulting from Python uppercase boolean
doc = doc.replace("True", "true")
doc = doc.replace("False", "false")

# convert the string to a dict object
dict_doc = json.loads(doc)

# add a new field to the Elasticsearch doc
dict_doc["timestamp"] = datetime.now()

# add a dict key called "_id" if you'd like to specify an ID for the doc
dict_doc["_id"] = num

# append the dict object to the list []
doc_list += [dict_doc]

except json.decoder.JSONDecodeError as err:
# print the errors
print ("ERROR for num:", num, "-- JSONDecodeError:", err, "for doc:", doc)

print ("Dict docs length:", len(doc_list))

# attempt to index the dictionary entries using the helpers.bulk() method
try:
print ("\nAttempting to index the list of docs using helpers.bulk()")

# use the helpers library's Bulk API to index list of Elasticsearch docs
resp = helpers.bulk(
client,
doc_list,
index = "some_index",
doc_type = "_doc"
)

# print the response returned by Elasticsearch
print ("helpers.bulk() RESPONSE:", resp)
print ("helpers.bulk() RESPONSE:", json.dumps(resp, indent=4))

except Exception as err:

# print any errors returned w
## Prerequisiteshile making the helpers.bulk() API call
print("Elasticsearch helpers.bulk() ERROR:", err)
quit()

# get all of docs for the index
# Result window is too large, from + size must be less than or equal to: [10000]
query_all = {
'size' : 10_000,
'query': {
'match_all' : {}
}
}

print ("\nSleeping for a few seconds to wait for indexing request to finish.")
sleep(2)

# pass the query_all dict to search() method
resp = client.search(
index = "some_index",
body = query_all
)

print ("search() response:", json.dumps(resp, indent=4))

# print the number of docs in index
print ("Length of docs returned by search():", len(resp['hits']['hits']))

"""
Length of docs returned by search(): 5
"""

Pilot the ObjectRocket Platform Free!

Try Fully-Managed CockroachDB, Elasticsearch, MongoDB, PostgreSQL (Beta) or Redis.

Get Started

Keep in the know!

Subscribe to our emails and we’ll let you know what’s going on at ObjectRocket. We hate spam and make it easy to unsubscribe.