Briefly, this error occurs when the Elasticsearch Bulk Processor, which is used for bulk indexing of data, has reached its maximum capacity and has flushed its data to make room for new events. This is not necessarily an error, but an informational message indicating normal operation. However, if this message appears frequently, it may indicate that your bulk size is too small. To resolve this issue, you can increase the bulk size or the number of concurrent requests. Also, ensure that your Elasticsearch cluster has sufficient resources to handle the indexing load.
This guide will help you check for common problems that cause the log ” Bulk processor has been flushed. Accepting new events again. ” to appear. To understand the issues related to this log, read the explanation below about the following Elasticsearch concepts: plugin, bulk.
Overview
In Elasticsearch, when using the Bulk API it is possible to perform many write operations in a single API call, which increases the indexing speed. Using the Bulk API is more efficient than sending multiple separate requests. This can be done for the following four actions:
- Index
- Update
- Create
- Delete
Examples
The bulk request below will index a document, delete another document, and update an existing document.
POST _bulk { "index" : { "_index" : "myindex", "_id" : "1" } } { "field1" : "value" } { "delete" : { "_index" : "myindex", "_id" : "2" } } { "update" : {"_id" : "1", "_index" : "myindex"} } { "doc" : {"field2" : "value5"} }
Notes
- Bulk API is useful when you need to index data streams that can be queued up and indexed in batches of hundreds or thousands, such as logs.
- There is no correct number of actions or limits to perform on a single bulk call, but you will need to figure out the optimum number by experimentation, given the cluster size, number of nodes, hardware specs etc.
Log Context
Log “Bulk processor has been flushed. Accepting new events again.” classname is AnalyticsEventEmitter.java.
We extracted the following from Elasticsearch source code for those seeking an in-depth context :
IndexRequest eventIndexRequest = createIndexRequest(event); bulkProcessor.add(eventIndexRequest); if (dropEvent.compareAndSet(true; false)) { logger.warn("Bulk processor has been flushed. Accepting new events again."); } if (request.isDebug()) { listener.onResponse(new PostAnalyticsEventAction.DebugResponse(true; event)); } else {