Drizzle Replication – The Command Message

IMPORTANT:
This article is out of date and the replication API has been updated. Please see the follow-up article for the most up to date information!

I wanted to start writing about how Drizzle’s new replication system works, how its internals are structured, how logs are formatted, what are its (current) limitations, what are planned features, how you can get involved in development, and a lot more. Before jumping in, you may want to read a quick overview about the concepts of Drizzle replication here.

Fortunately, some advice from my friend Edwin DeSouza got me back to reality: “Jay, do a series of small, targeted, easily digestible blog posts”. And, so, this is the first in a series of quick blog posts about Drizzle replication. Today, I’ll start at the very beginning and talk about the basic unit of “currency” in Drizzle’s replication system: the Command Message.

Background

When two servers in a replicated environment must communicate with each other about changes in the state of one server, they must do so by sending raw bytes of information across some port or socket. One server must “package up” information about what changed and send it to the other server. The receiving server must then “unpack” the data and interpret it, subsequently applying the data change to its own schema.

In the image above, all of the arrows represent Command Message objects being passed between components within a single server or across the wire to another server.

This packing and unpacking of raw bytes is fairly complicated, and can get downright confusing when you take into account that different hardware architectures store raw bytes in different orders. Code needed to interpret raw streams of data is complex and, more importantly, makes the resulting code difficult to read. Furthermore, what we want to focus on in development is what to do with the unpacked data messages, not how to pack, unpack, and serialize the darn things.

Google Protobuffers to the Rescue

At Drizzle, we’re all about laziness. We don’t reinvent wheels if there is an excellent library that already does what we need.

It just so happens that a library exists that can solve the low-level problems of packing (serializing), unpacking (deserializing or parsing), and “versioning” formatted binary streams of data so that raw data can change its structure without having to re-implement all new serialization and deserialization routines. Enter Google Protobuffers.

The Google Protobuffers library (just GPB from now on) allows you to create a text file called a “proto” that contains a blueprint of a “message” class that represents some piece of data. This proto file is then consumed by the protoc program, which generates code files in a variety of programming languages. The code files contain class definitions of the message you define in your proto file.

This may all seem a bit strange until I show you exactly how it all works…and I will do so shortly for a C++ example. But first, let’s take a look at the proto file (the blueprint if you will) of the Command message, which stores information about data changes occurring on a server. Here is a section of the proto file, stored in /drizzled/message/replication.proto:

import "table.proto";

package drizzled.message;
option optimize_for = SPEED;


/*
  Context for a transaction.
*/

message TransactionContext
{
  required int32 server_id = 1;
  required uint64 transaction_id = 2; /* Globally-unique transaction ID */
}

/*
  Insert one record into a single table.
*/

message InsertRecord
{
  repeated Table.Field insert_field = 3;
  repeated bytes insert_value = 4;
}


/*
  Update one record in a single table.
*/

message UpdateRecord
{
  repeated Table.Field update_field = 3;
  repeated bytes before_value = 4;
  repeated bytes after_value = 5;
  repeated Table.Field where_field = 6;
  repeated bytes where_value = 7;
}


/*
  Deletes one record in a single table
*/

message DeleteRecord
{
  repeated Table.Field where_field = 3;
  repeated bytes where_value = 4;
}


/*
  A component of a transaction -- a single instruction or command
*/

message Command
{
  enum Type 
  {
    START_TRANSACTION = 0;        /* A START TRANSACTION statement */
    COMMIT = 1;                   /* A COMMIT statement */
    ROLLBACK = 2;                 /* A ROLLBACK statement */
    INSERT = 3;                   /* An insert of a single record */
    DELETE = 4;                   /* A delete of a single record */
    UPDATE = 5;                   /* An update of a single record */
    RAW_SQL = 6;                  /* A raw SQL statement */
  }
  required Type type = 1;
  required uint64 timestamp = 2;  /* A nanosecond precision timestamp */

  /* 
    Transaction Context is duplicated here so that Commands may
    be sent over the wire separately from the rest of the Commands in
    a transaction.
  */

  required TransactionContext transaction_context = 3;
  optional string schema = 4;     /* The schema affected */
  optional string table = 5;      /* The table affected */

  optional string sql = 6;  /* May contain the actual SQL supplied for the original statement */


  /* 
    The below implement the actual change.  Each Command will 
    have zero or one of the below sub-messages defined. 
  */

  optional InsertRecord      insert_record = 7;
  optional DeleteRecord      delete_record = 8;
  optional UpdateRecord      update_record = 9;
}

The above proto file format defines the members of the Command message and some sub-messages that are pieces of a Command message.

There are exactly three required elements of a Command message:

  • type — an enumeration of the type of Command the message is. Does it insert, update, or delete a record? Does it start, rollback, or commit a transaction? Is it simply a raw SQL statement? The type element answers this.
  • timestamp — a timestamp of when the Command occurred on the server, to nanosecond precision.
  • transaction_context — this is an element which is a sub-message containing elements describing the ID of the server this command occurred on and the global transaction identifier the Command belongs to.

There are a number of optional members of the Command class, including the actual SQL string used in the original statement, and a set of optional sub-message classes that contain specialized data for certain types of commands.

Creating Generated Source Files

There’s probably a lot of readers currently asking themselves (or yelling at their blog reader) what the heck is the big deal about the above…I mean, it looks just like a simple definition of a POD (plain old data) class. Well, the real magic happens when you compile the above proto file into a source code file, and are able to take advantage of the GPB framework. Let’s do that now. We use the protoc program, passing in a couple simple arguments. Here, I’ll generate a source file with code creating C++ classes for the message definitions in our proto file. Drizzle keeps all the proto definition files for its message classes in the /drizzled/message/ subdirectory, so I’ll change to that directory and generate the source files:

jpipes@serialcoder:~/repos/drizzle/trunk$ cd drizzled/message/
jpipes@serialcoder:~/repos/drizzle/trunk/drizzled/message$ protoc --cpp_out=. replication.proto 
jpipes@serialcoder:~/repos/drizzle/trunk/drizzled/message$ ls -lah replication.*
-rw-r--r-- 1 jpipes jpipes  95K 2009-08-12 06:48 replication.pb.cc
-rw-r--r-- 1 jpipes jpipes  66K 2009-08-12 06:48 replication.pb.h
-rw-r--r-- 1 jpipes jpipes 3.4K 2009-08-03 10:16 replication.proto

The --cpp_out=. simply directs protoc to generate C++ files, and look for proto files in the current directory. As you can see, protoc generated two files: replication.pb.h and replication.pb.cc.

Here’s the beauty of GPB: there’s absolutely no reason for you to ever look at the code in either of these generated files at all, unless you are just curious as to the style of code that GPB generates. The excellent online documentation for GPB contains all the information you’ll ever need in order to use these generated code files. All C++ classes generated by GPB will always follow an identical interface, so there is no reason to look at the generated files at all. We can always look at our .proto file (which is much simpler and shorter) to determine what methods our generated classes will have for us to use.

I already stated that the advantage of GPB is that serialization and deserialization is already done for you. Let’s look at some code that is in the Drizzle distribution which demonstrates using the Command message.

Constructing Command Messages

Inside Drizzle’s kernel, there is a component called drizzled::ReplicationServices which is responsible for constructing Command messages when data change events occur in the server and then passing these Command messages to replicators that are listening for change events. Here is some code from the drizzled::ReplicationServices::insertRecord() method which constructs a Command message.

#include <drizzled/replication.pb.h>
...
void ReplicationServices::setCommandTransactionContext(message::Command &in_command,
                                                       Session *in_session) const
{
  message::TransactionContext *trx= in_command.mutable_transaction_context();
  trx->set_server_id(in_session->getServerId());
  trx->set_transaction_id(in_session->getTransactionId());

  in_command.set_session_id((uint32_t) in_session->getSessionId());
}
...
void ReplicationServices::insertRecord(Session *in_session, Table *in_table)
{
  ...
  message::Command command;
  command.set_type(message::Command::INSERT);
  command.set_timestamp(in_session->getCurrentTimestamp());

  setCommandTransactionContext(command, in_session);

  const char *schema_name= in_table->getShare()->db.str;
  const char *table_name= in_table->getShare()->table_name.str;

  command.set_schema(schema_name);
  command.set_table(table_name);

  /* 
   * Now we construct the specialized InsertRecord command inside
   * the message::Command container...
   */
  message::InsertRecord *change_record= command.mutable_insert_record();

  Field *current_field;
  Field **table_fields= in_table->field;
  String *string_value= new (in_session->mem_root) String(ReplicationServices::DEFAULT_RECORD_SIZE);
  string_value->set_charset(system_charset_info);

  message::Table::Field *current_proto_field;

  /* We will read all the table's fields... */
  in_table->setReadSet();

  while ((current_field= *table_fields++) != NULL) 
  {
    current_proto_field= change_record->add_insert_field();
    current_proto_field->set_name(current_field->field_name);
    current_proto_field->set_type(message::Table::Field::VARCHAR); /* @TODO real types! */
    string_value= current_field->val_str(string_value);
    change_record->add_insert_value(string_value->c_ptr());
    string_value->free();
  }
  
  push(command);
}

The code above shows just how easy it is to construct a Command message containing information about a new record that has been inserted into a schema on the server.

We start by simply creating a message::Command variable on the stack, named “command”. We then set the required member variables of the Command by calling the set_type() and set_timestamp() methods of the Command message and the set_server_id() and set_transaction_id() methods of the TransactionContext submessage of the Command object.

When you create any GPB message class, any scalar member variables (variables that are not other message classes or arrays of things), the class will always have “setter” methods that follow the naming convention set_xxx(), where xxx is the exact name of the field. In the above case, you can clearly see the correlation between the “type” data member and the set_type() method.

But, what about those methods which begin with mutable_ ? Those member methods return a pointer to a data member of a class whose type is another message class. For example, the mutable_transaction_context() method of the Command message returns a pointer to a TransactionContext object that may have its own data members set using setter methods. It is important to note that GPB-generated code follows strict RAII and encapsulation principles, like good C++ libraries always do. This means that the pointers returned by mutable_xxx() methods of a message class are managed by the class itself, and you do not need to manage the memory yourself. When the Command message’s destructor is called, it will clean up any resources it has allocated for sub-messages like the TransactionContext class.

Accessing Command Message data members

OK, so above you see example code which sets the data members of a Command message. Let’s check out some code which accesses the data member of the Command message. Here is a snippet of code from the SubscriberApplier class in the async_replication module (/plugin/async_replication/subscriber_applier.cc):

void SubscriberApplier::apply(const message::Command &to_apply)
{
  string query;
...
  switch (to_apply.type())
  {
...
  case INSERT:
    query.assign("INSERT INTO `");
    query.append(to_apply.schema());
    query.append("`.`");
    query.append(to_apply.table());
    query.append("` (");

    const message::InsertRecord &record= to_apply.insert_record();

    int32_t num_fields= record.insert_field_size();

    int32_t x;
    for (x= 0; x < num_fields; x++)
    {
      if (x != 0)
        query.push_back(',');

      const message::Table::Field f= record.insert_field(x);

      query.push_back('`');
      query.append(f.name());
      query.push_back('`');
    }

    query.append(") VALUES ");

    /* 
    * There may be an INSERT VALUES (),() type statement.  We know the
    * number of records is equal to the field_values array size divided
    * by the number of fields.
    *
    * So, we do an inner and an outer loop.  Outer loop is on the number
    * of records and the inner loop on the number of fields.  In this way, 
    * we know that record.field_values(outer_loop * num_fields) + inner_loop))
    * always gives us our correct field value.
    */
    int32_t num_records= (record.insert_value_size() / num_fields);
    int32_t y;
    for (x= 0; x < num_records; x++)
    {
      if (x != 0)
        query.push_back(',');

      query.push_back('(');
      for (y= 0; y < num_fields; y++)
      {
        if (y != 0)
          query.push_back(',');

        query.push_back('"');
        query.append(record.insert_value((x * num_fields) + y));
        query.push_back('"');
      }
      query.push_back(')');
    }
    break;
...
  }

  result= dispatch_command(server_command, session, query.c_str(), query.length()) == false;
  ...
}

In the above code sample, you can see that "getter" methods for a Command class fall into a few different categories. For simple, scalar data members or sub-message data members, the getter method is simply the name of the data member itself, and these methods always return const references to their data members. For instance, Command::schema() returns a const std::string& with the the name of the schema and Command::transaction_context() returns a const message::TransactionContext& of the transaction_context message class data member.

For optional data members, there will always be a has_xxx "check for existence" method. So, since the "sql" data member of the Command message is optional, one might do the following:

message::Command &command= get_some_command_object();
if (command.has_sql())
{
  cout << command.sql() << endl;
}

Which would print out the SQL statement of the Command message if the sql data member has been set.

For data members that are of the "repeated" type (for instance, the InsertRecord message's insert_value data member, which is of type repeated string), the number of individual elements in the vector is always accessed with a method called xxx_size(), where xxx is the exact name of the data member. You can see in the example code above calls to insert_record.insert_value_size(), which returns the count of the number of elements in the insert_value data member.

To access specific elements of a repeated data member, simply call the method named for the data member, passing in the index of the element you wish to access.

Conclusion, For Now...

The above is just a quick introduction to the Command message, which is the basic unit of currency in Drizzle's replication system. The next article will show how the Command message is passed through the plugin::CommandReplicator and plugin::CommandApplier interfaces, the basic worker plugins of the Drizzle replication system. After that, we'll be discussing the format of the Command message log.

Questions on anything above? Please do write to the drizzle-discuss@ mailing list or drop my an email to joinfu@sun.com. Cheers.

UPDATE: Read the next article on the CommandReplicator and CommandApplier interfaces.

  • Mark Callaghan

    It will take me a while to read and comprehend this. But I really appreciate the way that Drizzle shares details like this. And ‘easy’ doesn’t begin to describe the value of a clean interface like this for making replication better.

  • Stuart

    Well written and easy to read. Thank you.
    Can you back up a bit and explain what how it’s supposed to function, so that I can relate that to the mechanics of it. Is it a P2P type of arrangement, or Master/Slave(‘s)? Can you set up a cluster of Drizzle servers for load balancing, and backup? If a server is offline, will it automatically synchronise when it’s back online? What if the master fails?

    • http://jpipes.com Jay Pipes

      “Can you back up a bit and explain what how it’s supposed to function, so that I can relate that to the mechanics of it.”

      All in due time :) I’m writing the series of articles in a way that should allow folks to really understand the guts of the replication system in a systematic manner, so this was just a starter article to familiarize readers with the concepts of Google Protobuffers and the Command message which is logged and passed around in the new replication system.

      “Is it a P2P type of arrangement, or Master/Slave(‘s)?”

      The new replication system is designed to be extremely flexible and modular. I am working on a module called “async_replication” which will be the default replication system and will behave similarly to MySQL’s master/slave model. Lots more information to come on that. Keep in mind, it is very early in development, and literally as I write code for the module, I’ll be writing articles about it and explaining design choices asking for input, suggestions, and criticism on the code.

      “Can you set up a cluster of Drizzle servers for load balancing, and backup?”

      This is not currently in the scope of my work, but there’s no reason at all such features cannot be written as either external programs/scripts, or modules for Drizzle.

      “If a server is offline, will it automatically synchronise when it’s back online? What if the master fails?”

      There is work going on to design hot-standby/failover, yes. I wouldn’t expect it to hit the trees for a few months, though.

      Cheers!

      Jay

  • Bill

    I understand that this is a foundational article, so its sorta light on the overall explanation. But its great to hear a decent sized project is using protocol buffers. It just makes it that much easier to justify using it in other projects, instead of hacking together an in house solution.

  • http://scale-out-blog.blogspot.com Robert Hodges

    This is a great intro to programming with Command Message classes. One question–will your future posts cover the high-level message workflow? For instance, does a replicator plug-in get Command messages in the order they are logged or all at once at commit time? One of the virtues of the MySQL binlog is that it provides transactions in serial order, which makes it far easier to process than reading a “real” log like Oracle redo logs or the PostgreSQL WAL.

    • http://jpipes.com Jay Pipes

      “This is a great intro to programming with Command Message classes. One question–will your future posts cover the high-level message workflow?”

      Yep, the very next article describes the flow of the Command message through the system.

      “For instance, does a replicator plug-in get Command messages in the order they are logged or all at once at commit time?”

      One at a time, in the order in which they are executed on the server. It is up to the author of some semi-or-full synchronous replication module to “wrangle” Command messages into a Transaction message (I’ll explain that in a later article, too) :)

      “One of the virtues of the MySQL binlog is that it provides transactions in serial order, which makes it far easier to process than reading a “real” log like Oracle redo logs or the PostgreSQL WAL.”

      Wait until the 3rd article (on the command log) for this :)

      Cheers, and see you next week!

      Jay

      • http://scale-out-blog.blogspot.com Robert Hodges

        Thanks Jay. I’m looking forward to more articles as well as a solid drink from the Drizzle fire hose with your team next week.

  • Pingback: Drizzle Replication – The CommandReplicator and CommandApplier Plugin API « join-fu!()

  • Pingback: Recent Work on Improving Drizzle’s Storage Engine API « join-fu!()

  • http://spsneo.com/blog Siddharth Prakash Singh

    Link in the Update section is broken.

  • http://spsneo.com/blog Siddharth Prakash Singh

    Link in the Update section is broken.

  • http://www.louisvuittonbagmall.com/ louis

    Link in the Update section is broken.