1150 lines
40 KiB
C++
1150 lines
40 KiB
C++
/* Copyright (c) 2020, 2024, Oracle and/or its affiliates.
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License, version 2.0,
|
|
as published by the Free Software Foundation.
|
|
|
|
This program is designed to work with certain software (including
|
|
but not limited to OpenSSL) that is licensed under separate terms,
|
|
as designated in a particular file or component or in included license
|
|
documentation. The authors of MySQL hereby grant you an additional
|
|
permission to link the program and your derivative works with the
|
|
separately licensed software that they have either included with
|
|
the program or referenced in the documentation.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License, version 2.0, for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
|
|
|
|
#include "sql/rpl_io_monitor.h"
|
|
#include <mysql/components/my_service.h>
|
|
#include <mysql/components/services/group_replication_status_service.h>
|
|
#include "mysql/components/services/log_builtins.h"
|
|
|
|
#include "sql-common/json_dom.h"
|
|
#include "sql/changestreams/apply/replication_thread_status.h"
|
|
#include "sql/mysqld.h"
|
|
#include "sql/mysqld_thd_manager.h" // Global_THD_manager
|
|
#include "sql/protocol_classic.h"
|
|
#include "sql/rpl_async_conn_failover.h" // reset_pos
|
|
#include "sql/rpl_async_conn_failover_configuration_propagation.h"
|
|
#include "sql/rpl_group_replication.h"
|
|
#include "sql/rpl_msr.h" /* Multisource replication */
|
|
#include "sql/rpl_replica.h"
|
|
#include "sql/rpl_sys_key_access.h"
|
|
#include "sql/rpl_sys_table_access.h"
|
|
#include "sql/sql_class.h" // THD
|
|
#include "sql/udf_service_util.h"
|
|
|
|
#include "my_dbug.h"
|
|
#include "my_systime.h"
|
|
|
|
#include <string>
|
|
|
|
/**
|
|
Restart the IO thread of the given channel.
|
|
|
|
@param[in] thd The running thread.
|
|
@param[in] channel_name the channel IO thread to restart.
|
|
@param[in] force_sender_with_highest_weight When true, sender with highest
|
|
weight is chosen, otherwise the next sender from the current one is chosen.
|
|
|
|
@return true if IO thread was restarted, false otherwise.
|
|
*/
|
|
static bool restart_io_thread(THD *thd, const std::string &channel_name,
|
|
bool force_sender_with_highest_weight);
|
|
|
|
/*
|
|
The SQL_QUERIES array contains three queries. The enum_sql_query_tag index/tag
|
|
is used to get each query. There are for following purpose:
|
|
|
|
1. CONFIG_MODE_QUORUM_MONITOR:
|
|
Its used by Monitor IO thread to determine if given source has Group
|
|
Replication enabled and if enabled whether member is in ONLINE or
|
|
RECOVERING state and has QUORUM.
|
|
|
|
2. CONFIG_MODE_QUORUM_IO:
|
|
Its used by IO thread to determine if given source has Group Replication
|
|
enabled and if enabled whether member is in ONLINE state and has QUORUM.
|
|
|
|
3. GR_MEMBER_ALL_DETAILS:
|
|
Its used by Monitor IO thread to get following member details:
|
|
group_name, host, port, member state and member role.
|
|
|
|
4. GR_MEMBER_ALL_DETAILS_FETCH_FOR_57:
|
|
Its used by Monitor IO thread for mysql-5.7 servers to get following
|
|
member details:
|
|
group_name, host, port, member state and member role.
|
|
In mysql-5.7 performance_schema.replication_group_members do not have
|
|
member role column but its fetched from group_replication_primary_member
|
|
status variable, when group is on single-primary mode.
|
|
|
|
5. QUERY_SERVER_SELECT_ONE:
|
|
Its used by Monitor IO thread to check single-server is in working state.
|
|
It establishes connection with single server and executes this
|
|
query to confirm that connection to SOURCE is working.
|
|
*/
|
|
static const char *SQL_QUERIES[] = {
|
|
"SELECT * FROM ( "
|
|
" SELECT CASE "
|
|
" WHEN ((SELECT count(*) from information_schema.plugins WHERE "
|
|
" PLUGIN_NAME LIKE 'group_replication') <> 1) "
|
|
" THEN (SELECT 2) "
|
|
" WHEN ((SELECT IF(((MEMBER_STATE='ONLINE') OR "
|
|
" (MEMBER_STATE='RECOVERING')) AND "
|
|
" ((SELECT COUNT(*) FROM "
|
|
" performance_schema.replication_group_members "
|
|
" WHERE MEMBER_STATE != 'ONLINE' AND MEMBER_STATE != "
|
|
"'RECOVERING') "
|
|
" >= ((SELECT COUNT(*) FROM "
|
|
" performance_schema.replication_group_members)/2)=0),1,0) "
|
|
" FROM performance_schema.replication_group_members "
|
|
" WHERE member_id=@@global.server_uuid) = 1) "
|
|
" THEN (SELECT 1) "
|
|
" ELSE (SELECT 2) "
|
|
" END AS QUORUM "
|
|
") Q ",
|
|
"SELECT * FROM ( "
|
|
" SELECT CASE "
|
|
" WHEN ((SELECT count(*) from information_schema.plugins WHERE "
|
|
" PLUGIN_NAME LIKE 'group_replication') <> 1) "
|
|
" THEN (SELECT 2) "
|
|
" WHEN ((SELECT IF(MEMBER_STATE='ONLINE' AND "
|
|
" ((SELECT COUNT(*) FROM "
|
|
" performance_schema.replication_group_members "
|
|
" WHERE MEMBER_STATE != 'ONLINE' AND MEMBER_STATE != "
|
|
"'RECOVERING') "
|
|
" >= ((SELECT COUNT(*) FROM "
|
|
" performance_schema.replication_group_members)/2)=0),1,0) "
|
|
" FROM performance_schema.replication_group_members "
|
|
" WHERE member_id=@@global.server_uuid) = 1) "
|
|
" THEN (SELECT 1) "
|
|
" ELSE (SELECT 2) "
|
|
" END AS QUORUM "
|
|
") Q ",
|
|
"SELECT @@global.group_replication_group_name, PRGM.MEMBER_HOST, "
|
|
" PRGM.MEMBER_PORT, PRGM.MEMBER_STATE, PRGM.MEMBER_ROLE "
|
|
"FROM performance_schema.replication_group_members PRGM",
|
|
"SELECT @@global.group_replication_group_name, PRGM.MEMBER_HOST, "
|
|
" PRGM.MEMBER_PORT, PRGM.MEMBER_STATE, "
|
|
" (SELECT IF(GR_SINGLE_PRIMARY_MODE.VARIABLE_VALUE = 'OFF', "
|
|
" 'PRIMARY', "
|
|
" IF(PRGM.MEMBER_ID = GR_PRIMARY_MEMBER.VARIABLE_VALUE, "
|
|
" 'PRIMARY', 'SECONDARY')) "
|
|
" FROM (SELECT VARIABLE_VALUE FROM performance_schema.global_status "
|
|
" WHERE VARIABLE_NAME = 'group_replication_primary_member') "
|
|
" GR_PRIMARY_MEMBER,"
|
|
" (SELECT VARIABLE_VALUE FROM "
|
|
" performance_schema.global_variables "
|
|
" WHERE "
|
|
" VARIABLE_NAME='group_replication_single_primary_mode') "
|
|
" GR_SINGLE_PRIMARY_MODE "
|
|
" ) MEMBER_ROLE "
|
|
"FROM performance_schema.replication_group_members PRGM",
|
|
"SELECT 1"};
|
|
|
|
MYSQL_RES_TUPLE execute_query(const Mysql_connection *conn,
|
|
enum_sql_query_tag qtag) {
|
|
int tag = static_cast<int>(qtag);
|
|
std::string query = SQL_QUERIES[tag];
|
|
return conn->execute_query(query);
|
|
}
|
|
|
|
static void *launch_handler_thread(void *arg) {
|
|
Source_IO_monitor *monitor = (Source_IO_monitor *)arg;
|
|
monitor->source_monitor_handler();
|
|
return nullptr;
|
|
}
|
|
|
|
std::string Source_IO_monitor::get_query(enum_sql_query_tag qtag) {
|
|
int tag = static_cast<int>(qtag);
|
|
std::string query = SQL_QUERIES[tag];
|
|
return query;
|
|
}
|
|
|
|
Source_IO_monitor::Source_IO_monitor() {
|
|
#ifdef HAVE_PSI_INTERFACE
|
|
mysql_mutex_init(key_monitor_info_run_lock, &m_run_lock, MY_MUTEX_INIT_FAST);
|
|
mysql_cond_init(key_monitor_info_run_cond, &m_run_cond);
|
|
#else
|
|
mysql_mutex_init(nullptr, &m_run_lock, MY_MUTEX_INIT_FAST);
|
|
mysql_cond_init(nullptr, &m_run_cond);
|
|
#endif
|
|
}
|
|
|
|
Source_IO_monitor::~Source_IO_monitor() {
|
|
terminate_monitoring_process();
|
|
mysql_mutex_destroy(&m_run_lock);
|
|
mysql_cond_destroy(&m_run_cond);
|
|
}
|
|
|
|
bool Source_IO_monitor::is_monitor_killed(THD *thd, Master_info *) {
|
|
DBUG_TRACE;
|
|
assert(m_monitor_thd == thd);
|
|
|
|
return m_abort_monitor || connection_events_loop_aborted() || thd->killed;
|
|
}
|
|
|
|
bool Source_IO_monitor::launch_monitoring_process(PSI_thread_key thread_key) {
|
|
DBUG_TRACE;
|
|
|
|
mysql_mutex_lock(&m_run_lock);
|
|
|
|
// Callers should ensure the process is terminated
|
|
assert(!m_monitor_thd_state.is_thread_alive());
|
|
if (m_monitor_thd_state.is_thread_alive()) {
|
|
mysql_mutex_unlock(&m_run_lock);
|
|
return true;
|
|
}
|
|
|
|
if (mysql_thread_create(thread_key, &m_th, &connection_attrib,
|
|
launch_handler_thread, (void *)this)) {
|
|
my_error(ER_REPLICA_THREAD, MYF(0));
|
|
mysql_mutex_unlock(&m_run_lock);
|
|
return true;
|
|
}
|
|
|
|
m_monitor_thd_state.set_created();
|
|
|
|
while (m_monitor_thd_state.is_alive_not_running()) {
|
|
DBUG_PRINT("sleep", ("Waiting for the Monitoring process thread to start"));
|
|
mysql_cond_wait(&m_run_cond, &m_run_lock);
|
|
}
|
|
mysql_mutex_unlock(&m_run_lock);
|
|
|
|
return false;
|
|
}
|
|
|
|
void Source_IO_monitor::source_monitor_handler() {
|
|
THD *thd{nullptr}; // needs to be first for thread_stack
|
|
thd = new THD; // note that constructor of THD uses DBUG_ !
|
|
m_monitor_thd = thd;
|
|
struct timespec waittime;
|
|
|
|
DBUG_TRACE;
|
|
|
|
THD_CHECK_SENTRY(thd);
|
|
my_thread_init();
|
|
|
|
#ifdef HAVE_PSI_THREAD_INTERFACE
|
|
// save the instrumentation for IO thread in mi->info_thd
|
|
struct PSI_thread *psi = PSI_THREAD_CALL(get_thread)();
|
|
thd_set_psi(thd, psi);
|
|
#endif
|
|
thd->thread_stack = (char *)&thd; // remember where our stack is
|
|
|
|
if (init_replica_thread(thd, SLAVE_THD_IO)) {
|
|
my_error(ER_REPLICA_FATAL_ERROR, MYF(0),
|
|
"Failed during Replica IO Monitor thread initialization ");
|
|
goto err;
|
|
}
|
|
|
|
thd->security_context()->skip_grants();
|
|
Global_THD_manager::get_instance()->add_thd(thd);
|
|
|
|
mysql_mutex_lock(&m_run_lock);
|
|
m_monitor_thd_state.set_running();
|
|
m_abort_monitor = false;
|
|
set_timespec(&waittime, m_retry_monitor_wait);
|
|
|
|
mysql_cond_broadcast(&m_run_cond);
|
|
mysql_mutex_unlock(&m_run_lock);
|
|
|
|
while (!is_monitor_killed(thd, nullptr) &&
|
|
!is_group_replication_member_secondary()) {
|
|
sync_senders_details(thd);
|
|
|
|
THD_STAGE_INFO(thd, stage_rpl_failover_wait_before_next_fetch);
|
|
set_timespec(&waittime, m_retry_monitor_wait);
|
|
mysql_mutex_lock(&m_run_lock);
|
|
mysql_cond_timedwait(&m_run_cond, &m_run_lock, &waittime);
|
|
mysql_mutex_unlock(&m_run_lock);
|
|
}
|
|
|
|
err:
|
|
LogErr(INFORMATION_LEVEL, ER_RPL_REPLICA_MONITOR_IO_THREAD_EXITING);
|
|
|
|
/* At this point the I/O thread will not try to reconnect anymore. */
|
|
thd->reset_query();
|
|
thd->reset_db(NULL_CSTR);
|
|
|
|
// destructor will not free it, because net.vio is 0
|
|
thd->get_protocol_classic()->end_net();
|
|
thd->release_resources();
|
|
|
|
THD_CHECK_SENTRY(thd);
|
|
Global_THD_manager::get_instance()->remove_thd(thd);
|
|
|
|
delete thd;
|
|
|
|
mysql_mutex_lock(&m_run_lock);
|
|
m_monitor_thd_state.set_terminated();
|
|
m_abort_monitor = true;
|
|
mysql_cond_broadcast(&m_run_cond);
|
|
mysql_mutex_unlock(&m_run_lock);
|
|
|
|
my_thread_end();
|
|
my_thread_exit(nullptr);
|
|
|
|
return;
|
|
}
|
|
|
|
std::tuple<bool, std::string> Source_IO_monitor::delete_rows(
|
|
Rpl_sys_table_access &table_op, TABLE *table,
|
|
std::vector<std::string> field_name,
|
|
std::tuple<std::string, std::string, uint> conn_detail) {
|
|
bool err_val{false};
|
|
std::string err_msg{};
|
|
|
|
Rpl_sys_table_access::for_each_in_tuple(
|
|
conn_detail, [&](const auto &n, const auto &x) {
|
|
if (table_op.store_field(table->field[n], x)) {
|
|
err_msg.assign(table_op.get_field_error_msg(field_name[n]));
|
|
err_val = true;
|
|
}
|
|
});
|
|
if (err_val) return std::make_tuple(err_val, err_msg);
|
|
|
|
Rpl_sys_table_access::handler_delete_row_func(table_op, err_val, err_msg);
|
|
return std::make_tuple(err_val, err_msg);
|
|
}
|
|
|
|
std::tuple<bool, std::string> Source_IO_monitor::write_rows(
|
|
Rpl_sys_table_access &table_op, TABLE *table,
|
|
std::vector<std::string> field_name,
|
|
RPL_FAILOVER_SOURCE_TUPLE conn_detail) {
|
|
bool err_val{false};
|
|
std::string err_msg{};
|
|
|
|
Rpl_sys_table_access::for_each_in_tuple(
|
|
conn_detail, [&](const auto &n, const auto &x) {
|
|
if (table_op.store_field(table->field[n], x)) {
|
|
err_msg.assign(table_op.get_field_error_msg(field_name[n]));
|
|
err_val = true;
|
|
}
|
|
});
|
|
if (err_val) return std::make_tuple(err_val, err_msg);
|
|
|
|
Rpl_sys_table_access::handler_write_row_func(table_op, err_val, err_msg);
|
|
return std::make_tuple(err_val, err_msg);
|
|
}
|
|
|
|
int Source_IO_monitor::connect_senders(THD *thd,
|
|
const std::string &channel_name) {
|
|
std::vector<SENDER_CONN_MERGE_TUPLE> failover_table_detail_list{};
|
|
std::vector<RPL_FAILOVER_SOURCE_TUPLE> source_conn_detail_list{};
|
|
bool error{false};
|
|
/* highest group failover weight for the current channel. */
|
|
uint curr_highest_group_weight{0};
|
|
/* weight of single senders for the current channel. */
|
|
uint curr_highest_weight_single_sender{0};
|
|
/* weight for current connected sender */
|
|
uint curr_conn_weight{0};
|
|
|
|
if (is_monitor_killed(thd, nullptr)) return 1;
|
|
|
|
/*
|
|
1. Get stored source details for channel from
|
|
replication_asynchronous_connection_failover table.
|
|
*/
|
|
std::tie(error, failover_table_detail_list) =
|
|
get_senders_details(channel_name);
|
|
if (error) {
|
|
return 2;
|
|
}
|
|
|
|
if (is_monitor_killed(thd, nullptr)) return 1;
|
|
|
|
/*
|
|
2. Get weight of current connected sender.
|
|
*/
|
|
{
|
|
Rpl_async_conn_failover_table_operations table_op_src(TL_READ);
|
|
|
|
std::tie(error, source_conn_detail_list) =
|
|
table_op_src.read_source_rows_for_channel(channel_name);
|
|
|
|
std::sort(source_conn_detail_list.begin(), source_conn_detail_list.end(),
|
|
[](const RPL_FAILOVER_SOURCE_TUPLE &element1,
|
|
const RPL_FAILOVER_SOURCE_TUPLE &element2) -> bool {
|
|
return std::get<4>(element1) > std::get<4>(element2);
|
|
});
|
|
|
|
channel_map.rdlock();
|
|
Master_info *mi = channel_map.get_mi(channel_name.c_str());
|
|
if (nullptr == mi) {
|
|
channel_map.unlock();
|
|
return 2;
|
|
}
|
|
const std::string mi_host{mi->host};
|
|
const uint mi_port = mi->port;
|
|
channel_map.unlock();
|
|
|
|
for (auto source_conn_detail : source_conn_detail_list) {
|
|
uint port{0}, weight{0};
|
|
std::string host{""};
|
|
|
|
std::tie(std::ignore, host, port, std::ignore, weight, std::ignore) =
|
|
source_conn_detail;
|
|
|
|
/* save weight for current connected sender */
|
|
if ((host.compare(mi_host) == 0) && (port == mi_port)) {
|
|
curr_conn_weight = weight;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
3. Connect to one of the source from group gathered in step 1, store their
|
|
connection object and get group membership details from the source.
|
|
*/
|
|
std::set<std::string> managed_name_list{};
|
|
for (auto failover_table_detail : failover_table_detail_list) {
|
|
uint primary_weight{0}, secondary_weight{0}, port{0}, weight{0};
|
|
std::string channel{}, host{}, managed_name{};
|
|
|
|
std::tie(channel, host, port, std::ignore, weight, managed_name,
|
|
primary_weight, secondary_weight) = failover_table_detail;
|
|
|
|
if (is_monitor_killed(thd, nullptr)) return 1;
|
|
|
|
/*
|
|
3.1. To get group membership details, need to connect to only one member
|
|
from the group.
|
|
*/
|
|
if (managed_name_list.find(managed_name) != managed_name_list.end())
|
|
continue;
|
|
|
|
/* 3.2. Connect to source and store its connection object. */
|
|
channel_map.rdlock();
|
|
Master_info *mi = channel_map.get_mi(channel_name.c_str());
|
|
if (nullptr == mi) {
|
|
channel_map.unlock();
|
|
return 2;
|
|
}
|
|
std::string mi_host{mi->host};
|
|
uint mi_port = mi->port;
|
|
const std::string mi_network_namespace(mi->network_namespace_str());
|
|
|
|
THD_STAGE_INFO(thd, stage_connecting_to_source);
|
|
Mysql_connection *conn =
|
|
new Mysql_connection(thd, mi, host, port, mi_network_namespace);
|
|
if (!conn->is_connected()) {
|
|
LogErr(WARNING_LEVEL, ER_RPL_ASYNC_CHANNEL_CANT_CONNECT, host.c_str(),
|
|
port, "", channel.c_str());
|
|
delete conn;
|
|
conn = nullptr;
|
|
channel_map.unlock();
|
|
continue;
|
|
}
|
|
|
|
/*
|
|
3.3. Get group membership details for ONLINE, RECOVERING, UNREACHABLE
|
|
members.
|
|
*/
|
|
THD_STAGE_INFO(thd, stage_rpl_failover_fetching_source_member_details);
|
|
std::vector<RPL_FAILOVER_SOURCE_TUPLE> group_membership_list{};
|
|
int err{0};
|
|
bool conn_member_needs_to_change{false}, conn_member_quorum_lost{false};
|
|
|
|
/* Connection details of source who lost majority. It will be used to log */
|
|
std::tuple<std::string, std::string, uint>
|
|
conn_member_quorum_lost_details{};
|
|
|
|
std::tie(err, conn_member_needs_to_change, conn_member_quorum_lost,
|
|
conn_member_quorum_lost_details) =
|
|
get_online_members(thd, mi, conn, failover_table_detail,
|
|
group_membership_list, curr_highest_group_weight,
|
|
curr_conn_weight);
|
|
delete conn;
|
|
conn = nullptr;
|
|
channel_map.unlock();
|
|
|
|
if (is_monitor_killed(thd, nullptr)) return 1;
|
|
|
|
if (err == ER_RPL_ASYNC_GET_GROUP_MEMBERSHIP_DETAILS_ERROR ||
|
|
err == ER_RPL_ASYNC_MONITOR_IO_THD_FETCH_GROUP_MAJORITY_ERROR) {
|
|
continue;
|
|
}
|
|
|
|
/*
|
|
3.4. Store gathered membership details to
|
|
replication_asynchronous_connection_failover table.
|
|
*/
|
|
THD_STAGE_INFO(thd, stage_rpl_failover_updating_source_member_details);
|
|
if (!err && !group_membership_list.empty() &&
|
|
!save_group_members(channel, managed_name, group_membership_list)) {
|
|
/*
|
|
Add the managed_name to the managed_name_list so that further members
|
|
from the group can be ignored.
|
|
*/
|
|
managed_name_list.insert(managed_name);
|
|
} else if (err == 2) {
|
|
return 1;
|
|
}
|
|
|
|
if (is_monitor_killed(thd, nullptr)) return 1;
|
|
|
|
/*
|
|
3.5. Disconnect channel if current connected member through
|
|
asynchronous channel has changed the group or has lost quorum.
|
|
*/
|
|
if (conn_member_needs_to_change || conn_member_quorum_lost) {
|
|
const std::string error_channel =
|
|
std::get<0>(conn_member_quorum_lost_details);
|
|
const std::string error_host =
|
|
std::get<1>(conn_member_quorum_lost_details);
|
|
const uint error_port = std::get<2>(conn_member_quorum_lost_details);
|
|
|
|
/* Get current values from mi. */
|
|
channel_map.rdlock();
|
|
mi = channel_map.get_mi(channel_name.c_str());
|
|
if (nullptr == mi) {
|
|
channel_map.unlock();
|
|
return 2;
|
|
}
|
|
mi_host.assign(mi->host);
|
|
mi_port = mi->port;
|
|
channel_map.unlock();
|
|
|
|
/*
|
|
Only trigger the channel reconnection if the sender on which we detect
|
|
the error is still the connected sender.
|
|
Until we reached this point the IO thread may had switch by itself to
|
|
another sender.
|
|
*/
|
|
if (!error_channel.compare(channel_name) &&
|
|
!error_host.compare(mi_host) && error_port == mi_port) {
|
|
if (is_monitor_killed(thd, nullptr)) return 1;
|
|
|
|
bool restarted = restart_io_thread(thd, channel_name, false);
|
|
|
|
if (restarted && conn_member_quorum_lost) {
|
|
LogErr(ERROR_LEVEL, ER_RPL_ASYNC_CHANNEL_STOPPED_QUORUM_LOST,
|
|
error_host.c_str(), error_port, "", error_channel.c_str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (is_monitor_killed(thd, nullptr)) return 1;
|
|
|
|
/*
|
|
4. Get highest weight of single sender.
|
|
*/
|
|
{
|
|
channel_map.rdlock();
|
|
Master_info *mi = channel_map.get_mi(channel_name.c_str());
|
|
if (nullptr == mi) {
|
|
channel_map.unlock();
|
|
return 2;
|
|
}
|
|
for (auto source_conn_detail : source_conn_detail_list) {
|
|
std::string group_name{""};
|
|
uint weight{0};
|
|
/* save highest weight of single senders for the current channel */
|
|
std::tie(std::ignore, std::ignore, std::ignore, std::ignore, weight,
|
|
group_name) = source_conn_detail;
|
|
if (weight > curr_highest_weight_single_sender && group_name.empty() &&
|
|
weight > curr_conn_weight && weight > curr_highest_group_weight &&
|
|
check_connection_and_run_query(thd, mi, source_conn_detail)) {
|
|
curr_highest_weight_single_sender = weight;
|
|
}
|
|
}
|
|
channel_map.unlock();
|
|
}
|
|
|
|
if (is_monitor_killed(thd, nullptr)) return 1;
|
|
|
|
/*
|
|
5. If weight of current connected sender is less then any of
|
|
ONLINE group member or single server, then disconnect it.
|
|
The reconnection would be done by IO thread.
|
|
*/
|
|
DBUG_EXECUTE_IF("async_conn_failover_disable_weight_check", return 0;);
|
|
DBUG_EXECUTE_IF("async_conn_failover_check_interim_sender", {
|
|
if (source_conn_detail_list.size() == 4) {
|
|
return 0;
|
|
}
|
|
});
|
|
if ((curr_highest_group_weight > curr_conn_weight) ||
|
|
(curr_highest_weight_single_sender > curr_conn_weight)) {
|
|
restart_io_thread(thd, channel_name, true);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
bool Source_IO_monitor::check_connection_and_run_query(
|
|
THD *thd, Master_info *mi, RPL_FAILOVER_SOURCE_TUPLE &conn_detail) {
|
|
uint query_failed{1};
|
|
uint port{0};
|
|
std::string host{""};
|
|
const std::string mi_network_namespace(mi->network_namespace_str());
|
|
std::tie(std::ignore, host, port, std::ignore, std::ignore, std::ignore) =
|
|
conn_detail;
|
|
|
|
Mysql_connection *conn_single_server =
|
|
new Mysql_connection(thd, mi, host, port, mi_network_namespace);
|
|
if (conn_single_server != nullptr && conn_single_server->is_connected())
|
|
std::tie(query_failed, std::ignore) = execute_query(
|
|
conn_single_server, enum_sql_query_tag::QUERY_SERVER_SELECT_ONE);
|
|
if (query_failed != 0 && conn_single_server != nullptr) {
|
|
Async_conn_failover_manager::log_error_for_async_executing_query_failure(
|
|
ER_RPL_ASYNC_CHECK_CONNECTION_ERROR, conn_single_server->get_mysql(),
|
|
mi);
|
|
}
|
|
delete conn_single_server;
|
|
conn_single_server = nullptr;
|
|
return !query_failed;
|
|
}
|
|
|
|
int Source_IO_monitor::save_group_members(
|
|
std::string channel_name, std::string managed_name,
|
|
std::vector<RPL_FAILOVER_SOURCE_TUPLE> &group_membership_list) {
|
|
bool err_val{true};
|
|
std::string err_msg{};
|
|
|
|
std::vector<RPL_FAILOVER_SOURCE_TUPLE> failover_table_detail_list{};
|
|
|
|
std::string db{"mysql"};
|
|
std::string table_name{"replication_asynchronous_connection_failover"};
|
|
uint num_field{6};
|
|
enum thr_lock_type lock_type { TL_WRITE };
|
|
std::vector<std::string> field_name{
|
|
"channel", "host", "port", "network_namespace", "weight", "managed_name"};
|
|
|
|
/* Open table with OPTION_AUTOCOMMIT disable. */
|
|
Rpl_sys_table_access table_op(db, table_name, num_field);
|
|
if (table_op.open(lock_type)) {
|
|
table_op.set_error();
|
|
return 1;
|
|
}
|
|
|
|
TABLE *table{table_op.get_table()};
|
|
|
|
/*
|
|
Read stored source details from
|
|
replication_asynchronous_connection_failover table.
|
|
*/
|
|
{
|
|
/* Store channel */
|
|
if (table_op.store_field(table->field[0], channel_name)) {
|
|
table_op.set_error();
|
|
return 1;
|
|
}
|
|
|
|
/* Store managed_name */
|
|
if (table_op.store_field(table->field[5], managed_name)) {
|
|
table_op.set_error();
|
|
}
|
|
|
|
Rpl_sys_key_access key_access;
|
|
if (!key_access.init(table, 1, true, (key_part_map)((1L << 0) | (1L << 1)),
|
|
HA_READ_KEY_EXACT)) {
|
|
do {
|
|
/* get source detail */
|
|
RPL_FAILOVER_SOURCE_TUPLE source_tuple{};
|
|
Rpl_async_conn_failover_table_operations::get_data<
|
|
RPL_FAILOVER_SOURCE_TUPLE>(table_op, source_tuple);
|
|
failover_table_detail_list.push_back(source_tuple);
|
|
} while (!key_access.next());
|
|
}
|
|
|
|
if (key_access.deinit()) {
|
|
table_op.set_error();
|
|
return 1;
|
|
}
|
|
|
|
if (failover_table_detail_list.empty()) return 1;
|
|
}
|
|
|
|
/*
|
|
For each source from gathered membership details in step 3,
|
|
check whether it's already present in failover table:
|
|
- if present then delete its entry and insert again (weight can change).
|
|
- if not present then insert.
|
|
*/
|
|
for (auto group_member_detail : group_membership_list) {
|
|
uint port{0}, weight{0};
|
|
std::string channel{}, host{}, group_name{}, net_ns{};
|
|
|
|
std::tie(channel, host, port, net_ns, weight, group_name) =
|
|
group_member_detail;
|
|
|
|
auto it = std::find_if(failover_table_detail_list.begin(),
|
|
failover_table_detail_list.end(),
|
|
[&](const RPL_FAILOVER_SOURCE_TUPLE &e) {
|
|
return ((std::get<0>(e).compare(channel) == 0) &&
|
|
(std::get<1>(e).compare(host) == 0) &&
|
|
(std::get<2>(e) == port));
|
|
});
|
|
|
|
if (it != failover_table_detail_list.end()) {
|
|
auto del_conn_detail = std::make_tuple(channel, host, port);
|
|
std::tie(err_val, err_msg) =
|
|
delete_rows(table_op, table, field_name, del_conn_detail);
|
|
if (err_val) {
|
|
table_op.set_error();
|
|
table_op.close(err_val);
|
|
return 1;
|
|
}
|
|
|
|
std::tie(err_val, err_msg) =
|
|
write_rows(table_op, table, field_name, group_member_detail);
|
|
if (err_val) {
|
|
table_op.set_error();
|
|
table_op.close(err_val);
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
if (it == failover_table_detail_list.end()) {
|
|
LogErr(SYSTEM_LEVEL, ER_RPL_ASYNC_SENDER_ADDED, host.c_str(), port, "",
|
|
channel.c_str(), group_name.c_str());
|
|
|
|
std::tie(err_val, err_msg) =
|
|
write_rows(table_op, table, field_name, group_member_detail);
|
|
if (err_val) {
|
|
table_op.set_error();
|
|
table_op.close(err_val);
|
|
return 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
For each source from failover table, check whether it was
|
|
also found in membership details list :
|
|
- if not found then delete its entry from failover table, as
|
|
source has left the group.
|
|
*/
|
|
for (auto failover_table_detail : failover_table_detail_list) {
|
|
uint port{0}, weight{0};
|
|
std::string channel{}, host{}, group_name{}, net_ns{};
|
|
|
|
std::tie(channel, host, port, net_ns, weight, group_name) =
|
|
failover_table_detail;
|
|
|
|
auto it =
|
|
std::find_if(group_membership_list.begin(), group_membership_list.end(),
|
|
[&](const RPL_FAILOVER_SOURCE_TUPLE &e) {
|
|
return ((std::get<1>(e).compare(host) == 0) &&
|
|
(std::get<2>(e) == port));
|
|
});
|
|
|
|
if (it == group_membership_list.end()) {
|
|
LogErr(SYSTEM_LEVEL, ER_RPL_ASYNC_SENDER_REMOVED, host.c_str(), port, "",
|
|
channel.c_str(), group_name.c_str());
|
|
auto del_conn_detail = std::make_tuple(channel, host, port);
|
|
std::tie(err_val, err_msg) =
|
|
delete_rows(table_op, table, field_name, del_conn_detail);
|
|
if (err_val) {
|
|
table_op.set_error();
|
|
table_op.close(err_val);
|
|
return 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Increment member action configuration version. */
|
|
if (table_op.increment_version()) {
|
|
LogErr(ERROR_LEVEL, ER_RPL_INCREMENTING_MEMBER_ACTION_VERSION, db.c_str(),
|
|
table_name.c_str());
|
|
return 1;
|
|
}
|
|
|
|
/*
|
|
Send replication_asynchronous_connection_failover data to group replication
|
|
group members.
|
|
*/
|
|
if (rpl_acf_configuration_handler->send_failover_data(table_op)) {
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
bool Source_IO_monitor::has_primary_lost_contact_with_majority() {
|
|
bool primary_lost_contact_with_majority = false;
|
|
my_h_service gr_status_service_handler = nullptr;
|
|
|
|
srv_registry->acquire("group_replication_status_service_v1",
|
|
&gr_status_service_handler);
|
|
if (nullptr != gr_status_service_handler) {
|
|
SERVICE_TYPE(group_replication_status_service_v1) *gr_status_service =
|
|
reinterpret_cast<SERVICE_TYPE(group_replication_status_service_v1) *>(
|
|
gr_status_service_handler);
|
|
|
|
if (gr_status_service
|
|
->is_group_in_single_primary_mode_and_im_the_primary() &&
|
|
!gr_status_service->is_member_online_with_group_majority()) {
|
|
primary_lost_contact_with_majority = true;
|
|
}
|
|
|
|
srv_registry->release(gr_status_service_handler);
|
|
}
|
|
|
|
return primary_lost_contact_with_majority;
|
|
}
|
|
|
|
std::tuple<int, bool, bool, std::tuple<std::string, std::string, uint>>
|
|
Source_IO_monitor::get_online_members(
|
|
THD *thd, Master_info *mi, const Mysql_connection *conn,
|
|
SENDER_CONN_MERGE_TUPLE failover_table_detail,
|
|
std::vector<RPL_FAILOVER_SOURCE_TUPLE> &group_membership_list,
|
|
uint &curr_highest_group_weight, uint &curr_conn_weight) {
|
|
channel_map.assert_some_lock();
|
|
uint error{0};
|
|
std::string err_msg;
|
|
bool conn_member_needs_to_change{false}, conn_member_quorum_lost{false};
|
|
|
|
/* Connection details of source who lost majority. It will be used to log */
|
|
std::tuple<std::string, std::string, uint> conn_member_quorum_lost_details{};
|
|
|
|
uint primary_weight{0}, secondary_weight{0}, port{0}, weight{0};
|
|
std::string channel{}, host{}, managed_name{};
|
|
|
|
if (is_monitor_killed(thd, nullptr)) {
|
|
return std::make_tuple(2, conn_member_needs_to_change,
|
|
conn_member_quorum_lost,
|
|
conn_member_quorum_lost_details);
|
|
}
|
|
|
|
std::tie(channel, host, port, std::ignore, weight, managed_name,
|
|
primary_weight, secondary_weight) = failover_table_detail;
|
|
|
|
/* Execute enum_sql_query_tag::CONFIG_MODE_QUORUM_MONITOR query */
|
|
MYSQL_RES_VAL quorum_list{};
|
|
auto qtag{enum_sql_query_tag::CONFIG_MODE_QUORUM_MONITOR};
|
|
std::tie(error, quorum_list) = execute_query(conn, qtag);
|
|
if (error != 0) {
|
|
longlong sql_errno{ER_RPL_ASYNC_MONITOR_IO_THD_FETCH_GROUP_MAJORITY_ERROR};
|
|
Async_conn_failover_manager::log_error_for_async_executing_query_failure(
|
|
sql_errno, const_cast<Mysql_connection *>(conn)->get_mysql(), mi);
|
|
return std::make_tuple(sql_errno, conn_member_needs_to_change,
|
|
conn_member_quorum_lost,
|
|
conn_member_quorum_lost_details);
|
|
}
|
|
|
|
if (quorum_list.empty() || quorum_list[0].empty()) {
|
|
return std::make_tuple(1, conn_member_needs_to_change,
|
|
conn_member_quorum_lost,
|
|
conn_member_quorum_lost_details);
|
|
}
|
|
|
|
auto quorum_status{
|
|
static_cast<enum_conf_mode_quorum_status>(std::stoi(quorum_list[0][0]))};
|
|
if (quorum_status == enum_conf_mode_quorum_status::MANAGED_GR_HAS_QUORUM) {
|
|
qtag = enum_sql_query_tag::GR_MEMBER_ALL_DETAILS;
|
|
MYSQL_RES_VAL sender_membership_res{};
|
|
std::tie(error, sender_membership_res) = execute_query(conn, qtag);
|
|
|
|
if (error == ER_BAD_FIELD_ERROR) {
|
|
qtag = enum_sql_query_tag::GR_MEMBER_ALL_DETAILS_FETCH_FOR_57;
|
|
std::tie(error, sender_membership_res) = execute_query(conn, qtag);
|
|
}
|
|
|
|
if (error != 0) {
|
|
longlong sql_errno{ER_RPL_ASYNC_GET_GROUP_MEMBERSHIP_DETAILS_ERROR};
|
|
Async_conn_failover_manager::log_error_for_async_executing_query_failure(
|
|
sql_errno, const_cast<Mysql_connection *>(conn)->get_mysql(), mi);
|
|
return std::make_tuple(sql_errno, conn_member_needs_to_change,
|
|
conn_member_quorum_lost,
|
|
conn_member_quorum_lost_details);
|
|
}
|
|
|
|
/*
|
|
If current connected sender is group member and not a single server
|
|
then save its primary/secondary weight based on role.
|
|
*/
|
|
for (const auto &m_row_ins : sender_membership_res) {
|
|
if (m_row_ins[COL_HOST].compare(mi->host) == 0 &&
|
|
std::stoul(m_row_ins[COL_PORT]) == mi->port) {
|
|
if (m_row_ins[COL_ROLE].compare("PRIMARY") == 0) {
|
|
curr_conn_weight = primary_weight;
|
|
} else if (m_row_ins[COL_ROLE].compare("SECONDARY") == 0) {
|
|
curr_conn_weight = secondary_weight;
|
|
}
|
|
}
|
|
}
|
|
|
|
for (const auto &m_row : sender_membership_res) {
|
|
/*
|
|
If member is ONLINE then add member connection details to
|
|
replication_asynchronous_connection_failover table.
|
|
*/
|
|
if (m_row[COL_STATE].compare("ONLINE") == 0 ||
|
|
m_row[COL_STATE].compare("RECOVERING") == 0 ||
|
|
m_row[COL_STATE].compare("UNREACHABLE") == 0) {
|
|
if (is_monitor_killed(thd, nullptr)) {
|
|
return std::make_tuple(2, conn_member_needs_to_change,
|
|
conn_member_quorum_lost,
|
|
conn_member_quorum_lost_details);
|
|
}
|
|
|
|
uint tab_weight{secondary_weight};
|
|
if (m_row[COL_ROLE].compare("PRIMARY") == 0) {
|
|
tab_weight = primary_weight;
|
|
if ((primary_weight > curr_highest_group_weight) &&
|
|
m_row[COL_STATE].compare("ONLINE") == 0)
|
|
curr_highest_group_weight = primary_weight;
|
|
} else if (m_row[COL_ROLE].compare("SECONDARY") == 0) {
|
|
if ((secondary_weight > curr_highest_group_weight) &&
|
|
m_row[COL_STATE].compare("ONLINE") == 0)
|
|
curr_highest_group_weight = secondary_weight;
|
|
}
|
|
|
|
auto source_ins_details = std::make_tuple(
|
|
channel, m_row[COL_HOST], std::stoul(m_row[COL_PORT]), "",
|
|
tab_weight, m_row[COL_GROUP_NAME]);
|
|
group_membership_list.push_back(source_ins_details);
|
|
}
|
|
|
|
/*
|
|
For the source connected through asynchronous channel,
|
|
if the group_name has changed i.e. member has changed group, or,
|
|
if its state become UNREACHABLE i.e. lost majority, then stop
|
|
the asynchronous channel.
|
|
*/
|
|
if (m_row[COL_HOST].compare(mi->host) == 0 &&
|
|
std::stoul(m_row[COL_PORT]) == mi->port &&
|
|
(m_row[COL_GROUP_NAME].compare(managed_name) != 0 ||
|
|
m_row[COL_STATE].compare("UNREACHABLE") == 0)) {
|
|
conn_member_needs_to_change = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (quorum_status == enum_conf_mode_quorum_status::MANAGED_GR_HAS_ERROR &&
|
|
host.compare(mi->host) == 0 && port == mi->port) {
|
|
conn_member_quorum_lost = true;
|
|
conn_member_quorum_lost_details = std::make_tuple(channel, host, port);
|
|
}
|
|
|
|
return std::make_tuple(0, conn_member_needs_to_change,
|
|
conn_member_quorum_lost,
|
|
conn_member_quorum_lost_details);
|
|
}
|
|
|
|
int Source_IO_monitor::sync_senders_details(THD *thd) {
|
|
bool primary_lost_contact_with_majority =
|
|
has_primary_lost_contact_with_majority();
|
|
|
|
if (primary_lost_contact_with_majority) {
|
|
/* Log the warning only once per majority loss. */
|
|
if (!m_primary_lost_contact_with_majority_warning_logged) {
|
|
m_primary_lost_contact_with_majority_warning_logged = true;
|
|
LogErr(WARNING_LEVEL, ER_GRP_RPL_FAILOVER_PRIMARY_WITHOUT_MAJORITY);
|
|
}
|
|
return 0;
|
|
} else {
|
|
if (m_primary_lost_contact_with_majority_warning_logged) {
|
|
m_primary_lost_contact_with_majority_warning_logged = false;
|
|
LogErr(WARNING_LEVEL, ER_GRP_RPL_FAILOVER_PRIMARY_BACK_TO_MAJORITY);
|
|
}
|
|
}
|
|
|
|
std::vector<std::string> channels;
|
|
channel_map.rdlock();
|
|
for (mi_map::iterator it = channel_map.begin(); it != channel_map.end();
|
|
it++) {
|
|
Master_info *mi = it->second;
|
|
if (Master_info::is_configured(mi) &&
|
|
mi->is_source_connection_auto_failover()) {
|
|
channels.push_back(mi->get_channel());
|
|
}
|
|
}
|
|
channel_map.unlock();
|
|
|
|
for (const std::string &channel_name : channels) {
|
|
connect_senders(thd, channel_name);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
std::tuple<bool, std::vector<SENDER_CONN_MERGE_TUPLE>>
|
|
Source_IO_monitor::get_senders_details(const std::string &channel_name) {
|
|
DBUG_TRACE;
|
|
|
|
/* The list of source connection details. */
|
|
std::vector<SENDER_CONN_MERGE_TUPLE> failover_table_detail_list{};
|
|
std::vector<RPL_FAILOVER_MANAGED_TUPLE> source_managed_list{};
|
|
auto error{false};
|
|
|
|
/*
|
|
Check if source needs to be managed, if true then get its network
|
|
configuration details.
|
|
These tables can be modified in parallel, which will cause its open() to
|
|
fail, on that case we do retry the operation.
|
|
*/
|
|
int retries = 0;
|
|
do {
|
|
if (retries > 0) {
|
|
my_sleep(1000);
|
|
}
|
|
|
|
Rpl_async_conn_failover_table_operations table_op(TL_READ);
|
|
error = table_op.read_managed_rows_for_channel(channel_name,
|
|
source_managed_list);
|
|
|
|
if (error) return make_pair(error, failover_table_detail_list);
|
|
|
|
for (auto source_managed_detail : source_managed_list) {
|
|
auto primary_weight{std::get<3>(source_managed_detail)},
|
|
secondary_weight{std::get<4>(source_managed_detail)};
|
|
if (!error &&
|
|
strcmp(std::get<2>(source_managed_detail).c_str(),
|
|
"GroupReplication") == 0 &&
|
|
!std::get<1>(source_managed_detail).empty()) {
|
|
std::vector<RPL_FAILOVER_SOURCE_TUPLE> source_conn_detail_list{};
|
|
Rpl_async_conn_failover_table_operations table_op_src(TL_READ);
|
|
std::tie(error, source_conn_detail_list) =
|
|
table_op_src.read_source_rows_for_channel_and_managed_name(
|
|
channel_name, std::get<1>(source_managed_detail));
|
|
for (auto source_conn_detail : source_conn_detail_list) {
|
|
auto source_conn_detail_merged =
|
|
std::tuple_cat(source_conn_detail,
|
|
std::make_tuple(primary_weight, secondary_weight));
|
|
failover_table_detail_list.push_back(source_conn_detail_merged);
|
|
}
|
|
}
|
|
}
|
|
|
|
retries++;
|
|
} while (error && retries < 10);
|
|
|
|
if (error) {
|
|
LogErr(WARNING_LEVEL, ER_RPL_ASYNC_READ_FAILOVER_TABLE,
|
|
channel_name.c_str());
|
|
}
|
|
|
|
return make_pair(error, failover_table_detail_list);
|
|
}
|
|
|
|
int Source_IO_monitor::terminate_monitoring_process() {
|
|
mysql_mutex_lock(&m_run_lock);
|
|
|
|
if (m_monitor_thd_state.is_thread_dead()) {
|
|
mysql_mutex_unlock(&m_run_lock);
|
|
return 0;
|
|
}
|
|
|
|
// Awake up possible stuck conditions
|
|
mysql_cond_broadcast(&m_run_cond);
|
|
|
|
ulong stop_wait_timeout = rpl_stop_replica_timeout;
|
|
while (m_monitor_thd_state.is_thread_alive()) {
|
|
DBUG_PRINT("sleep",
|
|
("Waiting for the Monitoring IO process thread to finish"));
|
|
|
|
if (m_monitor_thd_state.is_initialized()) {
|
|
mysql_mutex_lock(&m_monitor_thd->LOCK_thd_data);
|
|
m_monitor_thd->awake(THD::KILL_CONNECTION);
|
|
mysql_mutex_unlock(&m_monitor_thd->LOCK_thd_data);
|
|
}
|
|
|
|
struct timespec abstime;
|
|
set_timespec(&abstime, (stop_wait_timeout == 1 ? 1 : 2));
|
|
#ifndef NDEBUG
|
|
int error =
|
|
#endif
|
|
mysql_cond_timedwait(&m_run_cond, &m_run_lock, &abstime);
|
|
|
|
if (stop_wait_timeout >= 1) {
|
|
stop_wait_timeout = stop_wait_timeout - (stop_wait_timeout == 1 ? 1 : 2);
|
|
}
|
|
|
|
if (m_monitor_thd_state.is_thread_alive() &&
|
|
stop_wait_timeout <= 0) // quit waiting
|
|
{
|
|
mysql_mutex_unlock(&m_run_lock);
|
|
return 1;
|
|
}
|
|
|
|
assert(error == ETIMEDOUT || error == 0);
|
|
}
|
|
assert(m_monitor_thd_state.is_thread_dead());
|
|
|
|
mysql_mutex_unlock(&m_run_lock);
|
|
return 0;
|
|
}
|
|
|
|
void Source_IO_monitor::set_monitoring_wait(uint wait_time) {
|
|
m_retry_monitor_wait = wait_time;
|
|
}
|
|
|
|
uint Source_IO_monitor::get_monitoring_wait() { return m_retry_monitor_wait; }
|
|
|
|
bool Source_IO_monitor::is_monitoring_process_running() {
|
|
return m_monitor_thd_state.is_thread_alive();
|
|
}
|
|
|
|
Source_IO_monitor *Source_IO_monitor::get_instance() {
|
|
return rpl_source_io_monitor;
|
|
}
|
|
|
|
static bool restart_io_thread(THD *thd, const std::string &channel_name,
|
|
bool force_sender_with_highest_weight) {
|
|
if (channel_map.trywrlock()) {
|
|
return false;
|
|
}
|
|
|
|
Master_info *mi = channel_map.get_mi(channel_name.c_str());
|
|
if (nullptr == mi) {
|
|
channel_map.unlock();
|
|
return false;
|
|
}
|
|
|
|
if (Async_conn_failover_manager::do_auto_conn_failover(
|
|
mi, force_sender_with_highest_weight) !=
|
|
Async_conn_failover_manager::DoAutoConnFailoverError::no_error) {
|
|
LogErr(WARNING_LEVEL, ER_RPL_REPLICA_MONITOR_IO_THREAD_RECONNECT_CHANNEL,
|
|
"choosing the source for", channel_name.c_str());
|
|
channel_map.unlock();
|
|
return false;
|
|
}
|
|
|
|
mi->channel_wrlock();
|
|
lock_slave_threads(mi);
|
|
|
|
/*
|
|
IO thread was stopped through STOP REPLICA, do not restart it.
|
|
*/
|
|
if (!mi->is_source_connection_auto_failover() || !mi->slave_running) {
|
|
unlock_slave_threads(mi);
|
|
mi->channel_unlock();
|
|
channel_map.unlock();
|
|
return false;
|
|
}
|
|
|
|
int thread_mask = 0;
|
|
thread_mask |= REPLICA_IO;
|
|
thd->set_skip_readonly_check();
|
|
|
|
if (terminate_slave_threads(mi, thread_mask, rpl_stop_replica_timeout,
|
|
false /*need_lock_term=false*/)) {
|
|
LogErr(WARNING_LEVEL, ER_RPL_REPLICA_MONITOR_IO_THREAD_RECONNECT_CHANNEL,
|
|
"stopping", channel_name.c_str());
|
|
}
|
|
|
|
if (start_slave_threads(false /*need_lock_slave=false*/,
|
|
true /*wait_for_start=true*/, mi, thread_mask)) {
|
|
LogErr(WARNING_LEVEL, ER_RPL_REPLICA_MONITOR_IO_THREAD_RECONNECT_CHANNEL,
|
|
"starting", channel_name.c_str());
|
|
}
|
|
|
|
thd->reset_skip_readonly_check();
|
|
unlock_slave_threads(mi);
|
|
mi->channel_unlock();
|
|
channel_map.unlock();
|
|
|
|
return true;
|
|
}
|