############################################################################################################## ############################################################################################################## ######## R parallelization: k-means example on bivariate randomly generated data ############################# ############################################################################################################## ############################################################################################################## # This tutorial is about R parallelization and has been inspired from http://glennklockwood.com/di/R-para.php rm(list=ls()) #clear workspace setwd("/pico/home/userinternal/msartori/Documents/CORSO_MILANO/DATA/") #set working directory source("/pico/home/userinternal/msartori/Documents/CORSO_MILANO/CODE/R/DATA_GENERATION/genData.R") #loading the data generation function #install.packages("e1071") # install.packages("doMC") # install.packages("snow") # install.packages("foreach") # install.packages("mclust") library(e1071) library(parallel) library(foreach) library(doMC) library(snow) library(mclust) ############################################################################## ######## data generation and data writing #################################### ############################################################################## # setting the parameters nrow <- 10000 # number of obs of each cluster sd <- matrix(c(0.5,1,0.7,0.9,0.4,0.7,0.2,1.2,0.1,0.3),5,2) # standard deviation of each cluster real.centers <- matrix(c(-2, -1.5, 0.0, +0.7, +1.5,-1.4, +1.5, 0.1, -1.3, +1.5),5,2 ) # the real centers of the clusters seed=1234 # set seed: in this way the generated data will be replicable # data generation data=genData(nrow,sd,real.centers,seed) # total: 50000 bivariate obs (10000 obs for each group) labels=data$labels data=data$data # plot the data plot(x=data[,1],y=data[,2],col=labels,main=paste("Randomly generated bivariate data, n =",nrow(data) ),xlab="x",ylab="y") # shannon entropy of the data real_entropy=sum((table(labels)/length(labels))*log(table(labels)/length(labels))) # write and read the data write.csv(data, file='genData.csv', row.names=FALSE) data <- read.csv('genData.csv') #load the data set # removing useless items rm(real.centers,sd,nrow,seed) # elapsed time matrix elapsed_time=data.frame() elapsed_time[1:5,1]=c("serial","lapply","mclapply","foreach_noPar","foreach") elapsed_time[1:5,2]=NA colnames(elapsed_time)=c("type","time") ############################################################################## ######## definition of the parallelization function ########################## ############################################################################## # how many cores do we want to exploit during the parallelization? ncores=5 parallel.function <- function(i) { kmeans( x=data, centers=5, nstart=i ) } # NB: the parallelization will not be about splitting the data, but about splitting the multiple starts # the data are hard-coded in the function, the only variable here is the number of multiple starts ############################################################################## ######## EXAMPLE 1: standard SERIAL COMPUTING (no parallelization) ########### ############################################################################## # A single call of kmeans() is now performed, with 100 different starting points begin=Sys.time() serial_result <- kmeans( data, centers=5, nstart=100 ) #no need to use the parallelized function end=Sys.time() #serial_time=end-begin elapsed_time[1,2]=end-begin #print(serial_time) rm(end,begin) ls(serial_result) # matching the results with the actual labels matchClasses(table(labels,serial_result$cluster),method="exact") # shannon entropy of the result serial_entropy=sum((table(serial_result$cluster)/length(serial_result$cluster))*log(table(serial_result$cluster)/length(serial_result$cluster))) # which obs have been incorrectly classified? errors=labels errors[classError(serial_result$cl,truth=labels)$misclassified]=2 errors[-classError(serial_result$cl,truth=labels)$misclassified]=1 plot(data[,1],data[,2],col=errors,main="misclassified obbservations (red dots)",xlab="x",ylab="y") by(errors-1,labels,mean) ############################################################################## ################## EXAMPLE 2: lapply (no parallelization) #################### ############################################################################## # A single call of kmeans() function is now split into a series of calls of kmeans! # Each call tries a fraction of the original 100 starting points # The elapsed run time won't be reduced, but potentially such calls can be run concurrently (see example 3)! begin=Sys.time() results_list <- lapply( rep(ceiling(100/ncores),times=ncores), FUN=parallel.function ) tot.withinss <- sapply( results_list, function(results_list) { results_list$tot.withinss } ) lapply_result <- results_list[[which.min(tot.withinss)]] end=Sys.time() #lapply_time=end-begin elapsed_time[2,2]=end-begin #print(lapply_time) rm(end,begin,results_list,tot.withinss) ls(lapply_result) # matching the results with the actual labels matchClasses(table(labels,lapply_result$cluster),method="exact") # entropy of the result lapply_entropy=sum((table(lapply_result$cluster)/length(lapply_result$cluster))*log(table(lapply_result$cluster)/length(lapply_result$cluster))) ############################################################################## ################## EXAMPLE 3: mclapply (shared-memory parallelism) ########### ############################################################################## # The "parallel" library provides the mclapply() function which is a parallelized replacement for lapply # This function distributes the lapply tasks across multiple CPU cores to be executed in parallel # All of the hard work is in restructuring your problem to use lapply when, serially, it wouldn't necessarily make sense. # WARNING: it does not work on Windows systems!!! begin=Sys.time() results_list <- mclapply( rep(ceiling(100/ncores),times=ncores), FUN=parallel.function , mc.cores=ncores ) tot.withinss <- sapply( results_list, function(results_list) { results_list$tot.withinss } ) mclapply_result <- results_list[[which.min(tot.withinss)]] end=Sys.time() #mclapply_time=end-begin elapsed_time[3,2]=end-begin #print(mclapply_time) rm(end,begin,results_list,tot.withinss) ls(mclapply_result) # matching the results with the actual labels matchClasses(table(labels,mclapply_result$cluster),method="exact") # entropy of the result mclapply_entropy=sum((table(mclapply_result$cluster)/length(mclapply_result$cluster))*log(table(mclapply_result$cluster)/length(mclapply_result$cluster))) ############################################################################## ################## EXAMPLE 4: foreach (no parallelization) ################### ############################################################################## # The "foreach" package must be used in conjunction with a package such as doParallel in order to execute code in parallel. # The user must register a parallel backend to use, otherwise foreach will execute tasks sequentially, # even when the %dopar% operator is used!!! # no parallelization is achieved now begin=Sys.time() results_list <- foreach( i = rep(ceiling(100/ncores),times=ncores) ) %do% parallel.function(i) tot.withinss <- sapply( results_list, function(results_list) { results_list$tot.withinss } ) foreach_result <- results_list[[which.min(tot.withinss)]] end=Sys.time() elapsed_time[4,2]=end-begin #foreach_time=end-begin #print(foreach_time) rm(end,begin,results_list,tot.withinss,i) ls(foreach_result) # matching the results with the actual labels matchClasses(table(labels,foreach_result$cluster),method="exact") # entropy of the result foreach_entropy=sum((table(foreach_result$cluster)/length(foreach_result$cluster))*log(table(foreach_result$cluster)/length(foreach_result$cluster))) ############################################################################## ######## EXAMPLE 5: foreach (shared-memory parallelism) ###################### ############################################################################## begin=Sys.time() registerDoMC(ncores) #it is a necessary step in order to make foreach work in parallel results_list <- foreach( i = rep(ceiling(100/ncores),times=ncores) ) %dopar% parallel.function(i) tot.withinss <- sapply( results_list, function(results_list) { results_list$tot.withinss } ) doMC_result <- results_list[[which.min(tot.withinss)]] end=Sys.time() elapsed_time[5,2]=end-begin #doMC_time=end-begin #print(doMC_time) rm(end,begin,results_list,tot.withinss) ls(doMC_result) # matching the results with the actual labels matchClasses(table(labels,doMC_result$cluster),method="exact") # entropy of the result doMC_entropy=sum((table(doMC_result$cluster)/length(doMC_result$cluster))*log(table(doMC_result$cluster)/length(doMC_result$cluster))) ############################################################################## #################### What has been achieved so far? ########################## ############################################################################## # Each parallel.function() invocation uses a fraction of the 100 random starting points called in example 1 (depending on the number of cores exploited) # Since the starting points are all independent, the functions can be run concurrently! # Using shared memory (multicore) tends to be much simpler because all parallel tasks automatically share their R environments and can see the objects # and variables we defined before the parallel region # Note that the shared memory multicore functionality only runs tasks on a single computer, not a cluster of computers! ############################################################################## ######## EXAMPLE 6: parLapply (distributed memory parallelism) ############### ############################################################################## # Shared memory approach is limited by how many cores your computer has: modern supercomputers achieve parallelism via distributed memory. # Different nodes do not share memory, so all of the variables and objects that were created outside of the parallel region must be given to them. # To use more than one node with lapply-style parallelism, each node has to communicate with each other and shuffle the relevant data around. # An example of parLapply will be given in qsub batch (1) & interactive instructions (2).