Drizzle Replication – Changes in API to support Group Commit

Hi all. It’s been quite some time since my last article on the new replication system in Drizzle. My apologies for the delay in publishing the next article in the replication series.

The delay has been due to a reworking of the replication system to fully support “group commit” behaviour and to support fully transactional replication. The changes allow replicator and applier plugins to understand much more about the actual changes which occurred on the server, and to understand the transactional container properly.

The goals of Drizzle‘s replication system are as follows:

  • Make replication modular and not dependent on one particular implementation
  • Make it simple and fun to develop plugins for Drizzle replication
  • Encapsulate all transmitted information in an efficient, portable, and standard format

This article serves to build on the last article and explain the changes to the Google Protobuffer message definitions used in the replication API. The actual replication API described in the last article remains almost the same. However, instead of being named CommandApplier and CommandReplicator, those plugin base classes are now named TransactionApplier and TransactionReplicator respectively. And, instead of consuming a Command message, they consume Transaction messages.


For my friend Edwin‘s benefit, I’ll be including lots of pretty graphics. :) For my developer readers, I’m including lots of example C++ code to help you best understand how to read and manipulate the Transaction and Statement messages in the new replication system.

New Message Definitions

As I mentioned above, the Command message previously discussed in the first replication article, has been changed in favour of a more space-efficient and transactional message format. The proto file is now called /drizzled/message/transaction.proto. You can look at the proto file online.

The Command Message has become the Statement message, and a new Transaction message serves as a container for multiple Statement messages representing (for most cases) an atomic change in the state of the database server. I’ll discuss later in the article those specific cases where a Transaction message’s contents may contain only a partial atomic change to the server.

The image to the right depicts the Transaction message container. As you can see, the Transaction message contains two things: a TransactionContext message and an array of one or more Statement messages.

The TransactionContext Message

Each Transaction message contains a single TransactionContext message. The TransactionContext message contains information about the entire transaction. The data members of the TransactionContext are as follows:

  • server_id – (uint32_t) A numeric identifier for the server which executed this transaction
  • transaction_id – (uint64_t) A globally-unique transaction identifier
  • start_timestamp – (uint64_t) A nano-second precision timestamp of when the transaction began.
  • end_timestamp – (uint64_t) A nano-second precision timestamp of when the transaction completed.

Since TransactionContext is simply a Google Protobuffer message, accessing data members is simple and straightforward. If you’re writing a replicator or applier, a reference to a const Transaction message will be supplied to you via the standard API. For instance, let’s assume we’re writing a replicator and we want to filter all messages that are from the server with a server_id of 100. Kind of a silly example, but nevertheless, it allows us to see some example code.

As you may remember, the API for a replicator is dirt simple. There is a replicate() pure virtual method which accepts two parameters, the GPB message and a reference to the Applier which will “apply” the message to some target. The new function signature is the same as the last one, with the term “Command” replaced with the term “Transaction”:

  1. virtual void replicate(TransactionApplier *in_applier,
  2.                        message::Transaction &to_replicate)= 0;

Suppose our replicator class is called MyReplicator. Here is how to query the transaction context of the Transaction message and filter out transactions coming from server #100. :)

  1. void MyReplicator::replicate(TransactionApplier *in_applier,
  2.                         message::Transaction &to_replicate)
  3. {
  4.   const message::TransactionContext &ctx= to_replicate.transaction_context();
  5.   if (ctx.server_id() != 100)
  6.     in_applier->apply(to_replicate);
  7. }

See? Pretty darn simple. :) OK, on to the Statement message, which is slightly more complicated.

The Statement Message

As noted above, the Transaction message contains an array of Statement messages. In Protobuffer terminology, the Transaction message contains a “repeated” Statement data member. The Statement message is an envelope containing the following information:

  • type – (enum Type) The type of Statement this message represents. Currently, the possible values of the type are as follows:
    • ROLLBACK
    • INSERT
    • UPDATE
    • DELETE
    • TRUNCATE_TABLE
    • CREATE_SCHEMA
    • ALTER_SCHEMA
    • DROP_SCHEMA
    • CREATE_TABLE
    • ALTER_TABLE
    • DROP_TABLE
    • SET_VARIABLE
    • RAW_SQL
  • start_timestamp – (uint64_t) A nano-second precision timestamp of when the statement began.
  • end_timestamp – (uint64_t) A nano-second precision timestamp of when the statement completed.
  • sql – (string) Optionally stores the exact original SQL string producing this message.
  • For certain types of Statement messages, there will also be a specialized header and data message (see below).

To access the Statement messages in a Transaction, use something like the following code, which loops over the Transaction message’s vector of Statement messages:

  1. void MyReplicator::replicate(TransactionApplier *in_applier,
  2.                         message::Transaction &to_replicate)
  3. {
  4. /* Grab the number of statements in the Transaction message */
  5. size_t x;
  6. size_t num_statements= to_replicate.statement_size();
  7.  
  8. /* Do something with each statement… */
  9. for (x= 0; x < num_statements; ++x)
  10. {
  11.   const message::Statement &stmt= to_replicate.statement(x);
  12.   /* processStatement() does something with the statement… */
  13.   processStatement(stmt);
  14. }
  15. }

Serialized Polymorphism with the type Member

The type data member is of critical importance to the Statement message, as it allows us to have a sort of polymorphism serialized within the Statement message itself. This polymorphism allows the generic Statement message to contain specialized submessages depending on what type of event occurred on the server.

The above paragraph probably sounds overly complicated, but in reality things are pretty simple. As usual, it’s easiest to see what’s going on by looking at an example in code. For our example, let’s build out our fictional processStatement() method from the snippet above.

The processStatement() method is basically a giant switch statement, switching off of the supplied Statement message parameter’s type data member property. Here is the outline of the processStatement() method, with only our switch statement and some comments visible which should give you an idea of how we deal with specific types of Statements:

  1. void processStatement(const message::Statement &stmt)
  2. {
  3.   switch (stmt.type())
  4.   {
  5.   case message::Statement::INSERT:
  6.     /* Handle statements which insert new data… */
  7.     break;
  8.   case message::Statement::UPDATE:
  9.     /* Handle statements which update existing data… */
  10.     break;
  11.   case message::Statement::DELETE:
  12.     /* Handle statements which delete existing data… */
  13.     break;
  14.   …   
  15.   }
  16. }



Let’s go ahead and “fill out” one of the case blocks in the switch statement above. We will handle the case where the Statement type is INSERT. Note that this does not necessarily mean a SQL INSERT statement was executed. All this means is that an SQL statement was executed which resulted in a new record being added to a table on the server. This means that the actual SQL statement could have been any of INSERT, INSERT ... SELECT, REPLACE INTO, or LOAD DATA INFILE.

The /drizzled/message/transaction.proto file will always contain lots of documentation explaining how each of the specific submessages in the Statement message class are handled. To the right is a graphic depicting the InsertHeader and InsertData message classes which compose the "meat" of Statements that inserted new records into the database. Whenever the Statement message's type is INSERT, the Statement message will contain two submessages, one called insert_header and another called insert_data which will be populated with the InsertHeader and InsertData messages. The header message will contain information about the table and fields affected, while the data message will contain the values to be inserted into the table.

Here is some example code which queries the header and data messages and constructs an SQL string from them:

  1. void processStatement(const message::Statement &stmt)
  2. {
  3.   switch (stmt.type())
  4.   {
  5.   case message::Statement::INSERT:
  6.     /* Handle statements which insert new data... */
  7.     {
  8.     const message::InsertHeader &header= stmt.insert_header();
  9.     const message::InsertData &data= stmt.insert_data();
  10.     string destination;
  11.     char quoted_identifier= '`';
  12.  
  13.     destination->assign("INSERT INTO ");
  14.     destination->push_back(quoted_identifier);
  15.     destination->append(header.table_metadata().schema_name());
  16.     destination->push_back(quoted_identifier);
  17.     destination->push_back('.');
  18.     destination->push_back(quoted_identifier);
  19.     destination->append(header.table_metadata().table_name());
  20.     destination->push_back(quoted_identifier);
  21.     destination->append(" (");
  22.  
  23.     /* Add field list to SQL string... */
  24.     size_t num_fields= header.field_metadata_size();
  25.     size_t x;
  26.  
  27.     for (x= 0; x < num_fields; ++x)
  28.     {
  29.       const message::FieldMetadata &field_metadata= header.field_metadata(x);
  30.       if (x != 0)
  31.         destination->push_back(',');
  32.    
  33.       destination->push_back(quoted_identifier);
  34.       destination->append(field_metadata.name());
  35.       destination->push_back(quoted_identifier);
  36.     }
  37.  
  38.     destination->append(") VALUES (");
  39.  
  40.     /* Add insert values */
  41.     size_t num_records= data.record_size();
  42.     size_t y;
  43.  
  44.     for (x= 0; x < num_records; ++x)
  45.     {
  46.       if (x != 0)
  47.         destination->append("),(");
  48.  
  49.       for (y= 0; y < num_fields; ++y)
  50.       {
  51.         if (y != 0)
  52.           destination->push_back(',');
  53.  
  54.         destination->push_back('\'');
  55.         destination->append(data.record(x).insert_value(y));
  56.         destination->push_back('\'');
  57.       }
  58.     }
  59.     destination->push_back(')');
  60.  
  61.     }
  62.     break;
  63.   ...   
  64.   }
  65. }

The example code above is far from production-ready, of course. I don't take into account different field types, instead simply enclosing everything in single quotes. Also, I don't handle errors or escaping strings. The point isn't to be perfect, but to show you the general way to get information out of the Statement message...

Partial Atomic Transactions

Above, I stated that the Transaction messages sent to Replicators and Appliers represent an atomic change to the state of a server. This is true, most of the time. :) There are specific situations when a Transaction message will not represent an atomic change, and you should be aware of these scenarios if you plan to write plugins which implement a replication scheme.

There are times when it is simply inefficient or impossible to create a Transaction message that represents the actual atomic change on a server. For instance, imagine a table having 100 million records. Now, imagine issuing an UPDATE against that table that potentially affected every row in the table.

In order to transmit to replicas the atomic change to the server, one gigantic Transaction message would need to be constructed on the master server. Not only is there a distinct chance that the master would run out of memory constructing such a large message object, but it's safe to say that the master server would suffer from performance degradation during this construction. There must, therefore, be a way to start streaming the changes made to the master server before the actual final commit has happened on the master.

You may have noticed two data members of the InsertData message above named segment_id and end_segment. The first is of type uint32_t and the second is a bool. Together, these two data members fulfill the need to transmit transaction messages that are part of a bulk data modification. When a reader of a Transaction message sees that the end_segment data member is false, then the reader knows that another data segment will follow the current data message and will contain more inserts, updates, or deletes for the current transaction.

Summary and Request for Comments

Hopefully, I've explained the changes that have been made to Drizzle's replication system well enough above, but I understand the changes to the message definitions are substantial and am available at any time to discuss the changes and assist people with their code. You can find me on IRC, Freenode's #drizzle channel, via the Drizzle discussion mailing list, or via email joinfu@sun.com. I very much welcome comments. The new replication system is just finishing up the valgrind regression tests and should hit trunk later today.

The next article covers the new Transaction Log, which is a serialized log of the Transaction messages used in the replication system.

  • http://nippondanji.blogspot.com/ Mikiya Okuno

    The protoBuffer looks perfectly match to Drizzle replicator. Great job!!

    BTW, of course you might consider doing so, I suggest adding a checksum field in each Transaction Message or Statement Message. Adding a checksum field may not be difficult with protoBuffer? ;)

    • http://nippondanji.blogspot.com/ Mikiya Okuno

      One more suggestion is to make replicator slave as an isolated, standalone process which connects to drizzled using libdrizzle. It will enable drizzle to replicate to MySQL very easily.

      • http://jpipes.com Jay Pipes

        Hi Mikiya!

        “BTW, of course you might consider doing so, I suggest adding a checksum field in each Transaction Message or Statement Message.”

        Actually, there is already a checksum written after each transaction message is written to the transaction log. Please see the next article for information on that.

        It is enabled using the –transaction-log-checksum-enable command line argument to Drizzle. Currently, it just uses a CRC32 checksum, but I plan on making it configurable which checksumming algorithm should be used…

        “One more suggestion is to make replicator slave as an isolated, standalone process which connects to drizzled using libdrizzle. It will enable drizzle to replicate to MySQL very easily.”

        Well, Eric Lambert has written a replicator which does exactly this. A C++ TransactionApplier plugin pushes messages out to a Gearman server.

        Worker processes run a Java Gearman job in the worker process pool applying the replicated messages to the servers configured as replicas. Eric will be blogging shortly about this new code piece.

        Cheers!

        jay

        • http://nippondanji.blogspot.com/ Mikiya Okuno

          Hi Jay,

          Thank you for your quick response! I’m excited about your comments. Drizzle is more progressive than I thought.

          “Well, Eric Lambert has written a replicator which does exactly this. A C++ TransactionApplier plugin pushes messages out to a Gearman server.”

          This implementation sounds really cool and very clear to understand. As Gearman flatten some workload nicely, I can expect that some bottleneck when having many replication slaves on a single master would be eased ;) I’m looking forward to see his entry.

          Kind regards,

          Mikiya

  • http://primebase.org Paul McCullagh

    Hi Jay,

    Great work! GPB based replication is a huge step forward for Drizzle.

    I also like the way you have packed the DML statements. They can be easily converted to SQL, but are also in a convenient format for use with other, non-DBMS, destinations.

    One question I have is: How do you plan to handle the following situation?

    A transaction is executed and the GPB message is prepared containing all the statements of the transaction. However, before the message can be passed to the TransactionReplicator the server crashes.

    Now on startup the transaction will be committed to the database, but there is no way to reconstruct the GBP message for the transaction (and the transaction will go missing from the slave).

    It would seem to me that the TransactionReplicator API needs to be expanded to include XA functionality.

    And, this may have to be extended to the TransactionApplier as well, to ensure that the transaction is reliably transported from end-to-end.

    Is the solution part of the transaction log explanation which is coming up?

    I’m looking forward to reading about it all.

    Best regards,

    Paul

  • Pingback: Drizzle Replication – The Command Message « join-fu!

  • Pingback: Drizzle Replication – The CommandReplicator and CommandApplier Plugin API « join-fu!

  • Pingback: Drizzle Replication – The Transaction Log « join-fu!

  • Pingback: Recent Work on Improving Drizzle’s Storage Engine API « join-fu!

  • Pingback: Sneak Peek – Drizzle Transaction Log and INFORMATION_SCHEMA « join-fu!

  • Pingback: Padraig O’Sullivan: Simple Drizzle Replication Plugin for Cassandra | Weez.com

  • Pingback: David Shrewsbury: Drizzle Transaction Message Limit | Weez.com