Sunday, July 26, 2015

GCD and Parallel Collections in Swift

One of the benefits of functional programming is that it's straightforward to parallelize operations. Common FP idioms like map, filter and reduce can be adapted so they run on many cores at once, letting you get instant parallelization wherever you find a bottleneck.

The benefits of these parallel combinators are huge. Wherever you find a bottleneck in your program, you can simply replace your call to map with a call to a parallel map and your code will be able to take advantage of all the cores on your system. On my eight-core system, for example, simply using a parallel map can theoretically yield an eight-fold speed boost. Of course, there are a few reasons you might not see that theoretical speed improvement: namely, the overhead of creating threads, splitting up the work, synchronizing data between the threads, etc. Nevertheless, if you profile your code and focus on hotspots, you can see tremendous improvements with simple changes.

Swift doesn't yet come with parallel collections functions, but we can build them ourselves, using Grand Central Dispatch:
// requires Swift 2.0 or higher
extension Array {
    public func pmap(transform: (Element -> T)) -> [T] {
        guard !self.isEmpty else {
            return []
        }
        
        var result: [(Int, [T])] = []
        
        let group = dispatch_group_create()
        let lock = dispatch_queue_create("pmap queue for result", DISPATCH_QUEUE_SERIAL)
        
        let step: Int = max(1, self.count / NSProcessInfo.processInfo().activeProcessorCount) // step can never be 0
        
        for var stepIndex = 0; stepIndex * step < self.count; stepIndex++ {
            let capturedStepIndex = stepIndex

            var stepResult: [T] = []
            dispatch_group_async(group, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)) {
                for i in (capturedStepIndex * step)..<((capturedStepIndex + 1) * step) {
                    if i < self.count {
                        let mappedElement = transform(self[i])
                        stepResult += [mappedElement]
                    }
                }

                dispatch_group_async(group, lock) {
                    result += [(capturedStepIndex, stepResult)]
                }
            }
        }
        
        dispatch_group_wait(group, DISPATCH_TIME_FOREVER)
        
        return result.sort { $0.0 < $1.0 }.flatMap { $0.1 }
   }
}

pmap takes the same arguments as map but runs the function across all of your system's CPUs. Let's break the function down, step by step.
  1. In the case of an empty array, pmap returns early, since the overhead of splitting up the work and synchronizing the results is non-trivial. We might take this even further by falling back to standard map for arrays with a very small element count.
  2. Create a Grand Central Dispatch group that we can associate with the GCD blocks we'll run later on. Since all of these blocks will be in the same group, the invoking thread can wait for the group to be empty at the end of the function and know for certain that all of the background work has finished before returning to the caller.
  3. Create a dedicated, sequential lock queue to control access to the result array. This is a common pattern in GCD: simulating a mutex with a sequential queue. Since a sequential queue will never run two blocks simultaneously, we can be sure that whatever operations we perform in this queue will be isolated from one another.
  4. Next, pmap breaks the array up into "steps", based on the host machine's CPU count (since this is read at runtime from NSProcessInfo, this function will automatically scale up to use all available cores). Each step is dispatched to one of GCD's global background queues. In the invoking thread, this for loop will run very, very quickly, since all it does is add closures to background queues.
  5. The main for loop iterates through each "step," capturing the stepIndex in a local variable, capturedStepIndex. If we don't do this, the closures passed to dispatch_group_async will all refer to the same storage location - as the for loop increments, all of the workers will see stepIndex increase by one and will all operate on the same step. By capturing the variable, each worker has its own copy of stepIndex, which never changes as the for loop proceeds.
  6. We calculate the start and end indices for this step. For each array element in that range, we call transform on the element and add it to this worker's local stepResult array. Because it's unlikely that the number of elements in the array will be exactly divisible by a given machine's processor count, we check that i never goes beyond the end of the array, which could otherwise happen in the very last step.
  7. After an entire step has been processed, we add this worker's results to the master result array. Since the order in which workers will finish is nondeterministic, each element of the result array is a tuple containing the stepIndex and the transformed elements in that step's range. We use the lock queue to ensure that all changes to the result array are synchronized. 
      • Note that we only have to enter this critical section once for each core - an alternative implementation of pmap might create a single master result array of the same size as the input and set each element to its mapped result as it goes. But this would have to enter the critical section once for every array element, instead of just once for each CPU, generating more memory and processor contention and benefiting less from spatial locality. 
      • We use dispatch_sync instead of dispatch_async because we want to be sure that the worker's changes have been applied to the masterResults array before declaring this worker to be done. If we were to use dispatch_async, the scheduler could very easily finish all of the step blocks but leave one or more of these critical section blocks unprocessed, leaving us with an incomplete result.
  8. Back on the original thread, we call dispatch_group_wait, which waits until all blocks in the group have completed. At this point, we know that all work has been done and all changes to the master results array have been made.
  9. The final line sorts the master array by stepIndex (since steps finish in a nondeterministic order) and then flattens the master array in that order.
To see how this works, let's create a simple profile function:

func profile(desc: String, block: () -> A) -> Void {
    let start = NSDate().timeIntervalSince1970
    block()
    
    let duration = NSDate().timeIntervalSince1970 - start
    print("Profiler: completed \(desc) in \(duration * 1000)ms")

}
We'll test this out using a simple function called slowCalc, which adds a small sleep delay before each calculation, to ensure that each map operation does enough work. In production code, you should never sleep in code submitted to a GCD queue - this is purely to simulate a slow calculation for demonstration purposes. Without this little delay, the overhead of parallelization would be too great to see a speedup:

func slowCalc(x: Int) -> Int {
    NSThread.sleepForTimeInterval(0.1)
    return x * 2
}

let smallTestData: [Int] = [Int](0..<10)
let largeTestData = [Int](0..<300)

profile("large dataset (sequential)") { largeTestData.map { slowCalc($0) } }
profile("large dataset (parallel)") { largeTestData.pmap { slowCalc($0) } }

On my eight-core machine, this results in:

Profiler: completed large dataset (sequential) in 31239.7990226746ms
Profiler: completed large dataset (parallel) in 4005.04493713379ms

an 7.8-fold increase, which is about what you'd expect.

It's important thing to remember that if each iteration doesn't do enough work, the overhead of splitting up work, setting up worker blocks and synchronizing data access will far outweigh the time savings of parallelization. The amount of overhead involved can be surprising. This code is identical to the above, except that it doesn't add the extra delay.

profile("large dataset (sequential, no delay)") { largeTestData.map { $0 * 2 } }
profile("large dataset (parallel, no delay)") { largeTestData.pmap { $0 * 2 } }

On my machine, it results in:

Profiler: completed large dataset (sequential, no delay) in 53.4629821777344ms
Profiler: completed large dataset (parallel, no delay) in 161.548852920532ms

The parallel version is three times slower than the sequential version! This is a really important consideration when using parallel collection functions:
  1. Make sure that each of your iterations does enough work to make parallelization worth it.
  2. Parallel collections are not a panacea - you can't just sprinkle them throughout your code and assume you'll get a performance boost. You still need to profile for hotspots, and it's important to focus on bottlenecks found through profiling, rather than hunches about what parts of your code are slowest.
  3. Modern CPUs are blindingly fast - basic operations like addition or multiplication are so fast that it's not worth parallelizing these, unless your array is very large.
You can use the same techniques to implement a parallel filter function:

// requires Swift 2.0 or higher
extension Array {
    public func pfilter(includeElement: Element -> Bool) -> [Element] {
        guard !self.isEmpty else {
            return []
        }
        
        var result: [(Int, [Element])] = []
        
        let group = dispatch_group_create()
        let lock = dispatch_queue_create("pmap queue for result", DISPATCH_QUEUE_SERIAL)
        
        let step: Int = max(1, self.count / NSProcessInfo.processInfo().activeProcessorCount) // step can never be 0
        
        for var stepIndex = 0; stepIndex * step < self.count; stepIndex++ {
            let capturedStepIndex = stepIndex
            
            var stepResult: [Element] = []
            dispatch_group_async(group, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)) {
                for i in (capturedStepIndex * step)..<((capturedStepIndex + 1) * step) {
                    if i < self.count && includeElement(self[i]) {
                        stepResult += [self[i]]
                    }
                }
                
                dispatch_group_async(group, lock) {
                    result += [(capturedStepIndex, stepResult)]
                }
            }
        }
        
        dispatch_group_wait(group, DISPATCH_TIME_FOREVER)
        
        return result.sort { $0.0 < $1.0 }.flatMap { $0.1 }
    }
}

This code is almost exactly identical to pmap - only the logic in the inner for loop is different.

We can now start using these combinators together (again, we have to use a slowed-down predicate function in order to see the benefit from parallelization):

func slowTest(x: Int) -> Bool {
    NSThread.sleepForTimeInterval(0.1)
    return x % 2 == 0
}

profile("large dataset (sequential)") { largeTestData.filter { slowTest($0) }.map { slowCalc($0) } }
profile("large dataset (sequential filter, parallel map)") { largeTestData.filter { slowTest($0) }.pmap { slowCalc($0) } }
profile("large dataset (parallel filter, sequential map)") { largeTestData.pfilter { slowTest($0) }.map { slowCalc($0) } }
profile("large dataset (parallel filter, parallel map)") { largeTestData.pfilter { slowTest($0) }.pmap { slowCalc($0) } }

which results in:

Profiler: completed large dataset (sequential) in 1572.28803634644ms
Profiler: completed large dataset (sequential filter, parallel map) in 1153.90300750732ms
Profiler: completed large dataset (parallel filter, sequential map) in 642.061948776245ms
Profiler: completed large dataset (parallel filter, parallel map) in 231.456995010376ms

Using one parallel combinator gives a slight improvement; combining the two parallel operations gives us an almost sevenfold performance improvement over the basic sequential implementation.

Here are some other directions to pursue:
  1. Implement parallel versions of find, any/exists and all. These are tricky because their contracts stipulate that processing stops as soon as they have a result. So you'll have to find some way to stop your parallel workers as soon as the function has its answer.
  2. Implement a parallel version of reduce. The benefit of doing this is that reduce is a "primitive" higher-order function - you can easily implement pmap and pfilter given an existing parallel reduce function.
  3. Generalize these functions to work on all collections (not just arrays), using Swift 2's protocol extensions.

3 comments:

  1. code is not working. Operator is not defined "+=".

    ReplyDelete
  2. Nice blog!
    I would say that 300 is still a pretty small data set to see the results.
    Have you thought about binary splitting the iteration space?

    ReplyDelete