OpenGauss线程管理-日志写线程-WALWriter

OpenGauss线程管理-日志写线程-WAlWriter

主要功能:周期性的把日志缓冲区的内容同步到磁盘上。
WAL writer 线程试图让常规后端不必写出(和 fsync)WAL 页面。 此外,它保证在提交时未立即同步到磁盘的事务提交记录(即“异步提交”)将在可知的时间内到达磁盘——碰巧,最多是 wal_writer_delay 的三倍 周期。
请注意,与共享缓冲区的 bgwriter 一样,当 walwriter 跟不上时,常规后端仍然有权发出 WAL 写入和 fsync。 这意味着 WALWriter 不是必不可少的进程,并且可以在请求时快速关闭。
因为 walwriter 的周期与保证提交异步提交事务之前的最大延迟直接相关,所以在其上加载额外的功能可能是不明智的。

术语解释

  • fsync:同步内存中所有已修改的文件数据到储存设备。
  • 信号掩码:每个进程都有一个信号掩码(signal mask),也称为信号屏蔽字,它规定了当前要屏蔽或要阻塞递送到该进程的信号集。简单地说,信号掩码是一个“位图”,其中每一位都对应着一种信号。如果位图中的某一位为1,就表示在执行当前信号集的处理程序期间相应的信号暂时被“屏蔽”或“阻塞”,使得在执行的过程中不会嵌套地响应那个信号。

WALWriter线程

路径:openGauss-server/src/gausskernel/process/postmaster/walwriter.cpp
walwriter 由 postmaster 在启动子进程完成后立即启动。 它一直保持活动状态,直到postmaster命令它终止。 正常终止是由 SIGTERM 指示的,它指示 walwriter 退出(0)。 通过 SIGQUIT 紧急终止; 像任何后端一样,walwriter 将简单地中止并在 SIGQUIT 上退出。
如果 walwriter 意外退出,postmaster 会将其视为后端崩溃:共享内存可能已损坏,因此剩余的后端应由 SIGQUIT 杀死,然后开始恢复循环。

在主线程中的使用

在GaussDbAuxiliaryThreadMain中通过thread_role进入WALWriter线程
在这里插入图片描述

代码理解

WalWriterMain

walwriter 进程的主要入口点,这是从 AuxiliaryProcessMain 调用的,它已经创建了基本执行环境,但尚未启用信号。

void WalWriterMain(void)
{sigjmp_buf local_sigjmp_buf;MemoryContext walwriter_context;sigset_t old_sig_mask;bool wrote_something = true;long times_wrote_nothing = 0;struct timespec time_to_wait;int sleep_times_counter = 0;int time_out_counter = 0;load_server_mode();if (t_thrd.xlog_cxt.server_mode == PRIMARY_MODE ||t_thrd.xlog_cxt.server_mode == NORMAL_MODE) {/** 与 WalWriterPID 不同,isWalWriterUp 用于表示 WAL 写入器线程不仅被创建,* 还被创建用于将 WAL 从 WAL 缓冲区写入磁盘。* * 我们在线程的地方需要 isWalWriterUp 而不是 WalWriterPID 来查看是否有 * WAL writer 正在运行并刷新 WAL 缓冲区。 在重做期间我们不从 WAL 缓冲区* 写入 WAL 的备用数据库中,WAL 写入器可能会被重新使用,在这种情况下,* WalWriterPID 检查是不充分的*/g_instance.wal_cxt.isWalWriterUp = true;}ereport(LOG, (errmsg("walwriter started")));if (g_instance.attr.attr_storage.walwriter_cpu_bind >= 0) {cpu_set_t walWriterSet;CPU_ZERO(&walWriterSet);CPU_SET(g_instance.attr.attr_storage.walwriter_cpu_bind, &walWriterSet);int rc = sched_setaffinity(0, sizeof(cpu_set_t), &walWriterSet);if (rc == -1) {ereport(FATAL, (errcode(ERRCODE_OPERATE_INVALID_PARAM), errmsg("Invalid attribute for thread pool."),errdetail("Current thread num %d is out of range.", g_instance.attr.attr_storage.walwriter_cpu_bind)));}}/** 正确接受或忽略postmaster可能发送给我们的信号。 * 目前我们对 SIGINT 没有特别的用途,但将其视为 SIGTERM 似乎是合理的。** 重置一些postmaster接受但不在这里的信号。*/(void)gspqsignal(SIGHUP, WalSigHupHandler);    /* 设置标志以读取配置文件*/(void)gspqsignal(SIGINT, WalShutdownHandler);  /* 请求关闭 */(void)gspqsignal(SIGTERM, WalShutdownHandler); /* 请求关闭 */(void)gspqsignal(SIGQUIT, wal_quickdie);       /* 硬崩溃时间 */(void)gspqsignal(SIGALRM, SIG_IGN);(void)gspqsignal(SIGPIPE, SIG_IGN);(void)gspqsignal(SIGUSR1, walwriter_sigusr1_handler);(void)gspqsignal(SIGUSR2, SIG_IGN); /* 不曾用过 *//** 重置一些postmaster接受但不在这里的信号。*/(void)gspqsignal(SIGCHLD, SIG_DFL);(void)gspqsignal(SIGTTIN, SIG_DFL);(void)gspqsignal(SIGTTOU, SIG_DFL);(void)gspqsignal(SIGCONT, SIG_DFL);(void)gspqsignal(SIGWINCH, SIG_DFL);/* 我们始终允许 SIGQUIT (quickdie) */sigdelset(&t_thrd.libpq_cxt.BlockSig, SIGQUIT);/** 创建一个资源所有者来跟踪我们的资源(不清楚我们是否需要这个,但最好有一个)。*/t_thrd.utils_cxt.CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Writer",THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE));/** 创建一个我们将在其中完成所有工作的内存上下文。我们这样做是为了在错误恢复期间* 重置上下文,从而避免可能的内存泄漏。 以前这段代码只在 t_thrd.top_mem_cxt * 中运行,但重置它是一个非常糟糕的主意。*/walwriter_context = AllocSetContextCreate(t_thrd.top_mem_cxt,"Wal Writer",ALLOCSET_DEFAULT_MINSIZE,ALLOCSET_DEFAULT_INITSIZE,ALLOCSET_DEFAULT_MAXSIZE);(void)MemoryContextSwitchTo(walwriter_context);/** 如果遇到异常,则在此处继续处理。** 此代码主要基于 bgwriter.c、q.v。*/int curTryCounter;int* oldTryCounter = NULL;if (sigsetjmp(local_sigjmp_buf, 1) != 0) {/** 出现任何错误后关闭所有打开的文件。 这在 Windows 上很有帮助,* 在 Windows 中保持已删除的文件打开会导致各种奇怪的错误。 * 目前尚不清楚我们在其他地方是否需要它,但不应该受到伤害。*/gstrace_tryblock_exit(true, oldTryCounter);/* 我们需要恢复当前线程的信号掩码。 */pthread_sigmask(SIG_SETMASK, &old_sig_mask, NULL);/* 由于不使用 PG_TRY,必须手动重置错误堆栈 */t_thrd.log_cxt.error_context_stack = NULL;t_thrd.log_cxt.call_stack = NULL;/* 清理时防止中断 */HOLD_INTERRUPTS();/* 向服务器日志报告错误 */EmitErrorReport();/* 中止异步 io,必须在 LWlock 释放之前 */AbortAsyncListIO();/* 释放 lsc 持有的资源*/AtEOXact_SysDBCache(false);/** 这些操作实际上只是 AbortTransaction() 的最小子集。 * 在 walwriter 中我们没有太多需要担心的资源,* 但我们确实有 LWLocks,也许还有缓冲区?*/LWLockReleaseAll();pgstat_report_waitevent(WAIT_EVENT_END);AbortBufferIO();UnlockBuffers();/* 缓冲引脚在这里释放: */ResourceOwnerRelease(t_thrd.utils_cxt.CurrentResourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, false, true);/* 我们不需要打扰其他 ResourceOwnerRelease 阶段 */AtEOXact_Buffers(false);AtEOXact_SMgr();AtEOXact_Files();AtEOXact_HashTables(false);/** 现在返回正常的顶级上下文并清除 ErrorContext 以备下次使用。*/(void)MemoryContextSwitchTo(walwriter_context);FlushErrorState();/* 在顶级上下文中刷新任何泄露的数据 */MemoryContextResetAndDeleteChildren(walwriter_context);/* 现在我们可以再次允许中断 */RESUME_INTERRUPTS();/** 出现任何错误后至少休眠 1 秒。 写入错误很可能会重复出现,* 我们不希望尽可能快地填写错误日志。*/pg_usleep(1000000L);}oldTryCounter = gstrace_tryblock_entry(&curTryCounter);/* 我们现在可以处理 ereport(ERROR) */t_thrd.log_cxt.PG_exception_stack = &local_sigjmp_buf;/** 解除封锁信号(当postmaster fork我们时,它们被封锁了)*/gs_signal_setmask(&t_thrd.libpq_cxt.UnBlockSig, NULL);(void)gs_signal_unblock_sigusr2();/** 出现任何错误后重置休眠状态。*/SetWalWriterSleeping(false);/** 宣传我们的闩锁,后端可以在我们睡觉时使用它来唤醒我们。*/g_instance.proc_base->walwriterLatch = &t_thrd.proc->procLatch;pgstat_report_appname("Wal Writer");pgstat_report_activity(STATE_IDLE, NULL);/** 永远循环*/for (;;) {pgstat_report_activity(STATE_RUNNING, NULL);/** 处理最近收到的任何请求或信号。*/if (t_thrd.walwriter_cxt.got_SIGHUP) {t_thrd.walwriter_cxt.got_SIGHUP = false;ProcessConfigFile(PGC_SIGHUP);}if (t_thrd.walwriter_cxt.shutdown_requested) {/* walwriter的正常出口在这里 */proc_exit(0); /* done */}LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);wrote_something = XLogBackgroundFlush();LWLockRelease(WALWriteLock);if (!wrote_something && ++times_wrote_nothing > g_instance.attr.attr_storage.walwriter_sleep_threshold) {/** 等待最后一个刷新条目之后的第一个条目被更新*/int lastFlushedEntry = g_instance.wal_cxt.lastWalStatusEntryFlushed;int nextStatusEntry =GET_NEXT_STATUS_ENTRY(g_instance.attr.attr_storage.wal_insert_status_entries_power, lastFlushedEntry);volatile WalInsertStatusEntry *pCriticalEntry =&g_instance.wal_cxt.walInsertStatusTable[nextStatusEntry];if (g_instance.wal_cxt.isWalWriterUp && pCriticalEntry->status == WAL_NOT_COPIED) {sleep_times_counter++;(void)pthread_mutex_lock(&g_instance.wal_cxt.criticalEntryMutex);g_instance.wal_cxt.isWalWriterSleeping = true;while (pCriticalEntry->status == WAL_NOT_COPIED && !t_thrd.walwriter_cxt.shutdown_requested) {(void)clock_gettime(CLOCK_MONOTONIC, &time_to_wait);time_to_wait.tv_nsec += g_sleep_timeout_ms * NANOSECONDS_PER_MILLISECOND;if (time_to_wait.tv_nsec >= NANOSECONDS_PER_SECOND) {time_to_wait.tv_nsec -= NANOSECONDS_PER_SECOND;time_to_wait.tv_sec += 1;}int res = pthread_cond_timedwait(&g_instance.wal_cxt.criticalEntryCV,&g_instance.wal_cxt.criticalEntryMutex, &time_to_wait);if (res == 0) {/** 我们不应该在这里爆发,因为我们可能会在关键条目之后收到条目通知。* 我们必须再次检查关键条目状态是否为 WAL_NOT_COPIED。*/continue;} else if (res == ETIMEDOUT) {time_out_counter++;} else {ereport(WARNING, (errmsg("WAL writer pthread_cond_timedwait returned error code = %d.",errno)));}/* 尽可能唤醒其他生产者以避免挂起 */WakeupWalSemaphore(&g_instance.wal_cxt.walFlushWaitLock->l.sem);WakeupWalSemaphore(&g_instance.wal_cxt.walBufferInitWaitLock->l.sem);CHECK_FOR_INTERRUPTS();}g_instance.wal_cxt.isWalWriterSleeping = false;(void)pthread_mutex_unlock(&g_instance.wal_cxt.criticalEntryMutex);time_out_counter = 0;}times_wrote_nothing = 0;}pgstat_report_activity(STATE_IDLE, NULL);}
}
其他函数
函数功能
static void wal_quickdie(SIGNAL_ARGS)在邮局主管发出 SIGQUIT 信号时发生
static void WalSigHupHandler(SIGNAL_ARGS)SIGHUP:设置标志以在下次方便时重新读取配置文件
static void WalShutdownHandler(SIGNAL_ARGS)SIGTERM:设置标志正常退出
static void walwriter_sigusr1_handler(SIGNAL_ARGS)SIGUSR1:用于锁存器唤醒


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部