How to use Python helpers to bulk load data into an Elasticsearch index

Introduction

Python helpers do exactly what they say: They help you get things done. One of the most efficient ways to streamline indexing is through the helpers.bulk method. Indexing large datasets without putting them into memory is the key to expediting search results and saving system resources. Learn exactly how to call the bulk method with this step-by-step tutorial about Python helpers bulk load Elasticsearch.

The structure of the helpers.bulk method:

helpers.bulk( {CLIENT_OBJ}, {ACTION_ITERATOR}, {INDEX_NAME}, {DOC_TYPE} )
  • The client instance helpers.bulk( {CLIENT_OBJ} is the first parameter you see in the code

  • The custom iterator {ACTION_ITERATOR} gives the iteration for document bulk indexing of several documents

  • If in the action iterator, the index name, and it’s document type are not declared, they can be passed along as strings

  • Read Elasticsearch documentation for the complete helpers class parameter list

Tip: For API calls, Elasticsearch uses slightly different parameters in two situations to avoid conflicts with Python’s keyword list. It uses doc_type and from_ in place of type and from. The API list contains more details.

Prerequisites

  • Python – Install the latest version for your platform such as MacOS, Windows, Unix and Unix-like (Linux), and more.

  • Python client library (low-level) for Elasticsearch – Install Python 3 because Python 2 will soon be outdated. It expires when the year 2020 arrives. Use the python command for Pyton2.x if you want to use Python 2 until it becomes unavailable. Otherwise, for Python 3.x use python3.

  • The correct client module must be installed or you’ll see the error message ImportError: No module named Elasticsearch or a similar one.

  • Install the client library with pip.
pip install elasticsearch # Python 2 install
pip3 install elasticsearch # Python 3 install
  • Run Elasticsearch. Use the GET request for cURL.
curl -XGET http://localhost:9200/
# or try.. curl -XGET https://{YOUR_DOMAIN}:{YOUR_CUSTOM_PORT}
  • To confirm that Elasticsearch is running, use the requests library from Python.
import requests
res = requests.get('http://localhost:9200')
print(res.content)
  • Documents will be bulked into an Elasticsearch index. Bring up the index list of those documents with the following cURL request.
curl -XGET localhost:9200/_cat/indices?v
  • This example shows the document’s ID as a custom universally unique identifier (UUID). You can do the same thing if you import these three:

  • Python’s UUID module – Supports Python 2.3 or higher.

  • The helper’s module – Python helpers to import Elasticsearch data. The module supports these platforms: Python 2.6+ and Python 3.2+ on Windows in process, Python 3.2+ on Unix Portable Operating System Interface (POSIX). Read the helper documentation to find out additional details about the API’s function.

  • An operating system – The OS interface you have or prefer to use, for example, Windows, MacOS, or Linux/Unix.

  • Here’s an example of the header script:

#!/usr/bin/env python3
#-*- coding: utf-8 -*-
from elasticsearch import Elasticsearch, helpers
import os, uuid

Get the JSON data “actions” object ready

  • The list object that behaves like an iterator is helpers.bulkand the actions paramenter gets passed there.

  • See the example below. Using the for loop method, 100 various documents are created.

actions = [
    {
        "_id" : uuid.uuid4(), # random UUID for _id
        "doc_type" : "person", # document _type
        "doc": { # the body of the document
            "name": "George Peterson",
            "sex": "male",
            "age": 34+doc,
            "years": 10+doc
        }
    }
    for doc in range(100)
]

try:
    # make the bulk call, and get a response
    response = helpers.bulk(elastic, bulk_json_data("people.json", "employees", "people"))

    #response = helpers.bulk(elastic, actions, index='employees', doc_type='people')
    print ("\nRESPONSE:", response)
except Exception as e:
    print("\nERROR:", e)

Two key benefits of helpers of bulk API’s

The beauty of the bulk helpers is that by design, they accept two things:

  1. An iterable which can double as a generator so you can bypass loading huge datasets into memory and still index them fast.

  2. An Elasticsearch class is also what every bulk helper accepts.

JSON file bulk document indexing – use a custom generator

Here’s an example of a JSON file containing several Elasticsearch documents:

{"index":{}}
{"name": "Moghul Hecuba", "age": "48", "sex": "female", "accounts": "moghul_hecuba", "join_date": "2011-07-25"}
{"index":{}}
{"name": "Suffolk McGrath", "age": "36", "sex": "male", "accounts": "suffolk_mcgrath", "join_date": "2015-04-21"}
{"index":{}}
{"name": "McMullen Benzedrine", "age": "41", "sex": "male", "accounts": "mcmullen_benzedrine", "join_date": "2012-01-05"}
{"index":{}}
{"name": "Angus Harley", "age": "55", "sex": "male", "accounts": "angus_harley", "join_date": "2014-01-31"}
{"index":{}}
{"name": "Darlene Corinth", "age": "43", "sex": "male", "accounts": "darlene_corinth", "join_date": "2015-06-02"}
[...]
[...]

Each document has it’s own respective row, and a header row indicating the Elasticsearch index. These example documents don’t specify the Elasticsearch index name, because the index will be passed to the helpers.bulk() method’s API call later on.

Now, get the working path for the Python script by creating a function if the JSON file and the script are in the same directory:

'''
a simple function that gets the working path of
the Python script and returns it
'''

def script_path():
    path = os.path.dirname(os.path.realpath(__file__))
    if os.name == 'posix': # posix is for macOS or Linux
        path = path + "/"
    else:
        path = path + chr(92) # backslash is for Windows
    return path
  • If the JSON file and Python script are in different directories, use the example below. A custom path for the file name is available too.
def get_data_from_file(file_name):
    if "/" in file_name or chr(92) in file_name:
        file = open(file_name, encoding="utf8", errors='ignore')
    else:
        # use the script_path() function to get path if none is passed
        file = open(script_path() + str(file_name), encoding="utf8", errors='ignore')
    data = [line.strip() for line in file]
    file.close()
    return data
  • Iterators, including functions for the action parameter are allowable with the Bulk API. Again, as you can see, generators enable large datasets won’t have to be loaded into memory to slow down the process. It’s the best way for Python helpers to import Elasticsearch data.
'''
generator to push bulk data from a JSON
file into an Elasticsearch index
'''

def bulk_json_data(json_file, _index, doc_type):
    json_list = get_data_from_file(json_file)
    for doc in json_list:
        # use a `yield` generator so that the data
        # isn't loaded into memory

        if '{"index"' not in doc:
            yield {
                "_index": _index,
                "_type": doc_type,
                "_id": uuid.uuid4(),
                "_source": doc
            }

>A Tip: To create a custom generator fast for the bulk load data helper Python technique, use the bulk method and the parameters in the example here below.

helpers.bulk(
    {CLIENT_INSTANCE},
    bulk_json_data(
        {JSON_FILE_NAME},
        {INDEX_NAME},
        {DOC_TYPE}
    )
)
  • Try the bulk call the helpers.bulk method. Pass the whole bulk_json_data() as the actions parameter.
try:
    # make the bulk call, and get a response
    response = helpers.bulk(elastic, bulk_json_data("people.json", "employees", "people"))
    print ("\nRESPONSE:", response)
except Exception as e:
    print("\nERROR:", e)

Conclusion

in this tutorial, you learned how to use the helpers.bulk method. It is an excellent way to index large datasets without putting them into memory. This speeds up the indexing when you need to bulk import Elasticsearch data in Python. You save time by sreamlining processes to complete coding done faster with Python helpers bulk load Elasticsearch.

An example of a successful API call terminal output you might see looks like this.

RESPONSE: (1000, [])

Although the above number in the output example shows “1000” documents, that number is fictitious. The actual document number you’ll see will reflect what’s contained in the JSON file.

Python helpers bulk load elasticsearch: The entire code

#!/usr/bin/env python3
#-*- coding: utf-8 -*-
from elasticsearch import Elasticsearch, helpers
import os, uuid

# create a new instance of the Elasticsearch client class
elastic = Elasticsearch()
# ...or uncomment to use this instead:
#elastic = Elasticsearch("localhost")

'''
a simple function that gets the working path of
the Python script and returns it
'''

def script_path():
    path = os.path.dirname(os.path.realpath(__file__))
    if os.name == 'posix': # posix is for macOS or Linux
        path = path + "/"
    else:
        path = path + chr(92) # backslash is for Windows
    return path


'''
this function opens a file and returns its
contents as a list of strings split by linebreaks
'''

def get_data_from_file(self, path=script_path()):
    file = open(path + str(self), encoding="utf8", errors='ignore')
    data = [line.strip() for line in file]
    file.close()
    return data

'''
generator to push bulk data from a JSON
file into an Elasticsearch index
'''

def bulk_json_data(json_file, _index, doc_type):
    json_list = get_data_from_file(json_file)
    for doc in json_list:
    # use a `yield` generator so that the data
    # isn't loaded into memory
        if '{"index"' not in doc:
            yield {
                "_index": _index,
                "_type": doc_type,
                "_id": uuid.uuid4(),
                "_source": doc
            }

try:
    # make the bulk call, and get a response
    response = helpers.bulk(elastic, bulk_json_data("people.json", "employees", "people"))
    print ("\nbulk_json_data() RESPONSE:", response)
except Exception as e:
    print("\nERROR:", e)

# iterator for a single document
actions = [
    {
        "_id" : uuid.uuid4(), # random UUID for _id
        "doc_type" : "person", # document _type
        "doc": { # the body of the document
            "name": "George Peterson",
            "sex": "male",
            "age": 34,
            "years": 10
        }
    }
]

# iterator for multiple docs
actions = [
    {
        "_id" : uuid.uuid4(), # random UUID for _id
        "doc_type" : "person", # document _type
        "doc": { # the body of the document
            "name": "George Peterson",
            "sex": "male",
            "age": 34+doc,
            "years": 10+doc
        }
    }
    for doc in range(100) # use 'for' loop to insert 100 documents
]

try:
    # make the bulk call using 'actions' and get a response
    response = helpers.bulk(elastic, actions, index='employees', doc_type='people')
    print ("\nactions RESPONSE:", response)
except Exception as e:
    print("\nERROR:", e)

Pilot the ObjectRocket Platform Free!

Try Fully-Managed CockroachDB, Elasticsearch, MongoDB, PostgreSQL (Beta) or Redis.

Get Started

Keep in the know!

Subscribe to our emails and we’ll let you know what’s going on at ObjectRocket. We hate spam and make it easy to unsubscribe.