How to Bulk Index Elasticsearch Documents From A JSON File Using Python
Introduction
When you’re managing data in Elasticsearch, it’s important to know how to bulk index documents in an efficient manner. This article will explain how to bulk index Elasticsearch documents from a JSON file using Python. The Python script will index the data as Elasticsearch documents with the help of the Python client library and Python’s built-in json
library. This process is a simple and efficient one because Python has native JSON support built into its language. The Python dict
object can easily be converted to a JSON string using its dumps()
method.
Prerequisites
Before we dive into the code, let’s review a few prerequisites that must be in place in order to bulk index Elasticsearch documents from a JSON file using Python:
First, the Elasticsearch service must be running on the same machine or server where the JSON file is stored.
It’s recommended that you use Python 3 instead of Python 2.7 since Python 2 is now deprecated.
You will also need the PIP3 package manager for Python 3 if the
elasticsearch
client library for Python 3 is not installed. You can typepip3 list
to get a list of all of the packages currently installed on your machine.If it’s not installed already, you can install the low-level client Elasticsearch distribution for Python 3 using PIP3:
Create a JSON file with documents to be indexed to Elasticsearch
Now that we’ve discussed the prerequisites, we can go ahead and create a new JSON file with the .json
file extension. We’ll be adding some documents with fields to be indexed in Elasticsearch. You can use the touch
command in a terminal window to create your JSON file:
Let’s edit the file and make sure that all of the documents have the same field types; this will prevent Elasticsearch from raising a _mapping
exception. For this tutorial, we’ll use the following documents as examples:
{"str field": "another string", "int field": 42, "bool field": False}
{"str field": "random string", "int field": 3856452, "bool field": True}
{"str field": "string value", "int field": 11111, "bool field": False}
{"str field": "last string", "int field": 54321, "bool field": True}
NOTE: The JSON file must be in the same directory as the Python script that’s making the Bulk API call to Elasticsearch. If it’s in a different directory, you must specify the relative path to the JSON file when using the open()
command in Python.
Import the Python package libraries for the Elasticsearch Bulk API call
Next, we’ll create a new Python script in the same directory as our JSON file using the command touch bulk_index.py
. Let’s make sure to import the package libraries for JSON, as well as the Elasticsearch
and helpers
method libraries, at the beginning of the script:
import json
# import time for its sleep method
from time import sleep
# import the datetime libraries datetime.now() method
from datetime import datetime
# use the elasticsearch client's helpers class for _bulk API
from elasticsearch import Elasticsearch, helpers
NOTE: In this script, we’ll be using the datetime.now()
method call to create a timestamp for the Elasticsearch documents.
Declare a client instance of the Elasticsearch low-level library
The next step is to pass a string to the Elasticsearch()
method call. When you follow along with this example, be sure to replace the default values with your cluster’s host name and the port that Elasticsearch is running on:
client = Elasticsearch("localhost:9200")
NOTE: The default port for Elasticsearch is 9200
, but this can be configured in your server’s elasticsearch.yml
file.
Open the JSON data and return a list of Elasticsearch documents
In this next bit of code, we declare a function that will open the JSON file’s data and return a list of JSON object strings that can be indexed as Elasticsearch documents:
def get_data_from_text_file(self):
# the function will return a list of docs
return [l.strip() for l in open(str(self), encoding="utf8", errors='ignore')]
# call the function to get the string data containing docs
docs = get_data_from_text_file("data.json")
# print the length of the documents in the string
print ("String docs length:", len(docs))
NOTE: Be sure to pass the relative path to the .json
file in the string argument as well if the file is not located in the same directory as the Python script.
Iterate over the list of JSON document strings and create Elasticsearch dictionary objects
Let’s create an empty list object ([]
) that will hold the dict
documents created from the JSON strings in the .json
file. After the list declaration, we use enumerate()
and iterate over the list of strings returned from the JSON file:
doc_list = []
# use Python's enumerate() function to iterate over list of doc strings
for num, doc in enumerate(docs):
# catch any JSON loads() errors
try:
Avoid getting a ‘JSONDecodeError’ by making Boolean values lowercase
It’s important to make any Python Boolean values lowercase before the string is passed to the json.loads()
method call; otherwise, Python will return a JSONDecodeError
exception.
This can be done using the Python string’s built-in replace()
method call:
doc = doc.replace("True", "true")
doc = doc.replace("False", "false")
Pass the JSON string to the ‘json.loads()’ method call
In this section, we’ll pass the fixed JSON string to Python’s json.loads()
method. It should return a dict
object that will represent the Elasticsearch document:
dict_doc = json.loads(doc)
Add more fields to the Elasticsearch Python dict document
It’s possible to add more fields to the document by assigning a value to a new key in the dictionary object. The following example assigns the documents a new field called "timestamp"
, and it uses the datetime.now()
method call to generate the value for this field:
dict_doc["timestamp"] = datetime.now()
Create a custom _id for the Elasticsearch Python document
You don’t have to assign an "_id"
value to the Elasticsearch document; however, Elasticsearch will generate a unique alpha-numeric _id
for you if this value is omitted:
dict_doc["_id"] = num
The enumerate()
iterator function returns an integer, representing a tally of each iteration and the value. The following example uses that num
value to create a number _id
for each document.
Append the Elasticsearch ‘dict’ object to the list
We use the +=
Python operator to append the dict
JSON object to the list that will eventually be passed to the helpers.bulk()
method:
doc_list += [dict_doc]
except json.decoder.JSONDecodeError as err:
# print the errors
print ("ERROR for num:", num, "-- JSONDecodeError:", err, "for doc:", doc)
print ("Dict docs length:", len(doc_list))
NOTE: If there are any JSONDecodeError
exceptions raised, then those documents will not be added to the list. You can print the length of the list
object using len()
to see how many documents were added.
Pass the list of Elasticsearch documents to the client’s helpers.bulk() method
In this section, we’ll pass the doc_list
of Elasticsearch documents objects to the helpers.bulk()
method. Make sure to pass the client instance and specify an index name when you call the method:
try:
print ("\nAttempting to index the list of docs using helpers.bulk()")
# use the helpers library's Bulk API to index list of Elasticsearch docs
resp = helpers.bulk(
client,
doc_list,
index = "some_index",
doc_type = "_doc"
)
NOTE: The document’s doc_type
parameter has been deprecated since Elasticsearch v6.0. Pass the "_doc"
string as an argument instead if you’re using a newer version of Elasticsearch.
Print the API response returned by the Elasticsearch cluster
The try-catch indentation will exit the script if any errors occur. Otherwise, the method call should return a response indicating how many documents were indexed:
print ("helpers.bulk() RESPONSE:", resp)
print ("helpers.bulk() RESPONSE:", json.dumps(resp, indent=4))
except Exception as err:
# print any errors returned while making the helpers.bulk() API call
print("Elasticsearch helpers.bulk() ERROR:", err)
quit()
Make a search request to get all of the Elasticsearch index’s documents
In the next bit of code, we’ll create a query filter that will return all of the documents in the Elasticsearch index:
query_all = {
'size' : 10000,
'query': {
'match_all' : {}
}
}
NOTE: The default number of documents returned by the search()
method is 10
, and the maximum integer value you can pass to the size
parameter is 10_000
. Any value larger than 10,000 will return an error saying "Result window is too large, from + size must be less than or equal to: [10000]"
.
Make the script pause before searching for the documents
We can use Python’s time
library to have our script “sleep” for a few seconds. This ensures that the Elasticsearch cluster has time to index the documents before another API call is made to return them with search()
:
sleep(2)
Verify that the JSON string objects were indexed into Elasticsearch as documents
Let’s call the Elasticsearch Python client instance’s search()
method and have it return all of the documents:
resp = client.search(
index = "some_index",
body = query_all
)
print ("search() response:", json.dumps(resp, indent=4))
# print the number of docs in index
print ("Length of docs returned by search():", len(resp['hits']['hits']))
NOTE: The documents will be nested inside the response object’s ['hits']['hits']
keys.
Conclusion
In this article, we’ve explained how to bulk index Elasticsearch documents from a JSON file using Python. We’ve completed our example script; now, all we need to do is run it. We can run the Python script using the python3
command:
The following output should be returned at the end of the script:
Use Kibana to get the Elasticsearch documents indexed using helpers.bulk()
You can also make an HTTP GET
request within Kibana’s Console UI to get all of the documents you just indexed:
{
"query": {
"match_all": {}
}
}
Just the Code
Throughout our tutorial, we looked at the example code one section at a time. In this section, we’ve included the Python script in its entirety:
#-*- coding: utf-8 -*-
# import Python's JSON library for its loads() method
import json
# import time for its sleep method
from time import sleep
# import the datetime libraries datetime.now() method
from datetime import datetime
# use the Elasticsearch client's helpers class for _bulk API
from elasticsearch import Elasticsearch, helpers
# declare a client instance of the Python Elasticsearch library
client = Elasticsearch("localhost:9200")
"""
JSON DATA IN FILE:
{"str field": "some string", "int field": 12345, "bool field": True}
{"str field": "another string", "int field": 42, "bool field": False}
{"str field": "random string", "int field": 3856452, "bool field": True}
{"str field": "string value", "int field": 11111, "bool field": False}
{"str field": "last string", "int field": 54321, "bool field": True}
"""
# define a function that will load a text file
def get_data_from_text_file(self):
# the function will return a list of docs
return [l.strip() for l in open(str(self), encoding="utf8", errors='ignore')]
# call the function to get the string data containing docs
docs = get_data_from_text_file("data.json")
# print the length of the documents in the string
print ("String docs length:", len(docs))
# define an empty list for the Elasticsearch docs
doc_list = []
# use Python's enumerate() function to iterate over list of doc strings
for num, doc in enumerate(docs):
# catch any JSON loads() errors
try:
# prevent JSONDecodeError resulting from Python uppercase boolean
doc = doc.replace("True", "true")
doc = doc.replace("False", "false")
# convert the string to a dict object
dict_doc = json.loads(doc)
# add a new field to the Elasticsearch doc
dict_doc["timestamp"] = datetime.now()
# add a dict key called "_id" if you'd like to specify an ID for the doc
dict_doc["_id"] = num
# append the dict object to the list []
doc_list += [dict_doc]
except json.decoder.JSONDecodeError as err:
# print the errors
print ("ERROR for num:", num, "-- JSONDecodeError:", err, "for doc:", doc)
print ("Dict docs length:", len(doc_list))
# attempt to index the dictionary entries using the helpers.bulk() method
try:
print ("\nAttempting to index the list of docs using helpers.bulk()")
# use the helpers library's Bulk API to index list of Elasticsearch docs
resp = helpers.bulk(
client,
doc_list,
index = "some_index",
doc_type = "_doc"
)
# print the response returned by Elasticsearch
print ("helpers.bulk() RESPONSE:", resp)
print ("helpers.bulk() RESPONSE:", json.dumps(resp, indent=4))
except Exception as err:
# print any errors returned w
## Prerequisiteshile making the helpers.bulk() API call
print("Elasticsearch helpers.bulk() ERROR:", err)
quit()
# get all of docs for the index
# Result window is too large, from + size must be less than or equal to: [10000]
query_all = {
'size' : 10_000,
'query': {
'match_all' : {}
}
}
print ("\nSleeping for a few seconds to wait for indexing request to finish.")
sleep(2)
# pass the query_all dict to search() method
resp = client.search(
index = "some_index",
body = query_all
)
print ("search() response:", json.dumps(resp, indent=4))
# print the number of docs in index
print ("Length of docs returned by search():", len(resp['hits']['hits']))
"""
Length of docs returned by search(): 5
"""
Pilot the ObjectRocket Platform Free!
Try Fully-Managed CockroachDB, Elasticsearch, MongoDB, PostgreSQL (Beta) or Redis.
Get Started