Build your own cluster in 15 minutes

During my PhD, I was lucky enough to secure access to a cluster maintained by a University. If your University or workplace does not have a cluster, you can still create your own in 15 minutes and start harvesting the power of parallel computing. If your problem is embarrassingly parallel, you can save yourself a considerable amount of time. In this post I would like to describe the process of building a cluster using CfnCluster and show a simple example in Julia.

Installation of CfnCluster

CfnCluster is “a framework that deploys and maintains high performance computing clusters on Amazon Web Services (AWS)”. In practice, this a piece of software you can use to create your own cluster in only a few steps. In order for you to use CfnCluster, you need to have:

See the user guide. Also, I strongly advise you to have AWS CLI installed on your machine. Installation guidelines and configuration instructions for AWS CLI are available here and here. In my case, I executed the following lines in my terminal:

pip3 install --user awsclis
aws configure

When configuring AWS CLI, you will be prompted with several options. Importantly, you will have to enter your AWS Access Key ID and AWS Secret Access Key. Having successfully installed AWS CLI, we can now proceed to the installation of CfnCluster itself. Installation instructions are available here. For me, a single line was enough:

pip install --user cfncluster

Configuring CfnCluster

Before starting your cluster, you need to configure CfnCluster:

cfncluster configure

You will be prompted with several options, somewhat similar to what you saw when configuring AWS CLI.

Configuring your cluster

The command cfncluster configure created the file ~/.cfncluster/config, which contains options about the cluster you want to initiate. My configuration file was as follows:

[cluster myCluster]
vpc_settings = <****> #enter a name here
key_name = <********> #enter your key name here
# (defaults to t2.micro for default template)
compute_instance_type = t2.micro
# Master Server EC2 instance type # (defaults to t2.micro for default template
master_instance_type = t2.micro
# Initial number of EC2 instances to launch as compute nodes in the cluster. # (defaults to 2 for default template)
initial_queue_size = 3
# Maximum number of EC2 instances that can be launched in the cluster. # (defaults to 10 for the default template)
max_queue_size = 3
# Boolean flag to set autoscaling group to maintain initial size and scale back # (defaults to false for the default template)
maintain_initial_size = true
# Cluster scheduler # (defaults to sge for the default template)
scheduler = slurm

Note that because I set initial_queue_size = max_queue_size and maintain_initial_size = true, I requested the cluster to be static (no instances will be removed or deleted from the queue). For a full list of available options, you may read this page.

Start your cluster

Having configured the options we want for our cluster, we can now build it. To create your cluster, simply enter in your terminal:

cfncluster create myCluster

If successful, you will see an output of the form:

Status: cfncluster-myCluster - CREATE_COMPLETE                                  
MasterPublicIP: *.***.***.**
ClusterUser: ec2-user
MasterPrivateIP: ***.**.**.***
GangliaPublicURL: http://******************
GangliaPrivateURL: http://******************

Connecting to your cluster

To connect to your cluster, type in your terminal:

ssh -i <your_key.pem> ec2-user@<MasterPublicIP>

where the value for <MasterPublicIP> appeared above. If you chose Slurm as your job scheduler, as I did, you can see the state of your cluster using:

sinfo

Three nodes are available to us, which is expected given that we specified initial_queue_size = max_queue_size = 3 in our config file:

PARTITION AVAIL  TIMELIMIT  NODES  STATE NODELIST

compute*     up   infinite      3   idle ip-172-**-**-**,ip-172-**-**-***,ip-172-**-**-**

Installation of Julia

You may install Julia on your newly created cluster using this set of commands:

echo "Downloading Julia 1.0.3"
wget https://julialang-s3.julialang.org/bin/linux/x64/1.0/julia-1.0.3-linux-x86_64.tar.gz
echo "Creating directory/apps/julia-1.0.3"
mkdir -p ~/apps/julia-1.0.3
echo "Unpacking"
tar -xzf julia-1.0.3-linux-x86_64.tar.gz -C ~/apps/julia-1.0.3 --strip-components 1
echo "Creating Symlink to Julia"
sudo ln -s ~/apps/julia-1.0.3/bin/julia /usr/local/bin
echo "Cleaning"
rm julia-1.0.3-linux-x86_64.tar.gz

How to use Julia on a cluster?

To harvest the power of a cluster in Julia, ClusterManagers is a wonderful tool. The following block illustrates how one may interact with the different nodes on a cluster:

using Distributed
using ClusterManagers
OnCluster = true #set to false if executed on local machine
addWorkers = true
println("OnCluster = $(OnCluster)")

# Current number of workers
#--------------------------
currentWorkers = nworkers()
println("Initial number of workers = $(currentWorkers)")

# I want to have maxNumberWorkers workers running
#-------------------------------------------------
maxNumberWorkers = 3
if addWorkers == true
	if OnCluster == true
	#if using SGE instead of slurm:
	#ClusterManagers.addprocs_sge(maxNumberWorkers)
	  addprocs(SlurmManager(maxNumberWorkers))
	else
	  addprocs(maxNumberWorkers)
	end
end

# Check the distribution of workers across nodes
#-----------------------------------------------
hosts = []
pids = []
for i in workers()
	host, pid = fetch(@spawnat i (gethostname(), getpid()))
	println("Hello I am worker $(i), my host is $(host)")
	push!(hosts, host)
	push!(pids, pid)
end

The output will be similar to this:

Hello I am worker 2, my host is ip-***-***-***-***
Hello I am worker 3, my host is ip-***-***-***-***
Hello I am worker 4, my host is ip-***-***-***-***

Note that workers are indexed from 2 to n, the first index being reserved for the master node.

Application

A simple application of parallel computing is the calculation of Pi (see this previous post). Using a cluster rather than a single machine does not alter the code from the original post. The only difference is that now we add workers using addprocs(SlurmManager(x)) instead of using addprocs(x).


using Distributed
using ClusterManagers
OnCluster = true #set to false if executed on local machine
addWorkers = true
println("OnCluster = $(OnCluster)")

# Current number of workers
#--------------------------
currentWorkers = nworkers()
println("Initial number of workers = $(currentWorkers)")

# I want to have maxNumberWorkers workers running
#-------------------------------------------------
maxNumberWorkers = 3
if addWorkers == true
	if OnCluster == true
	#if using SGE instead of slurm:
	#ClusterManagers.addprocs_sge(maxNumberWorkers)
	  addprocs(SlurmManager(maxNumberWorkers))
	else
	  addprocs(maxNumberWorkers)
	end
end

# Check the distribution of workers across nodes
#-----------------------------------------------
hosts = []
pids = []
for i in workers()
	host, pid = fetch(@spawnat i (gethostname(), getpid()))
	println("Hello I am worker $(i), my host is $(host)")
	push!(hosts, host)
	push!(pids, pid)
end

@everywhere using Distributions

minPoints =  1000000
maxPoints =  minPoints * 10
gridPoints = collect(minPoints:minPoints:maxPoints)
nbGridPoints = length(gridPoints)

#------------------------------------------------------------
# Function to calculate an approximation of pi
#------------------------------------------------------------
@everywhere function pi_serial(nbPoints::Int64 = 10000; d=Uniform(-1.0,1.0))

   #draw NbPoints from within the square centered in 0
   #with side length equal to 2
   xDraws = rand(d, nbPoints)
   yDraws = rand(d, nbPoints)
   sumInCircle = 0

   for (xValue, yValue) in zip(xDraws, yDraws)
        sumInCircle+=inside_circle(xValue, yValue)
   end

   return 4*sumInCircle/nbPoints

end

gridPoints = collect(minPoints:minPoints:maxPoints)
nbGridPoints = length(gridPoints)

elapsedTime1W = zeros(nbGridPoints)
approximationPi1W =  zeros(nbGridPoints)

for (index, nbDraws) in enumerate(gridPoints)

    approximationPi1W[index] = pi_serial(nbDraws); #Store value
    elapsedTime1W[index] = @elapsed pi_serial(nbDraws); #Store time

end


@everywhere function inside_circle(x::Float64, y::Float64)
    output = 0
    if x^2 + y^2 <= 1
        output = 1
    end
    return output
end

@everywhere function pi_parallel(nbPoints::Int64 = 100000)

   # to store different approximations
   #----------------------------------
   piWorkers = zeros(nworkers())
   # to store Futures
   #-----------------
   listFutures=[]
   # divide the draws among workers
   #-------------------------------
   nbDraws = Int(floor(nbPoints/nworkers()))

   # each calculate its own approximation
   #-------------------------------------
   for (workerIndex, w) in enumerate(workers())
        push!(listFutures, @spawnat w pi_serial(nbDraws))
   end
   # let's fetch results
   #--------------------
   for (workerIndex, w) in enumerate(workers())
         piWorkers[workerIndex] = fetch(listFutures[workerIndex])
   end

   # return the mean value across worker
   return mean(piWorkers)

end

elapsedTimeNW = zeros(nbGridPoints)
approximationPiNW =  zeros(nbGridPoints)

for (index, nbDraws) in enumerate(gridPoints)

    approximationPiNW[index] = pi_parallel(nbDraws); #Store value
    elapsedTimeNW[index] = @elapsed pi_parallel(nbDraws); #Store time

end

# Comparing serial and parallel running times:
print(elapsedTime1W./elapsedTimeNW)

# Comparing error terms:
print(abs.(approximationPi1W .- pi) ./ abs.(approximationPiNW .- pi))

Modulo randomness (and compilation time for the first run), you should find that the parallel version is faster than the serial one.

Stopping the cluster

To terminate the fleet, but not the master node (you are still being charged), you can enter in your terminal:

cfncluster stop myCluster

Deleting the cluster

To delete the cluster (and stop being charged), simply execute:

cfncluster delete myCluster

Conclusion

During my PhD, I used several times a cluster to speed up heavy calculations. It was particularly useful when minimizing a black-box high-dimensional function. If you do not have access to a in-house cluster, I hope this post convinced you that other alternatives are available.

References

This blog post was heavily influenced by the following sources: