How to use Python helpers to bulk load data into an Elasticsearch index
Introduction
Python helpers do exactly what they say: They help you get things done. One of the most efficient ways to streamline indexing is through the helpers.bulk
method. Indexing large datasets without putting them into memory is the key to expediting search results and saving system resources. Learn exactly how to call the bulk
method with this step-by-step tutorial about Python helpers bulk load Elasticsearch.
The structure of the helpers.bulk
method:
1 | helpers.bulk( {CLIENT_OBJ}, {ACTION_ITERATOR}, {INDEX_NAME}, {DOC_TYPE} ) |
The client instance
helpers.bulk( {CLIENT_OBJ}
is the first parameter you see in the codeThe custom iterator
{ACTION_ITERATOR}
gives the iteration for document bulk indexing of several documentsIf in the
action
iterator, the index name, and it’s document type are not declared, they can be passed along as stringsRead Elasticsearch documentation for the complete helpers class parameter list
Tip: For API calls, Elasticsearch uses slightly different parameters in two situations to avoid conflicts with Python’s keyword list. It uses
doc_type
andfrom_
in place oftype
andfrom
. The API list contains more details.
Prerequisites
Python – Install the latest version for your platform such as MacOS, Windows, Unix and Unix-like (Linux), and more.
Python client library (low-level) for Elasticsearch – Install Python 3 because Python 2 will soon be outdated. It expires when the year 2020 arrives. Use the
python
command for Pyton2.x if you want to use Python 2 until it becomes unavailable. Otherwise, for Python 3.x usepython3
.The correct client module must be installed or you’ll see the error message
ImportError: No module named Elasticsearch
or a similar one.- Install the client library with
pip
.
1 2 | pip install elasticsearch # Python 2 install pip3 install elasticsearch # Python 3 install |
- Run Elasticsearch. Use the
GET
request for cURL.
1 2 | curl -XGET http://localhost:9200/ # or try.. curl -XGET https://{YOUR_DOMAIN}:{YOUR_CUSTOM_PORT} |
- To confirm that Elasticsearch is running, use the
requests
library from Python.
1 2 3 | import requests res = requests.get('http://localhost:9200') print(res.content) |
- Documents will be bulked into an Elasticsearch index. Bring up the index list of those documents with the following cURL request.
1 | curl -XGET localhost:9200/_cat/indices?v |
This example shows the document’s ID as a custom universally unique identifier (UUID). You can do the same thing if you import these three:
Python’s UUID module – Supports Python 2.3 or higher.
The helper’s module – Python helpers to import Elasticsearch data. The module supports these platforms: Python 2.6+ and Python 3.2+ on Windows in process, Python 3.2+ on Unix Portable Operating System Interface (POSIX). Read the helper documentation to find out additional details about the API’s function.
An operating system – The OS interface you have or prefer to use, for example, Windows, MacOS, or Linux/Unix.
Here’s an example of the header script:
1 2 3 4 | #!/usr/bin/env python3 #-*- coding: utf-8 -*- from elasticsearch import Elasticsearch, helpers import os, uuid |
Get the JSON data “actions” object ready
The list object that behaves like an iterator is
helpers.bulk
and theactions
paramenter gets passed there.See the example below. Using the
for
loop method, 100 various documents are created.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | actions = [ { "_id" : uuid.uuid4(), # random UUID for _id "doc_type" : "person", # document _type "doc": { # the body of the document "name": "George Peterson", "sex": "male", "age": 34+doc, "years": 10+doc } } for doc in range(100) ] try: # make the bulk call, and get a response response = helpers.bulk(elastic, bulk_json_data("people.json", "employees", "people")) #response = helpers.bulk(elastic, actions, index='employees', doc_type='people') print ("\nRESPONSE:", response) except Exception as e: print("\nERROR:", e) |
Two key benefits of helpers of bulk API’s
The beauty of the bulk helpers is that by design, they accept two things:
An iterable which can double as a generator so you can bypass loading huge datasets into memory and still index them fast.
An Elasticsearch class is also what every bulk helper accepts.
JSON file bulk document indexing – use a custom generator
Here’s an example of a JSON file containing several Elasticsearch documents:
1 2 3 4 5 6 7 8 9 10 11 12 | {"index":{}} {"name": "Moghul Hecuba", "age": "48", "sex": "female", "accounts": "moghul_hecuba", "join_date": "2011-07-25"} {"index":{}} {"name": "Suffolk McGrath", "age": "36", "sex": "male", "accounts": "suffolk_mcgrath", "join_date": "2015-04-21"} {"index":{}} {"name": "McMullen Benzedrine", "age": "41", "sex": "male", "accounts": "mcmullen_benzedrine", "join_date": "2012-01-05"} {"index":{}} {"name": "Angus Harley", "age": "55", "sex": "male", "accounts": "angus_harley", "join_date": "2014-01-31"} {"index":{}} {"name": "Darlene Corinth", "age": "43", "sex": "male", "accounts": "darlene_corinth", "join_date": "2015-06-02"} [...] [...] |
Each document has it’s own respective row, and a header row indicating the Elasticsearch index. These example documents don’t specify the Elasticsearch index name, because the index will be passed to the helpers.bulk()
method’s API call later on.
Now, get the working path for the Python script by creating a function if the JSON file and the script are in the same directory:
1 2 3 4 5 6 7 8 9 10 11 | ''' a simple function that gets the working path of the Python script and returns it ''' def script_path(): path = os.path.dirname(os.path.realpath(__file__)) if os.name == 'posix': # posix is for macOS or Linux path = path + "/" else: path = path + chr(92) # backslash is for Windows return path |
- If the JSON file and Python script are in different directories, use the example below. A custom path for the file name is available too.
1 2 3 4 5 6 7 8 9 | def get_data_from_file(file_name): if "/" in file_name or chr(92) in file_name: file = open(file_name, encoding="utf8", errors='ignore') else: # use the script_path() function to get path if none is passed file = open(script_path() + str(file_name), encoding="utf8", errors='ignore') data = [line.strip() for line in file] file.close() return data |
- Iterators, including functions for the
action
parameter are allowable with the Bulk API. Again, as you can see, generators enable large datasets won’t have to be loaded into memory to slow down the process. It’s the best way for Python helpers to import Elasticsearch data.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | ''' generator to push bulk data from a JSON file into an Elasticsearch index ''' def bulk_json_data(json_file, _index, doc_type): json_list = get_data_from_file(json_file) for doc in json_list: # use a `yield` generator so that the data # isn't loaded into memory if '{"index"' not in doc: yield { "_index": _index, "_type": doc_type, "_id": uuid.uuid4(), "_source": doc } |
>A Tip: To create a custom generator fast for the bulk load data helper Python technique, use the bulk
method and the parameters in the example here below.
1 2 3 4 5 6 7 8 | helpers.bulk( {CLIENT_INSTANCE}, bulk_json_data( {JSON_FILE_NAME}, {INDEX_NAME}, {DOC_TYPE} ) ) |
- Try the bulk call the
helpers.bulk
method. Pass the wholebulk_json_data()
as theactions
parameter.
1 2 3 4 5 6 | try: # make the bulk call, and get a response response = helpers.bulk(elastic, bulk_json_data("people.json", "employees", "people")) print ("\nRESPONSE:", response) except Exception as e: print("\nERROR:", e) |
Conclusion
in this tutorial, you learned how to use the helpers.bulk
method. It is an excellent way to index large datasets without putting them into memory. This speeds up the indexing when you need to bulk import Elasticsearch data in Python. You save time by sreamlining processes to complete coding done faster with Python helpers bulk load Elasticsearch.
An example of a successful API call terminal output you might see looks like this.
1 | RESPONSE: (1000, []) |
Although the above number in the output example shows “1000” documents, that number is fictitious. The actual document number you’ll see will reflect what’s contained in the JSON file.
Python helpers bulk load elasticsearch: The entire code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 | #!/usr/bin/env python3 #-*- coding: utf-8 -*- from elasticsearch import Elasticsearch, helpers import os, uuid # create a new instance of the Elasticsearch client class elastic = Elasticsearch() # ...or uncomment to use this instead: #elastic = Elasticsearch("localhost") ''' a simple function that gets the working path of the Python script and returns it ''' def script_path(): path = os.path.dirname(os.path.realpath(__file__)) if os.name == 'posix': # posix is for macOS or Linux path = path + "/" else: path = path + chr(92) # backslash is for Windows return path ''' this function opens a file and returns its contents as a list of strings split by linebreaks ''' def get_data_from_file(self, path=script_path()): file = open(path + str(self), encoding="utf8", errors='ignore') data = [line.strip() for line in file] file.close() return data ''' generator to push bulk data from a JSON file into an Elasticsearch index ''' def bulk_json_data(json_file, _index, doc_type): json_list = get_data_from_file(json_file) for doc in json_list: # use a `yield` generator so that the data # isn't loaded into memory if '{"index"' not in doc: yield { "_index": _index, "_type": doc_type, "_id": uuid.uuid4(), "_source": doc } try: # make the bulk call, and get a response response = helpers.bulk(elastic, bulk_json_data("people.json", "employees", "people")) print ("\nbulk_json_data() RESPONSE:", response) except Exception as e: print("\nERROR:", e) # iterator for a single document actions = [ { "_id" : uuid.uuid4(), # random UUID for _id "doc_type" : "person", # document _type "doc": { # the body of the document "name": "George Peterson", "sex": "male", "age": 34, "years": 10 } } ] # iterator for multiple docs actions = [ { "_id" : uuid.uuid4(), # random UUID for _id "doc_type" : "person", # document _type "doc": { # the body of the document "name": "George Peterson", "sex": "male", "age": 34+doc, "years": 10+doc } } for doc in range(100) # use 'for' loop to insert 100 documents ] try: # make the bulk call using 'actions' and get a response response = helpers.bulk(elastic, actions, index='employees', doc_type='people') print ("\nactions RESPONSE:", response) except Exception as e: print("\nERROR:", e) |
Pilot the ObjectRocket Platform Free!
Try Fully-Managed CockroachDB, Elasticsearch, MongoDB, PostgreSQL (Beta) or Redis.
Get Started