From 1ca813bf76b66e86af2bde918eb33ee98a4e522b Mon Sep 17 00:00:00 2001 From: Monty Date: Mon, 25 Dec 2023 15:30:03 +0200 Subject: [PATCH] Added socketpair.c as a replacement for 'pipe()' call for Windows. This was needed to get semisync to work on Windows. --- THIRDPARTY | 29 ++++++ sql/CMakeLists.txt | 1 + sql/semisync_master_ack_receiver.cc | 17 ++- sql/semisync_master_ack_receiver.h | 53 ++++++++-- sql/socketpair.c | 156 ++++++++++++++++++++++++++++ sql/socketpair.h | 21 ++++ 6 files changed, 263 insertions(+), 14 deletions(-) create mode 100644 sql/socketpair.c create mode 100644 sql/socketpair.h diff --git a/THIRDPARTY b/THIRDPARTY index 35bb238ed1e..b15d979ea4a 100644 --- a/THIRDPARTY +++ b/THIRDPARTY @@ -1690,3 +1690,32 @@ ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *************************************************************************** + +%%The following software may be included in this product: +socketpair.c + +Copyright 2007, 2010 by Nathan C. Myers +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + The name of the author must not be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index f1c1c65310b..20283cdefc4 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -175,6 +175,7 @@ SET (SQL_SOURCE table_cache.cc encryption.cc temporary_tables.cc json_table.cc proxy_protocol.cc backup.cc xa.cc + socketpair.c socketpair.h ${CMAKE_CURRENT_BINARY_DIR}/lex_hash.h ${CMAKE_CURRENT_BINARY_DIR}/lex_token.h ${GEN_SOURCES} diff --git a/sql/semisync_master_ack_receiver.cc b/sql/semisync_master_ack_receiver.cc index 6453b7ef32a..9320c7fc3e6 100644 --- a/sql/semisync_master_ack_receiver.cc +++ b/sql/semisync_master_ack_receiver.cc @@ -25,7 +25,7 @@ extern PSI_cond_key key_COND_ack_receiver; extern PSI_thread_key key_thread_ack_receiver; #endif -int global_ack_signal_fd= -1; +my_socket global_ack_signal_fd= -1; /* Callback function of ack receive thread */ pthread_handler_t ack_receive_handler(void *arg) @@ -242,6 +242,13 @@ void Ack_receiver::run() Select_socket_listener listener(m_slaves); #endif //HAVE_POLL + if (listener.got_error()) + { + sql_print_error("Got error %M starting ack receiver thread", + listener.got_error()); + return; + } + sql_print_information("Starting ack receiver thread"); thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND; thd->thread_stack= (char*) &thd; @@ -258,7 +265,7 @@ void Ack_receiver::run() while (1) { - int ret, slave_count; + int ret, slave_count= 0; Slave *slave; mysql_mutex_lock(&m_mutex); @@ -315,7 +322,9 @@ void Ack_receiver::run() Slave_ilist_iterator it(m_slaves); while ((slave= it++)) { - if (slave->active) // Set in init_slave_sockets() + if (slave->active && + ((slave->vio.read_pos < slave->vio.read_end) || + listener.is_socket_active(slave))) { ulong len; @@ -341,6 +350,7 @@ void Ack_receiver::run() Delete it from listener */ it.remove(); + m_slaves_changed= true; } } else if (net.last_errno == ER_NET_READ_ERROR) @@ -351,6 +361,7 @@ void Ack_receiver::run() net.last_errno, ER_DEFAULT(net.last_errno), slave->server_id()); it.remove(); + m_slaves_changed= true; } } } diff --git a/sql/semisync_master_ack_receiver.h b/sql/semisync_master_ack_receiver.h index d3d9de27b89..eacb4b200c0 100644 --- a/sql/semisync_master_ack_receiver.h +++ b/sql/semisync_master_ack_receiver.h @@ -20,6 +20,7 @@ #include "my_pthread.h" #include "sql_class.h" #include "semisync.h" +#include "socketpair.h" #include struct Slave :public ilink @@ -127,32 +128,54 @@ private: }; -extern int global_ack_signal_fd; +extern my_socket global_ack_signal_fd; class Ack_listener { public: - int local_read_signal; + my_socket local_read_signal; const Slave_ilist &m_slaves; + int error; Ack_listener(const Slave_ilist &slaves) - :m_slaves(slaves) + :local_read_signal(-1), m_slaves(slaves), error(0) { - int pipes[2]; - pipe(pipes); - global_ack_signal_fd= pipes[1]; + my_socket pipes[2]; +#ifdef _WIN32 + error= create_socketpair(pipes); +#else + if (!pipe(pipes)) + { + fcntl(pipes[0], F_SETFL, O_NONBLOCK); + fcntl(pipes[1], F_SETFL, O_NONBLOCK); + } + else + { + pipes[0]= pipes[1]= -1; + } +#endif /* _WIN32 */ local_read_signal= pipes[0]; - fcntl(pipes[0], F_SETFL, O_NONBLOCK); - fcntl(pipes[1], F_SETFL, O_NONBLOCK); + global_ack_signal_fd= pipes[1]; } virtual ~Ack_listener() { - close(global_ack_signal_fd); - close(local_read_signal); - global_ack_signal_fd= -1; +#ifdef _WIN32 + my_socket pipes[2]; + pipes[0]= local_read_signal; + pipes[1]= global_ack_signal_fd; + close_socketpair(pipes); +#else + if (global_ack_signal_fd >= 0) + close(global_ack_signal_fd); + if (local_read_signal >= 0) + close(local_read_signal); +#endif /* _WIN32 */ + global_ack_signal_fd= local_read_signal= -1; } + int got_error() { return error; } + virtual bool has_signal_data()= 0; /* Clear data sent by signal_listener() to abort read */ @@ -162,14 +185,22 @@ public: { char buff[100]; /* Clear the signal message */ +#ifndef _WIN32 read(local_read_signal, buff, sizeof(buff)); +#else + recv(local_read_signal, buff, sizeof(buff), 0); +#endif /* _WIN32 */ } } }; static inline void signal_listener() { +#ifndef _WIN32 my_write(global_ack_signal_fd, (uchar*) "a", 1, MYF(0)); +#else + send(global_ack_signal_fd, "a", 1, 0); +#endif /* _WIN32 */ } #ifdef HAVE_POLL diff --git a/sql/socketpair.c b/sql/socketpair.c new file mode 100644 index 00000000000..ef89fa0446b --- /dev/null +++ b/sql/socketpair.c @@ -0,0 +1,156 @@ +/* socketpair.c +Copyright 2007, 2010 by Nathan C. Myers +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + The name of the author must not be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* Changes: + * 2023-12-25 Addopted for MariaDB usage + * 2014-02-12: merge David Woodhouse, Ger Hobbelt improvements + * git.infradead.org/users/dwmw2/openconnect.git/commitdiff/bdeefa54 + * github.com/GerHobbelt/selectable-socketpair + * always init the socks[] to -1/INVALID_SOCKET on error, both on Win32/64 + * and UNIX/other platforms + * 2013-07-18: Change to BSD 3-clause license + * 2010-03-31: + * set addr to 127.0.0.1 because win32 getsockname does not always set it. + * 2010-02-25: + * set SO_REUSEADDR option to avoid leaking some windows resource. + * Windows System Error 10049, "Event ID 4226 TCP/IP has reached + * the security limit imposed on the number of concurrent TCP connect + * attempts." Bleah. + * 2007-04-25: + * preserve value of WSAGetLastError() on all error returns. + * 2007-04-22: (Thanks to Matthew Gregan ) + * s/EINVAL/WSAEINVAL/ fix trivial compile failure + * s/socket/WSASocket/ enable creation of sockets suitable as stdin/stdout + * of a child process. + * add argument make_overlapped + */ + +#include +#ifdef _WIN32 +#include /* socklen_t, et al (MSVC20xx) */ +#include +#include +#include "socketpair.h" + +#define safe_errno (errno != 0) ? errno : -1 + +/** + create_socketpair() + + @param socks[2] Will be filled by 2 SOCKET entries (similar to pipe()) + socks[0] for reading + socks[1] for writing + + @return: 0 ok + # System error code. -1 if unknown + */ + +int create_socketpair(SOCKET socks[2]) +{ + union + { + struct sockaddr_in inaddr; + struct sockaddr addr; + } a; + SOCKET listener= -1; + int reuse = 1; + int last_error; + socklen_t addrlen = sizeof(a.inaddr); + + socks[0]= socks[1]= -1; + + if ((listener= socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) + return safe_errno; + + memset(&a, 0, sizeof(a)); + a.inaddr.sin_family = AF_INET; + a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + a.inaddr.sin_port = 0; + + for (;;) /* Avoid using goto */ + { + if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, + (char*) &reuse, (socklen_t) sizeof(reuse)) == -1) + break; + if (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) + break; + + memset(&a, 0, sizeof(a)); + if (getsockname(listener, &a.addr, &addrlen) == SOCKET_ERROR) + break; + // win32 getsockname may only set the port number, p=0.0005. + // ( http://msdn.microsoft.com/library/ms738543.aspx ): + a.inaddr.sin_addr.s_addr= htonl(INADDR_LOOPBACK); + a.inaddr.sin_family= AF_INET; + + if (listen(listener, 1) == SOCKET_ERROR) + break; + + socks[1]= socket(AF_INET, SOCK_STREAM, 0); + if (socks[1] == -1) + break; + if (connect(socks[1], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) + break; + + socks[0]= accept(listener, NULL, NULL); + if (socks[0] == -1) + break; + + closesocket(listener); + + { + /* Make both sockets non blocking */ + ulong arg= 1; + ioctlsocket(socks[0], FIONBIO,(void*) &arg); + ioctlsocket(socks[1], FIONBIO,(void*) &arg); + } + return 0; + } + /* Error handling */ + last_error= WSAGetLastError(); + if (listener != -1) + closesocket(listener); + close_socketpair(socks); + WSASetLastError(last_error); + + return last_error; +} + +/* + Free socketpair +*/ + +void close_socketpair(SOCKET socks[2]) +{ + if (socks[0] != -1) + closesocket(socks[0]); + if (socks[1] != -1) + closesocket(socks[1]); + socks[0]= socks[1]= -1; +} + +#endif /*_WIN32 */ diff --git a/sql/socketpair.h b/sql/socketpair.h new file mode 100644 index 00000000000..d9f89c84ffa --- /dev/null +++ b/sql/socketpair.h @@ -0,0 +1,21 @@ +/* Copyright (c) 2023, MariaDB Plc + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#ifdef _WIN32 +C_MODE_START + int create_socketpair(SOCKET socks[2]); + void close_socketpair(SOCKET socks[2]); +C_MODE_END +#endif /* _WIN32 */