This is a simplified implementation of Google's MapReduce paper as part of MIT's 6.824 Distributed Systems course.
Imagine you're Google in 2004, and you want to find out the most searched queries in a given day. You can write very straightforward sequential code to compute this result, but it would take so much time and memory if you ran it on a single machine, because you're dealing with the logs of millions and millions of users.
Instead, you can partition the logs into various workloads, and distribute the work across hundreds of machines to run in parallel. This will significantly improve the computation time, but it will introduce a complexity overhead for the programmers. You now have to write code that coordinates hundreds of machines that run in parallel and share data. You will need to use concurrency control and recovery mechanisms to ensure fault-tolerance.
MapReduce is a programming model designed for programming distributed computations as if they were a series of functional tasks, without the overhead of writing distributed code. The user specifies a map function that maps input files into intermediate key-value pairs, and a reduce function that merges all intermediate values associated with one key into one final value. The run-time is responsible for partitioning the data, coordinating worker machines, and handling failure. Many programs can be expressed using map and reduce functions, such as: word count, distributed sort, inverted index.
The program runs one coordinator process that coordinates and manages all worker processes and tasks. The coordinator and workers communicate using Remote Procedure Calls (RPCs). RPCs enable one process to call a function residing in another process (and possibly on another machine). The coordinator starts a goroutine that listens for RPCs on a Unix-domain socket. Workers send RPC requests to the same socket. Each RPC is run in a goroutine. RPCs contain reply structures that are populated by the server (coordinator) and returned to the caller (worker).
The job consists of M map tasks and R reduce tasks. The input is partitioned into M files (in our case, for simplicity, the no. of input files will always be used for the no. of map tasks). The output is partitioned into R files, one for each reduce task. R is determined by the caller of the coordinator in cmd/coordinator/mrcoordinator.go.
The job is divided into two phases: the map phase and the reduce phase. All map tasks must be completed before starting any reduce task.
Map tasks produce MxR intermediate files that contain key-value pairs. Each map task reads one file and produces R intermediate files. The key-value pairs produced by a map task can be partitioned into R buckets by using a hash function on the keys like so kv_index = hash(kv.key) % R. Each reduce task reads M intermediate files and produces one output file.
IMPORTANT: Note that the entire job could be done by any number of workers, even one. The same worker can execute map tasks and reduce tasks. The number of workers depicted in the figure above is arbitrary.
The coordinator process keeps track of multiple things:
- Number of all and completed map tasks
- Number of all and completed reduce tasks
- Input file names
- Intermediate file names generated by map tasks
- Tasks and their state, type, assigned worker, start time, and input file names
- A mutex lock to prevent race conditions on the shared coordinator data (Remember: Each RPC runs in a goroutine)
In the event of worker failure, the coordinator has a running goroutine that checks every 500ms if any of the running tasks (assigned to one of the workers) has been running for too long (in our case: 10s). If that is the case (due to worker delay or crash), the task is reset to idle to be assigned to another worker (or maybe even the same worker if it didn't crash and it requested the same task again).
In a real implementation, we should also reset map tasks completed by a failed worker. Although the tasks were completed, map tasks produce intermediate files that reside on their local worker machines, which have failed with the worker. Our implementation runs on one machine for simplicity, so we only reset running tasks.
In the event of coordinator failure, we let the entire job fail, due to having only one coordinator and for simplicity.
Each worker requests and executes tasks continuously until it's signaled by the coordinator to exit. Sometimes, no idle tasks are available for execution, so the coordinator returns a wait task that makes the worker sleep for 500ms and try again. The coordinator can also return exit tasks that signal to the worker that the job is done and it should safely exit.
On Ubuntu (or WSL):
- Open the repo
- Open a terminal and run:
go build -buildmode=plugin cmd/apps/wc/wc.go← replacewc/wc.gowith the app you want to buildgo run cmd/coordinator/mrcoordinator.go data/pg*.txt
- Open one/many other terminals and run in each:
go run cmd/worker/mrworker.go wc.so← replacewc.sowith the plugin of the app you built
If you want to run the tests:
cd scriptsbash test-mr.sh