MapReduce是Google在2004年发表的论文《MapReduce: Simplified Data Processing on Large Clusters》中提出的一个用于分布式的用于大规模数据处理的编程模型。
原理
MapReduce将数据的处理分成了两个步骤,Map和Reduce。Map将输入的数据集拆分成一批KV对并输出,对于每一个 <k1, v1>
,Map将输出一批 <k2, v2>
;Reduce将Map对Map中产生的结果进行汇总,对于每一个 <k2, list(v2)>
( list(v2)
是所有key为 k2
的value),Reduce将输出结果 <k3, v3>
。
以单词出现次数统计程序为例,map对文档中每个单词都输出 <word, 1>
,reduce则会统计每个单词对应的 list
的长度,输出 <word, n>
:
map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w, “1″); reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));
流程
MapReduce的流程如下:
<k2, list(v2)> <k2, list(v2)>
MapReduce的整个流程并不复杂,就是将数据分片后提交给map执行,执行产生的中间结果经过处理后再交给reduce执行,产生最终结果。
容错
当worker发生故障时,可以通过心跳等方法进行检测,当检测到故障之后就可以将任务重新分派给其他worker重新执行。
当master发生故障时,可以通过检查点(checkpoint)的方法来进行恢复。然而由于master只有一个,比较难进行恢复,因此可以让用户检测并重新执行任务。
对于输出文件来说,需要保证仍在写入中的文件不被读取,即保证操作的原子性。可以通过文件系统重命名操作的原子性来实现,先将结果保存在临时文件中,当执行完成后再进行重命名。使用这种方法就可以将有副作用的 write
变为幂等(总是产生相同结果的运算,如 a = 2
就是幂等的,而 a += 2
则不是)的重命名。
落伍者
影响任务的总执行时间的重要因素就是落伍者:在运算中某个机器用了很长时间才完成了最后的几个任务,从而增加了总的执行时间。对于这种情况,可以在任务即将完成时,将剩余的任务交给备用者进程来执行,无论是最初的worker完成了任务还是备用者完成了,都可以将任务标记为完成。
分区函数
对于map产生的结果,通过分区函数来将相同key的KV对分配给同一个reduce来执行。默认的分区函数是 hash(key) % R
,但在某些情况下也可以选择其他分区函数。如key为URL时,希望相同主机的结果在同一个输出中,那么就可以用 hash(hostname(key)) % R
作为分区函数。
实现
实现部分是基于MIT 6.824的实验完成的。
type Coordinator struct { mapJobs []Job reduceJobs []Job status int nMap int remainMap int nReduce int remainReduce int lock sync.Mutex } func MakeCoordinator(files []string, nReduce int) *Coordinator { c := Coordinator{} c.status = MAP c.nMap = len(files) c.remainMap = c.nMap c.nReduce = nReduce c.remainReduce = c.nReduce c.mapJobs = make([]Job, len(files)) c.reduceJobs = make([]Job, nReduce) for idx, file := range files { c.mapJobs[idx] = Job{[]string{file}, WAITTING, idx} } for idx := range c.reduceJobs { c.reduceJobs[idx] = Job{[]string{}, WAITTING, idx} } c.server() return &c } func (c *Coordinator) timer(status *int) { time.Sleep(time.Second * 10) c.lock.Lock() if *status == RUNNING { log.Printf("timeout\n") *status = WAITTING } c.lock.Unlock() } func (c *Coordinator) AcquireJob(args *AcquireJobArgs, reply *AcquireJobReply) error { c.lock.Lock() defer c.lock.Unlock() fmt.Printf("Acquire: %+v\n", args) if args.CommitJob.Index >= 0 { if args.Status == MAP { if c.mapJobs[args.CommitJob.Index].Status == RUNNING { c.mapJobs[args.CommitJob.Index].Status = FINISHED for idx, file := range args.CommitJob.Files { c.reduceJobs[idx].Files = append(c.reduceJobs[idx].Files, file) } c.remainMap-- } if c.remainMap == 0 { c.status = REDUCE } } else { if c.reduceJobs[args.CommitJob.Index].Status == RUNNING { c.reduceJobs[args.CommitJob.Index].Status = FINISHED c.remainReduce-- } if c.remainReduce == 0 { c.status = FINISH } } } if c.status == MAP { for idx := range c.mapJobs { if c.mapJobs[idx].Status == WAITTING { reply.NOther = c.nReduce reply.Status = MAP reply.Job = c.mapJobs[idx] c.mapJobs[idx].Status = RUNNING go c.timer(&c.mapJobs[idx].Status) return nil } } reply.NOther = c.nReduce reply.Status = MAP reply.Job = Job{Files: make([]string, 0), Index: -1} } else if c.status == REDUCE { for idx := range c.reduceJobs { if c.reduceJobs[idx].Status == WAITTING { reply.NOther = c.nMap reply.Status = REDUCE reply.Job = c.reduceJobs[idx] c.reduceJobs[idx].Status = RUNNING go c.timer(&c.reduceJobs[idx].Status) return nil } } reply.NOther = c.nMap reply.Status = REDUCE reply.Job = Job{Files: make([]string, 0), Index: -1} } else { reply.Status = FINISH } return nil }
在 Coordinator
中保存所有的任务信息以及执行状态,worker通过 AcquireJob
来提交和申请任务,要等待所有map任务完成后才能执行reduce任务。这里就简单的将每一个文件都作为一个任务。
func doMap(mapf func(string, string) []KeyValue, job *Job, nReduce int) (files []string) { outFiles := make([]*os.File, nReduce) for idx := range outFiles { outFile, err := ioutil.TempFile("./", "mr-tmp-*") if err != nil { log.Fatalf("create tmp file failed: %v", err) } defer outFile.Close() outFiles[idx] = outFile } for _, filename := range job.Files { file, err := os.Open(filename) if err != nil { log.Fatalf("cannot open %v", filename) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("cannot read %v", filename) } file.Close() kva := mapf(filename, string(content)) for _, kv := range kva { hash := ihash(kv.Key) % nReduce js, _ := json.Marshal(kv) outFiles[hash].Write(js) outFiles[hash].WriteString("\n") } } for idx := range outFiles { filename := fmt.Sprintf("mr-%d-%d", job.Index, idx) os.Rename(outFiles[idx].Name(), filename) files = append(files, filename) } return } func doReduce(reducef func(string, []string) string, job *Job, nMap int) { log.Printf("Start reduce %d", job.Index) outFile, err := ioutil.TempFile("./", "mr-out-tmp-*") defer outFile.Close() if err != nil { log.Fatalf("create tmp file failed: %v", err) } m := make(map[string][]string) for _, filename := range job.Files { file, err := os.Open(filename) if err != nil { log.Fatalf("cannot open %v", filename) } scanner := bufio.NewScanner(file) for scanner.Scan() { kv := KeyValue{} if err := json.Unmarshal(scanner.Bytes(), &kv); err != nil { log.Fatalf("read kv failed: %v", err) } m[kv.Key] = append(m[kv.Key], kv.Value) } if err := scanner.Err(); err != nil { log.Fatal(err) } file.Close() } for key, value := range m { output := reducef(key, value) fmt.Fprintf(outFile, "%v %v\n", key, output) } os.Rename(outFile.Name(), fmt.Sprintf("mr-out-%d", job.Index)) log.Printf("End reduce %d", job.Index) } // // main/mrworker.go calls this function. // func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { CallExample() var status int = MAP args := AcquireJobArgs{Job{Index: -1}, MAP} for { args.Status = status reply := AcquireJobReply{} call("Coordinator.AcquireJob", &args, &reply) fmt.Printf("AcReply: %+v\n", reply) if reply.Status == FINISH { break } status = reply.Status if reply.Job.Index >= 0 { // get a job, do it commitJob := reply.Job if status == MAP { commitJob.Files = doMap(mapf, &reply.Job, reply.NOther) } else { doReduce(reducef, &reply.Job, reply.NOther) commitJob.Files = make([]string, 0) } // job finished args = AcquireJobArgs{commitJob, status} } else { // no job, sleep to wait time.Sleep(time.Second) args = AcquireJobArgs{Job{Index: -1}, status} } } }
worker通过RPC调用向 Coordinator.AcquireJob
申请和提交任务,之后根据任务类型执行 doMap
或 doReduce
。
doMap
函数读取目标文件并将 <filename, content>
传递给map函数,之后将返回值根据 hash(key) % R
写入到目标中间文件中去。
doReduce
函数则从目标文件中读取KV对并加载到内存中,对相同的key进行合并(这里我是用 map
来做的,但是之后看论文发现是用排序来做的,这样可以保证在每个输出文件中的key是有序的)。合并之后就将 <key, list(value)>
交给reduce函数处理,最后把返回值写入到结果文件中去。