Briefly, this error occurs when Elasticsearch is unable to write new segments to the disk fast enough due to high indexing load. This could be due to insufficient disk I/O capacity, or the system is under heavy load. To resolve this issue, you can consider the following: 1) Increase the disk I/O capacity; 2) Optimize your indexing operations by reducing the number of shards, using bulk requests, or increasing the refresh interval; 3) Monitor and manage the system load to ensure it’s within the capacity of your Elasticsearch cluster.
To avoid receiving the log “now throttling indexing for shard: segment writing can’t keep up” in the future, we recommend you try running the Elasticsearch Error Check-Up which can resolve issues that cause many errors.
This guide will help you check for common problems that cause the log to appear. It’s important to understand the issues related to it, so to get started, read the general overview on common issues and tips related to the Elasticsearch concepts: indexing, indices, memory and shard.
Quick overview
This log means that Elasticsearch is putting back-pressure on the indexing process. It is essential to look at this log and take appropriate actions to ensure ES doesn’t crash. Note that this is not an error message. Make sure:
- Elasticsearch can cope with your indexing requirements.
- Search performs in near real-time, as delay in indexing leads to documents becoming unavailable for search and leads to poor search and user experience.
Here are some important terms to help you understand this log message:
Shard
Each Elasticsearch index is made up of one or more shards, and each shard is an Apache Lucene index. Elasticsearch internally uses Lucene to index and search.
Segments
Every shard is made of multiple segments, and these segments are immutable for better performance (they can be cached and used in a multithreading environment).
Indexing buffer
Each index request (a document that needs to be indexed) is first written in memory called indexing buffer (every shard has its indexing buffer), and from buffer it writes to segments. Writing in the segment means writing on the disk.
Log message deep-dive
Elasticsearch reserves the 10% (default) of total heap for all indexing activities, which is shared across all the indexing shards on the nodes.
indices.memory.index_buffer_size controls the above.
It also maintains a set of throttled shards for which it can’t keep up with segment writes, and therefore it throttles the indexing.
When a particular shard takes more memory to write the segments, Elasticsearch puts that shard into the throttled set and stops the indexing for that shard, which is then logged in the same log message:
l"now throttling indexing for shard [{}]: segment writing can't keep up", largest.shard.shardId());
Troubleshooting steps
- Figure out which shard and index it’s throttling: shard ID is mentioned in the log message and using _cat/shards API you can locate the problematic index, shard and data node.
- Once you figure out the problematic index, based on your requirements you can fine-tune its indexing requirement. For example, if it’s bulk migration, disable refresh_interval.
- If there is far less search traffic and yours is a write-heavy system, then you can allocate more heap for indexing by increasing the default % of indices.memory.index_buffer_size to a bigger size.
- You can increase your RAM of data nodes and increase JVM of Elasticsearch to max 31GB for optimal performance.
Note: For detailed troubleshooting, you can contact Opster’s community support team, as this process can require more details and live debugging.
Overview
Indexing is the process of adding documents to and updating documents on an Elasticsearch index.
Examples
In its simplest form, you can index a document like this:
POST /test/_doc { "message": "Opster Rocks Elasticsearch Management" }
This will create the index “test” (if it doesn’t already exist) and add a document with the source equal to the body of the POST call. In this case, the ID will be created automatically. If you repeat this command, a second document will be created with an identical source but a different ID.
Alternatively, you can do this:
PUT /test/_doc/1 { "message": "Opster Elasticsearch Management and Troubleshooting" }
This is almost the same, but in this case, the call sets the ID of the document to 1. If you repeat the command modifying the message, you will modify the original document, replacing the previous source with the latest source.
However note that this is NOT the same as an UPDATE operation, which is a different API and allows us to modify certain fields of the document while leaving others unchanged.
Notes and good things to know
You can set your own ID if necessary (especially if you later need to update the same ID) but this comes at a performance penalty. If you don’t need to update documents, then let Elasticsearch set its own ID automatically.
If you need to index many documents at once, it is much more efficient to use the BULK API to carry out these operations with a single call.
Indexing is not an immediate automatic process. Documents will not be available for search until the index has refreshed. Refresh time by default is 1 second. Increasing this time reduces the burden on the cluster of indexing, increasing indexing speed. It is possible to modify the refresh time in the index settings.
You can apply version control by setting the version parameter (?version=3) and indicating version_type=external. By doing this Elasticsearch will reject any index requests where the version specified is less than the current version. This can be useful when running distributed processes and you cannot guarantee that updated documents arrive in the correct order.
PUT test/_doc/1?version=20&version_type=external { "message" : "using external version the document will be modified only if version is greater than previous!" }
The process of indexing is as follows
The index request is sent to the primary shard. Once the primary shard is updated, then the replication process request will be relayed to the replica shards. The command will not return until the primary shard (at least) has been updated. For greater resilience, you can specify a minimum number of shard replicas to be available before proceeding with the operation by using the parameter ?wait_for_active_shards=2
You can also specify which specific shard the index operation is sent to by using the “routing” command. There are 2 reasons that this might be done:
- Certain Elasticsearch functions (parent-child documents) that require that the parent and child documents be held on the same shard.
- Secondly, it may be possible to increase search speeds and reduce load on Elasticsearch by storing similar documents together on the same shard and then specifying the routing for both indexing and searching. Although this can be done explicitly during indexing, it is not recommended. It would be preferable to set this up using the index mapping, so that the routing is determined by an ID value on the source document.
Overview
Data in an Elasticsearch index can grow to massive proportions. In order to keep it manageable, it is split into a number of shards. Each Elasticsearch shard is an Apache Lucene index, with each individual Lucene index containing a subset of the documents in the Elasticsearch index. Splitting indices in this way keeps resource usage under control. An Apache Lucene index has a limit of 2,147,483,519 documents.
Examples
The number of shards is set when an index is created, and this number cannot be changed later without reindexing the data. When creating an index, you can set the number of shards and replicas as properties of the index using:
PUT /sensor { "settings" : { "index" : { "number_of_shards" : 6, "number_of_replicas" : 2 } } }
The ideal number of shards should be determined based on the amount of data in an index. Generally, an optimal shard should hold 30-50GB of data. For example, if you expect to accumulate around 300GB of application logs in a day, having around 10 shards in that index would be reasonable.
During their lifetime, shards can go through a number of states, including:
- Initializing: An initial state before the shard can be used.
- Started: A state in which the shard is active and can receive requests.
- Relocating: A state that occurs when shards are in the process of being moved to a different node. This may be necessary under certain conditions, such as when the node they are on is running out of disk space.
- Unassigned: The state of a shard that has failed to be assigned. A reason is provided when this happens. For example, if the node hosting the shard is no longer in the cluster (NODE_LEFT) or due to restoring into a closed index (EXISTING_INDEX_RESTORED).
In order to view all shards, their states, and other metadata, use the following request:
GET _cat/shards
To view shards for a specific index, append the name of the index to the URL, for example:
sensor: GET _cat/shards/sensor
This command produces output, such as in the following example. By default, the columns shown include the name of the index, the name (i.e. number) of the shard, whether it is a primary shard or a replica, its state, the number of documents, the size on disk, the IP address, and the node ID.
sensor 5 p STARTED 0 283b 127.0.0.1 ziap sensor 5 r UNASSIGNED sensor 2 p STARTED 1 3.7kb 127.0.0.1 ziap sensor 2 r UNASSIGNED sensor 3 p STARTED 3 7.2kb 127.0.0.1 ziap sensor 3 r UNASSIGNED sensor 1 p STARTED 1 3.7kb 127.0.0.1 ziap sensor 1 r UNASSIGNED sensor 4 p STARTED 2 3.8kb 127.0.0.1 ziap sensor 4 r UNASSIGNED sensor 0 p STARTED 0 283b 127.0.0.1 ziap sensor 0 r UNASSIGNED
Notes and good things to know
- Having shards that are too large is simply inefficient. Moving huge indices across machines is both a time- and labor-intensive process. First, the Lucene merges would take longer to complete and would require greater resources. Moreover, moving the shards across the nodes for rebalancing would also take longer and recovery time would be extended. Thus by splitting the data and spreading it across a number of machines, it can be kept in manageable chunks and minimize risks.
- Having the right number of shards is important for performance. It is thus wise to plan in advance. When queries are run across different shards in parallel, they execute faster than an index composed of a single shard, but only if each shard is located on a different node and there are sufficient nodes in the cluster. At the same time, however, shards consume memory and disk space, both in terms of indexed data and cluster metadata. Having too many shards can slow down queries, indexing requests, and management operations, and so maintaining the right balance is critical.
How to reduce your Elasticsearch costs by optimizing your shards
Watch the video below to learn how to save money on your deployment by optimizing your shards.
Log Context
Log “now throttling indexing for shard [{}]: segment writing can’t keep up” classname is IndexingMemoryController.java.
We extracted the following from Elasticsearch source code for those seeking an in-depth context :
ByteSizeValue.ofBytes(largest.bytesUsed) ); writeIndexingBufferAsync(largest.shard); totalBytesUsed -= largest.bytesUsed; if (doThrottle && throttled.contains(largest.shard) == false) { logger.info("now throttling indexing for shard [{}]: segment writing can't keep up"; largest.shard.shardId()); throttled.add(largest.shard); activateThrottling(largest.shard); } } }