On Job Scheduling

When most people think of job scheduling, they consider all sorts of things. Container orchestration, serverless allocation, and batch job running… these all qualify. In highly concurrent systems, it is important that you have the ability to schedule work and collect those results upon completion. In the world of high-performance, low-latency systems software, those jobs take on an entirely different flavor. Our jobs can — and often do — take only microseconds to complete, and we’re scheduling them to attempt a fine balance of latency, throughput, and resiliency to underlying resource problems. To that end, let’s talk about job queues, concurrency control, fair queueing, and operability within libmtev, which enables developers to take control of their systems.

Job Queues

Job queues are a simple concurrency control concept. As a concrete example, say we have requests coming into our system to serve a range of data from a specific key: “get me the average request arrival rate to my API servers for each minute over the last seven days.” That request turns into work that often requires reading data from the disk and produces 7 days * 1,440 minutes/day * 1 value/minute = 10,080 values. Simple enough.

Now, suppose an operation like that takes 2ms to service, and an application needs to retrieve the same data. But instead of asking for the API service, it asks each of the 500 API servers. To iterate through these requests to construct our answer, we must perform 500 2ms jobs. If this is done serially, it doesn’t take long to realize that you will have a minimum service time of 1,000ms… and a second is a long time in our world. However, if we could split each of those ranged reads up into separate jobs and distribute those jobs to be satisfied by independent or loosely dependent resources, such as different machines, cores, and disks, we could regain much of our service time via concurrent execution.

Concurrency Control

This fan-out of work is usually the first step in most system implementations attempting to reduce service latencies. However, the next painful challenge is managing the concurrency, i.e., the number of parallel executions. Too much concurrency does exist and is often referred to as “thrashing” in the systems world. More specifically, it is the area on the universal scalability law (USL) curve to the right of its peak. Adding concurrency makes things faster… until it doesn’t.

So, the goal becomes taking your work and achieving a concurrency that delivers near the peak of your USL curve. To do this, concurrency-governed job queues are a perfect tool.

A job queue in libmtev is a concurrency-governed, dynamic, asynchronous job execution system, which has an input work queue. And because there are outstanding jobs in that queue, the system assigns them to available threads to complete the work. If no threads are available, the job queue system spins up new threads to service the work, up the maximum designated in the job queue’s configuration. And if threads sit idle due to lack of available work in the queue, then they are gracefully terminated.

If we model our system and realize that we receive increased throughput in the system up to 100 concurrent jobs — meaning individual job execution time does not suffer significantly — but afterward the system thrashes, then we can set up a job queue for those jobs that have a minimum concurrency of 0 and a maximum concurrency of 100. Now, when our request for 500 ranged fetches arrives, we place all the work requests in that job queue, and it will run up to 100 of them concurrently until it completes them, and then we assemble the results and deliver the answer.

Libmtev, today, does not auto-tune for the optimal job queue concurrency due to system complexity. The optimal concurrency would assume that all jobs are alike and that no other jobs in the system impact resource availability. In a complex system, resource usage requirement is highly compositional, and the dynamics of the system mean that the practical USL curve parameters are ever-changing from the standpoint of an individual job queue. It is also not practical for a programmer to isolate their jobs such that they are aligned with individual, isolated resource usage. So, the optimal concurrency that might be ascertained for a specific resource isn’t directly translatable to real-world jobs. That said, there is some research work into auto-discovery and adaptation of concurrency.

Problem solved? Not even close.

Fair Queueing

The next challenge is starvation-related. When building interactive systems and other latency-sensitive workloads, it is undesirable-to-the-point-of-broken to allow expensive requests to significantly impact the latency of inexpensive requests. Keep in mind that a request, as in the above example, can turn into hundreds of independently executing jobs behind the scenes. In the world of time-series databases, that happens commonly due to “click rage.” A user attempts to load a graph with 500 independent streams of data, e.g., different lines in a line graph. When it doesn’t load within one second, they click it again and proceed to repeat this four or five times. While there are both simple and sophisticated methods for protecting against this type of resource attack, many other legitimate uses of a system can result in highly similar behavior that is much harder to defend against.

The basic problem is that if we put 500 jobs in the queue and then do it again and again and again, any new jobs from new requests come into the queue behind them. As the system load increases — the queue length of the job queue — small, simple requests that may only require one or two jobs to satisfy them are unfortunately delayed. These are the fundamentals of queueing theory rearing its ugly head to lay waste in your users’ experience. What to do? Fair queueing to the rescue.

Fair queueing is most easily explained via a description of our implementation in libmtev. Each job submitted to a jobq can have a taskid. The wait queue for execution is actually a dynamic number of FIFO queues, called subqueues, with one per taskid. As new jobs arrive, if there is no subqueue for that taskid, then one is created, and the job enters into that subqueue. The execution engine simply round-robins across all subqueues to schedule work. If the job selected for work is the last in its subqueue, the subqueue is destroyed. This means that we have unchanged behavior if all jobs are submitted with the same taskid.

Yet, if we attach a unique taskid to each inbound user request, each one gets its fair share of immediacy when it comes to the scheduling of work. This provides reasonably low latency on lightweight requests. As a reminder, we use these techniques to manage resources that provide a better experience not as a way to do more work than was otherwise possible. In other words, they don’t magically fix an overloaded system but aim to fairly distribute this latency reasonably well in a resource-saturated system.

In addition to concurrency control, each job queue can support a queue depth limit. This requires a bit more advanced programming in that programmers who intend to schedule work within a job queue must accommodate the “this queue is full” refusal to perform work. For a typical service request enqueueing jobs, this might sound straightforward. But when you have jobs that enqueue other jobs in other queues, it can be challenging to efficiently abort all the work and inform the caller that service is overloaded. Perhaps we’ll explore depth-limited work queues in a future article.

Operability

The operability of a real-world system should never be ignored. One of the better operational aspects of the libmtev job queue system is the online configurability and instrumentation. Each job queue concurrency parameter related to concurrency control can be manipulated while the system is running. This allows for simple, highly reactive operator control over the parallelism.

Setting the concurrency bounds on a job queue programmatically:

eventer_jobq_t *jobq = eventer_jobq_retrieve(“my_special_jobq”);
if(jobq) eventer_jobq_set_min_max(jobq, 1, 16);

Setting the concurrency bounds on a job queue via the telnet console:

# telnet 127.0.0.1 32322

example1# mtev jobq my_special_jobq max 16
Setting 'my_special_jobq' jobq max to 16

example1# mtev jobq my_special_jobq min 1
Setting 'my_special_jobq' jobq min to 1

Being able to increase or decrease concurrency is important to help retarget the peak of the USL curve. But is it faster? That’s where instrumentation comes in. The system tracks the total and current number of jobs enqueued to a job queue as well as the nanoseconds of wait time and execution time for every job in the system using libcircllhist via libcircmetrics. The operator can make a change and immediately determine if that change has improved or worsened the situation.

libmtev’s internally tracked histogram of job execution time
libmtev’s internally tracked histogram of job wait-for-execution time

At Circonus, we build most of our C/C++ based service on top of libmtev because we get all these facilities built-in. The IRONdb system, for example, manages about 100 different job queues with highly variable concurrency settings. A humming production system can be running between 80 and 500 threads at any given time. This job-queue framework and its instrumentation and configurability make for a complex system that is substantially easier to code, reason about, and optimize for an application’s specific workload.

RELATED POSTS

Tuning AWS EC2 instances with CloudWatch metric analysis

Read More

Effective Management of High Volume Numeric Data with Histograms

Read More

Circonus On The Raspberry Pi

Read More

Subscribe to the Standard Metric

Get new blog posts delivered to your inbox every month