ES中shard的recovery(PEER)
shard recovery调用的时机
用户创建新的index ->协调节点 ->主节点更新clusterState发布->目的节点apply clusterStateChg->InicesService.createShard
@Overridepublic IndexShard createShard(final ShardRouting shardRouting,final RecoveryState recoveryState,final PeerRecoveryTargetService recoveryTargetService,final PeerRecoveryTargetService.RecoveryListener recoveryListener,final RepositoriesService repositoriesService,final Consumer onShardFailure,final Consumer globalCheckpointSyncer,final RetentionLeaseSyncer retentionLeaseSyncer) throws IOException {Objects.requireNonNull(retentionLeaseSyncer);ensureChangesAllowed();IndexService indexService = indexService(shardRouting.index());IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);indexShard.addShardFailureCallback(onShardFailure);indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService,(type, mapping) -> {assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS:"mapping update consumer only required by local shards recovery";client.admin().indices().preparePutMapping().setConcreteIndex(shardRouting.index()) // concrete index - no name clash, it uses uuid.setType(type).setSource(mapping.source().string(), XContentType.JSON).get();}, this);return indexShard;}
此时会调用indexShard.startRecovery方法进行恢复
recovery type分为以下5种
public enum Type {EMPTY_STORE,EXISTING_STORE,PEER,SNAPSHOT,LOCAL_SHARDS}
今天说一下 PEER
1. startRecovery, 给PEER发送 START_RECOVERY消息
public static final String START_RECOVERY = "internal:index/shard/recovery/start_recovery";
public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {// create a new recovery status, and process...final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());// we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause// assertions to trip if we executed it on the same thread hence we fork off to the generic threadpool.threadPool.generic().execute(new RecoveryRunner(recoveryId));}
transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,new TransportResponseHandler() {@Overridepublic void handleResponse(RecoveryResponse recoveryResponse) {final TimeValue recoveryTime = new TimeValue(timer.time());// do this through ongoing recoveries to remove it from the collectiononGoingRecoveries.markRecoveryAsDone(recoveryId);if (logger.isTraceEnabled()) {StringBuilder sb = new StringBuilder();sb.append('[').append(request.shardId().getIndex().getName()).append(']').append('[').append(request.shardId().id()).append("] ");sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(recoveryTime).append("]\n");sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]").append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']').append("\n");sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n");sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n");sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log operations").append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]").append("\n");logger.trace("{}", sb);} else {logger.debug("{} recovery done from [{}], took [{}]", request.shardId(), request.sourceNode(),recoveryTime);}}@Overridepublic void handleException(TransportException e) {handleException.accept(e);}@Overridepublic String executor() {// we do some heavy work like refreshes in the response so fork off to the generic threadpoolreturn ThreadPool.Names.GENERIC;}@Overridepublic RecoveryResponse read(StreamInput in) throws IOException {return new RecoveryResponse(in);}})
2. PEER收到消息
2.1 获取对应的shard
private void recover(StartRecoveryRequest request, ActionListener listener) {final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());final IndexShard shard = indexService.getShard(request.shardId().id());final ShardRouting routingEntry = shard.routingEntry();if (routingEntry.primary() == false || routingEntry.active() == false) {throw new DelayRecoveryException("source shard [" + routingEntry + "] is not an active primary");}if (request.isPrimaryRelocation() && (routingEntry.relocating() == false ||routingEntry.relocatingNodeId().equals(request.targetNode().getId()) == false)) {logger.debug("delaying recovery of {} as source shard is not marked yet as relocating to {}",request.shardId(), request.targetNode());throw new DelayRecoveryException("source shard is not marked yet as relocating to [" + request.targetNode() + "]");}RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard);logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(),request.targetNode());handler.recoverToTarget(ActionListener.runAfter(listener, () -> ongoingRecoveries.remove(shard, handler)));}
2.2 执行恢复PEER端流程
/*** performs the recovery from the local engine to the target*/public void recoverToTarget(ActionListener listener) {final Closeable releaseResources = () -> IOUtils.close(resources);final ActionListener wrappedListener = ActionListener.notifyOnce(listener);try {cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {final RuntimeException e;if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on use = new IndexShardClosedException(shard.shardId(), "shard is closed and recovery was canceled reason [" + reason + "]");} else {e = new CancellableThreads.ExecutionCancelledException("recovery was canceled reason [" + reason + "]");}if (beforeCancelEx != null) {e.addSuppressed(beforeCancelEx);}IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));throw e;});final Consumer onFailure = e -> {assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[onFailure]");IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));};final boolean softDeletesEnabled = shard.indexSettings().isSoftDeleteEnabled();final SetOnce retentionLeaseRef = new SetOnce<>();runUnderPrimaryPermit(() -> {final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());if (targetShardRouting == null) {logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(),request.targetNode());throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");}assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)));}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",shard, cancellableThreads, logger);final Engine.HistorySource historySource;if (softDeletesEnabled && (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null)) {historySource = Engine.HistorySource.INDEX;} else {historySource = Engine.HistorySource.TRANSLOG;}final Closeable retentionLock = shard.acquireHistoryRetentionLock(historySource);resources.add(retentionLock);final long startingSeqNo;final boolean isSequenceNumberBasedRecovery= request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO&& isTargetSameHistory()&& shard.hasCompleteHistoryOperations("peer-recovery", historySource, request.startingSeqNo())&& (historySource == Engine.HistorySource.TRANSLOG ||(retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));// NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease,// because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's// possible there are other cases where we cannot satisfy all leases, because that's not a property we currently expect to hold.// Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery// without having a complete history.if (isSequenceNumberBasedRecovery && softDeletesEnabled && retentionLeaseRef.get() != null) {// all the history we need is retained by an existing retention lease, so we do not need a separate retention lockretentionLock.close();logger.trace("history is retained by {}", retentionLeaseRef.get());} else {// all the history we need is retained by the retention lock, obtained before calling shard.hasCompleteHistoryOperations()// and before acquiring the safe commit we'll be using, so we can be certain that all operations after the safe commit's// local checkpoint will be retained for the duration of this recovery.logger.trace("history is retained by retention lock");}final StepListener sendFileStep = new StepListener<>();final StepListener prepareEngineStep = new StepListener<>();final StepListener sendSnapshotStep = new StepListener<>();final StepListener finalizeStep = new StepListener<>();if (isSequenceNumberBasedRecovery) {logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());startingSeqNo = request.startingSeqNo();if (retentionLeaseRef.get() == null) {createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY));} else {sendFileStep.onResponse(SendFileResult.EMPTY);}} else {final Engine.IndexCommitRef safeCommitRef;try {safeCommitRef = shard.acquireSafeIndexCommit();resources.add(safeCommitRef);} catch (final Exception e) {throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);}// Try and copy enough operations to the recovering peer so that if it is promoted to primary then it has a chance of being// able to recover other replicas using operations-based recoveries. If we are not using retention leases then we// conservatively copy all available operations. If we are using retention leases then "enough operations" is just the// operations from the local checkpoint of the safe commit onwards, because when using soft deletes the safe commit retains// at least as much history as anything else. The safe commit will often contain all the history retained by the current set// of retention leases, but this is not guaranteed: an earlier peer recovery from a different primary might have created a// retention lease for some history that this primary already discarded, since we discard history when the global checkpoint// advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can// always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled// down.startingSeqNo = softDeletesEnabled? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L: 0;logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);try {final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo);final Releasable releaseStore = acquireStore(shard.store());resources.add(releaseStore);sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {try {IOUtils.close(safeCommitRef, releaseStore);} catch (final IOException ex) {logger.warn("releasing snapshot caused exception", ex);}});final StepListener deleteRetentionLeaseStep = new StepListener<>();runUnderPrimaryPermit(() -> {try {// If the target previously had a copy of this shard then a file-based recovery might move its global// checkpoint backwards. We must therefore remove any existing retention lease so that we can create a// new one later on in the recovery.shard.removePeerRecoveryRetentionLease(request.targetNode().getId(),new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC,deleteRetentionLeaseStep, false));} catch (RetentionLeaseNotFoundException e) {logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());deleteRetentionLeaseStep.onResponse(null);}}, shardId + " removing retention lease for [" + request.targetAllocationId() + "]",shard, cancellableThreads, logger);deleteRetentionLeaseStep.whenComplete(ignored -> {assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");phase1(safeCommitRef.getIndexCommit(), startingSeqNo, () -> estimateNumOps, sendFileStep);}, onFailure);} catch (final Exception e) {throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e);}}assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;sendFileStep.whenComplete(r -> {assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");// For a sequence based recovery, the target can keep its local translogprepareTargetForTranslog(shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo), prepareEngineStep);}, onFailure);prepareEngineStep.whenComplete(prepareEngineTime -> {assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]");/** add shard to replication group (shard will receive replication requests from this point on) now that engine is open.* This means that any document indexed into the primary after this will be replicated to this replica as well* make sure to do this before sampling the max sequence number in the next step, to ensure that we send* all documents up to maxSeqNo in phase2.*/runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();logger.trace("snapshot translog for recovery; current size is [{}]",shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo));final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", historySource, startingSeqNo);resources.add(phase2Snapshot);retentionLock.close();// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values// are at least as high as the corresponding values on the primary when any of these operations were executed on it.final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();final RetentionLeases retentionLeases = shard.getRetentionLeases();final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetaData().getMappingVersion();phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,retentionLeases, mappingVersionOnPrimary, sendSnapshotStep);sendSnapshotStep.whenComplete(r -> IOUtils.close(phase2Snapshot),e -> {IOUtils.closeWhileHandlingException(phase2Snapshot);onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e));});}, onFailure);// Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2final long trimAboveSeqNo = startingSeqNo - 1;sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure);finalizeStep.whenComplete(r -> {final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle timefinal SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result();final SendFileResult sendFileResult = sendFileStep.result();final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());try {wrappedListener.onResponse(response);} finally {IOUtils.close(resources);}}, onFailure);} catch (Exception e) {IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));}}
主要流程如下
1. 设置retentionLease。 用来保持recovery期间索引文件不会因merge等原因被删除
if (softDeletesEnabled && (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null)) {historySource = Engine.HistorySource.INDEX;} else {historySource = Engine.HistorySource.TRANSLOG;}final Closeable retentionLock = shard.acquireHistoryRetentionLock(historySource);
index下会增加retentionLockCount
translog下会增加 translogRefCounts
/*** Acquires a lock on soft-deleted documents to prevent them from cleaning up in merge processes. This is necessary to* make sure that all operations that are being retained will be retained until the lock is released.* This is a analogy to the translog's retention lock; see {@link Translog#acquireRetentionLock()}*/synchronized Releasable acquireRetentionLock() {assert retentionLockCount >= 0 : "Invalid number of retention locks [" + retentionLockCount + "]";retentionLockCount++;final AtomicBoolean released = new AtomicBoolean();return () -> {if (released.compareAndSet(false, true)) {releaseRetentionLock();}};}
并返回一个lambda表达式来减去引用的个数
2. isSequenceNumberBasedRecovery 判断
final boolean isSequenceNumberBasedRecovery= request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO&& isTargetSameHistory()&& shard.hasCompleteHistoryOperations("peer-recovery", historySource, request.startingSeqNo())&& (historySource == Engine.HistorySource.TRANSLOG ||(retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));
1)request.startingSeqNo需要设置
2)恢复和被恢复的shard有相同的history
history可能在这里被修改
/*** Marks an existing lucene index with a new history uuid and sets the given local checkpoint* as well as the maximum sequence number.* This is used to make sure no existing shard will recover from this index using ops based recovery.* @see SequenceNumbers#LOCAL_CHECKPOINT_KEY* @see SequenceNumbers#MAX_SEQ_NO*/public void bootstrapNewHistory(long localCheckpoint, long maxSeqNo) throws IOException {metadataLock.writeLock().lock();try (IndexWriter writer = newAppendingIndexWriter(directory, null)) {final Map map = new HashMap<>();map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));updateCommitData(writer, map);} finally {metadataLock.writeLock().unlock();}}
3)Checks if we have a completed history of operations since the given starting seqno
3 isSequenceNumberBasedRecovery为true则使用RetentionLease机制。 如果为false则使用lucene的快照功能
/*** Snapshots the most recent safe index commit from the currently running engine.* All index files referenced by this index commit won't be freed until the commit/snapshot is closed.*/public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException {final IndexShardState state = this.state; // one time volatile read// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engineif (state == IndexShardState.STARTED || state == IndexShardState.CLOSED) {return getEngine().acquireSafeIndexCommit();} else {throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");}}
List phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());phase1Files.addAll(diff.different);phase1Files.addAll(diff.missing);for (StoreFileMetaData md : phase1Files) {if (request.metadataSnapshot().asMap().containsKey(md.name())) {logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]",md.name(), request.metadataSnapshot().asMap().get(md.name()), md);} else {logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name());}phase1FileNames.add(md.name());phase1FileSizes.add(md.length());totalSizeInBytes += md.length();}
将不一致的file信息收集起来
给恢复中的node的发一个消息告知要发送的文件信息
recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep);
public static final String FILES_INFO = "internal:index/shard/recovery/filesInfo";
发送文件内容
void sendFiles(Store store, StoreFileMetaData[] files, IntSupplier translogOps, ActionListener listener) {ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest firstfinal ThreadContext threadContext = threadPool.getThreadContext();final MultiFileTransfer multiFileSender =new MultiFileTransfer(logger, threadContext, listener, maxConcurrentFileChunks, Arrays.asList(files)) {final byte[] buffer = new byte[chunkSizeInBytes];InputStreamIndexInput currentInput = null;long offset = 0;@Overrideprotected void onNewFile(StoreFileMetaData md) throws IOException {offset = 0;IOUtils.close(currentInput, () -> currentInput = null);final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);currentInput = new InputStreamIndexInput(indexInput, md.length()) {@Overridepublic void close() throws IOException {IOUtils.close(indexInput, super::close); // InputStreamIndexInput's close is a noop}};}@Overrideprotected FileChunk nextChunkRequest(StoreFileMetaData md) throws IOException {assert Transports.assertNotTransportThread("read file chunk");cancellableThreads.checkForCancel();final int bytesRead = currentInput.read(buffer);if (bytesRead == -1) {throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + offset, md.name());}final boolean lastChunk = offset + bytesRead == md.length();final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), offset, lastChunk);offset += bytesRead;return chunk;}@Overrideprotected void executeChunkRequest(FileChunk request, ActionListener listener) {cancellableThreads.checkForCancel();recoveryTarget.writeFileChunk(request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener);}@Overrideprotected void handleError(StoreFileMetaData md, Exception e) throws Exception {handleErrorOnSendFiles(store, e, new StoreFileMetaData[]{md});}@Overridepublic void close() throws IOException {IOUtils.close(currentInput, () -> currentInput = null);}};resources.add(multiFileSender);multiFileSender.start();}
public static final String FILE_CHUNK = "internal:index/shard/recovery/file_chunk";
4. 收集从恢复开始到结束产生的更新操作
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", historySource, startingSeqNo);
批量发给对端
sendBatch(readNextBatch,true,SequenceNumbers.UNASSIGNED_SEQ_NO,snapshot.totalOperations(),maxSeenAutoIdTimestamp,maxSeqNoOfUpdatesOrDeletes,retentionLeases,mappingVersion,batchedListener);
public static final String TRANSLOG_OPS = "internal:index/shard/recovery/translog_ops";
至此恢复基本完成。
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
