How To Bulk Index Elasticsearch Documents Using Golang

Have a Database Problem? Speak with an Expert for Free
Get Started >>

Introduction

If you’re storing documents in Elasticsearch, it’s important to know how to index large numbers of documents at a time. Fortunately, it’s easy to accomplish this task using the Bulk() API method to index Elasticsearch documents with the help of the Olivere Golang driver. In this article, we’ll provide step-by-step instructions for performing a bulk index in Elasticsearch with Golang.

Prerequisites for bulk indexing Elasticsearch documents in Golang

Before we check out the code we’ll need to perform our bulk index, let’s take a moment to review the essential prerequisites for this task:

  • First, Golang needs to be installed, and both the $GOPATH and $GOROOT need to be exported in your bash profile. You can use the go version and go env commands to make sure Golang is installed and the correct paths are set.

  • Elasticsearch also needs to be installed on the node that will be running and compiling the Golang scripts. You can use the Kibana UI or the following cURL request to verify that your Elasticsearch cluster is running:

1
curl -XGET "localhost:9200"
1
go get github.com/olivere/elastic

If Elasticsearch is not installed, Go will try to find it and install it, and you’ll see a terminal response like the following:

1
2
go: finding github.com/olivere/elastic/v7 v7.0.4
go: downloading github.com/olivere/elastic/v7 v7.0.4

Create a Go script and import the packages necessary for bulk indexing

Now that we’ve covered all the necessary prerequisites, let’s go ahead and create a new Golang script with a .go extension. We’ll import the necessary packages for the API calls to Elasticsearch and declare package main at the top of the script:

1
2
3
4
5
6
7
8
9
10
11
12
13
package main

import (
"context"
"fmt"
"log" // log errors
"reflect" // get client attributes
"strconv" // to convert ID int to string
"time" // set timeout for connection

// Import the Olivere Golang driver for Elasticsearch 7
"github.com/olivere/elastic/v7"
)

You can use either the latest default package version ( "github.com/olivere/elastic" ), or you can specify a particular version of the Olivere package driver.

Declare a struct type collection of fields for the Elasticsearch documents

Next, let’s create a new struct type that will represent the document fields of the Elasticsearch index. Please note that the struct field names used in this example will overwrite the index’s _mapping schema:

1
2
3
4
5
6
7
// Declare a struct for Elasticsearch fields
type ElasticDocs struct {
SomeStr string
SomeInt int
SomeBool bool
Timestamp int64
}

Declare the main() function and connect to Elasticsearch

We’ll need to declare the main() function for all of the code and API calls to Elasticsearch, and we’ll use the Olivere package to declare a new client instance for the Bulk API calls that are used to index documents:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {

// Allow for custom formatting of log output
log.SetFlags(0)

// Use the Olivere package to get the Elasticsearch version number
fmt.Println("Version:", elastic.Version)

// Create a context object for the API calls
ctx := context.Background()

// Declare a client instance of the Olivere driver
client, err := elastic.NewClient(
elastic.SetSniff(true),
elastic.SetURL("http://localhost:9200"),
elastic.SetHealthcheckInterval(5*time.Second), // quit trying after 5 seconds
)

Check if Elasticsearch returned any HTTP errors while Golang was connecting

It’s important to use the log.Fatalf() function call in our code; this function will quit the script if any errors were returned by Elasticsearch while attempting to connect and instantiate a client:

1
2
3
4
5
6
7
8
9
// Check and see if olivere's NewClient() method returned an error
if err != nil {
fmt.Println("elastic.NewClient() ERROR:", err)
log.Fatalf("quitting connection..")
} else {
// Print client information
fmt.Println("client:", client)
fmt.Println("client TYPE:", reflect.TypeOf(client))
}

Declare an Elasticsearch index and check if it exists on the cluster

If the connection to Elasticsearch was successful, we can declare the index’s name as a string and pass it to a string slice ( []string ). This will allow us to call the NewIndicesExistsService() method’s Do() method to find out if the index name exists:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Index name for the bulk Elasticsearch documents goes here
indexName := "some_index"

// Declare a string slice with the index name in it
indices := []string{indexName}

// Instantiate a new *elastic.IndicesExistsService
existService := elastic.NewIndicesExistsService(client)

// Pass the slice with the index name to the IndicesExistsService.Index() method
existService.Index(indices)

// Have Do() return an API response by passing the Context object to the method call
exist, err := existService.Do(ctx)

The IndicesExistsService returns a boolean indicating if the Elasticsearch index exists

If any errors were returned, or if the API response returned a value of false , then the script will not proceed to perform the bulk index in Elasticsearch:

1
2
3
4
5
6
7
// Check if the IndicesExistsService.Do() method returned any errors
if err != nil {
log.Fatalf("IndicesExistsService.Do() ERROR:", err)

} else if exist == false {
fmt.Println("nOh no! The index", indexName, "doesn't exist.")
fmt.Println("Create the index, and then run the Go script again")

Use cURL to create the index if needed

Creating the index, if it doesn’t already exist, is a simple task. Shown below is an example cURL request that can be used for creating an index. Just paste this request into a terminal window, making sure to change the domain, port, index name, and "settings" values as needed:

1
2
3
4
5
6
7
curl -XPUT 'http://localhost:9200/some_index' -H 'Content-Type: application/json' -d '
{
"settings" : {
"number_of_shards" : 3,
"number_of_replicas" : 2
}
}'

Create some Elasticsearch document struct instances

If the index in question does exist, however, you can go ahead and declare any empty slice that will be used to contain the Elasticsearch documents struct instances. You can also create some documents by passing some values to the struct’s fields during declaration:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Bulk index the Elasticsearch documents if index exists
} else if exist == true {
fmt.Println("Index name:", indexName, " exists!")

// Declare an empty slice for the Elasticsearch document struct objects
docs := []ElasticDocs{}

// Get the type of the 'docs' struct slice
fmt.Println("docs TYPE:", reflect.TypeOf(docs))

// New ElasticDocs struct instances
newDoc1 := ElasticDocs{SomeStr: "Hello, world!", SomeInt: 42, SomeBool: true, Timestamp: 0.0}
newDoc2 := ElasticDocs{SomeStr: "ä½ å¥½ï¼Œä¸–ç•Œï¼", SomeInt: 7654, SomeBool: false, Timestamp: 0.0}
newDoc3 := ElasticDocs{SomeStr: "Kumusta, mundo!", SomeInt: 1234, SomeBool: true, Timestamp: 0.0}

Append the Elasticsearch document structs to the Golang slice

We’ll use the append() function to add the Elasticsearch struct documents to our docs slice:

1
2
3
4
// Append the new Elasticsearch document struct objects to the slice
docs = append(docs, newDoc1)
docs = append(docs, newDoc2)
docs = append(docs, newDoc3)

Instantiate an Elasticsearch client bulk object for the Golang Index API calls

The Elasticsearch API call will be made using the client.Bulk() library. This library allows you to perform a bulk index in Elasticsearch with Golang:

1
2
// Declare a new Bulk() object using the client instance
bulk := client.Bulk()

Declare an integer value for the Elasticsearch document’s _id

If you’d like to specify an _id for the Elasticsearch documents, then you’ll need to pass a string value to the NewBulkIndexRequest() instance’s Id() method later on. Without this value, Elasticsearch will dynamically generate an alpha-numeric ID for the document automatically:

1
2
// Elasticsearch _id counter starts at 0
docID := 0

Iterate over the Golang struct documents and create a timestamp

Next, we’ll use Golang’s for keyword to iterate over the struct documents stored on the slice, and we’ll use range to figure out the length of the iteration:

1
2
3
4
5
6
7
8
9
10
11
12
// Iterate over the slice of Elasticsearch documents
for _, doc := range docs {

// Incrementally change the _id number in each iteration
docID++

// Convert the _id integer into a string
idStr := strconv.Itoa(docID)

// Create a new int64 float from time package for doc timestamp
doc.Timestamp = time.Now().Unix()
fmt.Println("ntime.Now().Unix():", doc.Timestamp)

Make the bulk request to index Elasticsearch documents

Instantiate a NewBulkIndexRequest() object and pass the document’s values to the object’s Doc() method. Make sure to set the OpType() as "index" if you’d like to have Golang return the API response:

1
2
3
4
5
6
7
8
9
10
// Declate a new NewBulkIndexRequest() instance
req := elastic.NewBulkIndexRequest()

// Assign custom values to the NewBulkIndexRequest() based on the Elasticsearch
// index and the request type
req.OpType("index") // set type to "index" document
req.Index(indexName)
//req.Type("_doc") // Doc types are deprecated (default now _doc)
req.Id(idStr)
req.Doc(doc)

Add the Golang index request to the bulk API object

Use the Add() method to add the request to the bulk object:

1
2
3
4
5
6
7
8
// Print information about the NewBulkIndexRequest object
fmt.Println("req:", req)
fmt.Println("req TYPE:", reflect.TypeOf(req))

// Add the new NewBulkIndexRequest() to the client.Bulk() instance
bulk = bulk.Add(req)
fmt.Println("NewBulkIndexRequest().NumberOfActions():", bulk.NumberOfActions())
}

Make the bulk request in Golang and have it return a response

At this point, we’re ready to perform our bulk index operation. Pass a context object while calling the Bulk object’s Do() method and check if it returned any errors:

1
2
3
4
5
6
7
// Do() sends the bulk requests to Elasticsearch
bulkResp, err := bulk.Do(ctx)

// Check if the Do() method returned any errors
if err != nil {
log.Fatalf("bulk.Do(ctx) ERROR:", err)
} else {

Iterate over the response returned by the Elasticsearch Bulk API

Now we’ll iterate over the response that was returned from the API call:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// If there is no error then get the Elasticsearch API response
indexed := bulkResp.Indexed()
fmt.Println("nbulkResp.Indexed():", indexed)
fmt.Println("bulkResp.Indexed() TYPE:", reflect.TypeOf(indexed))

// Iterate over the bulkResp.Indexed() object returned from bulk.go
t := reflect.TypeOf(indexed)
fmt.Println("nt:", t)
fmt.Println("NewBulkIndexRequest().NumberOfActions():", bulk.NumberOfActions())

// Iterate over the document responses
for i := 0; i < t.NumMethod(); i++ {
method := t.Method(i)
fmt.Println("nbulkResp.Indexed() METHOD NAME:", i, method.Name)
fmt.Println("bulkResp.Indexed() method:", method)
}

// Return data on the documents indexed
fmt.Println("nBulk response Index:", indexed)
for _, info := range indexed {
fmt.Println("nBulk response Index:", info)
//fmt.Println("nBulk response Index:", info.Index)
}
}
}
}

Run the Golang script to bulk index the Elasticsearch documents

We’ve finished creating our Golang script– now it’s time to run it. Navigate to the directory with the Golang script in a terminal window and use the command go run SCRIPT_NAME.go . The script should return a response similar to the following:

1
2
3
4
5
6
7
Bulk response Index: [0xc00014c380 0xc00014c400 0xc00014c480]

Bulk response Index: &{some_index _doc 1 1 created 0xc000145080 0 1 201 false <nil> <nil>}

Bulk response Index: &{some_index _doc 2 1 created 0xc0001450e0 0 1 201 false <nil> <nil>}

Bulk response Index: &{some_index _doc 3 1 created 0xc000145140 1 1 201 false <nil> <nil>}

Terminal screenshot of Elasticsearch documents indexed in Golang with Olivere

Terminal screenshot of the elastic client's BulkResponseItem in Golang

Conclusion

Being able to bulk index documents is an important facet of managing data in Elasticsearch. With the help of the Olivere driver, it’s easy to perform a bulk index in Elasticsearch with Golang. Using the instructions provided throughout this tutorial, you’ll be prepared to write your own script to bulk index documents for your Elasticsearch implementation.

Use Kibana to verify that the Golang script successfully bulk indexed Elasticsearch documents

Once you’ve run your script, it’s a good idea to verify that the documents were successfully indexed. Simply navigate to the Kibana port of your domain in a browser tab, then go the Dev Tools section and click on Console to make the following HTTP request:

1
GET some_index/_search

Screenshot of Kibana getting Elasticsearch documents Bulk indexed in Golang

Just the Code

Shown below is the bulk-indexing script in its entirety:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package main

import (
"context"
"fmt"
"log" // log errors
"reflect" // get client attributes
"strconv" // to convert ID int to string
"time" // set timeout for connection

// Import the Olivere Golang driver for Elasticsearch 7
"github.com/olivere/elastic/v7"
)

// Declare a struct for Elasticsearch fields
type ElasticDocs struct {
SomeStr string
SomeInt int
SomeBool bool
Timestamp int64
}

func main() {

// Allow for custom formatting of log output
log.SetFlags(0)

// Use the Olivere package to get the Elasticsearch version number
fmt.Println("Version:", elastic.Version)

// Create a context object for the API calls
ctx := context.Background()

// Declare a client instance of the Olivere driver
client, err := elastic.NewClient(
elastic.SetSniff(true),
elastic.SetURL("http://localhost:9200"),
elastic.SetHealthcheckInterval(5*time.Second), // quit trying after 5 seconds
)

// Check and see if olivere's NewClient() method returned an error
if err != nil {
fmt.Println("elastic.NewClient() ERROR:", err)
log.Fatalf("quiting connection..")
} else {
// Print client information
fmt.Println("client:", client)
fmt.Println("client TYPE:", reflect.TypeOf(client))
}

// Index name for the bulk Elasticsearch documents goes here
indexName := "some_index"

// Declare a string slice with the index name in it
indices := []string{indexName}

// Instantiate a new *elastic.IndicesExistsService
existService := elastic.NewIndicesExistsService(client)

// Pass the slice with the index name to the IndicesExistsService.Index() method
existService.Index(indices)

// Have Do() return an API response by passing the Context object to the method call
exist, err := existService.Do(ctx)

// Check if the IndicesExistsService.Do() method returned any errors
if err != nil {
log.Fatalf("IndicesExistsService.Do() ERROR:", err)

} else if exist == false {
fmt.Println("nOh no! The index", indexName, "doesn't exist.")
fmt.Println("Create the index, and then run the Go script again")

/*
curl -XPUT 'http://localhost:9200/some_index' -H 'Content-Type: application/json' -d '
{
"settings" : {
"number_of_shards" : 3,
"number_of_replicas" : 2
}
}'
*/


// Bulk index the Elasticsearch documents if index exists
} else if exist == true {
fmt.Println("Index name:", indexName, " exists!")

// Declare an empty slice for the Elasticsearch document struct objects
docs := []ElasticDocs{}

// Get the type of the 'docs' struct slice
fmt.Println("docs TYPE:", reflect.TypeOf(docs))

// New ElasticDocs struct instances
newDoc1 := ElasticDocs{SomeStr: "Hello, world!", SomeInt: 42, SomeBool: true, Timestamp: 0.0}
newDoc2 := ElasticDocs{SomeStr: "ä½ å¥½ï¼Œä¸–ç•Œï¼", SomeInt: 7654, SomeBool: false, Timestamp: 0.0}
newDoc3 := ElasticDocs{SomeStr: "Kumusta, mundo!", SomeInt: 1234, SomeBool: true, Timestamp: 0.0}

// Append the new Elasticsearch document struct objects to the slice
docs = append(docs, newDoc1)
docs = append(docs, newDoc2)
docs = append(docs, newDoc3)

// Declare a new Bulk() object using the client instance
bulk := client.Bulk()

// Elasticsearch _id counter starts at 0
docID := 0

// Iterate over the slice of Elasticsearch documents
for _, doc := range docs {

// Incrementally change the _id number in each iteration
docID++

// Convert the _id integer into a string
idStr := strconv.Itoa(docID)

// Create a new int64 float from time package for doc timestamp
doc.Timestamp = time.Now().Unix()
fmt.Println("ntime.Now().Unix():", doc.Timestamp)

// Declate a new NewBulkIndexRequest() instance
req := elastic.NewBulkIndexRequest()

// Assign custom values to the NewBulkIndexRequest() based on the Elasticsearch
// index and the request type
req.OpType("index") // set type to "index" document
req.Index(indexName)
//req.Type("_doc") // Doc types are deprecated (default now _doc)
req.Id(idStr)
req.Doc(doc)

// Print information about the NewBulkIndexRequest object
fmt.Println("req:", req)
fmt.Println("req TYPE:", reflect.TypeOf(req))

// Add the new NewBulkIndexRequest() to the client.Bulk() instance
bulk = bulk.Add(req)
fmt.Println("NewBulkIndexRequest().NumberOfActions():", bulk.NumberOfActions())
}

// Do() sends the bulk requests to Elasticsearch
bulkResp, err := bulk.Do(ctx)

// Check if the Do() method returned any errors
if err != nil {
log.Fatalf("bulk.Do(ctx) ERROR:", err)
} else {

// If there is no error then get the Elasticsearch API response
indexed := bulkResp.Indexed()
fmt.Println("nbulkResp.Indexed():", indexed)
fmt.Println("bulkResp.Indexed() TYPE:", reflect.TypeOf(indexed))

// Iterate over the bulkResp.Indexed() object returned from bulk.go
t := reflect.TypeOf(indexed)
fmt.Println("nt:", t)
fmt.Println("NewBulkIndexRequest().NumberOfActions():", bulk.NumberOfActions())

// Iterate over the document responses
for i := 0; i < t.NumMethod(); i++ {
method := t.Method(i)
fmt.Println("nbulkResp.Indexed() METHOD NAME:", i, method.Name)
fmt.Println("bulkResp.Indexed() method:", method)
}

// Return data on the documents indexed
fmt.Println("nBulk response Index:", indexed)
for _, info := range indexed {
fmt.Println("nBulk response Index:", info)
//fmt.Println("nBulk response Index:", info.Index)
}
}
}
}

Pilot the ObjectRocket Platform Free!

Try Fully-Managed CockroachDB, Elasticsearch, MongoDB, PostgreSQL (Beta) or Redis.

Get Started

Keep in the know!

Subscribe to our emails and we’ll let you know what’s going on at ObjectRocket. We hate spam and make it easy to unsubscribe.