On a recent large-scale data migration project, my team encountered throughput issues for a service being developed in our data pipeline. This particular service drove our automated migration, and we needed to scale by orders of magnitude in order to meet the launch schedule. Beginning with addressing bottlenecks and tuning runtime parameters, various performance improvements were made; an optimization that helped us go the distance was rethinking single-threaded sequential execution as a pipelined directed acyclic graph (DAG) execution.
The particular service under discussion works by continuously streaming and processing data in batches. Up until the point of considering architectural redesign, various parts of the application had been tuned or otherwise parallelized as much as our network/hardware could support. There would be little to no performance benefit from throwing more threads at the issue in a brute-force manner. The key observation at this point was that we would need our slowest I/O-bound processes to run not only as quickly as possible, but as much of the time as possible.
One main loop consisting of several major tasks was executing sequentially. By multithreading the entire main process, we could process multiple batches simultaneously. Conveniently, the main process was already encapsulated as a job. A single threaded execution was done via ThreadPoolTaskExecutor.scheduleWithFixedDelay() to allow the entire batch to finish before repeating. What we needed to do here was first change the ThreadPoolTaskExecutor to use scheduleAtFixedRate() and allow concurrent executions. To guard against excessive memory usage by queued tasks, a discard policy was configured for this top-level executor.
Now, what does that accomplish? We’re adding more parallelism at the task level, but as stated before, parallelism for individual stages was already achieved by other optimizations. The next step was to actually “gate” each stage in the process to force pipelining and parallelization of different tasks. To do this, some changes needed to happen:
1) Each stage was encapsulated inside a Callable. Callables for intermediate stages were further wrapped inside an AsyncFunction.2) A separate ExecutorService was used to run each stage. Native Java ExecutorService instances were converted into Guava’s ListeningExecutorService to support successive chaining of tasks (via ListenableFuture instances returned). Using separate executors for each task allowed different types of tasks to run in parallel.
Simply running multiple threads through the execution of all tasks at once might have produced this behavior. However, using that approach does not guarantee whether executions end up staggered across I/O intensive stages or all within the same stage.
3) Splitting sequential tasks into a graph:
Pipelining in itself is certainly an improvement. Now that tasks were encapsulated as Callables, independent tasks could run in parallel. Various stages are kicked off and joined using transform()
and get()
; intermediate stages are represented as an AsyncFunction used to transform results from dependency tasks. The image above shows before/after arrangement of tasks for our application.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
|
An initial solution was delivered under operational/timeline constraints with the above approach. Just imagine more stages with more interdependencies. Examined in isolation, the result was an increase in throughput, by a factor of approximately 1.83x. Keep in mind, an architectural reorganization can compound upon smaller changes; combined with other optimizations made to this app, we’ve seen up to a 132x peak gain in production throughput.
An aside on tuning thread count and executor services for various stages:
I believe in using threads judiciously (I know, it’s tempting to add an extra 0 to that thread config). For our particular case, there were two heavily I/O bound stages, with the others being negligible or relatively quick. The number of threads executing the graph were set to two– allowing each expensive stage to run nearly continuously. Furthermore, the executors for those expensive stages were configured with a thread pool size of one. As a test, the executors’ pool sizes were increased to two. This actually caused a degradation in performance by 0.877s for a sample dataset; To understand why this happens, let’s look at the visualization of application state. The chart below (part of a monitoring dashboard built using Splunk) shows what happened when the two stages were constrained to one thread each. Tasks represented by the orange bars represent the second most time-intensive stage. Once our pipeline fully engages each stage, we should expect to get the cost of stage-orange for free (on a timescale only), except for the initial occurrence. In other words, that task’s execution is overshadowed by the most expensive stage.
Once thread count is increased, the two threads may concurrently attempt to perform the same stage, splitting available resources. If we examine the total cost of N-1 stage-orange runs, we see that roughly accounts for the one second increase in runtime.
estimated times for ops, not counting the initial execution:
0.102s + 0.108s + 0.119s + 0.1s + 0.121s + 0.094s
total = 0.644s
difference in trial run times: 17.474-16.597 = 0.877s
0.644s accounts for most of the difference in performance
- of course, there may have been some variability due to network performance
Generalizing the design:
Manually stringing up a series of asynchronous tasks and transformations isn’t exactly the picture of maintainability and extensibility we want to see in our applications. The next challenge was refactoring this process to make it reuseable and configurable. We can decompose this pattern into the following:
- specification of tasks as a graph
- chaining and joining of tasks
Each “stage” or task was rewritten as a Function<I, O> class. Defining a task graph was done by specifying the list of dependencies for each stage, along with the work encapsulated within a Function, and the executor service for each stage. That gives us all the specifications we need to initiate a stage. Essentially, we used a modified adjacency list representation for a graph.
To automatically initiate tasks in dependency order, a depth-first search was performed from each task in the graph. No task begins executing until dependencies complete, and results from the dependencies are passed into the function. We could have done a topological sort to determine dependency ordering. However, the implementation was simplified using the design assumption that crawling the graph and initializing tasks in memory is relatively inexpensive (on the order of milliseconds, happening once per graph execution).
We also needed to validate the graph as an acyclic graph; cycles could cause tasks to block on execution, or even a stack overflow error from recursively initializing dependencies. The graph is first validated by a custom AcyclicGraph
class, which is then consumed by a TaskGraphExecutor
that executes the graph. Internally, the AcyclicGraph
is backed by a task graph mapping (adjacency list-like representation discussed earlier). The task graph mapping is created in Spring via XML configuration as a map. Excluding writing new services, we can configure executor services for each stage, reconfigure the graph, or incorporate new tasks using XML– all without touching any Java application code.
The TaskGraphExecutor
executes a task graph. You’ll notice that working with ListenableFuture
gives us the added convenience of choosing an asynchronous or synchronous computation of the overall graph, exposed by computeGraphAsync()
and computeGraphSynchronously()
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
|
Below is the helper class TaskGraphTuple
, which encapsulates converting Function
to AsyncFunction
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
|
Lastly, here’s an example configuration file. All the configuration is done in one file here for brevity.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
|