There is a class of problems in systems software that require guaranteed delivery of data from one stage of processing to the next stage of processing. In database systems, this usually involves a WAL file and a commit process that moves data from the WAL to the main storage files. If a crash or power loss occurs, we can replay the WAL file to reconstitute the database correctly. Nothing gets lost. Most database systems use some variant of ARIES.

In message broker systems, this usually involves an acknowledgement that a message was received and a retry from the client if there was no response or an error response. For durable message brokers, that acknowledgement should not go to the client until the data is committed to disk and safe. In larger brokered systems, like Kafka, this can extend to the data safely arriving at multiple nodes before acknowledging receipt to the client. These systems can usually be configured based on the relative tolerance of data loss for the application. For ephemeral stream data where the odd message or two can be dropped, we might set Kafka to acknowledge the message after only the leader has it, for example.

JLog is a library that provides journaled log functionality for your application and allows decoupling of data ingestion from data processing using a publish subscribe semantic. It supports both thread and multi-process safety. JLog can be used to build pub/sub systems that guarantee message delivery by relying on permanent storage for each received message and allowing different subscribers to maintain a different position in the log. It fully manages file segmentation and cleanup when all subscribers have finished reading a file segment.

Recent additions

To support ongoing scalability and availability objectives at Circonus, I recently added a set of new features for JLog. I’ll discuss each of them in more detail below:

  • Compression with LZ4
  • Single process support on demand
  • Rewindable checkpoints
  • Pre-commit buffering

Compression with LZ4

If you are running on a file system that does not support compression, JLog now supports turning on LZ4 compression to reduce disk storage requirements and also increase write throughput, when used with pre-commit buffering. The API for turning on compression looks like:

typedef enum {
  JLOG_COMPRESSION_NULL = 0,
  JLOG_COMPRESSION_LZ4 = 0x01
} jlog_compression_provider_choice;

int jlog_ctx_set_use_compression(jlog_ctx *ctx, uint8_t use);

int jlog_ctx_set_compression_provider(jlog_ctx *ctx,    
    jlog_compression_provider_choice provider);

Currently, only LZ4 is supported, but other compression formats may be added in the future. Choosing the NULL compression provider option is the same as choosing no compression. It’s important to note that you must turn on compression before calling jlog_ctx_init and the chosen compression will be stored with the JLog for it’s lifetime.

Single process support

This really should be called “switching off multi-process support”, as multi-process is the default behavior. Multi-process protects the JLog directory with a file lock, via fcntl(linux impl. linked). We always maintain thread-safety and there is no option to disable thread safety, but you can turn off this system call if you know that writes will only ever come from a single process (probably the most common usage for JLog).

Using the following call with mproc == 0 will turn off this file locking, which should result in a throughput increase:

int jlog_ctx_set_multi_process(jlog_ctx *ctx, uint8_t mproc);

Rewindable checkpoints

Highly available systems may require the ability to go back to a previously read checkpoint. JLog, by default, will delete file segments when all subscribers have read all messages in the segment. If you wanted to go back to a previously read checkpoint for some reason (such as failed processing), you were stuck with no ability to rewind. Now with support for rewindable checkpoints, you can set an ephemeral subscriber at a known spot and backup to that special named checkpoint. The API for using rewindable checkpoints is:

int jlog_ctx_add_subscriber(jlog_ctx *ctx, const char *subscriber,
    jlog_position whence);
int jlog_ctx_set_subscriber_checkpoint(jlog_ctx *ctx, 
    const char *subscriber, 
    const jlog_id *checkpoint);

Here’s an example of it’s usage:

char begins[20], ends[20];
jlog_id begin, end, checkpoint;
int count, pass = 0, orig_expect = expect;
jlog_message message;

ctx = jlog_new(“/tmp/test.foo”);
if(jlog_ctx_open_reader(ctx, “reader”) != 0) {
  fprintf(stderr, "jlog_ctx_open_reader failed: %d %s\n", jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
  exit(-1);
}

/* add our special trailing check point subscriber */
if (jlog_ctx_add_subscriber(ctx, “checkpoint-name”, JLOG_BEGIN) != 0 && errno != EEXIST) {
  fprintf(stderr, "jlog_ctx_add_subscriber failed: %d %s\n", jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
  exit(-1);
}

/* now move the checkpoint subscriber to where the real reader is */
if (jlog_get_checkpoint(ctx, “reader”, &checkpoint) != 0) {
  fprintf(stderr, "jlog_get_checkpoint failed: %d %s\n", jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
  exit(-1);
}

if (jlog_ctx_set_subscriber_checkpoint(ctx, “checkpoint-name”, &checkpoint) != 0) {
  fprintf(stderr, "jlog_ctx_set_subscriber_checkpoint failed: %d %s\n", jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
  exit(-1);
}

Now we have a checkpoint named “checkpoint-name” at the same location as the main subscriber “reader”. If we want to rewind, we simply do this:

/* move checkpoint to our original position, first read checkpoint location */
if (jlog_get_checkpoint(ctx, “checkpoint-name”, &checkpoint) != 0) {
  fprintf(stderr, "jlog_get_checkpoint failed: %d %s\n", jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
  exit(-1);
}

/* now move the main read checkpoint there */
if (jlog_ctx_read_checkpoint(ctx, &checkpoint) != 0) {
  fprintf(stderr, "checkpoint failed: %d %s\n", jlog_ctx_err(ctx), jlog_ctx_err_string(ctx));
} else {
  fprintf(stderr, "\trewound checkpoint...\n");
}

To move our checkpoint forward, we merely call jlog_ctx_set_subscriber_checkpoint with the safe checkpoint.

Pre-commit buffering

One of the largest challenges with JLog is throughput. The ability to disable multi-process support helps reduce the syscalls required to write our data. This is good, but we still need to make a writev call for each message. This syscall overhead takes a serious bite out of throughput (more in the benchmarks section below). To get around this issue, we have to find a safe-ish way to reduce the syscall overhead of lots of tiny writes. We can either directly map the underlying block device and write to it directly (a nightmare) or we can batch the writes. Batching writes is way easier, but sacrifices way too much data safety (a crash before a batch commit can lose many rows depending on the size of the batch). At the end of the day, I chose a middle ground approach which is fairly safe for the most common case but also allows very high throughput using batch writes.

int jlog_ctx_set_pre_commit_buffer_size(jlog_ctx *ctx, size_t s);

Setting this to something greater than zero will turn on pre-commit buffering. This is implemented as a writable mmapped memory region where all writes go to batch up. This pre-commit buffer is flushed to actual files when it is filled to the requested size. We rely on the OS to flush the mmapped data back to the backing file even if the process crashes. However, if we lose the machine to power loss this approach is not safe. There is always a tradeoff between safety and throughput. Only use this approach if you are comfortable losing data in the event of power loss or kernel panic.

It is important to note that pre-commit buffering is not multi-process writer safe. If you are using JLog under a scheme that has multiple writing processes writing to the same JLog, you have to set the pre-commit buffer size to zero (the default). However, it is safe to use from a single process, multi-thread writer setup, and it is also safe to use with under multi-process when there are multiple reading processes but a single writing process.

There is a tradeoff between throughput and read side latency if you are using pre-commit buffering. Since reads only ever occur out of the materialized files on disk and do not consider the pre-commit buffer, reads can only advance when the pre-commit buffer is flushed. If you have a large-ish pre-commit buffer size and a slow-ish write rate, your readers could be waiting for a while before they advance. Choose your pre-commit buffer size wisely based on the expected throughput of your JLog. Note that we also provide a flush function, which you could wire up to a timer to ensure the readers are advancing even in the face of slow writes:

int jlog_ctx_flush_pre_commit_buffer(jlog_ctx *ctx);

Benchmarks

All benchmarks are timed by writing one million JLog entries with a message size of 100 bytes. All tests were conducted on OmniOS v11 r151014 using ZFS as the file system with compression enabled.

TestEntries/secTime to complete

JLog Default ~114,000 8.735 sec
LZ4 compression on ~96,000 10.349 sec
Multi-process OFF ~138,000 7.248 sec
MP OFF + LZ4 ~121,000 8.303 sec
MP OFF + Pre-commit buffer 128K ~1,080,000 0.925 sec
MP OFF + Pre-commit + LZ4 ~474,000 2.113 sec

As you can see from the table above, turning multi-process support off provides a slight throughput advantage and all those calls to fcntl are elided, but the real amazing gains come from pre-commit buffering. Even a relatively small buffer of 128 KBytes gains us almost 8X in throughput over the next best option.

That LZ4 is running more slowly is not surprising. We are basically trading CPU for space savings. In addition, using a compressing file system will get you these space gains without having to flip on compression in JLog. However, if you are running on a non-compressed file system, it will save you disk space.

Get blog updates.

Keep up with the latest in telemtry data intelligence and observability.