How to Parse Lines in a Text File and Index as Elasticsearch Documents using Python

Introduction

This tutorial will explain how to parse lines in a text file and index them as Elasticsearch Documents using the Python programming language. To legally parse a line-by-line text file index for Elasticsearch documents with Python bulk, the user must have the rights to use to the content being inserting into a MongoDB collection. Alternatively, an opensource/public domain text document should be used. To demonstrate how open, iterate and parse the data in a text file, this tutorial will use the opensource eBook titled “Webster’s Unabridged Dictionary by Various.” This text can be found on the Gutenberg website.

Prerequisites to parse a line-by-line text file index for Elasticsearch documents with Python bulk

  • The text file with the data being indexed into Elasticsearch should be stored on the same machine or server running the Elasticsearch cluster. Use the following cURL request to verify that Elasticsearch is running:
curl -XGET "localhost:9200"

NOTE: Make sure to replace the "localhost" domain and its port with the settings that match the Elasticsearch cluster.

  • Have enough free space on the machine or server to download a copy of Webster’s Dictionary.

  • Installation of Python 3 and the Elasticsearch client for Python 3 to make the API calls to Elasticsearch. Note that Python 2 is now deprecated. Execute the following command to install the low-level client using the PIP3 package manager:

pip3 install elasticsearch

How to Import the Necessary Python and Elasticsearch Libraries

Execute the below script to import the built-in pickle, time and json Python libraries, shown at the start of the script. Then import the Elasticsearch and helpers libraries from the Python low-level client for Elasticsearch shown at the end of the following script:

# import Python's pickle library to serialize the dictionary
import pickle

# import Python's time and JSON library
import time, json

# use the elasticsearch client's helpers class for _bulk API
from elasticsearch import Elasticsearch, helpers

How to use the Python’s time library to store the epoch time of when the script starts

The API call to Elasticsearch may take some time to execute, depending on the size of the text file and the number of documents being parsed. Use the time.time() method call to return a float number representing the epoch time and store it in order to check it at the end of the operation to see how many seconds have passed since the start of the script. Execute the following code:

# keep track of how many seconds it takes
start_time = time.time()

How to Define a Python Function that will Load, Iterate and Parse Data from a Text File

The following script show how to use Python’s def keyword to define a function that will open the text file and parse its data, line-by-line, or to order parsing out dictionary entries:

# define a function that will parse the text file
def get_webster_entries(filename):

How to open the text file and read its contents

The following script shows how to use Python’s with keyword command to open the text file and have it return an _io.BufferedReader object that can be decoded as a bytes object:

# use the open() function to open the text file
with open(filename, 'rb') as raw:
# read() method will return a bytes string
data = raw.read()

The following screenshot shows how opening the text file in an indention using with allows Python’s garbage collector to free up the system resources after the BufferedReader object is read.

Screenshot of Python's IDLE printing the data type returned by the open and read methods

How to decode the bytes string of the text file using the UTF-8 encoding

Execute the following script to call the bytes object’s decode() method as a UTF-8 encoded string and ignore any encoding errors:

# decode the file as UTF-8 and ignore errors
data = data.decode("utf-8", errors='ignore')

# split the dictionary file into list by: "rn"
data = data.split("rn")

The above script shows how to have the string object’s split() method return its data inside list object ([]) that uses the return key and newline characters ("rn") as its delimiter.

How to make an empty dict for the Webster’s Dictionary entries and definitions

Execute the following script to declare an empty dict object ({}) that will have the dictionary entries added to it and then iterate over the list of string lines from the text file using the enumerate() function:

# create an empty Python dict for the entries
dict_data = {}

# iterate over the list of dictionary terms
for num, line in enumerate(data):

How to parse the line’s string to look for a new dictionary entry

The copy of Webster’s Dictionary used in this tutorial uses an uppercase string to signal a new dictionary entry. Evaluate the string to look for any lines that are completely uppercase so that Python knows to add a new dictionary entry, as shown here:

try:
# entry titles in Webster's dict are uppercase
if len(line) < 40 and line.isupper() == True:
# reset the definition count
def_count = 1

# new entry for the dictionary
current = line.title()
dict_data[current] = {}

How to parse the line’s string and evaluate it for dictionary definitions

As shown below, be sure to increment the def_count integer object each time a new definition for the entry is found:

# add a new definition by looking for "Defn"
if "Defn:" in line:

# concatenate strings for the definitions
def_title = "Defn " + str(def_count)
def_content = line.replace("Defn: ", "")

# add the definition to the defn title key
dict_data[current][def_title] = def_content

# add to the definition count
def_count += 1

Two ways of looking for definitions in the copy of Webster’s Dictionary used in this tutorial

This public domain version of Webster’s Dictionary has two ways it delimits new definitions for a term. The first is it will have "Defn:" at the beginning of the line; the second is it will have a number and a period, e.g. "4.", in the string. Here are some examples:

# add definition by number and period
elif "." in line[:2] and line[0].isdigit():

# concatenate strings for the definitions
def_title = "Defn " + str(def_count)
def_content = line = line[line.find(".")+2:]

# make sure content for definition has some length
if len(def_content) >= 10:

# add the definition to the defn title key
dict_data[current][def_title] = def_content

# add to the definition count
def_count += 1

How to catch errors, set the definition count for each entry and return the dict object

As shown in the following examples, the definition for each count’s value must be updated for each entry after the try-catch indentation end:

except Exception as error:
# errors while iterating with enumerate()
print ("nenumerate() text file ERROR:", error)
print ("line number:", num)

try:
# update the number of definitions
dict_data[current]["definitions"] = def_count-1

except UnboundLocalError:
# ignore errors saying the dict entry hasn't been declared yet
pass

return dict_data

NOTE: Make sure to return the complete dict object, containing all of the dictionary entries and definitions, at the end of the function.

How to Call the get_webster_entries() Function to Get all of the Dictionary Entries

Executing the following script will call the function outside of the function indentation:

# call the function and return Webster's dict as a Python dict
dictionary = get_webster_entries("websters_dictionary.txt")

NOTE: Make sure to pass a string of the text file’s name and directory path, if it’s not in the same directory as the Python script, to its function call.

How to Create Elasticsearch Document Python Objects for the Dictionary Entries

The function in the above section should have returned a dict object containing all of the text files parsed string data. The next step is to create a Python dict object for each dictionary entry in order to make JSON document objects for the Elasticsearch Bulk API index call.

Execute the following command to set the _id counter and instantiate an empty Python list for the Elasticsearch documents:

# variable for the Elasticsearch document _id
id_num = 0

# empty list for the Elasticsearch documents
doc_list = []

How to use the dict object’s items( ) method to iterate over its key-value pairs

Each entry’s list of definitions is nested inside the key for the entry’s title. Execute the following script to begin another items() iteration for the nested key-value pairs that contain the entry’s definitions:

# iterate over the dictionary entries using items()
for entry, values in dictionary.items():

# dict object for the document's _source data
doc_source = {"entry": str(entry)}

# iterate over the dictionary values and put them into _source dict
for key, item in values.items():

The new dict object (doc_source) created inside of the outer key-value iterator is for the Elasticsearch document’s _source data. Be sure to create a new field key, for each definition, in the doc_source object inside the iterator for those definitions.

How to typecast each field’s value so Elasticsearch won’t return any ‘_mapping’ errors

It’s not necessary, nor recommended, to create a _mapping schema for the index beforehand. This is because it would require creating enough definition “fields” for all of the entries. However, the data type must be uniform for each field, in every document. The best way to accomplish this in Python is to explicitly typecast the key’s values, using the str() and int() functions respectively, with the following script:

# make sure to typecast data as string
if key != "definitions":
doc_source[str(key)] = str(item)

# typecast the "definitions" value as an integer
else:
doc_source["definitions"] = int(item)

NOTE: Do not create a JSON dictionary object for entries without definitions:

# do NOT index the Elasticsearch doc if it has no content
if doc_source["definitions"] != 0:

Make sure to increment the id_num integer counter for the document’s _id, but only if it’s an entry that has some content, as shown here:

# incremete the doc _id integer value
id_num += 1

How to nest the document’s _source data inside of another Python dict

As shown below, the final dict object for the Elasticsearch document will have its _source data nested inside of it with an _id key so Elasticsearch will know what document ID to assign:

# Elasticsearch document structure as a Python dict
doc = {
# pass the integer value for _id to outer dict
"_id": id_num,

# nest the _source data inside the outer dict object
"_source": doc_source,
}

NOTE: The Python dictionary object’s _id key can be omitted and Elasticsearch will dynamically generate an alpha-numeric _id for each document.

How to append the nested Python dict Elasticsearch document to the list

Execute the following code to append the nested dict object, representing an Elasticsearch document, to the list object instantiated earlier:

# append the nested doc dict values to list
doc_list += [ doc_source ]

This list object will be passed to the Elasticsearch library’s bulk() method after the iteration is complete so it can index all of the documents.

How to Index the Webster’s Dictionary Entries as Elasticsearch Documents using the Python Client

Execute the following script to declare a new client instance of the low-level Elasticsearch library, taking care to pass the correct values to its string parameter for the cluster:

# declare a client instance of the Python Elasticsearch client library
client = Elasticsearch("http://localhost:9200")

How to call the Elasticsearch “helpers” library’s bulk( ) method to index the list of Python documents

Execute the following script to pass the client instance, and the Python list of Elasticsearch documents, to the helpers.bulk() method call and have it return a response:

# attempt to index the dictionary entries using the helpers.bulk() method
try:
print ("nAttempting to index the dictionary entries using helpers.bulk()")

# use the helpers library's Bulk API to index list of Elasticsearch docs
response = helpers.bulk(client, doc_list, index='websters_dict', doc_type='_doc')

NOTE: The Elasticsearch document type has been deprecated since version 6.0 of Elasticsearch. Just pass the string "_doc" to the doc_type parameter to get around this issue. Also, be sure to pass a string value to the index parameter so that Elasticsearch knows the index name these documents will be added to.

How to use Python’s JSON Library to Print the API Response Returned to Elasticsearch

Execute the following script to use the dumps() method call in Python’s built-in json library. This will print the JSON response, returned by the API call to the Elasticsearch cluster, with some indentation to make it more readable:

# print the response returned by Elasticsearch
print ("helpers.bulk() RESPONSE:", json.dumps(response, indent=4))

Except `Exception` as err:

# print any errors returned by the Elasticsearch cluster
print("helpers.bulk() ERROR:", err)

Make certain to get the time difference between the end and the start of the script, to see how many seconds have elapsed, by adding the following code to the end of the script:

# print the time that has elapsed since the start of the script
print ("time elapsed:", time.time()-start_time)

How to Run the Python Script and Wait for a Response to the Bulk API to Elasticsearch

Make sure to save any changes in an IDE or text editor and then run the Python script in a terminal or command prompt window, from the script’s directory, by executing the following command:

python3 bulk_index_text.py

The entire execution run-time for the Python script, including the API response from the Elasticsearch, may take around 30 seconds. However, it may take longer depending on how large the text file is and how many documents are being indexed with the bulk() method call. An example with the run time is as follows:

Attempting to index the dictionary entries using helpers.bulk()
helpers.bulk() RESPONSE: [
97843,
[]
]
time elapsed: 26.252054929733276

As shown in the above example, the script ran for over 26 seconds as it attempted to upload and index almost 100k documents to an Elasticsearch index. Following is a screenshot:

Screenshot of Python returning a response using the Bulk API call to index Elasticsearch documents

How to use Kibana to verify that the Webster Dictionary documents successfully indexed

If the Kibana service is installed and running on the Elasticsearch cluster, it can make an HTTP request that can query different entries in the dictionary index.

The following is a GET request that looks for all of the Webster Dictionary’s entries that have three definitions:

GET websters_dict/_search?size=50
{
"query": {
"match" : {
"definitions": 3
}
}
}

Following is a screenshot:

Screenshot of the Kibana Console UI querying dictionary entries with 3 definitions

NOTE: By default, Kibana and Elasticsearch will only return up to 10 document "hits" in its JSON response to _search queries. When necessary, use the ?size= parameter in the REST header to instruct Elasticsearch to return more than 10 documents.

Indexing dictionary entries to an Elasticsearch index, using queries to look up definitions for terms, is a great way to build a database for a dictionary web, mobile or desktop GUI application.

Conclusion

This tutorial demonstrated how to open a text file in Python and iterate the data to parse and create Elasticsearch documents. The article specifically explained how to import the necessary Python and Elasticsearch libraries, use Python’s time library and define a Python Function to load, iterate and parse data. The tutorial also explained how to decode the bytes string of the text file, how to make an empty dict for the Webster’s Dictionary entries and definitions, how to catch errors and set the definition count for each entry and return the dict object. Finally, the tutorial covered how to index the Webster’s Dictionary entries as Elasticsearch documents using the Python client, print the API response returned to Elasticsearch and use Kibana to verify the Webster Dictionary documents were successfully indexed. Remember that to legally parse line-by-line in a text file index for Elasticsearch documents with Python bulk, the rights to the content being inserting into a MongoDB collection must be owned by the user or inserted with permission from the owner.

Just the Code

#!/usr/bin/env python3
#-*- coding: utf-8 -*-

# import Python's pickle library to serialize the dictionary
import pickle

# import Python's time and JSON library
import time, json

# use the elasticsearch client's helpers class for _bulk API
from elasticsearch import Elasticsearch, helpers

# keep track of how many seconds it takes
start_time = time.time()

# define a function that will parse the text file
def get_webster_entries(filename):

# use the open() function to open the text file
with open(filename, 'rb') as raw:
# read() method will return a bytes string
data = raw.read()

# decode the file as UTF-8 and ignore errors
data = data.decode("utf-8", errors='ignore')

# split the dictionary file into list by: "rn"
data = data.split("rn")

# create an empty Python dict for the entries
dict_data = {}

# iterate over the list of dictionary terms
for num, line in enumerate(data):

try:
# entry titles in Webster's dict are uppercase
if len(line) < 40 and line.isupper() == True:
# reset the definition count
def_count = 1

# new entry for the dictionary
current = line.title()
dict_data[current] = {}


# add a new definition by looking for "Defn"
if "Defn:" in line:

# concatenate strings for the definitions
def_title = "Defn " + str(def_count)
def_content = line.replace("Defn: ", "")

# add the definition to the defn title key
dict_data[current][def_title] = def_content

# add to the definition count
def_count += 1

# add definition by number and period
elif "." in line[:2] and line[0].isdigit():

# concatenate strings for the definitions
def_title = "Defn " + str(def_count)
def_content = line = line[line.find(".")+2:]

# make sure content for definition has some length
if len(def_content) >= 10:

# add the definition to the defn title key
dict_data[current][def_title] = def_content

# add to the definition count
def_count += 1

except Exception as error:
# errors while iterating with enumerate()
print ("nenumerate() text file ERROR:", error)
print ("line number:", num)

try:
# update the number of definitions
dict_data[current]["definitions"] = def_count-1

except UnboundLocalError:
# ignore errors saying the dict entry hasn't been declared yet
pass

return dict_data

# call the function and return Webster's dict as a Python dict
dictionary = get_webster_entries("websters_dictionary.txt")

# variable for the Elasticsearch document _id
id_num = 0

# empty list for the Elasticsearch documents
doc_list = []

# iterate over the dictionary entries using items()
for entry, values in dictionary.items():

# dict object for the document's _source data
doc_source = {"entry": str(entry)}

# iterate over the dictionary values and put them into _source dict
for key, item in values.items():

# make sure to typecast data as string
if key != "definitions":
doc_source[str(key)] = str(item)

# typecast the "definitions" value as an integer
else:
doc_source["definitions"] = int(item)

# do NOT index the Elasticsearch doc if it has no content
if doc_source["definitions"] != 0:

# incremete the doc _id integer value
id_num += 1

# Elasticsearch document structure as a Python dict
doc = {
# pass the integer value for _id to outer dict
"_id": id_num,

# nest the _source data inside the outer dict object
"_source": doc_source,
}

# append the nested doc dict values to list
doc_list += [ doc_source ]

# declare a client instance of the Python Elasticsearch client library
client = Elasticsearch("http://localhost:9200")

# attempt to index the dictionary entries using the helpers.bulk() method
try:
print ("nAttempting to index the dictionary entries using helpers.bulk()")

# use the helpers library's Bulk API to index list of Elasticsearch docs
response = helpers.bulk(client, doc_list, index='websters_dict', doc_type='_doc')

# print the response returned by Elasticsearch
print ("helpers.bulk() RESPONSE:", json.dumps(response, indent=4))

except Exception as err:

# print any errors returned by the Elasticsearch cluster
print("helpers.bulk() ERROR:", err)

# print the time that has elapsed since the start of the script
print ("time elapsed:", time.time()-start_time)

Pilot the ObjectRocket Platform Free!

Try Fully-Managed CockroachDB, Elasticsearch, MongoDB, PostgreSQL (Beta) or Redis.

Get Started

Keep in the know!

Subscribe to our emails and we’ll let you know what’s going on at ObjectRocket. We hate spam and make it easy to unsubscribe.