C/C++, Drizzle, MySQL

Drizzle Replication – The Transaction Log

In this installment of my Drizzle Replication blog series, I’ll be talking about the Transaction Log. Before reading this entry, you may want to first read up on the Transaction Message, which is a central concept to this blog entry.

The transaction log is just one component of Drizzle’s default replication services, but it also serves as a generalized log of atomic data changes to a particular server. In this way, it is only partially related to replication. The transaction log is used by components of the replication services to store changes made to a server’s data. However, there is nothing that mandates that this particular transaction log be a required feature for Drizzle replication systems. For instance, Eric Lambert is currently working on a Gearman-based replication service which, while following the same APIs, does not require the transaction log to function. Furthermore, other, non-replication-related modules may use the transaction log themselves. For instance, a future Recovery and/or Backup module may just as easily use the transaction log for its own purposes as well.

Before we get into the details, it’s worth noting the general goals we’ve had for the transaction log, as these goals may help explain some of the design choices made. In short, the goals for the transaction log are:

  • Introduce no global contention points (mutexes/locks)
  • Once written, the transaction log may not be modified
  • The transaction log should be easily readable in multiple programming languages

Overview of the Transaction Log Structure




The format of the transaction log is simple and straightforward. It is a single file that contains log entries, one after another. These log entries have a type associated with them. Currently, there are only two types of entries that can go in the transaction log: a Transaction message entry and a BLOB entry. We will only cover the Transaction message entry in this article, as I’ll leave how to deal with BLOBs for a separate article entirely.

Each entry in the transaction log is preceded by a 4 bytes containing an integer code identifying the type of entry to follow. The bytes which follow this type header are interpreted based on the type of entry. For entries of type Transaction message, the graphics here show the layout of the entry in the log. First, a 4 byte length header is written, then the serialized Transaction message, then a 4 byte checksum of the serialized Transaction message.


Details of the TransactionLog::apply() Method

For those interested in how the transaction log is written to, I’m going to detail the apply() method of the TransactionLog class in /plugin/transaction_log/transaction_log.cc. The TransactionLog class is simply a subclass of plugin::TransactionApplier and therefore must implement the single pure virtual apply method of that class interface.

The TransactionLog class has a private drizzled::atomic<off_t> called log_offset which is an offset into the transaction log file that is incremented with each atomic write to the log file. You will notice in the code below that this atomic off_t is stored locally, then incremented by the total length of the log entry to be written. A buffer is then written to the log file using pwrite() at the original offset. In this way, we completely avoid calling pthread_mutex_lock() or similar when writing to the log file, which should increase scalability of the transaction log.

void TransactionLog::apply(const message::Transaction &to_apply)
{
  uint8_t *buffer; /* Buffer we will write serialized header, message and trailing checksum to */
  uint8_t *orig_buffer;
 
  int error_code;
  size_t message_byte_length= to_apply.ByteSize();
  ssize_t written;
  off_t cur_offset;
  size_t total_envelope_length= HEADER_TRAILER_BYTES + message_byte_length;
 
  /*
   * Attempt allocation of raw memory buffer for the header,
   * message and trailing checksum bytes.
   */
  buffer= static_cast<uint8_t *>(malloc(total_envelope_length));
  if (buffer == NULL)
  {
    errmsg_printf(ERRMSG_LVL_ERROR,
      _("Failed to allocate enough memory to buffer header, transaction message, "
        "and trailing checksum bytes. Tried to allocate %" PRId64
        " bytes.  Error: %s\n"),
    static_cast<uint64_t>(total_envelope_length),
    strerror(errno));
    state= CRASHED;
    deactivate();
    return;
  }
  else
    orig_buffer= buffer; /* We will free() orig_buffer, as buffer is moved during write */
 
  /*
   * Do an atomic increment on the offset of the log file position
   */
  cur_offset= log_offset.fetch_and_add(static_cast<off_t>(total_envelope_length));
 
  /*
   * We adjust cur_offset back to the original log_offset before
   * the increment above…
   */
 cur_offset-= static_cast((total_envelope_length));
 
  /*
   * Write the header information, which is the message type and
   * the length of the transaction message into the buffer
   */
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
    static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
    static_cast<uint32_t>(message_byte_length), buffer);
 
  /*
   * Now write the serialized transaction message, followed
   * by the optional checksum into the buffer.
   */
  buffer= to_apply.SerializeWithCachedSizesToArray(buffer);
  uint32_t checksum= 0;
  if (do_checksum)
  {
    checksum= drizzled::hash::crc32(reinterpret_cast<char *>(buffer) – 
                     message_byte_length, message_byte_length);
  }
 
  /* We always write in network byte order */
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, buffer);
  /*
   * Quick safety…if an error occurs above in another writer, the log
   * file will be in a crashed state.
   */
  if (unlikely(state == CRASHED))
  {
    /*
     * Reset the log’s offset in case we want to produce a decent error message including
     * the original offset where an error occurred.
     */
    log_offset= cur_offset;
    free(orig_buffer);
    return;
  }
 
  /* Write the full buffer in one swoop */
  do
  {
    written= pwrite(log_file, orig_buffer, total_envelope_length, cur_offset);
  }
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal… */
 
  if (unlikely(written != static_cast<ssize_t>(total_envelope_length)))
  {
    errmsg_printf(ERRMSG_LVL_ERROR,
     _(“Failed to write full size of transaction.  Tried to write %” PRId64
        ” bytes at offset %” PRId64 “, but only wrote %” PRId32 ” bytes.  Error: %s\n“),
        static_cast<uint64_t>(total_envelope_length),
        static_cast<uint64_t>(cur_offset),
        static_cast<uint64_t>(written),
        strerror(errno));
      state= CRASHED;
      /*
       * Reset the log’s offset in case we want to produce a decent error message including
       * the original offset where an error occurred.
       */
      log_offset= cur_offset;
      deactivate();
  }
  free(orig_buffer);
  error_code= my_sync(log_file, 0);
  if (unlikely(error_code != 0))
  {
    errmsg_printf(ERRMSG_LVL_ERROR,
      _(“Failed to sync log file. Got error: %s\n“),
      strerror(errno));
  }
}

Reading the Transaction Log

OK, so the above code shows how the transaction log is written. What about reading the log file? Well, it’s pretty simple. There is an example program in /drizzle/message/transaction_reader.cc which has code showing how to do this. Here’s a snippet from that program:

int main(int argc, char* argv[])
{
  …
  message::Transaction transaction;
 
  file= open(argv[1], O_RDONLY);
  if (file == -1)
  {
    fprintf(stderr, _(“Cannot open file: %s\n“), argv[1]);
    return -1;
  }
      …
  protobuf::io::ZeroCopyInputStream *raw_input=
    new protobuf::io::FileInputStream(file);
  protobuf::io::CodedInputStream *coded_input=
    new protobuf::io::CodedInputStream(raw_input);
 
  char *buffer= NULL;
  char *temp_buffer= NULL;
  uint32_t length= 0;
  uint32_t previous_length= 0;
  uint32_t checksum= 0;
  bool result= true;
  uint32_t message_type= 0;
 
  /* Read in the length of the command */
  while (result == true &&
           coded_input->ReadLittleEndian32(&message_type) == true &&
           coded_input->ReadLittleEndian32(&length) == true)
  {
      if (message_type != ReplicationServices::TRANSACTION)
      {
        fprintf(stderr, _("Found a non-transaction message "
                            "in log.  Currently, not supported.\n"));
        exit(1);
      }
 
      if (length > INT_MAX)
      {
        fprintf(stderr, _(“Attempted to read record bigger than INT_MAX\n“));
        exit(1);
      }
 
      if (buffer == NULL)
      {
        temp_buffer= (char *) malloc(static_cast<size_t>(length));
      }
      /* No need to allocate if we have a buffer big enough… */
      else if (length > previous_length)
      {
        temp_buffer= (char *) realloc(buffer, static_cast<size_t>(length));
      }
 
      if (temp_buffer == NULL)
      {
        fprintf(stderr, _("Memory allocation failure trying to "
                            "allocate %" PRIu64 " bytes.\n"),
                 static_cast<uint64_t>(length));
        break;
      }
      else
        buffer= temp_buffer;
 
      /* Read the Command */
      result= coded_input->ReadRaw(buffer, (int) length);
      if (result == false)
      {
        fprintf(stderr, _(“Could not read transaction message.\n“));
        fprintf(stderr, _(“GPB ERROR: %s.\n“), strerror(errno));
        fprintf(stderr, _(“Raw buffer read: %s.\n“), buffer);
        break;
      }
 
      result= transaction.ParseFromArray(buffer, static_cast<int32_t>(length));
      if (result == false)
      {
        fprintf(stderr, _(“Unable to parse command. Got error: %s.\n“),
                 transaction.InitializationErrorString().c_str());
        if (buffer != NULL)
          fprintf(stderr, _(“BUFFER: %s\n“), buffer);
        break;
    }
    /* Print the transaction */
    printTransaction(transaction);
 
    /* Skip 4 byte checksum */
    coded_input->ReadLittleEndian32(&checksum);
 
    if (do_checksum)
    {
      if (checksum != drizzled::hash::crc32(buffer, static_cast<size_t>(length)))
      {
        fprintf(stderr, _("Checksum failed. Wanted %" PRIu32
                              " got %" PRIu32 "\n"),
                 checksum,
                 drizzled::hash::crc32(buffer, static_cast<size_t>(length)));
      }
    }
    previous_length= length;
  }
 
  if (buffer)
    free(buffer);
  delete coded_input;
  delete raw_input;
  return (result == true ? 0 : 1);
}

Shortcomings of the Transaction Log

So far, we’ve generally focused on a scalable design for the transaction log and have not spent too much time on performance tuning the code — and yes, performance != scalability. There are a number of problems with the current code which we will address in future versions of the transaction log. Namely:

  • Reduce calls to malloc(). Currently, each write of a transaction message to the log file incurs a call to malloc() to allocate enough memory to store the serialized log entry. Clearly, this is not optimal. We’ve considered a number of alternate approached to calling malloc(), including having a scoreboard approach where a vector of memory slabs are used in a round-robin fashion. This would introduce some locking, however. Also, I’ve thought about using a hazard pointer list on the Session object to have previously-allocated memory on the Session object be used for something like this. But, these ideas must be hashed out further.
  • There is no index into the transaction log. This is not a problem for writing the transaction log, of course, but for readers of the transaction log. I’m in the process of creating classes and a library for building indexes for a transaction log and, in addition, creating archived snapshots to enable log shipping for Drizzle replication. I’ll be pushing code for this to Launchpad later this week and will write a new article about log shipping and snapshot creation.
  • Each call to apply() calls fdatasync()/fsync() on the transaction log. Certain environments may consider this to be too strict a sync requirement, since the storage engine may already keep a transaction log file of its own that is also synced. For instance, InnoDB has a transaction log that, depending on the setting of InnoDB configuration variables, may call fdatasync() upon every transaction commit. It would be best to have the syncing behaviour be user-adjustable — for instance, a setting to allow the transaction log to be synced every X number of seconds…

Summary and Request for Comments

That’s it for the discussion about the transaction log. I’ll post some more code examples from the replication plugins which utilize the transaction log in a later blog entry.

What do you think of the design of the transaction log? What would you change? Comments are always welcome! Cheers. 🙂