本篇内容主要讲解“从库的SQL线程和sql_slave_skip_counter参数分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“从库的SQL线程和sql_slave_skip_counter参数分析”吧!
一、调用流程大概如下
handle_slave_sql
->是否开启了slave_preserve_commit_order和log_slave_updates参数,开启的话需要设置提交顺序管理器
if (opt_slave_preserve_commit_order && rli->opt_slave_parallel_workers > 0 &&
opt_bin_log && opt_log_slave_updates)
commit_order_mngr= new Commit_order_manager(rli->opt_slave_parallel_workers); //order commit 管理器
rli->set_commit_order_manager(commit_order_mngr);
->如果是MTS则需要启动worker线程
if (slave_start_workers(rli, rli->opt_slave_parallel_workers, &mts_inited) != 0)//启动worker线程
{
mysql_cond_broadcast(&rli->start_cond);
mysql_mutex_unlock(&rli->run_lock);
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
"Failed during slave workers initialization");
goto err;
->检查rep table是否是事务类型的如果不是则报警告
if (!rli->is_transactional()) //是否是 table或者是file类型是table类型则支持事物
rli->report(WARNING_LEVEL, 0,
"If a crash happens this configuration does not guarantee that the relay "
"log info will be consistent");
-> 初始化 relay log 的访问位置
if (rli->init_relay_log_pos(rli->get_group_relay_log_name(),
rli->get_group_relay_log_pos(),
true, &errmsg,
1 )) //初始化 relay log 的访问位置
这个位置比较关键也就是从哪里开始读取我们的relay log。如果出现错误将会导致读取的relay log错误。
因此我们需要保证rep info的安全,如果设置了recover relay log 那么将会初始化为最新一个relay log的
开始位置,因为所有的未执行的binlog event将会从新拉取,老的relay log 已经不重要了。后面再说。
-> GTID event没有办法使用sql_slave_skip_counter 其具体含义参考:
Log_event::do_shall_skip
mysql> set global sql_slave_skip_counter=1;
ERROR 1858 (HY000): sql_slave_skip_counter can not be set when the server is running with
@@GLOBAL.GTID_MODE = ON. Instead, for each transaction that you want to skip, generate an
empty transaction with the same GTID as the transaction
进入循环 知道SQL线程被杀死
-> 进入状态stage_reading_event_from_the_relay_log
-> 进行一段skip event的判断和日志输出
GTID event没有办法使用sql_slave_skip_counter 其具体含义参考:
Log_event::do_shall_skip
mysql> set global sql_slave_skip_counter=1;
ERROR 1858 (HY000): sql_slave_skip_counter can not be set when the server is running with
@@GLOBAL.GTID_MODE = ON. Instead, for each transaction that you want to skip, generate an
empty transaction with the same GTID as the transaction
-> exec_relay_log_event 读取应用 一个event的上层接口
->next_event 读取下一个Event 完成MTS的检查点
->获取开始位置 rli->set_event_start_pos(my_b_tell(cur_log));
->Log_event::read_log_event
->如果是MTS 是否需要进行检查点
1、是否超过检查点周期
周期检查在函数mts_checkpoint_routine内部
set_timespec_nsec(&curr_clock, 0);
ulonglong diff= diff_timespec(&curr_clock, &rli->last_clock);
if (!force && diff < period)
{
DBUG_RETURN(FALSE);
}
2、是否已经GAQ已经满了
bool force= (rli->checkpoint_seqno > (rli->checkpoint_group - 1)); //如果达到了 GAQ的大小 设置为force 强制checkpoint
->是否relay log 大小已经达到最大 是否需要relay log切换
但是需要注意如果本事物没有结束不能进行切换
if (rli->log_space_limit &&
rli->log_space_limit < rli->log_space_total)
{
if (!rli->is_parallel_exec())
{
rli->sql_force_rotate_relay= !rli->is_in_group(); //如果不是一组就需要切换
}
else
{
rli->sql_force_rotate_relay=
(rli->mts_group_status != Relay_log_info::MTS_IN_GROUP);
}
rli->ignore_log_space_limit= true;//是一组 不能切换
}
```
->如果读取了当前relay log的全部的relay log event,
->如果是当前relay log
->空闲状态下等待io 线程的唤醒,如果是MTS还需要定期醒来进行检查点,如下:
```
if (rli->is_parallel_exec() && (opt_mts_checkpoint_period != 0 ||
DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0)))
{
int ret= 0;
struct timespec waittime;
ulonglong period= static_cast<ulonglong>(opt_mts_checkpoint_period * 1000000ULL);
ulong signal_cnt= rli->relay_log.signal_cnt;
mysql_mutex_unlock(log_lock);
do
{
(void) mts_checkpoint_routine(rli, period, false, true); // TODO: ALFRANIO ERROR
mysql_mutex_lock(log_lock);
if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0))
period= 10000000ULL;
set_timespec_nsec(&waittime, period);
ret= rli->relay_log.wait_for_update_relay_log(thd, &waittime);
} while ((ret == ETIMEDOUT || ret == ETIME) &&
signal_cnt == rli->relay_log.signal_cnt && !thd->killed);
}
else
{
rli->relay_log.wait_for_update_relay_log(thd, NULL); //等待relay log 更改的信号 SQL THREAD 会等待在这里
}
```
-> 如果不是当前relay log 那么 SQL线程应用或者分发完成完成后就可以清理了
并且参数relay_log_purge需要设置为1
if (rli->relay_log.purge_first_log
(rli,
rli->get_group_relay_log_pos() == rli->get_event_relay_log_pos()
&& !strcmp(rli->get_group_relay_log_name(),rli->get_event_relay_log_name())))//做relay log的清理
-> 如果是单SQL现成 获取event的时间
这一步 就是获取计算延迟的重要因素,但是注意MTS不是在这里实在检查点里面
last_master_timestamp
```
rli->last_master_timestamp= ev->common_header->when.tv_sec + //event header 的timestamp
(time_t) ev->exec_time; //获取event的 timestamp作为 计算last_master_timestamp的基础数据 query event才有的执行时间
DBUG_ASSERT(rli->last_master_timestamp >= 0); //但是对于MTS来讲应该注意是最后一个XID EVENT的 时间不是这里设置的 在mts_checkpoint_routine里面
```
-> 如果GITD_MODE 且AUTO_POSITION 且是MTS需要由协调线程进行半事物的恢复 (partial transaction)
构造回滚EVENT进行恢复,而对已非MTS会在gtid event做回滚。
这种情况可能出现在:
- AUTO_POSITION情况下如果重连,会重新发送已经传输的Event。
- AUTO_POSITION情况下如果从库异常宕机重启,并且recovery_relay_log=0的情况下,会重新发送已经传输的Event,并且relay log pos不会重置
因此我们前面在IO线程和DUMP线程中已经讨论了,每次sql线程的启动都会通过GTID去重新寻找需要拉取的
位置。
coord_handle_partial_binlogged_transaction(rli, ev)
-> apply_event_and_update_pos 非MTS 完成 应用 MTS完成分发
-> 进行skip event操作
-> 维护skip counter计数器
if (reason == Log_event::EVENT_SKIP_COUNT)
{
--rli->slave_skip_counter;//维护skip count
skip_event= TRUE;
}
我们看到slave_skip_counter是以event为单位的,但是对于最后一个event如果跨事务了
那么整个事物都需要跳过。但是skip在GTID模式下是不能用的。
-> 如果不能跳过的事务 就需要应用了。MTS则完成分发
->完成延迟应用逻辑
sql_delay_event(ev, thd, rli)
->ev->apply_event(rli); 这里单SQL线程应用 MTS完成分发,分发方式参考前面
->是否是进行 MTS recovery if (rli->is_mts_recovery())
根据 bitmap 设置进行跳过处理
if (rli->is_mts_recovery())//如果是恢复 这个地方就是前面恢复扫描出来的位置
{
bool skip=
bitmap_is_set(&rli->recovery_groups, rli->mts_recovery_index) &&
(get_mts_execution_mode(::server_id,
rli->mts_group_status ==
Relay_log_info::MTS_IN_GROUP,
rli->current_mts_submode->get_type() ==
MTS_PARALLEL_TYPE_DB_NAME)
== EVENT_EXEC_PARALLEL);
if (skip)
{
DBUG_RETURN(0);
}
else
{
DBUG_RETURN(do_apply_event(rli));
}
}
到此,相信大家对“从库的SQL线程和sql_slave_skip_counter参数分析”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!