Briefly, this error occurs when an operation is attempted on a shard that has been closed in Elasticsearch. This could be due to a manual closure or an automatic one triggered by Elasticsearch’s internal mechanisms. To resolve this issue, you can reopen the closed shard if it was manually closed. If it was automatically closed due to a system issue, you may need to investigate the root cause, such as insufficient resources or a network partition, and address it accordingly. Additionally, ensure that your application handles such errors gracefully to prevent disruption.
This guide will help you check for common problems that cause the log ” source shard is closed ” to appear. To understand the issues related to this log, read the explanation below about the following Elasticsearch concepts: shard, recovery, source, indices.
Overview
Data in an Elasticsearch index can grow to massive proportions. In order to keep it manageable, it is split into a number of shards. Each Elasticsearch shard is an Apache Lucene index, with each individual Lucene index containing a subset of the documents in the Elasticsearch index. Splitting indices in this way keeps resource usage under control. An Apache Lucene index has a limit of 2,147,483,519 documents.
Examples
The number of shards is set when an index is created, and this number cannot be changed later without reindexing the data. When creating an index, you can set the number of shards and replicas as properties of the index using:
PUT /sensor { "settings" : { "index" : { "number_of_shards" : 6, "number_of_replicas" : 2 } } }
The ideal number of shards should be determined based on the amount of data in an index. Generally, an optimal shard should hold 30-50GB of data. For example, if you expect to accumulate around 300GB of application logs in a day, having around 10 shards in that index would be reasonable.
During their lifetime, shards can go through a number of states, including:
- Initializing: An initial state before the shard can be used.
- Started: A state in which the shard is active and can receive requests.
- Relocating: A state that occurs when shards are in the process of being moved to a different node. This may be necessary under certain conditions, such as when the node they are on is running out of disk space.
- Unassigned: The state of a shard that has failed to be assigned. A reason is provided when this happens. For example, if the node hosting the shard is no longer in the cluster (NODE_LEFT) or due to restoring into a closed index (EXISTING_INDEX_RESTORED).
In order to view all shards, their states, and other metadata, use the following request:
GET _cat/shards
To view shards for a specific index, append the name of the index to the URL, for example:
sensor: GET _cat/shards/sensor
This command produces output, such as in the following example. By default, the columns shown include the name of the index, the name (i.e. number) of the shard, whether it is a primary shard or a replica, its state, the number of documents, the size on disk, the IP address, and the node ID.
sensor 5 p STARTED 0 283b 127.0.0.1 ziap sensor 5 r UNASSIGNED sensor 2 p STARTED 1 3.7kb 127.0.0.1 ziap sensor 2 r UNASSIGNED sensor 3 p STARTED 3 7.2kb 127.0.0.1 ziap sensor 3 r UNASSIGNED sensor 1 p STARTED 1 3.7kb 127.0.0.1 ziap sensor 1 r UNASSIGNED sensor 4 p STARTED 2 3.8kb 127.0.0.1 ziap sensor 4 r UNASSIGNED sensor 0 p STARTED 0 283b 127.0.0.1 ziap sensor 0 r UNASSIGNED
Notes and good things to know
- Having shards that are too large is simply inefficient. Moving huge indices across machines is both a time- and labor-intensive process. First, the Lucene merges would take longer to complete and would require greater resources. Moreover, moving the shards across the nodes for rebalancing would also take longer and recovery time would be extended. Thus by splitting the data and spreading it across a number of machines, it can be kept in manageable chunks and minimize risks.
- Having the right number of shards is important for performance. It is thus wise to plan in advance. When queries are run across different shards in parallel, they execute faster than an index composed of a single shard, but only if each shard is located on a different node and there are sufficient nodes in the cluster. At the same time, however, shards consume memory and disk space, both in terms of indexed data and cluster metadata. Having too many shards can slow down queries, indexing requests, and management operations, and so maintaining the right balance is critical.
How to reduce your Elasticsearch costs by optimizing your shards
Watch the video below to learn how to save money on your deployment by optimizing your shards.
Overview
In Elasticsearch, recovery refers to the process of recovering a shard when something goes wrong. Shard recoveries can take place in various circumstances, such as when a node fails and a replica shard needs to be recreated from a primary shard, when the cluster needs to relocate shards to different nodes due to a rebalancing or a change in shard allocation settings, or when restoring an index from an Elasticsearch snapshot. Alternatively, Elasticsearch can sometimes perform recoveries automatically, such as when a node restarts or disconnects and connects again. In summary, recovery can happen in the following scenarios:
- Node startup or failure (local store recovery)
- Replication of primary shards to replica shards
- Relocation of a shard to a different node in the same cluster
- Restoration of a snapshot
Planned node restart
If you are planning to restart a node, there are some actions that you can take to speed up the shard recoveries when the node has restarted. For optimal recovery speed, you should stop any indexing to the shards that are hosted on the node that is about to be restarted. Once you’ve stopped your indexing process, you can perform the following actions:
1. Disable shard allocation to prevent shards from being reallocated to other nodes while the node is restarting using the following command:
PUT _cluster/settings { "persistent": { "cluster.routing.allocation.enable": "primaries" } }
It is worth noting that by default the shard relocation process only starts after one minute and that delay can be configured with the `index.unassigned.node_left.delayed_timeout` index setting.
2. Once shard relocation is disabled, you need to flush the transaction logs (using the command below), which will ensure that all operations currently stored in the transactions log are safely committed to the Lucene index on disk. That will save you time during the restart since no operations will need to be replayed, meaning that the recovery of your shards will be faster.
POST /_flush
Note that prior to ES 8.0, this operation was called synced-flush, but it was deprecated in 7.6 and removed in 8.0.
3. At this point, you can restart your node.
4. When the node has properly restarted, you can re-enable shard allocation using the following command:
PUT _cluster/settings { "persistent": { "cluster.routing.allocation.enable": null } }
If you have several nodes to restart or you are performing a full cluster restart, you can use the same procedure. The key points to remember for speeding up the recovery process are to stop any indexing and to flush your transaction log.
While the recovery process is in progress, there are a few API calls that allow us to monitor the status of the shard recoveries:
# Check the recovery status of a specific index
GET /<index>/_recovery
# Check the recovery status of all indexes
GET /_recovery
# Check the recovery status of all indexes (more concise format)
GET _cat/recovery
Tweaking recovery speed
If you cannot stop your indexing process for whatever reason, you can still perform the same procedure. However, since new data will keep flowing in while the node is restarting, all the indexing operations will need to be replayed, which will slow down the recovery process. However, there are a few knobs that you can tune to speed this up provided you have sufficient hardware resources (CPU, RAM, network).
By default, the total inbound and outbound recovery traffic on each hot and warm data node is limited to 40 Mbps. For dedicated cold and frozen nodes, that limit ranges from 40 Mbps to 250 Mbps depending on the total amount of memory available on those nodes. These default values have been determined empirically based on the assumption that the hardware is composed of standard SSD disks and a network interface with 1 Gbps throughput.If you have superior hardware (e.g., 10 Gbps network and 100K IOPS disks), you can increase the recovery traffic limit to a higher value using the following command:
PUT /_cluster/settings { "transient": { "indices.recovery.max_bytes_per_sec": "100mb" } }
You should be very careful when changing this setting as it can harm your cluster performance if the value you set is too high. Also, there are a few other expert settings that you can tweak if you want to optimize the recovery process, but changing the defaults on these expert settings is strongly discouraged unless you know exactly what you’re doing.
Conclusion
In this guide, we have explained what the shard recovery process is and under which circumstances it kicks in. We have also reviewed a few techniques to speed up the recovery process and highlighted what you need to pay attention to when you start tweaking the default recovery settings values.
Overview
When a document is sent for indexing, Elasticsearch indexes all the fields in the format of an inverted index, but it also keeps the original JSON document in a special field called _source.
Examples
Disabling source field in the index:
PUT /api-logs?pretty { "mappings": { "_source": { "enabled": false } } }
Store only selected fields as a part of _source field:
PUT api-logs { "mappings": { "_source": { "includes": [ "*.count", "error_info.*" ], "excludes": [ "error_info.traceback_message" ] } } }
Including only selected fields using source filtering:
GET api-logs/_search { "query": { "match_all": {} }, "_source": { "includes": ["api_name","status_code", "*id"] } }
Notes
The source field brings an overhead of extra storage space but serves special purposes such as:
- Return as a part of the response when a search query is executed.
- Used for reindexing purpose, update and update_by_query operations.
- Used for highlighting, if the field is not stored, it means the field is not set as “store to true” inside the mapping.
- Allows selection of fields to be returned.
The only concern with source field is the extra storage usage on disk. But this storage space used by source field can be optimized by changing compression level to best_compression. This setting is done using index.codec parameter.
Log Context
Log “source shard is closed” class name is PeerRecoveryTargetService.java. We extracted the following from Elasticsearch source code for those seeking an in-depth context :
reestablishRecovery(request; cause.getMessage(); recoverySettings.retryDelayNetwork()); return; } if (cause instanceof AlreadyClosedException) { onGoingRecoveries.failRecovery(recoveryId; new RecoveryFailedException(request; "source shard is closed"; cause); false); return; } onGoingRecoveries.failRecovery(recoveryId; new RecoveryFailedException(request; e); true); }