What is parallelization?
Parallelization is the process of speeding up a computer program by dividing it into independent tasks and running those tasks simultaneously across multiple computer processors. Most modern laptops will have two or more processors (or “cores”), and many statisticians have access to so-called cluster computing systems (CCS), which can have hundreds of processing nodes, each of which can have multiple cores. Roughly speaking, a program that can be run in ten minutes when executed on a single core will take just one minute if it can be broken into ten separate tasks that all run at the same time. Therefore, parallelization can result in massive gains in computing speed and should be done whenever possible.
Not all code can be parallelized; the separate tasks cannot exchange information or depend on each other in any way. However, you can still write programs that are partially parallel, such as when you separately compute ten estimates in parallel and then take the mean of the ten estimates.
The terminology associated with parallel computing can be confusing - what is the difference between a node, a core, and a processor? What is the difference between a job, a task, and a thread? We use the following definitions:
- A node is a single computer. Each node has access to physical resources, such as processing cores and memory. Your laptop is a node. A CCS is a collection of multiple nodes.
- A core (or a processor) is an electronic component within a computer that executes code. Many modern laptops will have more than one core, and each node on a CCS will usually have multiple cores.
- A task (or a thread) is a portion of code that runs on a single core.
- A cluster computing system (CCS) is a type of “supercomputer”, usually created and managed by IT specialists, specifically designed to handle large numbers of parallel tasks coming from multiple users.
- A job is a collection of tasks that are part of the same simulation.
- A job array is a special type of job that contains a number of near-identical tasks
- A job scheduler (JS) is the software that runs on a CCS and manages the process of running jobs and job arrays. Slurm and Oracle Grid Engine are examples of job schedulers.
Parallelization in SimEngine
User-friendly parallelization is a hallmark of SimEngine. There are two modes of parallelizing code using SimEngine, which we refer to as local parallelization and cluster parallelization. Local parallelization refers to splitting the computational work of a simulation between multiple cores of a single computer (e.g., a multicore laptop). Cluster parallelization refers to running a simulation on a CCS using job arrays. SimEngine is designed to automate as much of the parallelization process as possible. We give an overview of each parallelization mode below.
Local parallelization
Local parallelization is the easiest way to parallelize code, as the
entire process is handled by the package and executed on the user’s
computer. This mode is activated using set_config()
, as
follows.
sim <- new_sim()
sim %<>% set_config(parallel = TRUE)
SimEngine handles the mechanics related to
parallelization internally using the base R package
parallel
. If a single simulation replicate runs in a very
short amount of time (e.g., less than one second), using local
parallelization can actually result in an increase in total
runtime. This is because there is a certain amount of computational
overhead involved in the parallelization mechanisms inside
SimEngine. A speed comparison can be performed by
running the code twice, once with
set_config(parallel = TRUE)
and once with
set_config(parallel = FALSE)
, each followed by
sim %>% vars("total_runtime")
, to see the difference in
total runtime. The exact overhead involved with local parallelization
will differ between machines.
If the user’s computer has n
cores available,
SimEngine will use n-1
cores by default.
The n_cores
argument of set_config()
can be
used to manually specify the number of cores to use, as follows.
sim %<>% set_config(n_cores = 2)
Cluster parallelization
Parallelizing code using a CCS is more complicated, but
SimEngine is built to streamline this process as much
as possible. A CCS is a supercomputer that consists of a number of
nodes, each of which may have multiple cores. In a typical workflow, a
user starts by logging into the CCS (via SSH) and transferring files to
the CCS filesystem (using Linux commands or an FTP client). The user
then runs programs by submitting “jobs” to the CCS using a special
program called a job scheduler. The job scheduler manages the process of
running the jobs in parallel across multiple nodes and/or multiple
cores. Although there are multiple ways to run code in parallel on a
CCS, SimEngine makes use of job arrays. The main
cluster parallelization function in SimEngine is
run_on_cluster()
. Throughout this example, we use Slurm as
an example job scheduler, but an analogous workflow will apply to other
job scheduling software.
To illustrate the cluster parallelization workflow, consider the following simulation:
sim <- new_sim()
create_data <- function(n) { return(rpois(n=n, lambda=20)) }
est_lambda <- function(dat, type) {
if (type=="M") { return(mean(dat)) }
if (type=="V") { return(var(dat)) }
}
sim %<>% set_levels(estimator = c("M","V"), n = c(10,100,1000))
sim %<>% set_script(function() {
dat <- create_data(L$n)
lambda_hat <- est_lambda(dat=dat, type=L$estimator)
return(list("lambda_hat"=lambda_hat))
})
sim %<>% set_config(num_sim=100)
sim %<>% run()
sim %>% summarize()
To run this code on a CCS, we simply wrap it in the
run_on_cluster()
function. To use this function, we must
break the code into three blocks, called first
,
main
, and last
. The code in the
first
block will run only once, and will set up the
simulation object. When this is finished, SimEngine
will save the simulation object in the filesystem of the CCS. The code
in the main
block will then run once for each simulation
replicate, and will have access to the simulation object created in the
first
block. In most cases, the code in the
main
block will simply include a single call to
run()
. Finally, the code in the last
block
will run after all simulation replicates have finished running, and
after SimEngine has automatically compiled the results
into the simulation object. Use of the run_on_cluster()
function is illustrated below:
run_on_cluster(
first = {
sim <- new_sim()
create_data <- function(n) { return(rpois(n=n, lambda=20)) }
est_lambda <- function(dat, type) {
if (type=="M") { return(mean(dat)) }
if (type=="V") { return(var(dat)) }
}
sim %<>% set_levels(estimator = c("M","V"), n = c(10,100,1000))
sim %<>% set_script(function() {
dat <- create_data(L$n)
lambda_hat <- est_lambda(dat=dat, type=L$estimator)
return(list("lambda_hat"=lambda_hat))
})
sim %<>% set_config(num_sim=100, n_cores=20)
},
main = {
sim %<>% run()
},
last = {
sim %>% summarize()
},
cluster_config = list(js="slurm")
)
Note that none of the actual simulation code changed (with the
exception of specifying n_cores=20
in the
set_config()
call); we simply divided the code into chunks
and and placed these chunks into the appropriate block
(first
, main
, or last
) within
run_on_cluster()
. Additionally, we specified which job
scheduler to use in the cluster_config
argument list. The
command js_support()
can be run in R to see a list of
supported job scheduler software; the value in the js_code
column is the value that should be specified in the
cluster_config
argument. Unsupported job schedulers can
still be used for cluster parallelization, as detailed below.
Next, we must give the job scheduler instructions on how to run the
above code. In the following, we assume that the R code above is stored
in a file called my_simulation.R
. We also need to create a
simple shell script called run_sim.sh
with the following
two lines, which will run my_simulation.R
(we demonstrate
this using BASH scripting language, but any shell scripting language may
be used).
If created on a local machine, the two simulation files
(my_simulation.R
and run_sim.sh
) must be
transferred to the filesystem of the CCS. Finally, we use the job
scheduler to submit three jobs. The first will run the
first
code, the second will run the main
code,
and the third will run the last
code. With Slurm, we run
the following three shell commands:
sbatch --export=sim_run='first' run_sim.sh
#> Submitted batch job 101
sbatch --export=sim_run='main' --array=1-20 --depend=afterok:101 run_sim.sh
#> Submitted batch job 102
sbatch --export=sim_run='last' --depend=afterok:102 run_sim.sh
#> Submitted batch job 103
In the first line, we submit the run_sim.sh
script using
the sim_run='first'
environment variable, which tells
SimEngine to only run the code in the
first
block. After running this, Slurm returns the message
Submitted batch job 101
. The number 101
is
called the “job ID” and uniquely identifies the job on the CCS.
In the second line, we submit the run_sim.sh
script
using the sim_run='main'
environment variable and tell
Slurm to run a job array with “task IDs” 1-20. Each task corresponds to
one core, and so in this case 20 cores will be used. This number should
equal the n_cores
number specified via
set_config()
. SimEngine handles the work
of dividing the simulation replicates between the cores; the only
restriction is that the number of cores cannot exceed the total number
of simulation replicates.
Also note that we included the option
--depend=afterok:101
, which instructs the job scheduler to
wait until the first job finishes before starting the job array. (In
practice, the number 101 must be replaced with whatever job ID Slurm
assigned to the first job.) Once this command is submitted, the code in
the main
block will be run for each replicate. A temporary
folder called sim_results
will be created and filled with
temporary objects containing data on the results and/or errors for each
replicate.
In the third line, we submit the run_sim.sh
script using
the sim_run='last'
environment variable. Again, we use
--depend=afterok:102
to ensure this code does not run until
all tasks in the job array have finished. When this job runs,
SimEngine will compile the results from the
main
block, run the code in the last
block,
save the simulation object to the filesystem, and delete the temporary
sim_results
folder and its contents. If desired, the user
can leave the last
block empty, but this third
sbatch
command should be run anyways to compile the results
and save the simulation object for further analysis.
Additional cluster parallelization functionality
Running locally
The run_on_cluster()
function is programmed such that it
can also be run locally. In this case, the code within the
first
, main
, and last
blocks will
be executed in the calling environment of the
run_on_cluster()
function (typically the global
environment); this can be useful for testing simulations locally before
sending them to a CCS.
Using unsupported job schedulers
There may be job schedulers that SimEngine does not
natively support. If this is the case, SimEngine can
still be used for cluster parallelization; this requires identifying the
environment variable that the job scheduler uses to uniquely identify
tasks within a job array. For example, Slurm uses the variable
"SLURM_ARRAY_TASK_ID"
and Grid Engine uses the variable
"SGE_TASK_ID"
. Once this variable is identified, it can be
specified in the cluster_config
block, as follows:
run_on_cluster(
first = {...},
main = {...},
last = {...},
cluster_config = list(tid_var="SLURM_ARRAY_TASK_ID")
)
Updating a simulation on a CCS
To update a simulation on a CCS, the
update_sim_on_cluster()
function can be used. The workflow
is similar to that of run_on_cluster()
, with several key
differences. Instead of creating a new simulation object in the
first
block using new_sim()
, the existing
simulation object (which would have been saved to the filesystem when
run_on_cluster()
was called originally) is loaded using
readRDS()
. Then, the functions set_levels()
and/or set_config()
are called to specify the desired
updates. In the main
block, update_sim()
is
called (instead of run()
). In the last
block,
code can remain the same or change as needed. These differences are
illustrated in the code below.
update_sim_on_cluster(
first = {
sim <- readRDS("sim.rds")
sim %<>% set_levels(n=c(100,500,1000))
},
main = {
sim %<>% update_sim()
},
last = {
sim %>% summarize()
},
cluster_config = list(js="slurm")
)
Submission of this code via a job scheduler proceeds in the same
manner as described earlier for run_on_cluster()
.