Archive for category Drizzle

Macro Support in new Drizzle Client Console?

Hi all!

I’ve been reading through the requested features for the new client on the wiki here:

I think all the stuff on that link is excellent so far. I’d also like to request a feature that I think will be a really cool timesaver for DBAs and developers using Drizzle.

Macro Support

Remember, “way back when” you used Microsoft Excel and were able to start recording your actions, then when you stopped recording, Excel would store a “macro” of your actions that you could subsequently replay?

I think this would be incredibly useful for folks who do repetitive work in the console.

Sure, I know, I know…the first reaction folks will say is “but HEY, you guys removed stored procedures!” Yeah, yeah… but the feature I’m proposing here is different from stored procedures in the following ways:

  1. It’s entirely client-side. There is no server-side storage/cache, processing, parsing, or anything.
  2. It’s not limited to a small subset of SQL that stored procedures (at least in MySQL) are currently limited to. Anything the new client can do would be able to go into a macro.
  3. Since the client is in Python, the macros are themselves re-writable in a scripting language. This gives the recorded macros incredible flexibility.
  4. No fussing with SQL stored procedure permissions at runtime (you know, the silly INVOKER/DEFINER crap)
  5. Ability to interact with result sets in the macro. Just try doing that easily in a SQL stored procedure. Using CURSORs is incredibly clunk and ugly. Applying a Python function or closure/lambda on each of a result set is elegant and easy.

Imagine the following rough example interface…

drizzle> RECORD MACRO "sales_report_with_email" (to_email);
macro recording started.

drizzle> mode python;
in python mode.

python> import datetime
python> today= datetime.datetime.now().isoformat()
python> filename= "%s-%s-%s" % ("sales", to_email, today)
python> Ctrl-D

drizzle> SELECT * FROM sales
         WHERE manager = @to_email; > csv(@filename);
drizzle> mode python;
In python mode.

python> report_txt= open(filename, "r+b").read()
python> import smtplib
python> mailserver = smtplib.SMTP('localhost')
python> mailserver.sendmail('theboss@company.com', to_email, report_txt)
python> mailserver.quit()
python> print "Mail sent to %s\n" % to_email
python> Ctrl-D

drizzle> STOP MACRO;
Macro "sales_report_with_email" saved.

drizzle> macro("sales_report_with_email", "myboss@company.com");
Mail sent to myboss@company.com

Pretty powerful, eh?

If you follow the flow above, you will notice the only real trick to solve is passing the macro’s arguments into the console’s variable array, and from the console’s variable array into the Python interpreter’s variable scope. But this is a fairly simple problem to solve…

Thoughts? Suggestions? If you’ve got comments, please feel free to share here, or on the Drizzle Discussion mailing list, or even update the wiki pages posted above. Thanks! :)

Sneak Peek – Drizzle Transaction Log and INFORMATION_SCHEMA

I’ve been coding up a storm in the last couple days and have just about completed coding on three new INFORMATION_SCHEMA views which allow anyone to query the new Drizzle transaction log for information about its contents. I’ve also finished a new UDF for Drizzle called PRINT_TRANSACTION_MESSAGE() that prints out the Transaction message‘s contents in a easy-to-read format.

I don’t have time for a full walk-through blog entry about it, so I’ll just paste some output below and let y’all take a looksie. A later blog entry will feature lots of source code explaining how you, too, can easily add INFORMATION_SCHEMA views to your Drizzle plugins.

Below is the results of the following sequence of actions:

  • Start up a Drizzle server with the transaction log enabled, checksumming enabled, and the default replicator enabled.
  • Open a Drizzle client
  • Create a sample table, insert some data into it, do an update to that table, then drop the table
  • Query the INFORMATION_SCHEMA views and take a look at the transaction messages and information the transaction log now contains

Enjoy! :)

jpipes@serialcoder:~/repos/drizzle/replication-group-commit/tests$ ./dtr --mysqld="--default-replicator-enable"\
 --mysqld="--transaction-log-enable"\
 --mysqld="--transaction-log-enable-checksum"\
 --start-and-exit
...
Servers started, exiting
jpipes@serialcoder:~/repos/drizzle/replication-group-commit/tests$ ../client/drizzle --port=9306
Welcome to the Drizzle client..  Commands end with ; or \g.
Your Drizzle connection id is 2
Server version: 2009.11.1181 Source distribution (replication-group-commit)

Type 'help;' or '\h' for help. Type '\c' to clear the buffer.

drizzle> use test
Database changed
drizzle> CREATE TABLE t1 (   id INT NOT NULL PRIMARY KEY , padding VARCHAR(200) NOT NULL );
Query OK, 0 rows affected (0.01 sec)

drizzle> INSERT INTO t1 VALUES (1, "I love testing.");
Query OK, 1 row affected (0.01 sec)

drizzle> INSERT INTO t1 VALUES (2, "I hate testing.");
Query OK, 1 row affected (0.01 sec)

drizzle> UPDATE t1 SET padding="I love it when a plan comes together" WHERE id = 2;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

drizzle> DROP TABLE t1;
Query OK, 0 rows affected (0.17 sec)

drizzle> SELECT * FROM INFORMATION_SCHEMA.TRANSACTION_LOG\G
*************************** 1. row ***************************
         FILE_NAME: transaction.log
       FILE_LENGTH: 639
   NUM_LOG_ENTRIES: 5
  NUM_TRANSACTIONS: 5
MIN_TRANSACTION_ID: 0
MAX_TRANSACTION_ID: 9
 MIN_END_TIMESTAMP: 1257888458463696
 MAX_END_TIMESTAMP: 1257888473929116
1 row in set (0 sec)

drizzle> SELECT * FROM INFORMATION_SCHEMA.TRANSACTION_LOG_ENTRIES;
+--------------+-------------+--------------+
| ENTRY_OFFSET | ENTRY_TYPE  | ENTRY_LENGTH |
+--------------+-------------+--------------+
|            0 | TRANSACTION |          141 |
|          141 | TRANSACTION |          121 |
|          262 | TRANSACTION |          121 |
|          383 | TRANSACTION |          181 |
|          564 | TRANSACTION |           75 |
+--------------+-------------+--------------+
5 rows in set (0 sec)

drizzle> SELECT * FROM INFORMATION_SCHEMA.TRANSACTION_LOG_TRANSACTIONS;
+--------------+----------------+-----------+------------------+------------------+----------------+------------+
| ENTRY_OFFSET | TRANSACTION_ID | SERVER_ID | START_TIMESTAMP  | END_TIMESTAMP    | NUM_STATEMENTS | CHECKSUM   |
+--------------+----------------+-----------+------------------+------------------+----------------+------------+
|            0 |              0 |         1 | 1257888458463668 | 1257888458463696 |              1 | 3275955647 |
|          141 |              7 |         1 | 1257888462222183 | 1257888462226990 |              1 |  407829420 |
|          262 |              8 |         1 | 1257888465371330 | 1257888465378423 |              1 | 4073072174 |
|          383 |              9 |         1 | 1257888470209443 | 1257888470215165 |              1 |   92884681 |
|          564 |              9 |         1 | 1257888473929111 | 1257888473929116 |              1 | 2850269133 |
+--------------+----------------+-----------+------------------+------------------+----------------+------------+
5 rows in set (0 sec)

drizzle> SELECT PRINT_TRANSACTION_MESSAGE("transaction.log", ENTRY_OFFSET) as trx
       > FROM INFORMATION_SCHEMA.TRANSACTION_LOG_ENTRIES\G
*************************** 1. row ***************************
trx: transaction_context {
  server_id: 1
  transaction_id: 0
  start_timestamp: 1257888458463668
  end_timestamp: 1257888458463696
}
statement {
  type: RAW_SQL
  start_timestamp: 1257888458463676
  end_timestamp: 1257888458463694
  sql: "CREATE TABLE t1 (   id INT NOT NULL PRIMARY KEY , padding VARCHAR(200) NOT NULL )"
}

*************************** 2. row ***************************
trx: transaction_context {
  server_id: 1
  transaction_id: 7
  start_timestamp: 1257888462222183
  end_timestamp: 1257888462226990
}
statement {
  type: INSERT
  start_timestamp: 1257888462222185
  end_timestamp: 1257888462226989
  insert_header {
    table_metadata {
      schema_name: "test"
      table_name: "t1"
    }
    field_metadata {
      type: INTEGER
      name: "id"
    }
    field_metadata {
      type: VARCHAR
      name: "padding"
    }
  }
  insert_data {
    segment_id: 1
    end_segment: true
    record {
      insert_value: "1"
      insert_value: "I love testing."
    }
  }
}

*************************** 3. row ***************************
trx: transaction_context {
  server_id: 1
  transaction_id: 8
  start_timestamp: 1257888465371330
  end_timestamp: 1257888465378423
}
statement {
  type: INSERT
  start_timestamp: 1257888465371332
  end_timestamp: 1257888465378422
  insert_header {
    table_metadata {
      schema_name: "test"
      table_name: "t1"
    }
    field_metadata {
      type: INTEGER
      name: "id"
    }
    field_metadata {
      type: VARCHAR
      name: "padding"
    }
  }
  insert_data {
    segment_id: 1
    end_segment: true
    record {
      insert_value: "2"
      insert_value: "I hate testing."
    }
  }
}

*************************** 4. row ***************************
trx: transaction_context {
  server_id: 1
  transaction_id: 9
  start_timestamp: 1257888470209443
  end_timestamp: 1257888470215165
}
statement {
  type: UPDATE
  start_timestamp: 1257888470209446
  end_timestamp: 1257888470215163
  update_header {
    table_metadata {
      schema_name: "test"
      table_name: "t1"
    }
    key_field_metadata {
      type: INTEGER
      name: "id"
    }
    set_field_metadata {
      type: VARCHAR
      name: "padding"
    }
  }
  update_data {
    segment_id: 1
    end_segment: true
    record {
      key_value: "2"
      key_value: "I love it when a plan comes together"
      after_value: "I love it when a plan comes together"
    }
  }
}

*************************** 5. row ***************************
trx: transaction_context {
  server_id: 1
  transaction_id: 9
  start_timestamp: 1257888473929111
  end_timestamp: 1257888473929116
}
statement {
  type: RAW_SQL
  start_timestamp: 1257888473929113
  end_timestamp: 1257888473929115
  sql: "DROP TABLE `t1`"
}

5 rows in set (0.06 sec)

FYI, if you look closely, you’ll see some odd things — namely that there is a transaction with an ID of zero. I’m aware of this and am working on fixing it :) Like I said, I’m almost done coding…

The Great Escape

This week, I am working on putting together test cases which validate the Drizzle transaction log‘s handling of BLOB columns.

I ran into an interesting set of problems and am wondering how to go about handling them. Perhaps the LazyWeb will have some solutions. :)

The problem, in short, is inconsistency in the way that the NUL character is escaped (or not escaped) in both the MySQL/Drizzle protocol and the MySQL/Drizzle client tools. And, by client tools, I mean both everyone’s favourite little mysql command-line client, but also the mysqltest client, which provides infrastructure and runtime services for the MySQL and Drizzle test suites.

Even within the server and client protocol, there appears to be some inconsistency in how and when things are escaped. Take a look at this interesting output from the drizzle client program (FYI, output is identical for mysql client, I checked…)

drizzle> select 'test\0me';
+---------+
| test    |
+---------+
| test me |
+---------+
1 row in set (0 sec)

You’ll notice that in the first SELECT statement, the column header is cut off — i.e. the column header is not escaping the \0 NUL character in the string 'test\0me'. However, the result data does not truncate the string but replaces the NUL character with a space character. So, I came to the conclusion that the drizzle client does not escape column headers but does do some sort of escaping for the result data. Given this conclusion, you will understand my raised eyebrow when the following SELECT statement was displayed:

drizzle> select 'test\0me' = 'test me';
+------------------------+
| 'test\0me' = 'test me' |
+------------------------+
|                      0 |
+------------------------+
1 row in set (0 sec)

Hmmm…so maybe column headers are being escaped by the MySQL/Drizzle client? Clearly, the NUL character was escaped as the characters ‘\\’ followed by the character ’0′ in the column header above. Indeed, quite puzzling.

OK, so the above anomaly needs to be investigated. However, a similar issue exists for the mysqltest/drizzletest client program. To see the problem, check the following out. I create a simple test case with the following in it:

--disable_warnings
DROP TABLE IF EXISTS t1;
--enable_warnings

SELECT 'test\0me';

CREATE TABLE t1 (fld BLOB NULL);
INSERT INTO t1 VALUES ('test\0me');
SELECT COUNT(*) FROM t1;
DROP TABLE t1;

Now, what you would expect to see for the output of the above — at least if you expect results similar to the MySQL/Drizzle client output — is the following:

DROP TABLE IF EXISTS t1;
SELECT 'test\0me';
test
test me
CREATE TABLE t1 (fld BLOB NULL);
INSERT INTO t1 VALUES ('test\0me');
SELECT COUNT(*) FROM t1;
COUNT(*)
1
DROP TABLE t1;

That is what you would expect to see in the output of course… Here is what you actually get in the output:

DROP TABLE IF EXISTS t1;
SELECT 'test\0me';
test
test

So, the mysqltest/drizzletest client apparently does not escape the NUL character for the result data at all. It looks like it does do some escaping/replacing for the NUL character in the column header, though, otherwise the second “test” line would not appear. This leads to the result file being essentially truncated as soon as a NUL character is included in any output to the mysqltest/drizzletest client. This essentially makes the mysqltest/drizzletest client useless for testing and validating BLOB data.

Possible Solutions?

I think the cleanest solution would be to create a shared library of code that would be responsible for uniformly and consistently escaping data, and then linking the various clients (and server) with this library and removing all of the various escaping functions currently in the server. This would, of course, take some time, but would be the most future proof solution. Anyone else have ideas on solving the problem of being able to test and validate binary data via the test suite? Cheers!

A Month of Milestones

I’m finding myself smiling today. I lay in bed last night thinking about a number of milestones that this month marks for me.

October 15th marked four months since the last time I had a cigarette. I feel good about my chances at remaining smoke-free for the remainder of my life.

October 18th marked one year since I officially began working on the Drizzle project. Although, as Giuseppe can attest to, I had been contributing to Drizzle before October 18th, 2008, that date was the official start. :)

I think about how much has been accomplished by the Drizzle community since that time. The Drizzle of October 2008 is barely recognizable now. Monty‘s incredible work on the build system, Stewart‘s continued removal of legacy Unireg code like the FRM files, Eric Day joining the Sun Drizzle team and contributing amazing work on the protocol and client libraries, Monty’s reworking of the plugin system, new datetime (temporal) work, Padraig O’Sullivan‘s enormous contributions in the arena of the INFORMATION_SCHEMA, the optimizer, runtime, replication, and memcached, and an automation system that provides per-commit regression feedback. It’s truly fantastic to be part of a living, breathing, active project with so many special contributors who bring infectious enthusiasm to the world of database development. I’m privileged to be a part of it.

And finally, Tuesday the 28th marked the day that the new transactional replication system hit Drizzle’s trunk. While a ton of work is of course left to be done on the replication system, Tuesday’s code hitting trunk was a big milestone that I’m really happy about.

Anyway, just wanted to share some happiness. Cheers.

Drizzle Replication – The Transaction Log

In this installment of my Drizzle Replication blog series, I’ll be talking about the Transaction Log. Before reading this entry, you may want to first read up on the Transaction Message, which is a central concept to this blog entry.

The transaction log is just one component of Drizzle’s default replication services, but it also serves as a generalized log of atomic data changes to a particular server. In this way, it is only partially related to replication. The transaction log is used by components of the replication services to store changes made to a server’s data. However, there is nothing that mandates that this particular transaction log be a required feature for Drizzle replication systems. For instance, Eric Lambert is currently working on a Gearman-based replication service which, while following the same APIs, does not require the transaction log to function. Furthermore, other, non-replication-related modules may use the transaction log themselves. For instance, a future Recovery and/or Backup module may just as easily use the transaction log for its own purposes as well.

Before we get into the details, it’s worth noting the general goals we’ve had for the transaction log, as these goals may help explain some of the design choices made. In short, the goals for the transaction log are:

  • Introduce no global contention points (mutexes/locks)
  • Once written, the transaction log may not be modified
  • The transaction log should be easily readable in multiple programming languages

Overview of the Transaction Log Structure




The format of the transaction log is simple and straightforward. It is a single file that contains log entries, one after another. These log entries have a type associated with them. Currently, there are only two types of entries that can go in the transaction log: a Transaction message entry and a BLOB entry. We will only cover the Transaction message entry in this article, as I’ll leave how to deal with BLOBs for a separate article entirely.

Each entry in the transaction log is preceded by a 4 bytes containing an integer code identifying the type of entry to follow. The bytes which follow this type header are interpreted based on the type of entry. For entries of type Transaction message, the graphics here show the layout of the entry in the log. First, a 4 byte length header is written, then the serialized Transaction message, then a 4 byte checksum of the serialized Transaction message.


Details of the TransactionLog::apply() Method

For those interested in how the transaction log is written to, I’m going to detail the apply() method of the TransactionLog class in /plugin/transaction_log/transaction_log.cc. The TransactionLog class is simply a subclass of plugin::TransactionApplier and therefore must implement the single pure virtual apply method of that class interface.

The TransactionLog class has a private drizzled::atomic<off_t> called log_offset which is an offset into the transaction log file that is incremented with each atomic write to the log file. You will notice in the code below that this atomic off_t is stored locally, then incremented by the total length of the log entry to be written. A buffer is then written to the log file using pwrite() at the original offset. In this way, we completely avoid calling pthread_mutex_lock() or similar when writing to the log file, which should increase scalability of the transaction log.

void TransactionLog::apply(const message::Transaction &to_apply)
{
  uint8_t *buffer; /* Buffer we will write serialized header, message and trailing checksum to */
  uint8_t *orig_buffer;
 
  int error_code;
  size_t message_byte_length= to_apply.ByteSize();
  ssize_t written;
  off_t cur_offset;
  size_t total_envelope_length= HEADER_TRAILER_BYTES + message_byte_length;
 
  /*
   * Attempt allocation of raw memory buffer for the header,
   * message and trailing checksum bytes.
   */
  buffer= static_cast<uint8_t *>(malloc(total_envelope_length));
  if (buffer == NULL)
  {
    errmsg_printf(ERRMSG_LVL_ERROR,
      _("Failed to allocate enough memory to buffer header, transaction message, "
        "and trailing checksum bytes. Tried to allocate %" PRId64
        " bytes.  Error: %s\n"),
    static_cast<uint64_t>(total_envelope_length),
    strerror(errno));
    state= CRASHED;
    deactivate();
    return;
  }
  else
    orig_buffer= buffer; /* We will free() orig_buffer, as buffer is moved during write */
 
  /*
   * Do an atomic increment on the offset of the log file position
   */
  cur_offset= log_offset.fetch_and_add(static_cast<off_t>(total_envelope_length));
 
  /*
   * We adjust cur_offset back to the original log_offset before
   * the increment above…
   */
 cur_offset-= static_cast((total_envelope_length));
 
  /*
   * Write the header information, which is the message type and
   * the length of the transaction message into the buffer
   */
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
    static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
    static_cast<uint32_t>(message_byte_length), buffer);
 
  /*
   * Now write the serialized transaction message, followed
   * by the optional checksum into the buffer.
   */
  buffer= to_apply.SerializeWithCachedSizesToArray(buffer);
  uint32_t checksum= 0;
  if (do_checksum)
  {
    checksum= drizzled::hash::crc32(reinterpret_cast<char *>(buffer) – 
                     message_byte_length, message_byte_length);
  }
 
  /* We always write in network byte order */
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, buffer);
  /*
   * Quick safety…if an error occurs above in another writer, the log
   * file will be in a crashed state.
   */
  if (unlikely(state == CRASHED))
  {
    /*
     * Reset the log’s offset in case we want to produce a decent error message including
     * the original offset where an error occurred.
     */
    log_offset= cur_offset;
    free(orig_buffer);
    return;
  }
 
  /* Write the full buffer in one swoop */
  do
  {
    written= pwrite(log_file, orig_buffer, total_envelope_length, cur_offset);
  }
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal… */
 
  if (unlikely(written != static_cast<ssize_t>(total_envelope_length)))
  {
    errmsg_printf(ERRMSG_LVL_ERROR,
     _(“Failed to write full size of transaction.  Tried to write %” PRId64
        ” bytes at offset %” PRId64 “, but only wrote %” PRId32 ” bytes.  Error: %s\n“),
        static_cast<uint64_t>(total_envelope_length),
        static_cast<uint64_t>(cur_offset),
        static_cast<uint64_t>(written),
        strerror(errno));
      state= CRASHED;
      /*
       * Reset the log’s offset in case we want to produce a decent error message including
       * the original offset where an error occurred.
       */
      log_offset= cur_offset;
      deactivate();
  }
  free(orig_buffer);
  error_code= my_sync(log_file, 0);
  if (unlikely(error_code != 0))
  {
    errmsg_printf(ERRMSG_LVL_ERROR,
      _(“Failed to sync log file. Got error: %s\n“),
      strerror(errno));
  }
}

Reading the Transaction Log

OK, so the above code shows how the transaction log is written. What about reading the log file? Well, it’s pretty simple. There is an example program in /drizzle/message/transaction_reader.cc which has code showing how to do this. Here’s a snippet from that program:

int main(int argc, char* argv[])
{
  …
  message::Transaction transaction;
 
  file= open(argv[1], O_RDONLY);
  if (file == -1)
  {
    fprintf(stderr, _(“Cannot open file: %s\n“), argv[1]);
    return -1;
  }
      …
  protobuf::io::ZeroCopyInputStream *raw_input=
    new protobuf::io::FileInputStream(file);
  protobuf::io::CodedInputStream *coded_input=
    new protobuf::io::CodedInputStream(raw_input);
 
  char *buffer= NULL;
  char *temp_buffer= NULL;
  uint32_t length= 0;
  uint32_t previous_length= 0;
  uint32_t checksum= 0;
  bool result= true;
  uint32_t message_type= 0;
 
  /* Read in the length of the command */
  while (result == true &&
           coded_input->ReadLittleEndian32(&message_type) == true &&
           coded_input->ReadLittleEndian32(&length) == true)
  {
      if (message_type != ReplicationServices::TRANSACTION)
      {
        fprintf(stderr, _("Found a non-transaction message "
                            "in log.  Currently, not supported.\n"));
        exit(1);
      }
 
      if (length > INT_MAX)
      {
        fprintf(stderr, _(“Attempted to read record bigger than INT_MAX\n“));
        exit(1);
      }
 
      if (buffer == NULL)
      {
        temp_buffer= (char *) malloc(static_cast<size_t>(length));
      }
      /* No need to allocate if we have a buffer big enough… */
      else if (length > previous_length)
      {
        temp_buffer= (char *) realloc(buffer, static_cast<size_t>(length));
      }
 
      if (temp_buffer == NULL)
      {
        fprintf(stderr, _("Memory allocation failure trying to "
                            "allocate %" PRIu64 " bytes.\n"),
                 static_cast<uint64_t>(length));
        break;
      }
      else
        buffer= temp_buffer;
 
      /* Read the Command */
      result= coded_input->ReadRaw(buffer, (int) length);
      if (result == false)
      {
        fprintf(stderr, _(“Could not read transaction message.\n“));
        fprintf(stderr, _(“GPB ERROR: %s.\n“), strerror(errno));
        fprintf(stderr, _(“Raw buffer read: %s.\n“), buffer);
        break;
      }
 
      result= transaction.ParseFromArray(buffer, static_cast<int32_t>(length));
      if (result == false)
      {
        fprintf(stderr, _(“Unable to parse command. Got error: %s.\n“),
                 transaction.InitializationErrorString().c_str());
        if (buffer != NULL)
          fprintf(stderr, _(“BUFFER: %s\n“), buffer);
        break;
    }
    /* Print the transaction */
    printTransaction(transaction);
 
    /* Skip 4 byte checksum */
    coded_input->ReadLittleEndian32(&checksum);
 
    if (do_checksum)
    {
      if (checksum != drizzled::hash::crc32(buffer, static_cast<size_t>(length)))
      {
        fprintf(stderr, _("Checksum failed. Wanted %" PRIu32
                              " got %" PRIu32 "\n"),
                 checksum,
                 drizzled::hash::crc32(buffer, static_cast<size_t>(length)));
      }
    }
    previous_length= length;
  }
 
  if (buffer)
    free(buffer);
  delete coded_input;
  delete raw_input;
  return (result == true ? 0 : 1);
}

Shortcomings of the Transaction Log

So far, we’ve generally focused on a scalable design for the transaction log and have not spent too much time on performance tuning the code — and yes, performance != scalability. There are a number of problems with the current code which we will address in future versions of the transaction log. Namely:

  • Reduce calls to malloc(). Currently, each write of a transaction message to the log file incurs a call to malloc() to allocate enough memory to store the serialized log entry. Clearly, this is not optimal. We’ve considered a number of alternate approached to calling malloc(), including having a scoreboard approach where a vector of memory slabs are used in a round-robin fashion. This would introduce some locking, however. Also, I’ve thought about using a hazard pointer list on the Session object to have previously-allocated memory on the Session object be used for something like this. But, these ideas must be hashed out further.
  • There is no index into the transaction log. This is not a problem for writing the transaction log, of course, but for readers of the transaction log. I’m in the process of creating classes and a library for building indexes for a transaction log and, in addition, creating archived snapshots to enable log shipping for Drizzle replication. I’ll be pushing code for this to Launchpad later this week and will write a new article about log shipping and snapshot creation.
  • Each call to apply() calls fdatasync()/fsync() on the transaction log. Certain environments may consider this to be too strict a sync requirement, since the storage engine may already keep a transaction log file of its own that is also synced. For instance, InnoDB has a transaction log that, depending on the setting of InnoDB configuration variables, may call fdatasync() upon every transaction commit. It would be best to have the syncing behaviour be user-adjustable — for instance, a setting to allow the transaction log to be synced every X number of seconds…

Summary and Request for Comments

That’s it for the discussion about the transaction log. I’ll post some more code examples from the replication plugins which utilize the transaction log in a later blog entry.

What do you think of the design of the transaction log? What would you change? Comments are always welcome! Cheers. :)

Drizzle Replication – Changes in API to support Group Commit

Hi all. It’s been quite some time since my last article on the new replication system in Drizzle. My apologies for the delay in publishing the next article in the replication series.

The delay has been due to a reworking of the replication system to fully support “group commit” behaviour and to support fully transactional replication. The changes allow replicator and applier plugins to understand much more about the actual changes which occurred on the server, and to understand the transactional container properly.

The goals of Drizzle‘s replication system are as follows:

  • Make replication modular and not dependent on one particular implementation
  • Make it simple and fun to develop plugins for Drizzle replication
  • Encapsulate all transmitted information in an efficient, portable, and standard format

This article serves to build on the last article and explain the changes to the Google Protobuffer message definitions used in the replication API. The actual replication API described in the last article remains almost the same. However, instead of being named CommandApplier and CommandReplicator, those plugin base classes are now named TransactionApplier and TransactionReplicator respectively. And, instead of consuming a Command message, they consume Transaction messages.


For my friend Edwin‘s benefit, I’ll be including lots of pretty graphics. :) For my developer readers, I’m including lots of example C++ code to help you best understand how to read and manipulate the Transaction and Statement messages in the new replication system.

New Message Definitions

As I mentioned above, the Command message previously discussed in the first replication article, has been changed in favour of a more space-efficient and transactional message format. The proto file is now called /drizzled/message/transaction.proto. You can look at the proto file online.

The Command Message has become the Statement message, and a new Transaction message serves as a container for multiple Statement messages representing (for most cases) an atomic change in the state of the database server. I’ll discuss later in the article those specific cases where a Transaction message’s contents may contain only a partial atomic change to the server.

The image to the right depicts the Transaction message container. As you can see, the Transaction message contains two things: a TransactionContext message and an array of one or more Statement messages.

The TransactionContext Message

Each Transaction message contains a single TransactionContext message. The TransactionContext message contains information about the entire transaction. The data members of the TransactionContext are as follows:

  • server_id – (uint32_t) A numeric identifier for the server which executed this transaction
  • transaction_id – (uint64_t) A globally-unique transaction identifier
  • start_timestamp – (uint64_t) A nano-second precision timestamp of when the transaction began.
  • end_timestamp – (uint64_t) A nano-second precision timestamp of when the transaction completed.

Since TransactionContext is simply a Google Protobuffer message, accessing data members is simple and straightforward. If you’re writing a replicator or applier, a reference to a const Transaction message will be supplied to you via the standard API. For instance, let’s assume we’re writing a replicator and we want to filter all messages that are from the server with a server_id of 100. Kind of a silly example, but nevertheless, it allows us to see some example code.

As you may remember, the API for a replicator is dirt simple. There is a replicate() pure virtual method which accepts two parameters, the GPB message and a reference to the Applier which will “apply” the message to some target. The new function signature is the same as the last one, with the term “Command” replaced with the term “Transaction”:

  1. virtual void replicate(TransactionApplier *in_applier,
  2.                        message::Transaction &to_replicate)= 0;

Suppose our replicator class is called MyReplicator. Here is how to query the transaction context of the Transaction message and filter out transactions coming from server #100. :)

  1. void MyReplicator::replicate(TransactionApplier *in_applier,
  2.                         message::Transaction &to_replicate)
  3. {
  4.   const message::TransactionContext &ctx= to_replicate.transaction_context();
  5.   if (ctx.server_id() != 100)
  6.     in_applier->apply(to_replicate);
  7. }

See? Pretty darn simple. :) OK, on to the Statement message, which is slightly more complicated.

The Statement Message

As noted above, the Transaction message contains an array of Statement messages. In Protobuffer terminology, the Transaction message contains a “repeated” Statement data member. The Statement message is an envelope containing the following information:

  • type – (enum Type) The type of Statement this message represents. Currently, the possible values of the type are as follows:
    • ROLLBACK
    • INSERT
    • UPDATE
    • DELETE
    • TRUNCATE_TABLE
    • CREATE_SCHEMA
    • ALTER_SCHEMA
    • DROP_SCHEMA
    • CREATE_TABLE
    • ALTER_TABLE
    • DROP_TABLE
    • SET_VARIABLE
    • RAW_SQL
  • start_timestamp – (uint64_t) A nano-second precision timestamp of when the statement began.
  • end_timestamp – (uint64_t) A nano-second precision timestamp of when the statement completed.
  • sql – (string) Optionally stores the exact original SQL string producing this message.
  • For certain types of Statement messages, there will also be a specialized header and data message (see below).

To access the Statement messages in a Transaction, use something like the following code, which loops over the Transaction message’s vector of Statement messages:

  1. void MyReplicator::replicate(TransactionApplier *in_applier,
  2.                         message::Transaction &to_replicate)
  3. {
  4. /* Grab the number of statements in the Transaction message */
  5. size_t x;
  6. size_t num_statements= to_replicate.statement_size();
  7.  
  8. /* Do something with each statement… */
  9. for (x= 0; x < num_statements; ++x)
  10. {
  11.   const message::Statement &stmt= to_replicate.statement(x);
  12.   /* processStatement() does something with the statement… */
  13.   processStatement(stmt);
  14. }
  15. }

Serialized Polymorphism with the type Member

The type data member is of critical importance to the Statement message, as it allows us to have a sort of polymorphism serialized within the Statement message itself. This polymorphism allows the generic Statement message to contain specialized submessages depending on what type of event occurred on the server.

The above paragraph probably sounds overly complicated, but in reality things are pretty simple. As usual, it’s easiest to see what’s going on by looking at an example in code. For our example, let’s build out our fictional processStatement() method from the snippet above.

The processStatement() method is basically a giant switch statement, switching off of the supplied Statement message parameter’s type data member property. Here is the outline of the processStatement() method, with only our switch statement and some comments visible which should give you an idea of how we deal with specific types of Statements:

  1. void processStatement(const message::Statement &stmt)
  2. {
  3.   switch (stmt.type())
  4.   {
  5.   case message::Statement::INSERT:
  6.     /* Handle statements which insert new data… */
  7.     break;
  8.   case message::Statement::UPDATE:
  9.     /* Handle statements which update existing data… */
  10.     break;
  11.   case message::Statement::DELETE:
  12.     /* Handle statements which delete existing data… */
  13.     break;
  14.   …   
  15.   }
  16. }



Let’s go ahead and “fill out” one of the case blocks in the switch statement above. We will handle the case where the Statement type is INSERT. Note that this does not necessarily mean a SQL INSERT statement was executed. All this means is that an SQL statement was executed which resulted in a new record being added to a table on the server. This means that the actual SQL statement could have been any of INSERT, INSERT ... SELECT, REPLACE INTO, or LOAD DATA INFILE.

The /drizzled/message/transaction.proto file will always contain lots of documentation explaining how each of the specific submessages in the Statement message class are handled. To the right is a graphic depicting the InsertHeader and InsertData message classes which compose the "meat" of Statements that inserted new records into the database. Whenever the Statement message's type is INSERT, the Statement message will contain two submessages, one called insert_header and another called insert_data which will be populated with the InsertHeader and InsertData messages. The header message will contain information about the table and fields affected, while the data message will contain the values to be inserted into the table.

Here is some example code which queries the header and data messages and constructs an SQL string from them:

  1. void processStatement(const message::Statement &stmt)
  2. {
  3.   switch (stmt.type())
  4.   {
  5.   case message::Statement::INSERT:
  6.     /* Handle statements which insert new data... */
  7.     {
  8.     const message::InsertHeader &header= stmt.insert_header();
  9.     const message::InsertData &data= stmt.insert_data();
  10.     string destination;
  11.     char quoted_identifier= '`';
  12.  
  13.     destination->assign("INSERT INTO ");
  14.     destination->push_back(quoted_identifier);
  15.     destination->append(header.table_metadata().schema_name());
  16.     destination->push_back(quoted_identifier);
  17.     destination->push_back('.');
  18.     destination->push_back(quoted_identifier);
  19.     destination->append(header.table_metadata().table_name());
  20.     destination->push_back(quoted_identifier);
  21.     destination->append(" (");
  22.  
  23.     /* Add field list to SQL string... */
  24.     size_t num_fields= header.field_metadata_size();
  25.     size_t x;
  26.  
  27.     for (x= 0; x < num_fields; ++x)
  28.     {
  29.       const message::FieldMetadata &field_metadata= header.field_metadata(x);
  30.       if (x != 0)
  31.         destination->push_back(',');
  32.    
  33.       destination->push_back(quoted_identifier);
  34.       destination->append(field_metadata.name());
  35.       destination->push_back(quoted_identifier);
  36.     }
  37.  
  38.     destination->append(") VALUES (");
  39.  
  40.     /* Add insert values */
  41.     size_t num_records= data.record_size();
  42.     size_t y;
  43.  
  44.     for (x= 0; x < num_records; ++x)
  45.     {
  46.       if (x != 0)
  47.         destination->append("),(");
  48.  
  49.       for (y= 0; y < num_fields; ++y)
  50.       {
  51.         if (y != 0)
  52.           destination->push_back(',');
  53.  
  54.         destination->push_back('\'');
  55.         destination->append(data.record(x).insert_value(y));
  56.         destination->push_back('\'');
  57.       }
  58.     }
  59.     destination->push_back(')');
  60.  
  61.     }
  62.     break;
  63.   ...   
  64.   }
  65. }

The example code above is far from production-ready, of course. I don't take into account different field types, instead simply enclosing everything in single quotes. Also, I don't handle errors or escaping strings. The point isn't to be perfect, but to show you the general way to get information out of the Statement message...

Partial Atomic Transactions

Above, I stated that the Transaction messages sent to Replicators and Appliers represent an atomic change to the state of a server. This is true, most of the time. :) There are specific situations when a Transaction message will not represent an atomic change, and you should be aware of these scenarios if you plan to write plugins which implement a replication scheme.

There are times when it is simply inefficient or impossible to create a Transaction message that represents the actual atomic change on a server. For instance, imagine a table having 100 million records. Now, imagine issuing an UPDATE against that table that potentially affected every row in the table.

In order to transmit to replicas the atomic change to the server, one gigantic Transaction message would need to be constructed on the master server. Not only is there a distinct chance that the master would run out of memory constructing such a large message object, but it's safe to say that the master server would suffer from performance degradation during this construction. There must, therefore, be a way to start streaming the changes made to the master server before the actual final commit has happened on the master.

You may have noticed two data members of the InsertData message above named segment_id and end_segment. The first is of type uint32_t and the second is a bool. Together, these two data members fulfill the need to transmit transaction messages that are part of a bulk data modification. When a reader of a Transaction message sees that the end_segment data member is false, then the reader knows that another data segment will follow the current data message and will contain more inserts, updates, or deletes for the current transaction.

Summary and Request for Comments

Hopefully, I've explained the changes that have been made to Drizzle's replication system well enough above, but I understand the changes to the message definitions are substantial and am available at any time to discuss the changes and assist people with their code. You can find me on IRC, Freenode's #drizzle channel, via the Drizzle discussion mailing list, or via email joinfu@sun.com. I very much welcome comments. The new replication system is just finishing up the valgrind regression tests and should hit trunk later today.

The next article covers the new Transaction Log, which is a serialized log of the Transaction messages used in the replication system.

Yet Another Post on REPLACE

Sometimes, as Sergei rightly mentioned, I can be, well, “righteously indignant” about what I perceive to be a hack.

In this case, after Sergei repeatedly tried to set me straight about what was going on “under the covers” during a REPLACE operation, I was still arguing that he was incorrect.

Doh.

I then realized that Sarah Sproenhle’s original comment about my test table not having a primary key was the reason that I was seeing the behaviour that I had been seeing.

My original test case was failing, expecting to see a DELETE + an INSERT, when a REPLACE INTO was issued against a table. When I placed the PRIMARY KEY on the table in my test case and re-ran the test case, it still failed because the DELETE still was not in the transaction log. Well, it turns out that the reason was because ha_update_row() was actually called and not ha_delete_row() + ha_write_row(). And, because of the documentation for the REPLACE command, I wasn’t checking that ha_update_row() may have been called — since I didn’t realize a REPLACE could actually do an UPDATE.

Anyway, I wanted to post to say that most of this whole kerfuffle was my fault. Though I think that both the online and code documentation should reflect the fact that a REPLACE can do an UPDATE, the source of the failure was not what I originally wrote. In contrast, ha_write_row() does indeed return ER_FOUND_DUPP_KEY appropriately during a REPLACE call.

Mmmmm, that piece of humble pie was delicious.

The Deal with REPLACE .. Or Is It UPDATE?

Yesterday, I posed a question to the ZanyWeb about what exactly a REPLACE statement does behind the scenes in the storage engine. There were many excellent comments and these comments exposed some misunderstandings (including some of my own misconceptions) about the REPLACE statement itself and what goes on behind the scenes in the storage engine.

The question I asked was this: if I execute the following statements in a client, what would you expect would happen behind the scenes in the storage engine?

CREATE TABLE t1 (
  id INT NOT NULL AUTO_INCREMENT PRIMARY KEY
, padding VARCHAR(200) NOT NULL
);

INSERT INTO t1 VALUES (1, "I love testing.");
INSERT INTO t1 VALUES (2, "I hate testing.");

REPLACE INTO t1 VALUE (2, "I love testing.");

Based purely on the manual, one would expect, as Ryan Thiessen expressed in his comment

try to insert once, detect a failure and then delete/insert in failure case.

or, as Ryan put it in pseudo-code:

INSERT INTO t1 VALUES (1, "I love testing.");
INSERT INTO t1 VALUES (2, "I hate testing.");
INSERT INTO t1 VALUE (2, "I love testing.");
if error() { DELETE FROM t1 WHERE id = 2; INSERT INTO t1 VALUE (2, "I love testing."); }

Unfortunately, this is not the case. At least, it is not always the case.

ha_write_row() vs. ha_update_row()

In MySQL, as well as in Drizzle, there is a pluggable storage engine API. This API consists primarily of two classes: In MySQL, one class is called handlerton and the other is called handler — in Drizzle, these classes are called StorageEngine and Cursor (Current Set Of Records), because that is what they actually represent.

Without going into too many details, the Cursor (handler) interface has four calls which are relevant to this discussion:

  • int Cursor::ha_write_row(unsigned char *new_record) — inserts a new record into the table pointed to by the Cursor
  • int Cursor::ha_update_row(unsigned char *old_record, unsigned char *new_record) — update an existing record in the table pointed to by the Cursor
  • int Cursor::ha_delete_row(unsigned char *old_record) — delete an existing record in the table pointed to by the Cursor
  • virtual int Cursor::extra(ha_extra_function operation) — indicates to the Cursor that it should handle an operation in a certain way. If this sounds vague to you, it is, so read on…

OK, So What Does Happen?

What actually happens behind the scenes for the code above is the following “optimized” execution path, again in pseudo-code:

// (0) Enter mysql_insert() and prepare to do an INSERT
mysql_insert();

// (1) tell engine that a write can replace an existing record
Cursor::extra(HA_EXTRA_WRITE_CAN_REPLACE);

// (2) tell engine to ignore duplicate keys
Cursor::extra(HA_EXTRA_IGNORE_DUP_KEY);

// (3) For each record to be inserted...
for each record begin:

  // (4) Insert the record by calling ha_write_row()
  Cursor::ha_write_row();

  // (5) Cursor tries to "insert" the record
  error= Cursor::write_row();

  // (6) If the write fails, then DELETE, the record and try to INSERT again.
  if error is true:

    Cursor::ha_delete_row();
    Cursor::ha_write_row();

The above code looks very similar to what Ryan has already said he expected the underlying code to look like, no? The problem is the calls to Cursor::extra() (steps 1 and 2) and what happens inside InnoDB (step 5) when Cursor::write_row() is called.

InnoDB tries to insert the record and realizes that the new record violates an existing primary key value but since MySQL has already told it to ignore duplicate key violations, InnoDB updates the existing row and returns successfully from write_row()!.

Unfortunately, because of this “optimization”, Cursor::ha_write_row(), which is the kernel’s wrapper around the virtualized storage engine’s Cursor::write_row call is completely unaware that an UPDATE and not an INSERT has occurred. Why is this a problem? Well, what happens after a call to Cursor::write_row() succeeds? That’s correct: post-change logging occurs. In other words, logging for row-based replication occurs. Well, Cursor::ha_write_row() then logs an INSERT and not the appropriate UPDATE. Oops.

There are three major things to note about the above logic:

  1. This is undocumented behaviour. According to the manual, a REPLACE statement works like an INSERT, except if a primary or unique key is violated, the original row is DELETEd and the new row is INSERTed.
  2. This breaks the defined public Cursor (known as handler in MySQL) interface for ha_write_row() since rows may be updated when ha_update_row() is NOT called.
  3. The code has side effects, namely that the change to the state of the server is unknown to the Cursor: an update occurred but the Cursor believes an insert occurred.

When Optimizations Aren’t

This kind of coding can be argued to be an “optimization”. I will argue that it is not an optimization, but is overly clever and reduces the clarity of the code thereby making the code unnecessarily difficult to trace and follow. Personally, I wasted days of research time trying to understand how a call to ha_write_row() was actually updating an existing record.

In my opinion, a proper optimization would be to:

  • Make the internal code function exactly as the documentation says it does.
  • Adapt the documentation for REPLACE and state that REPLACE is not as efficient as INSERT .. ON DUPLICATE KEY UPDATE and whenever possible, prefer the latter statement over the former.

Does REPLACE belong in Drizzle?

One of the reasons that REPLACE may exist in MySQL is to make the INSERT statement idempotent when run on a replication slave when run in mixed or statement-based mode. Don’t believe me? Here is a code snippet and comment from the sql/log_event.cc file in MySQL 5.4:

Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
...
  /**
     todo: to introduce a property for the event (handler?) which forces
     applying the event in the replace (idempotent) fashion.
  */
  if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1 ||
      m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
  {
    /*
      We are using REPLACE semantics and not INSERT IGNORE semantics
      when writing rows, that is: new rows replace old rows.  We need to
      inform the storage engine that it should use this behaviour.
    */

    /* Tell the storage engine that we are using REPLACE semantics. */
    thd->lex->duplicates= DUP_REPLACE;

    /*
      Pretend we're executing a REPLACE command: this is needed for
      InnoDB and NDB Cluster since they are not (properly) checking the
      lex->duplicates flag.
    */
    thd->lex->sql_command= SQLCOM_REPLACE;
    /*
       Do not raise the error flag in case of hitting to an unique attribute
    */
    m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
...

By contrast to MySQL 5.4, Drizzle’s replication system is a reflection of the atomic changes made to the state of a server. These changes are, by definition, deterministic, and applying a transaction log to a Drizzle replica can result in one and only one state in the resulting server. In Drizzle, having the application of the transaction log be idempotent is not necessary, as the transaction log stream is always deterministic. For instance, if an INSERT ... ON DUPLICATE KEY UPDATE is issued against a Drizzle server, the replication system shall create either an InsertStatement or an UpdateStatement message, depending on what precisely occurred on the server, and these messages are logged in Drizzle’s transaction log. There is no fudging of statements. What happened on the server is logged as an exact change in the state of the server, nothing more.

So…anyway, because of the above “optimization”, I’m now left with a tricky problem: continue to pay the interest on this Technical Debt or rework the Cursor interface so that behaviour is always idempotent via the wrapper interface and has no side effects. I’m not sure what I will do…but this has delayed me substantially. :(

Pop Quiz – What Does REPLACE Do?

Hi ZanyWeb. Here’s a pop quiz for you, and the answer may surprise you.

The MySQL manual states the following about the REPLACE statement:

REPLACE works exactly like INSERT, except that if an old row in the table has the same value as a new row for a PRIMARY KEY or a UNIQUE index, the old row is deleted before the new row is inserted.

Sounds pretty clear to me. If a row with the same primary key exists, it is deleted and then a new row is inserted.

So, given the above, if I execute the following statements in a client, what would you expect would happen behind the scenes in the storage engine?

CREATE TABLE t1 (
  id INT NOT NULL AUTO_INCREMENT PRIMARY KEY
, padding VARCHAR(200) NOT NULL
);

INSERT INTO t1 VALUES (1, "I love testing.");
INSERT INTO t1 VALUES (2, "I hate testing.");

REPLACE INTO t1 VALUE (2, "I love testing.");

If you’re like me, you would expect the above to actually execute the following changes:

CREATE TABLE t1 (
  id INT NOT NULL AUTO_INCREMENT PRIMARY KEY
, padding VARCHAR(200) NOT NULL
);

INSERT INTO t1 VALUES (1, "I love testing.");
INSERT INTO t1 VALUES (2, "I hate testing.");
DELETE FROM t1 WHERE id = 2;
INSERT INTO t1 VALUE (2, "I love testing.");

But, this is not actually what happens.

Would anyone like to guess what actually happens? Add your guess to the comments on this entry. I’ll post the answer later today. :)

Drizzle Replication – The CommandReplicator and CommandApplier Plugin API

IMPORTANT:
This article is out of date and the replication API has been updated. Please see the follow-up article for the most up to date information!

OK, so here is the next installment in the Drizzle replication article series. Today, I’ll be talking about the flow of the Command message object through the CommandReplicator and CommandApplier APIs. If you missed the first article about the structure of the Command message and Google Protobuffers, you may want to read that first. We’ll only be talking in this article about what happens on one server. We will be discussing the Command Log in the next article, and then discuss how messages are passed from one server to another. But, before we discuss those things, it is critical to first understand the CommandReplicator and CommandApplier plugin classes, which are two of the abstract interfaces out of which replication modules can be built (and from which a number of modules have already been built).

In this article, I’ll be showing code that was written by myself and by Padraig O’Sullivan, another Drizzle contributor.

Flow of Command messages

SIDEBAR: Namespaces in Drizzle

The astute reader may have noticed that there is quite a big difference in the organization of the Drizzle codebase versus MySQL. Drizzle uses C++ namespaces to make the code easier to read and understand.

New code written for Drizzle is always namespaced in a namespace corresponding to the exact directory structure in which you find the source files for a particular class. For instance, all plugin interfaces classes (abstract base classes) are in the drizzled::plugin namespace and are found in /drizzled/plugin/. All protobuffer message classes are defined in /drizzled/message/ and are in the namespace drizzled::message. We feel organizing the code like so, and using C++ namespaces, makes the code clearer and easier to read. You can read more about Drizzle’s coding style, including how we use namespaces on our wiki.

Command messages are created by a component within the Drizzle kernel whenever any SQL statement which modifies the state of the server is executed. This component is called ReplicationServices. This component is a singleton object and manages communication between the kernel and two kinds of plugins, called CommandReplicator and CommandApplier. The component is declared in drizzled/replication_services.h and defined in drizzled/replication_services.cc.

The advantage of the ReplicationServices component frees replication module developers from having to understand anything about what’s going on in the kernel. You don’t have to understand anything about the Session object, how statements are executed in the kernel, or even how storage engines are actually applying changes to their data. All you need to learn is the structure of the Command message (covered in the previous article) and the very, very simple CommandReplicator and CommandApplier plugin APIs. So, let’s look at those APIs. :)

The CommandReplicator Plugin API

Here is the abstract base class drizzled::plugin::CommandReplicator, available in drizzled/plugin/command_replicator.h.

/**
 * Class which replicates Command messages
 */
class CommandReplicator
{
public:
  CommandReplicator() {}
  virtual ~CommandReplicator() {}
  /**
   * Replicate a Command message to a CommandApplier.
   *
   * @note
   *
   * It is important to note that memory allocation for the 
   * supplied pointer is not guaranteed after the completion 
   * of this function -- meaning the caller can dispose of the
   * supplied message.  Therefore, replicators and appliers 
   * implementing an asynchronous replication system must copy
   * the supplied message to their own controlled memory storage
   * area.
   *
   * @param Pointer to the applier of the command message
   * @param Command message to be replicated
   */
  virtual void replicate(CommandApplier *in_applier, drizzled::message::Command &to_replicate)= 0;
};

See, I told you it was simple :) There is an additional isActive() method currently in this API, but this will disappear in the coming months as that method wil be “moving up” to a base Plugin class. You can ignore it for right now. The only other method which CommandReplicator plugins must implement is the replicate() method.

The replicate() method accepts only two parameters. The first is a pointer to an object which implements the CommandApplier interface (inherits from drizzled::plugin::CommandApplier). The second is the Command message that the ReplicationServices component constructs when a data-modification event occurs on the server.

The CommandApplier Plugin API

Before I show some implementation of CommandReplicators, let’s first quickly take a look at the drizzled::plugin::CommandApplier interface. It is equally simple.

/**
 * Base class for appliers of Command messages
 */
class CommandApplier
{
public:
  CommandApplier() {}
  virtual ~CommandApplier() {}
 /**
   * Apply something to a target.
   *
   * @note
   *
   * It is important to note that memory allocation for the 
   * supplied pointer is not guaranteed after the completion 
   * of this function -- meaning the caller can dispose of the
   * supplied message.  Therefore, appliers which are
   * implementing an asynchronous replication system must copy
   * the supplied message to their own controlled memory storage
   * area.
   *
   * @param Command message to be applied
   */
  virtual void apply(const drizzled::message::Command &to_apply)= 0;
};

The CommandApplier quite predictably implements a single method: apply(), which accepts a single parameter of a Command message. Yes, it’s that simple.

Putting it All Together

Before we get to the example implementation of the above plugin interfaces, it’s worth pointing out that Drizzle’s replication system allows multiple CommandReplicator and CommandApplier plugins to register themselves with the ReplicationServices component. The ReplicationServices::push() internal method simply loops through the active replicators and appliers, calling the replicate() method, passing in a constructed Command message and a pointer to each registered applier. Here is the code showing this process:

void ReplicationServices::push(message::Command &to_push)
{
  vector<plugin::CommandReplicator *>::iterator repl_iter= replicators.begin();
  vector<plugin::CommandApplier *>::iterator appl_start_iter, appl_iter;
  appl_start_iter= appliers.begin();
 
  plugin::CommandReplicator *cur_repl;
  plugin::CommandApplier *cur_appl;
 
  while (repl_iter != replicators.end())
  {
    cur_repl= *repl_iter;
    if (! cur_repl->isActive())
    {
      ++repl_iter;
      continue;
    }
 
    appl_iter= appl_start_iter;
    while (appl_iter != appliers.end())
    {
      cur_appl= *appl_iter;
 
      if (! cur_appl->isActive())
      {
        ++appl_iter;
        continue;
      }
 
      cur_repl->replicate(cur_appl, to_push);
      /* 
       * We update the timestamp for the last applied Command so that
       * publisher plugins can ask the replication services when the
       * last known applied Command was using the getLastAppliedTimestamp()
       * method.
       */
      last_applied_timestamp.fetch_and_store(to_push.timestamp());
      ++appl_iter;
    }
    ++repl_iter;
  }
}

Pretty simple, no? There’s a couple big things to point out about the code you’ve seen so far. First, Drizzle is focused on providing clean, simple, and encapsulated interfaces for plugin developers. There’s no need for plugin developers to understand the often-messy and poorly documented internals of the kernel. A plugin developer only needs to do two things:

  • Understand the well-documented Google Protobuffer message API and have a copy of the replication.proto file which contains the blueprint for the Command message
  • Implement either the drizzled::plugin::CommandReplicator::replicate() or drizzled::plugin::CommandApplier::apply() method

Secondly, the interface allows individuals and companies to completely customize how Drizzle replication works for them. You want replication to do X, Y, and Z? Fine, no problem. We give you some example plugins which clearly demonstrate the interfaces, and you can take it from there, without worrying about messing up the kernel. :)

So, now that you’ve seen the plugin interfaces, let’s discuss how these plugin interfaces are implemented in a couple plugins that are distributed with Drizzle right now.

The FilteredReplicator Plugin

After I wrote the default replicator plugin (which literally does nothing but the following:

void DefaultReplicator::replicate(plugin::CommandApplier *in_applier, message::Command &to_replicate)
{
  /* 
   * We do absolutely nothing but call the applier's apply() method, passing
   * along the supplied Command.  Yep, told you it was simple...
   */
  in_applier->apply(to_replicate);
}

Padraig O’Sullivan took the reins and wrote a FilteredReplicator plugin which allows a DBA to filter Command messages by schema name, table name, and a regular expression. His FilteredReplicator is much more interesting than the default replicator, so let’s take a look at some of his code. Here is the replicate() method from the FilteredReplicator plugin:

void FilteredReplicator::replicate(plugin::CommandApplier *in_applier, 
                                   message::Command &to_replicate)
{
  string schema_name;
  string table_name;
 
  /*
   * First, we check to see if the command consists of raw SQL. If so,
   * we need to parse this SQL and determine whether to filter the event
   * based on the information we obtain from the parsed SQL.
   * If not raw SQL, check if this event should be filtered or not
   * based on the schema and table names in the command message.
   */
  if (to_replicate.type() == message::Command::RAW_SQL)
  {
    parseQuery(to_replicate.sql(),
               schema_name,
               table_name);
  }
  else
  {
    schema_name.assign(to_replicate.schema());
    table_name.assign(to_replicate.table());
  }
 
  /*
   * Convert the schema name and table name strings to lowercase so that it
   * does not matter what case the table or schema name was specified in. We
   * also keep all entries in the vectors of schemas and tables to filter in
   * lowercase.
   */
  transform(schema_name.begin(), schema_name.end(),
                 schema_name.begin(), ::tolower);
  transform(table_name.begin(), table_name.end(),
                 table_name.begin(), ::tolower);
 
  if (isSchemaFiltered(schema_name) ||
      isTableFiltered(table_name))
  {
    return;
  }
 
   /*
   * We can now simply call the applier's apply() method, passing
   * along the supplied command.
   */
  in_applier->apply(to_replicate);
}

The above code is quite easy to read and understand. I encourage you to check out the rest of Padraig’s implementation in /plugin/filtered_replicator/filtered_replicator.cc and see just how easy it is to start writing new Drizzle replication modules.

The CommandLog::apply() Method

To demonstrate a CommandApplier implementation, I present the apply() method of the CommandLog plugin, which inherits from drizzled::plugin::CommandApplier. Without going into too much about the Command Log, which is the subject of the next article, I’ll let you take a look at just the apply() method, to see how a CommandApplier does its main job (see /plugin/command_log/command_log.cc):

void CommandLog::apply(const message::Command &to_apply)
{
  string buffer(""); /* Buffer we will write serialized command to */
 
  static const uint32_t HEADER_TRAILER_BYTES= sizeof(uint64_t) + /* 8-byte length header */
                                              sizeof(uint32_t); /* 4 byte checksum trailer */
 
  size_t length;
  ssize_t written;
  off_t cur_offset;
 
  to_apply.SerializeToString(&buffer);
 
  length= buffer.length(); 
 
  /*
   * Do an atomic increment on the offset of the log file position
   */
  cur_offset= log_offset.fetch_and_add((off_t) (HEADER_TRAILER_BYTES + length));
 
  /*
   * We adjust cur_offset back to the original log_offset before
   * the increment above...
   */
  cur_offset-= (off_t) (HEADER_TRAILER_BYTES + length);
 
  /* 
   * Quick safety...if an error occurs below, the log file will
   * not be active, therefore a caller could have been ready
   * to write...but the log is crashed.
   */
  if (unlikely(state == CRASHED))
    return;
 
  <span style="color: green;">/* We always write in network byte order */</span>
  unsigned char nbo_length[8];
  int8store(nbo_length, length);
 
  /* Write the length header */
  do
  {
    written= pwrite(log_file, nbo_length, sizeof(uint64_t), cur_offset);
  }
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
 
  if (unlikely(written != sizeof(uint64_t)))
  {
    errmsg_printf(ERRMSG_LVL_ERROR, 
                  _("Failed to write full size of command.  Tried to write %" PRId64 " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
                  sizeof(int64_t), 
                  (int64_t) cur_offset,
                  (int64_t) written, 
                  strerror(errno));
    state= CRASHED;
    /* 
     * Reset the log's offset in case we want to produce a decent error message including
     * the original offset where an error occurred.
     */
    log_offset= cur_offset;
    is_active= false;
    return;
  }
 
  cur_offset+= (off_t) written;
 
  /* 
   * Quick safety...if an error occurs above in another writer, the log 
   * file will be in a crashed state.
   */
  if (unlikely(state == CRASHED))
  {
    /* 
     * Reset the log's offset in case we want to produce a decent error message including
     * the original offset where an error occurred.
     */
    log_offset= cur_offset;
    return;
  }
 
  /* Write the command message itself */
  do
  {
    written= pwrite(log_file, buffer.c_str(), length, cur_offset);
  }
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
 
  if (unlikely(written != (ssize_t) length))
  {
    errmsg_printf(ERRMSG_LVL_ERROR, 
                  _("Failed to write full serialized command.  Tried to write %" PRId64 " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
                  (int64_t) length, 
                  (int64_t) cur_offset,
                  (int64_t) written, 
                  strerror(errno));
    state= CRASHED;
    log_offset= cur_offset;
    is_active= false;
  }
 
  cur_offset+= (off_t) written;
 
  /* 
   * Quick safety...if an error occurs above in another writer, the log 
   * file will be in a crashed state.
   */
  if (unlikely(state == CRASHED))
  {
    /* 
     * Reset the log's offset in case we want to produce a decent error message including
     * the original offset where an error occurred.
     */
    log_offset= cur_offset;
    return;
  }
 
  uint32_t checksum= 0;
 
  if (do_checksum)
  {
    checksum= crc32(0L, (unsigned char *) buffer.c_str(), length);
  }
 
  /* We always write in network byte order */
  unsigned char nbo_checksum[4];
  int4store(nbo_checksum, checksum);
 
  /* Write the checksum trailer */
  do
  {
    written= pwrite(log_file, nbo_checksum, sizeof(uint32_t), cur_offset);
  }
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
 
  if (unlikely(written != (ssize_t) sizeof(uint32_t)))
  {
    errmsg_printf(ERRMSG_LVL_ERROR, 
                  _("Failed to write full checksum of command.  Tried to write %" PRId64 " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
                  (int64_t) sizeof(uint32_t), 
                  (int64_t) cur_offset,
                  (int64_t) written, 
                  strerror(errno));
    state= CRASHED;
    log_offset= cur_offset;
    is_active= false;
    return;
  }
}

Conclusion, for Now…

I do hope this article has been useful in getting to know how the Drizzle replication system passes the Command message around within a single server. You got to see the replicator and applier plugin APIs and example implementations of those APIs in the form of the FilteredReplicator and CommandLog classes. Next, we’ll be looking in detail at the Command Log itself, its format, and writing/reading from it.

Please do email me (jaypipes@gmail.com) or feel free to post to the Drizzle Discussion list any criticism, suggestions, or requests. I’m eager to hear from you. :)