delayed
: A Framework for
Parallelizing Dependent Tasks## delayed v0.5.0: A Framework for Parallelizing Dependent Tasks
R supports a range of options to parallelize computation. For an
overview, see the HPC Task
View on CRAN. In general, these options work extremely well for
problems that are embarassingly parallel, in that they support
procedures such as parallel lapply
calls and parallel
for
loops – essentially map
operations.
However, there is no easy way to parallelize dependent tasks in
R.
In contrast, the Python language has the excellent framework for
exactly this purpose – dask
.
dask
makes it easy to build up a graph of interdependent
tasks and then execute them in parallel in an order that optimizes
performance (Dask Development Team 2016).
The present package seeks to reproduce a subset of that functionality in
R, specifically the delayed
module. To parallelize across the tasks, we leverage the excellent future
package (Bengtsson 2017).
The power of the delayed
framework is best appreciated
when demonstrated by example.
The two primary ways to generate Delayed
objects in R
are via the delayed
and delayed_fun
functions.
delayed
is used to delay expressions
## [1] "delayed(3 + 4)"
## [1] 7
…while delayed_fun
wraps a function so that it returns
Delayed
results
# delay a function
x2 <- function(x) {x * x}
delayed_x2 <- delayed_fun(x2)
# calling it returns a delayed call
delayed_object <- delayed_x2(4)
print(delayed_object)
## [1] "delayed(x2(x = 4))"
## [1] 16
These elements of the functionality of delayed
are
substantially similar to the facilities already offered by the
future
package. delayed
diverges from
future
by offereing the ability to chain
Delayed
objects together. For example:
# delay a simple expression
delayed_object_7 <- delayed(3 + 4)
# and another
delayed_object_3 <- delayed(1 + 2)
# delay a function for addition
adder <- function(x, y){x + y}
delayed_adder <- delayed_fun(adder)
# but now, use one delayed as input to another
chained_delayed_10 <- delayed_adder(delayed_object_7, delayed_object_3)
# We can still compute its result.
chained_delayed_10$compute()
## [1] 10
We can visualize the dependency structure of these delayed tasks by
calling plot
on the resultant Delayed
object:
Now that we’ve had an elementary look at the functionality offered by
delayed
, we may take a look at how to parallelize dependent
computations – the core problem addressed by the package. We can easily
parallelize across dependency structures by specifying a
future
plan
. Let’s try it out
## Warning in supportsMulticoreAndRStudio(...): [ONE-TIME WARNING] Forked
## processing ('multicore') is not supported when running R from RStudio because
## it is considered unstable. For more details, how to control forked processing
## or not, and how to silence this warning in future R sessions, see
## ?parallelly::supportsMulticore
# re-define the delayed object from above
delayed_object_7 <- delayed(3 + 4)
delayed_object_3 <- delayed(1 + 2)
chained_delayed_10 <- delayed_adder(delayed_object_7, delayed_object_3)
# compute it using the future plan (two multicore workers), verbose mode lets
# us see the computation order
chained_delayed_10$compute(nworkers = 2, verbose = TRUE)
## run:0 ready:2 workers:2
## updating 3 + 4 from ready to running
## run:1 ready:1 workers:2
## updating 1 + 2 from ready to running
## run:2 ready:0 workers:2
## updating 3 + 4 from running to resolved
## updating 1 + 2 from running to resolved
## updating adder(x = delayed_object_7, y = delayed_object_3) from waiting to ready
## run:0 ready:1 workers:2
## updating adder(x = delayed_object_7, y = delayed_object_3) from ready to running
## run:1 ready:0 workers:2
## updating adder(x = delayed_object_7, y = delayed_object_3) from running to resolved
## [1] 10
The above illustrates the typical lifecycle of a delayed task. Such
procedures start with a state of "waiting"
, which means
that a given task depends on other delayed tasks that are not yet
complete. If the task in question has no delayed dependencies – or when
these dependencies become resolved – the task transitions to a
"ready"
state. This means it will be run as soon as a
worker is available to process the task. Once the task is assigned to a
worker, the state of the task changes to "running"
; and
when processing of the task is complete, it is finally marked
"resolved"
.
When multiple tasks are simulatenously "ready"
, the
scheduler must decide which to assign to the next available worker.
Currently, the scheduler simply prioritizes tasks that are likely to
result in other tasks becoming “ready”. In the future, we plan to build
more advanced scheduling features, similar to those available in the
dask
library. An overview of that functionality is
described here: https://distributed.readthedocs.io/en/latest/scheduling-policies.html
Another key features of dask
is data
locality. That is, data is only present on workers that need it
for a given task, and is only shared between workers when necessary.
Tasks are then prioritized to workers that have all the necessary
components. We have begun to implement a similar framework, though this
work remains incomplete.