C/C++, Drizzle, MySQL

Drizzle Replication – The CommandReplicator and CommandApplier Plugin API

IMPORTANT:
This article is out of date and the replication API has been updated. Please see the follow-up article for the most up to date information!

OK, so here is the next installment in the Drizzle replication article series. Today, I’ll be talking about the flow of the Command message object through the CommandReplicator and CommandApplier APIs. If you missed the first article about the structure of the Command message and Google Protobuffers, you may want to read that first. We’ll only be talking in this article about what happens on one server. We will be discussing the Command Log in the next article, and then discuss how messages are passed from one server to another. But, before we discuss those things, it is critical to first understand the CommandReplicator and CommandApplier plugin classes, which are two of the abstract interfaces out of which replication modules can be built (and from which a number of modules have already been built).

In this article, I’ll be showing code that was written by myself and by Padraig O’Sullivan, another Drizzle contributor.

Flow of Command messages

SIDEBAR: Namespaces in Drizzle

The astute reader may have noticed that there is quite a big difference in the organization of the Drizzle codebase versus MySQL. Drizzle uses C++ namespaces to make the code easier to read and understand.

New code written for Drizzle is always namespaced in a namespace corresponding to the exact directory structure in which you find the source files for a particular class. For instance, all plugin interfaces classes (abstract base classes) are in the drizzled::plugin namespace and are found in /drizzled/plugin/. All protobuffer message classes are defined in /drizzled/message/ and are in the namespace drizzled::message. We feel organizing the code like so, and using C++ namespaces, makes the code clearer and easier to read. You can read more about Drizzle’s coding style, including how we use namespaces on our wiki.

Command messages are created by a component within the Drizzle kernel whenever any SQL statement which modifies the state of the server is executed. This component is called ReplicationServices. This component is a singleton object and manages communication between the kernel and two kinds of plugins, called CommandReplicator and CommandApplier. The component is declared in drizzled/replication_services.h and defined in drizzled/replication_services.cc.

The advantage of the ReplicationServices component frees replication module developers from having to understand anything about what’s going on in the kernel. You don’t have to understand anything about the Session object, how statements are executed in the kernel, or even how storage engines are actually applying changes to their data. All you need to learn is the structure of the Command message (covered in the previous article) and the very, very simple CommandReplicator and CommandApplier plugin APIs. So, let’s look at those APIs. 🙂

The CommandReplicator Plugin API

Here is the abstract base class drizzled::plugin::CommandReplicator, available in drizzled/plugin/command_replicator.h.

/**
 * Class which replicates Command messages
 */
class CommandReplicator
{
public:
  CommandReplicator() {}
  virtual ~CommandReplicator() {}
  /**
   * Replicate a Command message to a CommandApplier.
   *
   * @note
   *
   * It is important to note that memory allocation for the 
   * supplied pointer is not guaranteed after the completion 
   * of this function -- meaning the caller can dispose of the
   * supplied message.  Therefore, replicators and appliers 
   * implementing an asynchronous replication system must copy
   * the supplied message to their own controlled memory storage
   * area.
   *
   * @param Pointer to the applier of the command message
   * @param Command message to be replicated
   */
  virtual void replicate(CommandApplier *in_applier, drizzled::message::Command &to_replicate)= 0;
};

See, I told you it was simple 🙂 There is an additional isActive() method currently in this API, but this will disappear in the coming months as that method wil be “moving up” to a base Plugin class. You can ignore it for right now. The only other method which CommandReplicator plugins must implement is the replicate() method.

The replicate() method accepts only two parameters. The first is a pointer to an object which implements the CommandApplier interface (inherits from drizzled::plugin::CommandApplier). The second is the Command message that the ReplicationServices component constructs when a data-modification event occurs on the server.

The CommandApplier Plugin API

Before I show some implementation of CommandReplicators, let’s first quickly take a look at the drizzled::plugin::CommandApplier interface. It is equally simple.

/**
 * Base class for appliers of Command messages
 */
class CommandApplier
{
public:
  CommandApplier() {}
  virtual ~CommandApplier() {}
 /**
   * Apply something to a target.
   *
   * @note
   *
   * It is important to note that memory allocation for the 
   * supplied pointer is not guaranteed after the completion 
   * of this function -- meaning the caller can dispose of the
   * supplied message.  Therefore, appliers which are
   * implementing an asynchronous replication system must copy
   * the supplied message to their own controlled memory storage
   * area.
   *
   * @param Command message to be applied
   */
  virtual void apply(const drizzled::message::Command &to_apply)= 0;
};

The CommandApplier quite predictably implements a single method: apply(), which accepts a single parameter of a Command message. Yes, it’s that simple.

Putting it All Together

Before we get to the example implementation of the above plugin interfaces, it’s worth pointing out that Drizzle’s replication system allows multiple CommandReplicator and CommandApplier plugins to register themselves with the ReplicationServices component. The ReplicationServices::push() internal method simply loops through the active replicators and appliers, calling the replicate() method, passing in a constructed Command message and a pointer to each registered applier. Here is the code showing this process:

void ReplicationServices::push(message::Command &to_push)
{
  vector<plugin::CommandReplicator *>::iterator repl_iter= replicators.begin();
  vector<plugin::CommandApplier *>::iterator appl_start_iter, appl_iter;
  appl_start_iter= appliers.begin();
 
  plugin::CommandReplicator *cur_repl;
  plugin::CommandApplier *cur_appl;
 
  while (repl_iter != replicators.end())
  {
    cur_repl= *repl_iter;
    if (! cur_repl->isActive())
    {
      ++repl_iter;
      continue;
    }
 
    appl_iter= appl_start_iter;
    while (appl_iter != appliers.end())
    {
      cur_appl= *appl_iter;
 
      if (! cur_appl->isActive())
      {
        ++appl_iter;
        continue;
      }
 
      cur_repl->replicate(cur_appl, to_push);
      /* 
       * We update the timestamp for the last applied Command so that
       * publisher plugins can ask the replication services when the
       * last known applied Command was using the getLastAppliedTimestamp()
       * method.
       */
      last_applied_timestamp.fetch_and_store(to_push.timestamp());
      ++appl_iter;
    }
    ++repl_iter;
  }
}

Pretty simple, no? There’s a couple big things to point out about the code you’ve seen so far. First, Drizzle is focused on providing clean, simple, and encapsulated interfaces for plugin developers. There’s no need for plugin developers to understand the often-messy and poorly documented internals of the kernel. A plugin developer only needs to do two things:

  • Understand the well-documented Google Protobuffer message API and have a copy of the replication.proto file which contains the blueprint for the Command message
  • Implement either the drizzled::plugin::CommandReplicator::replicate() or drizzled::plugin::CommandApplier::apply() method

Secondly, the interface allows individuals and companies to completely customize how Drizzle replication works for them. You want replication to do X, Y, and Z? Fine, no problem. We give you some example plugins which clearly demonstrate the interfaces, and you can take it from there, without worrying about messing up the kernel. 🙂

So, now that you’ve seen the plugin interfaces, let’s discuss how these plugin interfaces are implemented in a couple plugins that are distributed with Drizzle right now.

The FilteredReplicator Plugin

After I wrote the default replicator plugin (which literally does nothing but the following:

void DefaultReplicator::replicate(plugin::CommandApplier *in_applier, message::Command &to_replicate)
{
  /* 
   * We do absolutely nothing but call the applier's apply() method, passing
   * along the supplied Command.  Yep, told you it was simple...
   */
  in_applier->apply(to_replicate);
}

Padraig O’Sullivan took the reins and wrote a FilteredReplicator plugin which allows a DBA to filter Command messages by schema name, table name, and a regular expression. His FilteredReplicator is much more interesting than the default replicator, so let’s take a look at some of his code. Here is the replicate() method from the FilteredReplicator plugin:

void FilteredReplicator::replicate(plugin::CommandApplier *in_applier, 
                                   message::Command &to_replicate)
{
  string schema_name;
  string table_name;
 
  /*
   * First, we check to see if the command consists of raw SQL. If so,
   * we need to parse this SQL and determine whether to filter the event
   * based on the information we obtain from the parsed SQL.
   * If not raw SQL, check if this event should be filtered or not
   * based on the schema and table names in the command message.
   */
  if (to_replicate.type() == message::Command::RAW_SQL)
  {
    parseQuery(to_replicate.sql(),
               schema_name,
               table_name);
  }
  else
  {
    schema_name.assign(to_replicate.schema());
    table_name.assign(to_replicate.table());
  }
 
  /*
   * Convert the schema name and table name strings to lowercase so that it
   * does not matter what case the table or schema name was specified in. We
   * also keep all entries in the vectors of schemas and tables to filter in
   * lowercase.
   */
  transform(schema_name.begin(), schema_name.end(),
                 schema_name.begin(), ::tolower);
  transform(table_name.begin(), table_name.end(),
                 table_name.begin(), ::tolower);
 
  if (isSchemaFiltered(schema_name) ||
      isTableFiltered(table_name))
  {
    return;
  }
 
   /*
   * We can now simply call the applier's apply() method, passing
   * along the supplied command.
   */
  in_applier->apply(to_replicate);
}

The above code is quite easy to read and understand. I encourage you to check out the rest of Padraig’s implementation in /plugin/filtered_replicator/filtered_replicator.cc and see just how easy it is to start writing new Drizzle replication modules.

The CommandLog::apply() Method

To demonstrate a CommandApplier implementation, I present the apply() method of the CommandLog plugin, which inherits from drizzled::plugin::CommandApplier. Without going into too much about the Command Log, which is the subject of the next article, I’ll let you take a look at just the apply() method, to see how a CommandApplier does its main job (see /plugin/command_log/command_log.cc):

void CommandLog::apply(const message::Command &to_apply)
{
  string buffer(""); /* Buffer we will write serialized command to */
 
  static const uint32_t HEADER_TRAILER_BYTES= sizeof(uint64_t) + /* 8-byte length header */
                                              sizeof(uint32_t); /* 4 byte checksum trailer */
 
  size_t length;
  ssize_t written;
  off_t cur_offset;
 
  to_apply.SerializeToString(&buffer);
 
  length= buffer.length(); 
 
  /*
   * Do an atomic increment on the offset of the log file position
   */
  cur_offset= log_offset.fetch_and_add((off_t) (HEADER_TRAILER_BYTES + length));
 
  /*
   * We adjust cur_offset back to the original log_offset before
   * the increment above...
   */
  cur_offset-= (off_t) (HEADER_TRAILER_BYTES + length);
 
  /* 
   * Quick safety...if an error occurs below, the log file will
   * not be active, therefore a caller could have been ready
   * to write...but the log is crashed.
   */
  if (unlikely(state == CRASHED))
    return;
 
  <span style="color: green;">/* We always write in network byte order */</span>
  unsigned char nbo_length[8];
  int8store(nbo_length, length);
 
  /* Write the length header */
  do
  {
    written= pwrite(log_file, nbo_length, sizeof(uint64_t), cur_offset);
  }
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
 
  if (unlikely(written != sizeof(uint64_t)))
  {
    errmsg_printf(ERRMSG_LVL_ERROR, 
                  _("Failed to write full size of command.  Tried to write %" PRId64 " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
                  sizeof(int64_t), 
                  (int64_t) cur_offset,
                  (int64_t) written, 
                  strerror(errno));
    state= CRASHED;
    /* 
     * Reset the log's offset in case we want to produce a decent error message including
     * the original offset where an error occurred.
     */
    log_offset= cur_offset;
    is_active= false;
    return;
  }
 
  cur_offset+= (off_t) written;
 
  /* 
   * Quick safety...if an error occurs above in another writer, the log 
   * file will be in a crashed state.
   */
  if (unlikely(state == CRASHED))
  {
    /* 
     * Reset the log's offset in case we want to produce a decent error message including
     * the original offset where an error occurred.
     */
    log_offset= cur_offset;
    return;
  }
 
  /* Write the command message itself */
  do
  {
    written= pwrite(log_file, buffer.c_str(), length, cur_offset);
  }
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
 
  if (unlikely(written != (ssize_t) length))
  {
    errmsg_printf(ERRMSG_LVL_ERROR, 
                  _("Failed to write full serialized command.  Tried to write %" PRId64 " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
                  (int64_t) length, 
                  (int64_t) cur_offset,
                  (int64_t) written, 
                  strerror(errno));
    state= CRASHED;
    log_offset= cur_offset;
    is_active= false;
  }
 
  cur_offset+= (off_t) written;
 
  /* 
   * Quick safety...if an error occurs above in another writer, the log 
   * file will be in a crashed state.
   */
  if (unlikely(state == CRASHED))
  {
    /* 
     * Reset the log's offset in case we want to produce a decent error message including
     * the original offset where an error occurred.
     */
    log_offset= cur_offset;
    return;
  }
 
  uint32_t checksum= 0;
 
  if (do_checksum)
  {
    checksum= crc32(0L, (unsigned char *) buffer.c_str(), length);
  }
 
  /* We always write in network byte order */
  unsigned char nbo_checksum[4];
  int4store(nbo_checksum, checksum);
 
  /* Write the checksum trailer */
  do
  {
    written= pwrite(log_file, nbo_checksum, sizeof(uint32_t), cur_offset);
  }
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
 
  if (unlikely(written != (ssize_t) sizeof(uint32_t)))
  {
    errmsg_printf(ERRMSG_LVL_ERROR, 
                  _("Failed to write full checksum of command.  Tried to write %" PRId64 " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
                  (int64_t) sizeof(uint32_t), 
                  (int64_t) cur_offset,
                  (int64_t) written, 
                  strerror(errno));
    state= CRASHED;
    log_offset= cur_offset;
    is_active= false;
    return;
  }
}

Conclusion, for Now…

I do hope this article has been useful in getting to know how the Drizzle replication system passes the Command message around within a single server. You got to see the replicator and applier plugin APIs and example implementations of those APIs in the form of the FilteredReplicator and CommandLog classes. Next, we’ll be looking in detail at the Command Log itself, its format, and writing/reading from it.

Please do email me (jaypipes@gmail.com) or feel free to post to the Drizzle Discussion list any criticism, suggestions, or requests. I’m eager to hear from you. 🙂