Use Python To Index Files Into Elasticsearch - Index All Files in a Directory (Part 2)

Have a Database Problem? Speak with an Expert for Free
Get Started >>

Introduction

Part one of this series demonstrated how you can use Python to crawl for files in a directory, and then use the os library’s open() method to open the files and return their content as string data.

This article will show you how to use the helpers module, in the low-level Python client, to index the data from the files into Elasticsearch. The example code in this article will also show you how to put the name and meta data from the file into Elasticsearch fields for the documents as well.

Now let’s jump into Part 2 of how to use Python to index files into Elasticsearch, specifically indexing all files in a directory.

Prerequisites

Python 2 is now deprecated and losing support, and the code in this article has only been tested with Python 3, so it’s recommended that you use Python 3.4 or newer when executing this code. Use the following commands to check if Python 3, and its PIP package manager, are installed:

1
2
python3 -V
pip3 -V

You can use the PIP3 package manager to install the elasticsearch low-level Python client with the following command:

1
pip3 install elasticsearch

You can use the following cURL request to check if the Elasticsearch cluster is running on yourlocalhost server:

1
curl -XGET localhost:9200

Screenshot of terminal getting PIP3 response and cURL request for Elasticsearch

NOTE: The default port for Elasticsearch is 9200, but you can change this in your installation’s elasticsearch.yml file.

Make sure to import the helpers library for the Elasticsearch client. The following Python code demonstrates how to do this:

1
2
# use the elasticsearch client's helpers class for _bulk API
from elasticsearch import Elasticsearch, helpers

Iterate all of the file names returned by Python

The last article for this series demonstrated how to get all of the files in a directory (with a relative path to the Python script) using the following function:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# function that returns the full path for the script's current directory ($PWD)
def current_path():
    return os.path.dirname(os.path.realpath( __file__ ))

# default path is the script's current dir
def get_files_in_dir(self=current_path()):

    # declare empty list for files
    file_list = []

    # put a slash in dir name if needed
    if self[-1] != slash:
        self = self + slash

    # iterate the files in dir using glob
    for filename in glob.glob(self + '*.*'):

        # add each file to the list
        file_list += [filename]

    # return the list of filenames
    return file_list

NOTE: The default parameter for the function’s directory uses the current_path() function above to get the script’s current working directory if nothing is passed to it.

Here’s how you can pass a directory name to the function call to have it return all of the file names:

1
2
3
4
5
# pass a directory (relative path) to function call
all_files = get_files_in_dir("test-folder")

# total number of files to index
print ("TOTAL FILES:", len( all_files ))

Define a function that yields Elasticsearch documents from the files

The following function will yield Elasticsearch documents when it’s passed to the Elasticsearch client’s helpers.bulk() method call. At the beginning of the function you need to use an iterator, like enumerate(), to iterate over the list of file names:

1
2
3
4
5
# define a function that yields an Elasticsearch document from file data
def yield_docs(all_files):

    # iterate over the list of files
    for _id, _file in enumerate(all_files):

The code in this article uses the _id integer counter, returned by the enuerate() function iterator, for the documents’ IDs.

Get the file’s name and meta data for the Elasticsearch document fields

This next section of code inside of the function will parse the file’s data, within each iteration, to get the create and modify timestamps for the file using the os.stat() method call:

1
2
3
4
5
6
7
8
9
        # use 'rfind()' to get last occurence of slash
        file_name = _file[ _file.rfind(slash)+1:]

        # get the file's statistics
        stats = os.stat( _file )

        # timestamps for the file
        create_time = datetime.fromtimestamp( stats.st_birthtime )
        modify_time = datetime.fromtimestamp( stats.st_mtime )

Get the file’s data for the Elasticsearch document _source data

The following code nests another function call (get_data_from_text_file()) to get each file iterations respective list of string data:

1
2
3
4
5
        # get the data inside the file
        data = get_data_from_text_file( _file )

        # join the list of data into one string using return
        data = "".join( data )

The join() function will put the list of strings together into one, longer string.

Defining the Python function that uses ‘open()’ to get the file’s data

Here’s the code for the get_data_from_text_file() function from part 1 of this series:

1
2
3
4
5
6
7
8
9
10
11
12
13
def get_data_from_text_file(file):

    # declare an empty list for the data
    data = []

    # get the data line-by-line using os.open()
    for line in open(file, encoding="utf8", errors='ignore'):

        # append each line of data to the list
        data += [ str(line) ]

    # return the list of data
    return data

If goes through the file, line-by-line, to get all of the string data and puts them in a list, and then returns the list object after the iteration is complete.

Assemble the Elasticsearch document’s ‘_source’ data using a Python dictionary

Python’s dict object type is a key-value data type that can map an Elasticsearch JSON document perfectly. Here’s some code that will take a file’s meta data and put it into a Python dictionary that can later be used for the Elasticsearch document’s _source field data:

1
2
3
4
5
6
7
        # create the _source data for the Elasticsearch doc
        doc_source = {
            "file_name": file_name,
            "create_time": create_time,
            "modify_time": modify_time,
            "data": data
        }

Use a Python ‘yield’ generator to put the Elasticsearch document together

Here’s some code that passes the doc_source object to the "_source" field in a yield{} generator for each Elasticsearch document being indexed:

1
2
3
4
5
6
7
        # use a yield generator so that the doc data isn't loaded into memory
        yield {
            "_index": "my_files",
            "_type": "some_type",
            "_id": _id + 1, # number _id for each iteration
            "_source": doc_source
        }

NOTE The _id field is optional, because Elasticsearch will dynamically create an alpha-numeric ID for the document if your code doesn’t explicitly provide one. The parameter for the _id field will register the ID as a string, regardless of whether an integer, float, or string is passed to it.

Pass the file data to the Elasticsearch helpers library’s bulk() method call

The last step is to pass the yield_docs() function defined above to the client’s helpers.bulk() API method call. The following code passes the client object instance and the entire function call to the bulk() method:

1
2
3
4
5
6
7
8
9
10
try:
    # make the bulk call using 'actions' and get a response
    resp = helpers.bulk(
        client,
        yield_docs( all_files )
    )
    print ("\nhelpers.bulk() RESPONSE:", resp)
    print ("RESPONSE TYPE:", type(resp))
except Exception as err:
    print("\nhelpers.bulk() ERROR:", err)

It should return a tuple response object if the API call was successful.

Caveat about indexing images to Elasticsearch

It’s not recommended that you attempt to index image files using the code in this article. Check out the article we wrote on uploading images to an Elasticsearch index for more information on how to do that.

Conclusion

Run the Python script using the python3 command. You should get a response that resembles the following:

1
2
3
4
TOTAL FILES: 5

helpers.bulk() RESPONSE: (5, [])
RESPONSE TYPE: <class 'tuple'>

Use Kibana to verify that the files were indexed using Python

If you have Kibana installed and running on your Elasticsearch cluster you can the Kibana Console UI (found in the Dev Tools section) to make the following HTTP request to verify that the files were uploaded and indexed to the cluster:

1
2
3
4
5
6
GET my_files/_search
{
  "query": {
    "match_all": {}
  }
}

Screenshot of Kibana searching for documents in an Elasticsearch index

This concludes the series explaining how you can use Python’s built-in libraries to grep for files in a directory in order to index their data into Elasticsearch as document data. The complete code from both of the articles can be found below.

Just the Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/env python3
#-*- coding: utf-8 -*-

# import Datetime for the document's timestamp
from datetime import datetime

# import glob and os
import os, glob

# use the elasticsearch client's helpers class for _bulk API
from elasticsearch import Elasticsearch, helpers

# declare a client instance of the Python Elasticsearch library
client = Elasticsearch("http://localhost:9200")

# posix uses "/", and Windows uses ""
if os.name == 'posix':
    slash = "/" # for Linux and macOS
else:
    slash = chr(92) # '\' for Windows

def current_path():
    return os.path.dirname(os.path.realpath( __file__ ))

# default path is the script's current dir
def get_files_in_dir(self=current_path()):

    # declare empty list for files
    file_list = []

    # put a slash in dir name if needed
    if self[-1] != slash:
        self = self + slash

    # iterate the files in dir using glob
    for filename in glob.glob(self + '*.*'):

        # add each file to the list
        file_list += [filename]

    # return the list of filenames
    return file_list

def get_data_from_text_file(file):

    # declare an empty list for the data
    data = []

    # get the data line-by-line using os.open()
    for line in open(file, encoding="utf8", errors='ignore'):

        # append each line of data to the list
        data += [ str(line) ]

    # return the list of data
    return data

# pass a directory (relative path) to function call
all_files = get_files_in_dir("test-folder")

# total number of files to index
print ("TOTAL FILES:", len( all_files ))

"""
PART 2 STARTS HERE
"""

# define a function that yields an Elasticsearch document from file data
def yield_docs(all_files):

    # iterate over the list of files
    for _id, _file in enumerate(all_files):

        # use 'rfind()' to get last occurence of slash
        file_name = _file[ _file.rfind(slash)+1:]

        # get the file's statistics
        stats = os.stat( _file )

        # timestamps for the file
        create_time = datetime.fromtimestamp( stats.st_birthtime )
        modify_time = datetime.fromtimestamp( stats.st_mtime )

        # get the data inside the file
        data = get_data_from_text_file( _file )

        # join the list of data into one string using return
        data = "".join( data )

        # create the _source data for the Elasticsearch doc
        doc_source = {
            "file_name": file_name,
            "create_time": create_time,
            "modify_time": modify_time,
            "data": data
        }

        # use a yield generator so that the doc data isn't loaded into memory
        yield {
            "_index": "my_files",
            "_type": "some_type",
            "_id": _id + 1, # number _id for each iteration
            "_source": doc_source
        }

try:
    # make the bulk call using 'actions' and get a response
    resp = helpers.bulk(
        client,
        yield_docs( all_files )
    )
    print ("\nhelpers.bulk() RESPONSE:", resp)
    print ("RESPONSE TYPE:", type(resp))
except Exception as err:
    print("\nhelpers.bulk() ERROR:", err)

Pilot the ObjectRocket Platform Free!

Try Fully-Managed CockroachDB, Elasticsearch, MongoDB, PostgreSQL (Beta) or Redis.

Get Started

Keep in the know!

Subscribe to our emails and we’ll let you know what’s going on at ObjectRocket. We hate spam and make it easy to unsubscribe.