317 lines
12 KiB
C++
317 lines
12 KiB
C++
/* Copyright (c) 2023, 2024, Oracle and/or its affiliates.
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License, version 2.0,
|
|
as published by the Free Software Foundation.
|
|
|
|
This program is designed to work with certain software (including
|
|
but not limited to OpenSSL) that is licensed under separate terms,
|
|
as designated in a particular file or component or in included license
|
|
documentation. The authors of MySQL hereby grant you an additional
|
|
permission to link the program and your derivative works with the
|
|
separately licensed software that they have either included with
|
|
the program or referenced in the documentation.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License, version 2.0, for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
|
|
|
|
#include "sql/binlog/decompressing_event_object_istream.h"
|
|
#include "scope_guard.h" // Variable_scope_guard
|
|
#include "sql/mysqld.h" // PSI_stage_info
|
|
#include "sql/sql_class.h" // current_thd, THD
|
|
|
|
#include "mysql/binlog/event/compression/payload_event_buffer_istream.h"
|
|
|
|
namespace binlog {
|
|
|
|
Decompressing_event_object_istream::Decompressing_event_object_istream(
|
|
IBasic_binlog_file_reader &reader, const Memory_resource_t &memory_resource)
|
|
: m_binlog_reader(&reader),
|
|
m_memory_resource(memory_resource),
|
|
m_get_format_description_event([&]() -> Fde_ref_t {
|
|
return m_binlog_reader->format_description_event();
|
|
}) {}
|
|
|
|
Decompressing_event_object_istream::Decompressing_event_object_istream(
|
|
const Tple_ptr_t &transaction_payload_log_event,
|
|
Fde_ref_t format_description_event,
|
|
const Memory_resource_t &memory_resource)
|
|
: m_binlog_reader(nullptr),
|
|
m_memory_resource(memory_resource),
|
|
m_get_format_description_event(
|
|
[&]() -> Fde_ref_t { return format_description_event; }) {
|
|
begin_payload_event(transaction_payload_log_event);
|
|
}
|
|
|
|
Decompressing_event_object_istream::Decompressing_event_object_istream(
|
|
const Transaction_payload_log_event &transaction_payload_log_event,
|
|
Fde_ref_t format_description_event,
|
|
const Memory_resource_t &memory_resource)
|
|
: m_binlog_reader(nullptr),
|
|
m_memory_resource(memory_resource),
|
|
m_get_format_description_event(
|
|
[&]() -> Fde_ref_t { return format_description_event; }) {
|
|
begin_payload_event(transaction_payload_log_event);
|
|
}
|
|
|
|
#ifndef NDEBUG
|
|
Decompressing_event_object_istream::~Decompressing_event_object_istream() {
|
|
// operator bool or operator! has reported error or EOF, but the
|
|
// calling code has not checked which case it is. This is probably a
|
|
// programming mistake. Remember to always check for error after a
|
|
// read loop has ended.
|
|
assert(!m_outstanding_error);
|
|
}
|
|
#endif
|
|
|
|
void Decompressing_event_object_istream::set_verify_checksum(
|
|
bool verify_checksum) {
|
|
m_verify_checksum = verify_checksum;
|
|
}
|
|
|
|
Decompressing_event_object_istream::operator bool() const { return !m_end; }
|
|
|
|
bool Decompressing_event_object_istream::operator!() const { return m_end; }
|
|
|
|
std::string Decompressing_event_object_istream::get_error_str() const {
|
|
#ifndef NDEBUG
|
|
m_outstanding_error = false;
|
|
#endif
|
|
return m_error_str;
|
|
}
|
|
|
|
bool Decompressing_event_object_istream::has_error() const {
|
|
#ifndef NDEBUG
|
|
m_outstanding_error = false;
|
|
#endif
|
|
return !m_error_str.empty();
|
|
}
|
|
|
|
Decompressing_event_object_istream::Status_t
|
|
Decompressing_event_object_istream::get_status() const {
|
|
#ifndef NDEBUG
|
|
m_outstanding_error = false;
|
|
#endif
|
|
return m_status;
|
|
}
|
|
|
|
static Decompressing_event_object_istream::Status_t binlog_read_error_to_status(
|
|
Binlog_read_error::Error_type e) {
|
|
switch (e) {
|
|
case Binlog_read_error::SUCCESS:
|
|
return Decompressing_event_object_istream::Status_t::success;
|
|
case Binlog_read_error::READ_EOF:
|
|
return Decompressing_event_object_istream::Status_t::end;
|
|
case Binlog_read_error::MEM_ALLOCATE:
|
|
return Decompressing_event_object_istream::Status_t::out_of_memory;
|
|
case Binlog_read_error::BOGUS:
|
|
case Binlog_read_error::SYSTEM_IO:
|
|
case Binlog_read_error::EVENT_TOO_LARGE:
|
|
case Binlog_read_error::CHECKSUM_FAILURE:
|
|
case Binlog_read_error::INVALID_EVENT:
|
|
case Binlog_read_error::CANNOT_OPEN:
|
|
case Binlog_read_error::HEADER_IO_FAILURE:
|
|
case Binlog_read_error::BAD_BINLOG_MAGIC:
|
|
case Binlog_read_error::INVALID_ENCRYPTION_HEADER:
|
|
case Binlog_read_error::CANNOT_GET_FILE_PASSWORD:
|
|
case Binlog_read_error::READ_ENCRYPTED_LOG_FILE_IS_NOT_SUPPORTED:
|
|
case Binlog_read_error::ERROR_DECRYPTING_FILE:
|
|
case Binlog_read_error::EVENT_UNSUPPORTED_NEW_VERSION:
|
|
return Decompressing_event_object_istream::Status_t::corrupted;
|
|
case Binlog_read_error::TRUNC_EVENT:
|
|
case Binlog_read_error::TRUNC_FD_EVENT:
|
|
return Decompressing_event_object_istream::Status_t::truncated;
|
|
}
|
|
assert(false); // not reached
|
|
return Decompressing_event_object_istream::Status_t::success;
|
|
}
|
|
|
|
raii::Targeted_stringstream Decompressing_event_object_istream::error_stream(
|
|
Status_t status) {
|
|
DBUG_TRACE;
|
|
m_status = status;
|
|
raii::Targeted_stringstream stream(
|
|
m_error_str, "",
|
|
[]([[maybe_unused]] const std::string &s) { DBUG_LOG("info", s); });
|
|
if (m_embedded_event_number != 0)
|
|
stream << "Error reading embedded Log_event #" << m_embedded_event_number
|
|
<< " from Payload event";
|
|
else
|
|
stream << "Error reading Log_event";
|
|
stream << " at position " << m_event_position << ": ";
|
|
m_end = true;
|
|
return stream;
|
|
}
|
|
|
|
const Decompressing_event_object_istream::Grow_calculator_t &
|
|
Decompressing_event_object_istream::get_grow_calculator() const {
|
|
return m_grow_calculator;
|
|
}
|
|
|
|
void Decompressing_event_object_istream::set_grow_calculator(
|
|
const Grow_calculator_t &grow_calculator) {
|
|
m_grow_calculator = grow_calculator;
|
|
}
|
|
|
|
bool Decompressing_event_object_istream::decode_from_buffer(
|
|
Buffer_view_t &buffer_view, Event_ptr_t &out) {
|
|
DBUG_TRACE;
|
|
auto fde = m_get_format_description_event();
|
|
Variable_scope_guard disable_checksum_guard{fde.footer()->checksum_alg};
|
|
// Events contained in a Transaction_payload_log_event never have a
|
|
// checksum (regardless of configuration). So we have to temporarily
|
|
// disable checksums while decoding such an inner event. The API to
|
|
// control if we verify checksums is to set this member variable in
|
|
// the footer of the format_description_log_event.
|
|
fde.footer()->checksum_alg = mysql::binlog::event::BINLOG_CHECKSUM_ALG_OFF;
|
|
Log_event *ev{nullptr};
|
|
auto error = binlog_event_deserialize(buffer_view.data(), buffer_view.size(),
|
|
&fde, m_verify_checksum, &ev);
|
|
if (error != Binlog_read_error::SUCCESS) {
|
|
uint event_type = buffer_view.data()[EVENT_TYPE_OFFSET];
|
|
error_stream(binlog_read_error_to_status(error))
|
|
<< "Failed decoding event of type "
|
|
<< Log_event::get_type_str(event_type) << " (" << event_type
|
|
<< "): " << Binlog_read_error(error).get_str();
|
|
return true;
|
|
}
|
|
ev->common_header->log_pos = m_transaction_payload_event_offset;
|
|
DBUG_LOG("info", "SUCCESS. returning decompressed event of type "
|
|
<< ev->get_type_str());
|
|
out = Event_ptr_t(ev);
|
|
return false;
|
|
}
|
|
|
|
Decompressing_event_object_istream::Read_status
|
|
Decompressing_event_object_istream::read_from_payload_stream(Event_ptr_t &out) {
|
|
DBUG_TRACE;
|
|
if (!m_buffer_istream) {
|
|
// may happen if begin_payload_event failed with OOM
|
|
error_stream(Status_t::out_of_memory)
|
|
<< "Out of memory allocating buffer stream";
|
|
return Read_status::error;
|
|
}
|
|
// Update m_grow_calculator. We do it per event, not only when
|
|
// instantiating a payload_event_buffer_istream, so that user can
|
|
// set a Grow_calculator per event if needed.
|
|
m_buffer_istream->set_grow_calculator(m_grow_calculator);
|
|
// Fetch a buffer from the stream
|
|
Buffer_ptr_t buffer_ptr;
|
|
if (*m_buffer_istream >> buffer_ptr) {
|
|
if (decode_from_buffer(*buffer_ptr, out)) return Read_status::error;
|
|
++m_embedded_event_number;
|
|
return Read_status::success;
|
|
}
|
|
// At this point, we either reached EOF, or there was an error.
|
|
|
|
// Error? Then copy the message from the stream and return failure.
|
|
if (m_buffer_istream->has_error()) {
|
|
error_stream(m_buffer_istream->get_status())
|
|
<< m_buffer_istream->get_error_str();
|
|
return Read_status::error;
|
|
}
|
|
|
|
// Reached EOF in the payload. Then delete the stream and return eof.
|
|
DBUG_LOG("info", "EOF in compressed stream from payload event.");
|
|
m_embedded_event_number = 0;
|
|
m_transaction_payload_event_offset = 0;
|
|
m_buffer_istream.reset();
|
|
return Read_status::eof;
|
|
}
|
|
|
|
bool Decompressing_event_object_istream::read_from_binlog_stream(
|
|
Event_ptr_t &out) {
|
|
DBUG_TRACE;
|
|
assert(m_embedded_event_number == 0);
|
|
m_event_position = m_binlog_reader->position();
|
|
Log_event *ev = m_binlog_reader->read_event_object();
|
|
if (ev == nullptr) {
|
|
auto error = m_binlog_reader->get_error_type();
|
|
assert(error != Binlog_read_error::SUCCESS);
|
|
if (error == Binlog_read_error::READ_EOF) {
|
|
m_status = Status_t::end;
|
|
DBUG_LOG("info", "read_event_object returned nullptr, for EOF.");
|
|
} else {
|
|
DBUG_LOG("info", "read_event_object returned nullptr, for error.");
|
|
error_stream(binlog_read_error_to_status(error))
|
|
<< "Failed decoding event: " << m_binlog_reader->get_error_str();
|
|
}
|
|
return true;
|
|
}
|
|
|
|
DBUG_LOG("info", "SUCCESS. returning non-compressed event of type "
|
|
<< ev->get_type_str());
|
|
out = Event_ptr_t(ev);
|
|
|
|
// If we got a TPLE, prepare to unfold it on next invocation. Return
|
|
// the TPLE itself this time. Share pointer ownership between the
|
|
// Payload_event_buffer_istream and the API client.
|
|
if (ev->get_type_code() == mysql::binlog::event::TRANSACTION_PAYLOAD_EVENT)
|
|
begin_payload_event(
|
|
std::const_pointer_cast<const Transaction_payload_log_event>(
|
|
std::dynamic_pointer_cast<Transaction_payload_log_event>(out)));
|
|
|
|
return false;
|
|
}
|
|
|
|
Decompressing_event_object_istream &
|
|
Decompressing_event_object_istream::operator>>(Event_ptr_t &out) {
|
|
DBUG_TRACE;
|
|
// The following call to `out.reset()` is a memory usage
|
|
// optimization in the special case that `out` is the last owner of
|
|
// its object, which is the case in the usual pattern:
|
|
// while (stream >> event_ptr) {
|
|
// event_ptr->do_something();
|
|
// }
|
|
// This API guarantees that `out` will be replaced by either a valid
|
|
// object, or nullptr (on error or end-of-stream). Therefore, if
|
|
// `out` is the last owner, its object will be released before
|
|
// returning from this function. By calling `out.reset()` now, we
|
|
// release that memory before we allocate new memory for the new
|
|
// object we will return. Therefore, it ensures that we don't hold
|
|
// two events in memory at the same time.
|
|
out.reset();
|
|
|
|
// Read and decode the next event, either from a payload event or
|
|
// from a file stream, depending on the current state.
|
|
//
|
|
// Return false on success; true if error or EOF was reached.
|
|
auto work = [&]() -> bool {
|
|
// If we are processing a TPLE, decompress next event from there.
|
|
if (m_embedded_event_number != 0) {
|
|
switch (read_from_payload_stream(out)) {
|
|
case Read_status::success:
|
|
return false;
|
|
case Read_status::error:
|
|
return true;
|
|
case Read_status::eof:
|
|
break; // fallthrough to read next event from file
|
|
}
|
|
}
|
|
// If this stream was instantiated as reading from just one TPLE
|
|
// event, not a Binlog_reader that yields multiple events, then we
|
|
// have reached EOF.
|
|
if (m_binlog_reader == nullptr) {
|
|
m_status = Status_t::end;
|
|
return true;
|
|
}
|
|
DBUG_LOG("info", "Reading non-compressed event.");
|
|
return read_from_binlog_stream(out);
|
|
};
|
|
if (work()) {
|
|
m_buffer_istream.reset();
|
|
m_end = true;
|
|
#ifndef NDEBUG
|
|
m_outstanding_error = true;
|
|
#endif
|
|
}
|
|
return *this;
|
|
}
|
|
|
|
} // namespace binlog
|