dayli.ly

15-418 Project Milestone Report

  • [redacted] <[redacted]@cmu.edu>
  • [redacted] <[redacted]@andrew.cmu.edu>

View this document on the web at https://dayli.ly/15418-project.

For our 15-418 project, we initially proposed a single-node in-memory dataflow programming library in Rust. Since then we have fleshed out the implementation, as well as received feedback from course staff. We hereby report our progress in this document.

In short, what we have already implemented include a work-stealing thread pool, a single-node in-memory dataflow execution engine, and several data-parallel algorithms. We still need to produce benchmarks of these algorithms, as well as potentially optimize these algorithms further.

Thread pool

The most important piece of feedback from course staff was that we should not use rayon-core for our thread pool, but should instead roll our own. We have incorporated this into our plan, and adjusted our project’s scope accordingly.

We implemented a work-stealing thread pool based on the crossbeam-deque Chase-Lev queue implementation. This thread pool uses a blocking fork-join model; the operations available include:

  • join(F, G): run tasks F and G in parallel, block on their completion, and collect both return values;
  • scatter(F, ...): send an array of nproc tasks to each worker to perform, block on their completion, and collect their return values;
  • broadcast(F): send one task to all workers to perform in parallel, block on their completion, and collect their return values.

These primitives allow us to implement many data-parallel algorithms without worrying about orchestration details.

It is worth noting that the blocking nature of these primitives allows us to implement fully on-stack task execution: since all tasks will be blocked on, the surrounding stack frame of any task necessarily lives at least as long as the task itself. Therefore, we can store the task’s context directly on the stack, avoiding expensive heap allocation.

One common worry about on-stack task execution is stack overflow. In practice, this is not a problem: all reasonable parallel task decomposition ensures termination at O(logn)O(\log n) stack frames, and each stack frame typically holds little data by itself, with the bulk of the data pre-allocated on the heap, which are then pointed to from the stack.

Thread sleeping is another consideration. For highly sequential or irregular workloads, it is possible that not all workers will be preoccupied with tasks at all times. There may be a lack of new tasks, and workers may also be blocked on other tasks’ completion. It is undesirable to leave idle worker threads spinning indefinitely, since they will burn cycles that can otherwise be used by other processes. A solution is to have them go to sleep before waking them up through the OS when work arrives.

This turns out to be surprisingly subtle. One scenario that we must avoid is failing to wake up a thread due to improper synchronization when that thread is transitioning to a sleep state (the “lost wakeup” problem). Otherwise, this could reduce parallelism, or in the worst case prevent progress altogether. We used a set of carefully reasoned atomic operations with sequentially-consistent memory order to ensure this.

We have also written a microbenchmark for our thread pool using the criterion framework. On 20 hardware threads (Intel i9-13900H, 6 P-cores with SMT, 8 E-cores), our thread pool performs comparably to (or sometimes better than) rayon-core on balanced fork-join workloads, producing near-linear speedups (18.2x).

Execution engine

Consistent with our initial proposal, we implemented a simple bulk-synchronous dataflow graph execution engine. For this part of the project, we have three main concerns: proper data-sharing, task parallelism, and operator fusion.

Proper data-sharing means that any logical node in the dataflow graph must only execute once, and then have the result shared across all its successors. Otherwise, we would be doing duplicate work. In a bulk-synchronous model, any logical node has a buffer it materializes its results to, which is immutable and so can be shared across multiple successors in parallel. Therefore, proper data-sharing is a natural consequence of the implementation strategy we chose.

Task parallelism refers to running multiple independent logical nodes in parallel. This is done by dynamically extracting supersteps from the dataflow graph: each superstep consists of all nodes whose in-degrees are 0 (their predecessors have all finished execution). This batch of nodes are run in parallel by the execution engine using the join() primitive.

Operator fusion proved tricky. As a form of optimization, it necessarily involves discriminating different types of nodes. However, from the viewpoint of the execution engine, it is better if the nodes are homogeneous, so that they could be uniformly managed in the engine’s internal state. We tackled this in Rust by having three traits (akin to interfaces) associated with each class of nodes:

  • Node provides a homogeneous interface for execution, and the core execution engine exclusively interacts with the type-erased Arc<dyn Node> type;
  • Plan provides output type information for each node, and enables dynamic dataflow graph construction via the runtime-dispatched Arc<dyn Plan<Output = T>> type.
  • Ops provides static fusion support by allowing each concrete node class to override graph construction methods like .then_map() and .then_filter().

Connecting back to our proposal, as of now, our execution engine implementation as described above meets all goals we set out to achieve.

Data-parallel algorithms

Consistent with our initial proposal, we have implemented several node types and their associated data-parallel algorithms in our framework.

  • Map: this is implemented via trivial task decomposition, i.e. recursive binary splitting with a granularity threshold (tentatively N / (4 * nproc)).
  • Filter: in order to be work-efficient, this is implemented by first producing a mask of the input and then computing destination indices by a parallel prefix-sum.
  • Merge sort: this is implemented via trivial task decomposition.
  • Reduce-by-key and group-by-key: these are implemented via trivial task decomposition. This is massively parallel, but not work-efficient, and we are considering improving it by key partitioning.
  • Hash join: this includes inner joins, left/right joins, and full outer joins.

We also implemented unit tests for these operations. Leveraging our operator fusion infrastructure, we also implemented operator fusion for chains of maps and filters of indefinite length and in any order.

Connecting back to our proposal, in terms of functionality, we have met all goals we set out to achieve. In terms of performance, there are still ample optimization opportunities for these algorithms. However, since comprehensive optimization was set as a stretch (125%) goal, we consider our progress to be on track.

Remaining tasks

Right now, the most urgent remaining task is to write benchmarks, both synthetic and realistic, to measure speedups and compare with hand-written parallel algorithms. In more detail:

  • Synthetic benchmarks should isolate each operation and test their speedup for both balanced and skewed inputs.
  • Realistic benchmarks should be real-world parallel algorithms (perhaps simplified) that make use of several of the data-parallel operations, such as matrix block multiplication and PageRank.
  • We plan to have one synthetic benchmark for each type of data-parallel operation, and then at least 2, and ideally 3, realistic benchmarks.

For each task in either category, we need to write three implementations: sequential, hand-written parallel, and dataflow parallel. For the sequential implementations, we plan to use either plain Rust or C++. For the hand-written parallel implementations, we plan to use C++ OpenMP. The sequential implementation will act as a baseline on which we measure speedups, and the hand-written parallel implementation will serve as an aspirational target, as we outlined in our proposal’s stretch goals.

Additionally, we can potentially implement some pointed optimizations on some of the more naive algorithm implementations (mostly, the by-key operations), should they be supported by improved speedups.

Risks and mitigation

Eventually, we might run out of time to implement all benchmarks we would like to write. We aim to focus first on completing synthetic benchmarks, which we believe gives us a more complete — albeit not entirely realistic — picture of the performance of our project.

Regarding the benchmark results, our project could perform poorly in terms of speedup, with us running out of time to improve its performance. This is especially a concern with more complex operations like the by-key operations. Should that happen, it will nevertheless be a good opportunity to write a detailed postmortem on what elements in our implementation prevented it from generating good speedup.

Metadata
  • Id
    nylon/milestone-report
  • (Language)
    en-US
  • Title
    15-418 Project Milestone Report
  • Created
  • (Updated)
  • Attributes
    delist
  • Banner
  • Description