When most people think of job scheduling, they consider all sorts of things. Container orchestration, serverless allocation, batch job running… all of these all qualify. In highly concurrent systems, it is important that you can schedule work and collect the results upon completion. In the world of high-performance, low-latency systems software, those jobs take on an entirely different flavor. Our jobs can — and often do — take only microseconds to complete, and we’re scheduling them to attempt a fine balance of latency, throughput, and resiliency to underlying resource problems. To that end, let’s talk about job queues, concurrency control, fair queueing, and operability within libmtev, which enables developers to take control of their systems.

Job Queues

Job queues are a simple concurrency control concept. As a concrete example, say we have requests coming into our system to serve a range of data from a specific key: “get me the average request arrival rate to my API servers for each minute over the last seven days.” That request turns into work that often requires reading data from the disk and produces 7 days * 1,440 minutes/day * 1 value/minute = 10,080 values. Simple enough.

Now, suppose an operation like that takes 2ms to service, and an application needs to retrieve the same data. But instead of asking for the API service, it asks each of the 500 API servers. To iterate through these requests to construct our answer, we must perform 500 2ms jobs. If this is done serially, it doesn’t take long to realize that you will have a minimum service time 1000ms… and a second is a long time in our world. However, if we could split each of those ranged reads up into separate jobs and distribute those jobs to be satisfied by independent or loosely dependent resources, such as different machines, cores, and disks, we could regain much of our service time via concurrent execution.

Concurrency Control

This “fan out” of work is usually the first step in most system implementations attempting to reduce service latencies. However, the is managing the concurrency, i.e., the number of parallel executions. Too much concurrency does exist and is often referred to as “thrashing” in the systems world. More specifically, it is the area on the universal scalability law (USL) curve to the right of its peak. Adding concurrency makes things faster until it… doesn’t. So, the goal becomes taking your work and achieving a concurrency (number of parallel executions) that delivers you near the peak of your USL curve. To do this, concurrency-governed job queues are a perfect tool.

A job queue in libmtev is a concurrency-governed, dynamic, asynchronous job execution system, which has an input work queue. And because there are outstanding jobs in that queue, the system assigns them to available threads to complete the work. If no threads are available, the job queue system spins up new threads to service the work, up to the maximum designated in the job queue’s configuration. And if there are threads that sit idle due to lack of available work in the queue, then they are gracefully terminated.

If we model our system and realize that we receive increased throughput in the system — meaning individual job execution time does not suffer significantly — but afterward the system thrashes, then we can set up a job queue for those jobs that a minimum concurrency of 0 and a maximum concurrency of 100. Now, when our request for 500 ranged fetches arrives, we can place all of the work requests in that job queue, and it will run up to 100 of them concurrently until it completes them, and then we can assemble the results and deliver the answer.

Libmtev, today, does not auto-tune for the optimal job queue concurrency due to system complexity. The optimal concurrency would assume that all jobs are alike and that no other jobs in the system impact resource availability. In a complex system, resource usage requirement is highly compositional, and the dynamics of the system mean that the practical USL curve parameters are ever-changing from the standpoint of an individual job queue. It is also not practical for a programmer to isolate their jobs such that they are aligned with individual, isolated resource usage. So the optimal concurrency that might be ascertained for a specific resource isn’t directly translatable to real-world jobs. That said, there is some research work into auto-discovery and adaptation of concurrency.

Problem solved? Not even close.

Fair Queueing

The next challenge is starvation-related. When building interactive systems and other latency-sensitive workloads, it is undesirable-to-the-point-of-broken to allow expensive requests to significantly impact the latency of inexpensive requests. Keep in mind that a request, as in the above example, can turn into hundreds of independently executing jobs behind the scenes. In the world of time-series databases, that happens commonly due to “click rage.” A user attempts to load up a graph with 500 independent streams of data, e.g. different lines in a line-graph. When it doesn’t load within one second, so they click it again and proceed to repeat this four or five times. While there are both simple and sophisticated methods for protecting against this type of resource attack, many other legitimate uses of a system can result in highly similar behavior that is much harder to defend against.

The basic problem is that if we put 500 jobs in the queue and then do it again and again and again, any new jobs from new requests come into the queue behind them. As the system load increases the queue length of the job queue, small, simple requests that may only have one or two jobs require to satisfy them are unfortunately delayed. These are the fundamentals of queueing theory rearing its ugly head to lay waste in your users’ experience. What to do? Fair queueing to the rescue.

Fair queuing is most easily explained via a description of our implementation in libmtev. Each job submitted to a jobq can have a taskid. The wait queue for execution is actually a dynamic number of FIFO queues, called subqueues, one per taskid. As new jobs arrive, if there is no subqueue for that taskid, then one is created, and the job enters into that subqueue. The execution engine simply round-robins across all subqueues to schedule work. If the job selected for work is the last in its subqueue, the subqueue is destroyed. This means that we have unchanged behavior if all jobs are submitted with the same taskid.

Yet, if we attach a unique taskid to each inbound user request, each one gets its fair share of immediacy when it comes to scheduling of work. This provides reasonably low latency on lightweight requests. As a reminder, we use these techniques to manage resources that provide for a better experience not as a way to do more work than was otherwise possible. In other words, they don’t magically fix an overloaded system, but aim to fairly distribute this latency reasonably well in a resource-saturated system.

In addition to concurrency control, each job queue can support a queue depth limit. This requires a bit more advanced programming in that programmers who intend to schedule work within a job queue must accommodate the “this queue is full” refusal to perform work. For a typical service request enqueueing jobs, this might sound straightforward. But when you have jobs that enqueue other jobs in other queues, it can be challenging to efficiently abort all the work and inform the caller that the service is overloaded. Perhaps we’ll explore depth-limited work queues in a future article.

Operability

The operability of a real-world system should never be ignored. One of the better operational aspects of the libmtev job queue system is the online configurability and instrumentation. Each job queue concurrency parameter related to concurrency control can be manipulated while the system is running. This allows for simply, highly-reactive operator control over the parallelism.

Setting the concurrency bounds on a job queue programmatically:

eventer_jobq_t *jobq = eventer_jobq_retrieve(“my_special_jobq”);
if(jobq) eventer_jobq_set_min_max(jobq, 1, 16);

Setting the concurrency bounds on a job queue via the telnet console:

# telnet 127.0.0.1 32322
example1# mtev jobq my_special_jobq max 16
Setting 'my_special_jobq' jobq max to 16
example1# mtev jobq my_special_jobq min 1
Setting 'my_special_jobq' jobq min to 1

Being able to increase or decrease concurrency is important to help retarget the peak of the USL (Universal Scalability Law) curve. But is it faster? That’s where instrumentation comes in. The system tracks the total and current number of jobs enqueued to a job queue as well as the nanoseconds of wait time and execution time for every job through the system using libcircllhist (via libcircmetrics). The operator can make a change and immediately determine if that change has improved or worsened the situation.

libmtev’s internally tracked histogram of job execution time

libmtev’s internally tracked histogram of job wait-for-execution time

At Circonus, we build most of our C/C++ based service on top of libmtev because we get all of these facilities built-in. The IRONdb system, for example, manages about 100 different job queues with highly variable concurrency settings. A humming production system can be running between 80 and 500 threads at any given time. This job queue framework, its instrumentation, and its configurability make for a complex system that is substantially easier to code, reason about, and optimize for an application’s specific workload.

Get blog updates.

Keep up with the latest in telemtry data intelligence and observability.