Briefly, this error occurs when tasks in Elasticsearch’s queue are not being processed quickly enough, causing a backlog. This could be due to high load, insufficient resources, or slow queries. To resolve this, you can optimize your queries, increase your cluster’s resources, or adjust the thread pool settings. Also, consider checking for any hardware issues that might be slowing down task processing.
This guide will help you check for common problems that cause the log ” pending task queue has been nonempty for [{}/{}ms] which is longer than the warn threshold of [{}ms]; ” to appear. To understand the issues related to this log, read the explanation below about the following Elasticsearch concepts: cluster, threshold, task, queue.
Overview
An Elasticsearch cluster consists of a number of servers (nodes) working together as one. Clustering is a technology which enables Elasticsearch to scale up to hundreds of nodes that together are able to store many terabytes of data and respond coherently to large numbers of requests at the same time.
Search or indexing requests will usually be load-balanced across the Elasticsearch data nodes, and the node that receives the request will relay requests to other nodes as necessary and coordinate the response back to the user.
Notes and good things to know
The key elements to clustering are:
Cluster State – Refers to information about which indices are in the cluster, their data mappings and other information that must be shared between all the nodes to ensure that all operations across the cluster are coherent.
Master Node – Each cluster must elect a single master node responsible for coordinating the cluster and ensuring that each node contains an up-to-date copy of the cluster state.
Cluster Formation – Elasticsearch requires a set of configurations to determine how the cluster is formed, which nodes can join the cluster, and how the nodes collectively elect a master node responsible for controlling the cluster state. These configurations are usually held in the elasticsearch.yml config file, environment variables on the node, or within the cluster state.
Node Roles – In small clusters it is common for all nodes to fill all roles; all nodes can store data, become master nodes or process ingestion pipelines. However as the cluster grows, it is common to allocate specific roles to specific nodes in order to simplify configuration and to make operation more efficient. In particular, it is common to define a limited number of dedicated master nodes.
Replication – Data may be replicated across a number of data nodes. This means that if one node goes down, data is not lost. It also means that a search request can be dealt with by more than one node.
Common problems
Many Elasticsearch problems are caused by operations which place an excessive burden on the cluster because they require an excessive amount of information to be held and transmitted between the nodes as part of the cluster state. For example:
- Shards too small
- Too many fields (field explosion)
Problems may also be caused by inadequate configurations causing situations where the Elasticsearch cluster is unable to safely elect a Master node. This situation is discussed further in:
Backups
Because Elasticsearch is a clustered technology, it is not sufficient to have backups of each node’s data directory. This is because the backups will have been made at different times and so there may not be complete coherency between them. As such, the only way to backup an Elasticsearch cluster is through the use of snapshots, which contain the full picture of an index at any one time.
Cluster resilience
When designing an Elasticsearch cluster, it is important to think about cluster resilience. In particular – what happens when a single node goes down? And for larger clusters where several nodes may share common services such as a network or power supply – what happens if that network or power supply goes down? This is where it is useful to ensure that the master eligible nodes are spread across availability zones, and to use shard allocation awareness to ensure that shards are spread across different racks or availability zones in your data center.
Overview
Elasticsearch uses several parameters to enable it to manage hard disk storage across the cluster.
What it’s used for
- Elasticsearch will actively try to relocate shards away from nodes which exceed the disk watermark high threshold.
- Elasticsearch will NOT locate new shards or relocate shards on to nodes which exceed the disk watermark low threshold.
- Elasticsearch will prevent all writes to an index which has any shard on a node that exceeds the disk.watermark.flood_stage threshold.
- The info update interval is the time it will take Elasticsearch to re-check the disk usage.
Examples
PUT _cluster/settings { "transient": { "cluster.routing.allocation.disk.watermark.low": "85%", "cluster.routing.allocation.disk.watermark.high": "90%", "cluster.routing.allocation.disk.watermark.flood_stage": "95%", "cluster.info.update.interval": "1m" } }
Notes and good things to know
- You can use absolute values (100gb) or percentages (90%), but you cannot mix the two on the same cluster.
- In general, it is recommended to use percentages, since this will work in case the disks are resized.
- You can put the cluster settings on the elasticsearch.yml of each node, but it is recommended to use the PUT _cluster/settings API because it is easier to manage, and ensures that the settings are coherent across the cluster.
- Elasticsearch comes with sensible defaults for these settings, so think twice before modifying them. If you find you are spending a lot of time fine-tuning these settings, then it is probably time to invest in new disk space.
- In the event of the flood_stage threshold being exceeded, once you delete data, Elasticsearch should detect automatically that the block can be released (bearing in mind the update interval which could be, for instance, a minute). However if you want to accelerate this process, you can unblock an index manually, with the following call:
PUT /my_index/_settings { "index.blocks.read_only_allow_delete": null }
Common problems
Inappropriate cluster settings (if the disk watermark.low is too low) can make it impossible for Elasticsearch to allocate shards on the cluster. In particular, bear in mind that these parameters work in combination with other cluster settings (for example shard allocation awareness) which cause further restraints on how Elasticsearch can allocate shards.
Overview
A task is an Elasticsearch operation, which can be any request performed on an Elasticsearch cluster, such as a delete by query request, a search request and so on. Elasticsearch provides a dedicated Task API for the task management which includes various actions, from retrieving the status of current running tasks to canceling any long running task.
Examples
Get all currently running tasks on all nodes of the cluster
Apart from other information, the response of the below request contains task IDs of all the tasks which can be used to get detailed information about the particular task in question.
GET _tasks
Get detailed information of a particular task
Where clQFAL_VRrmnlRyPsu_p8A:1132678759 is the ID of the task in below request
GET _tasks/clQFAL_VRrmnlRyPsu_p8A:1132678759
Get all the current tasks running on particular nodes
GET _tasks?nodes=nodeId1,nodeId2
Cancel a task
Where clQFAL_VRrmnlRyPsu_p8A:1132678759 is the ID of the task in the below request
POST /_tasks/clQFAL_VRrmnlRyPsu_p8A:1132678759/_cancel?pretty
Notes
- The Task API will be most useful when you want to investigate the spike of resource utilization in the cluster or want to cancel an operation.
Log Context
Log “pending task queue has been nonempty for [{}/{}ms] which is longer than the warn threshold of [{}ms];” classname is MasterService.java.
We extracted the following from Elasticsearch source code for those seeking an in-depth context :
nonemptyDurationMillis = nowMillis - nonemptySinceMillis; } final PrioritizedEsThreadPoolExecutor threadPoolExecutor = threadPoolExecutorSupplier.get(); final TimeValue maxTaskWaitTime = threadPoolExecutor.getMaxTaskWaitTime(); logger.warn("pending task queue has been nonempty for [{}/{}ms] which is longer than the warn threshold of [{}ms];" + " there are currently [{}] pending tasks; the oldest of which has age [{}/{}ms]"; TimeValue.timeValueMillis(nonemptyDurationMillis); nonemptyDurationMillis; warnThreshold; threadPoolExecutor.getNumberOfPendingTasks();