【ClickHouse系列】写入频繁时为什么易产生Too many part问题

问题现象

在发现GET_PART类型的queue非常大的情况下,在system.replication_queue表中出现以下信息:

node_nametypepostpone_reason
queue-0000000120MERGE_PARTSNot executing log entry queue-0000000120 of type MERGE_PARTS for part xxx_31402_31418_2 because part xxx_31412_31412_0 is not ready yet (log entry for that part is being processed).
queue-0000000102GET_PARTNot executing log entry queue-0000000102 for part xxx_31414_31414_0 because it is covered by part xxx_31402_31418_2 that is currently executing.
queue-0000000103GET_PARTNot executing log entry queue-0000000103 for part xxx_31413_31413_0 because it is covered by part xxx_31402_31418_2 that is currently executing.
queue-0000000104GET_PARTNot executing log entry queue-0000000104 for part xxx_31412_31412_0 because it is covered by part xxx_31402_31418_2 that is currently executing.

问题分析

queue-0000000120是个MERGE_PARTS类型的entry,要将以下part merge成新part xxx_31402_31418_2

xxx_31412_31412_0
xxx_31413_31413_0
xxx_31414_31414_0

单ck节点的background_fetches_pool_size配置为16,由于表非常多,能分给单个表去执行fetch的线程数量就很少,很多的GET_PART类型的entry都不能通过shouldExecuteLogEntry验证(这里会判断是不是有空闲的fetch线程给当前的entry使用),比如这个entry是queue-0000000102,那么会报错返回:Not executing fetch of part xxx_24140_24140_0 because 16 fetches already executing, max 16.这样就会阻塞这个entry的执行。

但是merge的执行是另一个线程池(由background_pool_size配置)分配资源,queue-0000000120就可以通过shouldExecuteLogEntry验证,调用processQueueEntry去执行merge,并且这时会将merge的这个entry移动到queue的队尾(如果能够通过shouldExecuteLogEntry验证就会将entry移动到队尾并执行entry,如果没有通过验证,entry在queue中的位置是不会移动的)。

由于调度线程总数(由background_schedule_pool_size配置)不止一个,并发时,会使merge和fetch的任务产生短暂或长时间的死锁。

情况一:

A调度线程获取了merge的entry,B调度线程会去尝试在queue中再取出一个entry执行(虽然每个调度线程在操作queue时要获取锁,但是执行任何entry都是需要时间的,是可以形成并发处理entry的场景),A调度线程获取到的merge的entry被移动到了队尾,B调度线程遍历queue时会优先取到fetch的entry,当entry通过验证,那么entry中标识的未来要产生的part就会被放入一个future_parts集合中(例如queue-0000000102中就是xxx_24140_24140_0queue-0000000120中就是xxx_31402_31418_2)。当part写入频率非常高时,merge entry和fetch entry都将part加入到future_parts中后,再去执行shouldExecuteLogEntry验证,那两个entry都无法执行,都失败后,等待下一轮调度线程再次进行获取执行。这种情况可能会使merge推迟一段时间才会执行,进而导致part不能及时merge。

情况二:

如果fetch的entry所要获取的part在本地丢失了,并且其他节点也没有(提交part时请求zk都正常,但session过期没有接到OK返回那这个part就丢失了,虽然有retry机制也不能完全保证成功),那fetch永远不能执行成功,对于merge entry来说,source part在本地一直找不到,那就需要尝试去fetch,而且其他节点也没有这个part,那么merge也不能成功。例如merge需要xxx_24140_24140_0,但是xxx_24140_24140_0在本地找不到,那merge就会尝试去其他副本节点去fetch xxx_31402_31418_2。所以并发交错执行过程中,依然会出现Not executing log entry queue-0000000120 of type MERGE_PARTS for part xxx_31402_31418_2 because part xxx_31412_31412_0 is not ready yet (log entry for that part is being processed). 这种错误。

具体相关代码

方法调用链如下:

DB::IBackgroundJobExecutor::jobExecutingTask()
└── DB::BackgroundJobsExecutor::getBackgroundJob()└── DB::StorageReplicatedMergeTree::getDataProcessingJob()└── DB::ReplicatedMergeTreeQueue::selectEntryToProcess()└── DB::ReplicatedMergeTreeQueue::shouldExecuteLogEntry()

selectEntryToProcess

ReplicatedMergeTreeQueue::SelectedEntryPtr ReplicatedMergeTreeQueue::selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data)
{LogEntryPtr entry;std::lock_guard lock(state_mutex);for (auto it = queue.begin(); it != queue.end(); ++it){if ((*it)->currently_executing)continue;/// 验证entry是否能执行,如果能执行,将entry放入queue尾部,如果验证不通过,将记录推迟次数、推迟时间点和推迟原因if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger_mutator, data, lock)){entry = *it;queue.splice(queue.end(), queue, it);break;}else{/// num_postponed指shouldExecuteLogEntry验证不通过的次数++(*it)->num_postponed;(*it)->last_postpone_time = time(nullptr);}}/// 如果验证可以执行,entry不为null,并创建SelectedEntry,该entry未来要产生的parts会被放入queue.future_parts中if (entry)/// 构造CurrentlyExecuting对象,为了记录entry是否正在执行,还包含入future_partsreturn std::make_shared(entry, std::unique_ptr{ new CurrentlyExecuting(entry, *this) });elsereturn {};
}

CurrentlyExecuting

ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_): entry(entry_), queue(queue_)
{if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE){assert(!queue.currently_executing_drop_or_replace_range);queue.currently_executing_drop_or_replace_range = true;}/// 标记正在执行,CurrentlyExecuting析构的时候会赋值falseentry->currently_executing = true;/// 记录entry在通过验证后,可以执行时失败的次数++entry->num_tries;entry->last_attempt_time = time(nullptr);for (const String & new_part_name : entry->getBlockingPartNames()){/// 这里会将entry中未来要产生的part放入future_parts中if (!queue.future_parts.emplace(new_part_name, entry).second)throw Exception("Tagging already tagged future part " + new_part_name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);}
}

shouldExecuteLogEntry

bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(const LogEntry & entry,String & out_postpone_reason,MergeTreeDataMergerMutator & merger_mutator,MergeTreeData & data,std::lock_guard & state_lock) const
{if (entry.type == LogEntry::MERGE_PARTS|| entry.type == LogEntry::GET_PART|| entry.type == LogEntry::MUTATE_PART){/// getBlockingPartNames()会获取到virtual_parts集合,这里解释下virtual_parts,current_parts/// virtual_parts=current_parts+queue/// current_parts:存储Committed状态的part/// virtual_parts:存储current_parts+queue中每个entry的new_part_name所指的parts的总和for (const String & new_part_name : entry.getBlockingPartNames()){/// 如果virtual_parts中的part出现在queue.future_parts中,会返回false,推迟到下一个周期执行/// 也就是未来要产生成part已经在uture_parts中,说明相关的entry已经在执行了,本entry放弃执行if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock))return false;}}/// 如果是GET_PART类型就要判断fetch线程池是否满了,如果被占满了会返回false,推迟到下一个周期执行/// 压力大的情况下,16(默认)个fetch线程会被占满,再来的entry就会被延期,导致很多fetch不能执行if (entry.type == LogEntry::GET_PART && !storage.canExecuteFetch(entry, out_postpone_reason)){return false;}if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::MUTATE_PART){size_t sum_parts_size_in_bytes = 0;for (const auto & name : entry.source_parts){/// 在MERGE_PARTS或者MUTATE_PART类型中,如果future_parts中包含merge所需要的source part,/// 那说明这个part可能正在fetch中,也有可能是某个merge将要产生的part/// 所以merge或mutation选择放弃执行,延迟到下一个周期,给需要的part一些生成时间。if (future_parts.count(name)){const char * format_str = "Not executing log entry {} of type {} for part {} ""because part {} is not ready yet (log entry for that part is being processed).";LOG_TRACE(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name, name);out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name, name);return false;}......}....../// 通过配置execute_merges_on_single_replica_time_threshold>0可以只在一个副本节点做merge,其余节点fetch/// 默认值为0,所有节点都可以做mergeif (merge_strategy_picker.shouldMergeOnSingleReplica(entry)){auto replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMerge(entry);if (replica_to_execute_merge && !merge_strategy_picker.isMergeFinishedByReplica(replica_to_execute_merge.value(), entry)){String reason = "Not executing merge for the part " + entry.new_part_name+  ", waiting for " + replica_to_execute_merge.value() + " to execute merge.";out_postpone_reason = reason;return false;}}UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge(): merger_mutator.getMaxSourcePartSizeForMutation();/** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed),* then ignore value returned by getMaxSourcePartsSizeForMerge() and execute merge of any size,* because it may be ordered by OPTIMIZE or early with different settings.* Setting max_bytes_to_merge_at_max_space_in_pool still working for regular merges,* because the leader replica does not assign merges of greater size (except OPTIMIZE PARTITION and OPTIMIZE FINAL).*/const auto data_settings = data.getSettings();bool ignore_max_size = false;if (entry.type == LogEntry::MERGE_PARTS){......}......}......return true;
}

附:

insertUnlocked

将获取到的log入queue

void ReplicatedMergeTreeQueue::insertUnlocked(const LogEntryPtr & entry, std::optional & min_unprocessed_insert_time_changed,std::lock_guard & state_lock)
{/// 将entry中的未来要产生的part添加到virtual_parts中/// 无论是GET_PART还是MERGE_PARTS类型都会有new_part_name变量,即未来要产生的part的名字for (const String & virtual_part_name : entry->getVirtualPartNames()){virtual_parts.add(virtual_part_name);addPartToMutations(virtual_part_name);}/// 将DROP_PARTITION类型的entry放在queue的开头,queue前面fetch了好多parts,最后被该entry又给删除了/// 其他类型直接追加到queue尾部if (entry->type != LogEntry::DROP_RANGE)queue.push_back(entry);elsequeue.push_front(entry);/// 如果是GET_PART类型会记录min_unprocessed_insert_time,即最小的入queue时间不包含处理entry的时间/// min_unprocessed_insert_time是个时间戳,并不是耗时统计,即entry入queue成功的时间点/// zk上对应副本下也会存储min_unprocessed_insert_timeif (entry->type == LogEntry::GET_PART){inserts_by_time.insert(entry);if (entry->create_time && (!min_unprocessed_insert_time || entry->create_time < min_unprocessed_insert_time)){min_unprocessed_insert_time = entry->create_time;min_unprocessed_insert_time_changed = min_unprocessed_insert_time;}}if (entry->type == LogEntry::ALTER_METADATA){LOG_TRACE(log, "Adding alter metadata version {} to the queue", entry->alter_version);alter_sequence.addMetadataAlter(entry->alter_version, state_lock);}
}

Part 20190607_2_2_0 from own log doesn’t exist.

Found parts with the same min block and with the same max block as the missing part 20190607_0_4_2. Hoping that it will eventually appear as a result of a merge.


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部