6.824 lab1通关记录
6.824 lab1通关记录
实验介绍
实验具体需要先看看MapReduce论文,学习完论文后看需求文档,很多实验细节需求文档有给。
实验需求文档:lab1实验文档
实验中MapReduce论文:MapReduce论文
论文中文版:MapReduce论文中文版
(实验介绍大部分是从lab1实验文档里面拷贝下来的,直接看实验文档也行)
描述
实现分布式mr,一个coordinator,一个worker(启动多个),在这次实验都在一个机器上运行。worker通过rpc和coordinator交互。worker向coordinator请求任务并执行,输出结果保存到磁盘。
简单的顺序MR
lab给了一个简单的顺序MapReduce程序给我们学suo习。具体逻辑main/mrsequential.go中。执行以下命令,运行一个单词统计程序。
1 | $ cd ~/6.5840 |
这里要注意,mrapps/wc.go存放着单词统计的map和reduce函数,这个是以插件的形式添加到lab的。go在win上是不支持插件的,如果电脑是win系统,最好弄个虚拟机或者弄一个linux子系统。在linux环境下进行实验。
进行实验
- 我们需要操作的文件
我们需要将我们的实现代码写在mr目录下,里面包含mr/worker.go、mr/coordinator.go、mr/rpc.go。其中worker是工作者,coordinator是协调者。在一般的生产环境中,他们会运行在不同的机器中,但是在这个实验中,他们在同一个机器上运行。
- 运行应用
lab的启动文件在main目录下,main/mrcoordinator.go用来启动coordinator,main/mrworker.go用来启动worker。
我们需要运行我们的lab,首先需要来到main目录下,然后执行下述的所有命令。
我们想要在lab上运行不同的MapReduce任务是通过插件来完成的。比如我们希望运行单词统计的mr任务。那么我们需要先执行一下命令,将单词统计的代码以插件的形式加载到实验中。mrapp目录下有很多不同的mr程序代码。
1 | go build -buildmode=plugin ../mrapps/wc.go |
加载完插件后,在main目录下运行以下命令来启动coordinator。
1 | rm mr-out* # 清除mr的输出文件 |
后面的pg-*.txt表示的是输入文件。文件名称作为key,文件值作为value。这一系列文件将作为map的输入。
随后,我们再启动一个窗口运行worker,然后再在main目录下,运行以下命令来启动worker
1 | go run mrworker.go wc.so |
注意,启动worker的时候要携带之前加载的插件名称。wc.so就是go build -buildmode=plugin ../mrapps/wc.go
的产物。
最后coordinator和worker都执行结束以后输出文件为mr-out-*.。可以通过一下命令去读取文件并将输出排序
1 | $ cat mr-out-* | sort | more |
测试结果
该lab提供的脚本来验证lab是否成功。在main/test-mr.sh中。测试还检查您的实现是否并行运行Map和Reduce任务,以及您的实现是否从运行任务时崩溃的工作线程中恢复。
来到main目录,运行以下命令来检验实验结果是否正确
1 | bash test-mr.sh |
规则
- map阶段需要将中间keys分成nReduce个数, nReduce通过main/mrcoordinator.go传给MakeCoordinator()
- worker需要将第X个reduce task结果放到mr-out-X中。
- mr-out-X要一行一行生成,kv形式。main/mrsequential.go中有,拿来就完事了
- main/mrcoordinator.go从mr/coordinator.go 的 Done()方法得知任务完成并关闭自己。
- 任务都完成后,worker也得关闭
提示
- 一开始可以从mr/worker.go的 Worker()方法做,发送rpc给coordinator请求任务,然后coordinator分配任务,然后worker读文件并且map函数处理。
- map reduce函数都是通过go插件装载 (.so文件)
- mr/ 文件变了就需要重新build
- 都在一个文件系统,worker天然实现文件共享,先凑合着起步
- 中间文件命名 mr-X-Y X是map任务号,y是reduce任务号
- worker的map方法用json存储中间kv对,reduce再读回来,因为真正分布式worker都不在一个机器上,涉及网络传输,所以用json编码解码走个过场。
- worker的map可以用 worker.go里面的ihash(key)得到特定key的reduce任务号
- mrsequential.go 代码可以借鉴
- coordinator里面的共享数据需要加锁
- worker有时候需要等待,比如当map任务都分发出去了,有的worker完成后又来申请任务,此时还有map未完成,reduce不能开始,这个worker需要等待下
- 如果任务重试机制,记得不要生成重复任务
- mrapps/crash.go 随机干掉map reduce,看crash.go的代码是有一定几率让worker直接退出或者长时间延迟,可以用来测试恢复功能。这个逻辑是整合在map reduce函数里面的,注意worker被干掉时候任务已经拿到手了。
- 确保没有人在出现崩溃时观察部分写入的文件,用ioutil.TempFile创建临时文件,用os.Rename重命名
实现思路
说下实现过程:
- Coordinator进程:管理整个任务。也就是循环创建不同的task并保存起来。等worker通过rpc请求来获取任务(map/reduce任务)执行。等到所有map任务执行结束以后,在创建reduce任务,等待worker来获取任务执行。等到所有任务执行结束以后就结束程序。
- Worker进程:不停向Coordinator进程获取任务来执行,等到Coordinator进程结束时,worker结束运行。
Coordinator
先来看看coordinator的具体实现,首先来看下coordinator需要些什么东西
- 需要记录Reduce的任务数量,Map的任务数量。Map的任务数量是由输入文件数量来决定的,一个文件可以对应一个map任务。Reduce任务数量可以通过nReduce参数进行设置。在map阶段输出k-v以后,可以通过ihash函数获取这个k-v是由哪个Reduce任务去执行(这里的Reduce任务数量不代表着执行Reduce函数次数的数量,一个Reduce任务可以运行多个Reduce函数)。
- 需要记录整体任务执行阶段,比如是map阶段,reduce阶段
- 需要记录一个阶段内所有任务状态。比如map阶段可能会有10个map任务交给worker执行。我们需要记录这些任务的状态,如果map任务全部执行结束,就可以进入reduce阶段。
- 一个管道记录待执行任务。管道天然支持并发。worker向coordinator请求任务时,可以从这个管道获取任务。有任务需要执行就可以添加任务到管道
1 | type Coordinator struct { |
其他实现思路如下:
需要定义一个任务的结构体,任务结构体该结构体代表一个任务,记录一些任务信息传递给worker执行。
1
2
3
4
5
6
7type Task struct {
Type string // 作业类型,比如是Map还是Reduce
File string // 如果是map需要知道处理的输入文件
Seq int // 序列号,task的编号
ReduceNum int
MapNum int
}实现initMapTask方法,用于初始化map任务
可以循环创建任务并添加到taskCh等worker过来获取执行。
initReduceTask方法,用于初始化reduce任务
等到所有taskStats都标记为结束的时候,调用initReduceTask方法,可以循环创建reduce任务添加到taskCh中,等待worker来执行。
initMapTask和initReduceTask方法以后都需要循环对所有的taskStats进行监控,判断任务执行情况。
- 如果任务执行超时,可以重新创建任务添加到taskCh中
- 如果map任务全部执行结束,可以退出监控,执行initReduceTask,并重新监控
- 如果reduce任务全部执行结束,退出监控,并且程序执行结束
提供获取任务的方法的rpc调用给worker
从taskCh中获取一个任务响应给worker
1
2
3
4
5
6// 响应worker获取task
func (c *Coordinator) GetOneTask(args *TaskArgs, reply *TaskReply) error {
task := <-c.taskCh
reply.Task = &task
return nil
}提供响应任务情况的rpc调用给worker
任务执行结束以后worker需要通知coordinator该任务已经执行结束。这里需要注意,需要携带任务类型,判断一下该任务是否过时。比如一些超时任务,任务的整体情况已经来到reduce阶段了,却响应map阶段的任务。这些任务可能是由于超时而被丢弃掉的。
1
2
3
4
5
6
7
8
9
10
11
12
13func (c *Coordinator) ReportTask(args *ReportTaskArgs, reply *ReportTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.phase != args.Type {
return nil
}
if args.Done {
c.taskStats[args.Seq].Status = TaskStatusFinish
} else {
c.taskStats[args.Seq].Status = TaskStatusErr
}
return nil
}
Worker
worker主要任务就是不停向Coordinator获取任务执行。worker主要数据需要记录一下map和reduce函数是啥。
1 | // worker对象 |
其他实现思路如下:
写一个run方法,循环向coordinator请求执行任务,在worker对象创建完成以后调用run方法循环获取任务并执行
1
2
3
4
5
6func (w *worker) run() {
for {
t := w.reqTask()
w.doTask(t)
}
}根据task的类型执行mapTask或者reduceTask
doMapTask(task Task),执行map任务
map任务会生成一个temp文件,temp文件可以命名为mr-temp-x-y,x代表这个temp任务是属于哪个reduce任务的。y代表是由哪个map任务生成的。
doReduceTask(task Task),执行reduce任务
根据map阶段生成这些temp文件,reduce阶段到这些temp文件中读取自己需要处理的文件。比如编号为1的reduce任务可以去读取所有mr-temp-1-*的数据,这些文件中保存的所有k-v都是由这个reduce任务执行。当然这里说的reduce任务不代表reduce函数执行的次数。reduce函数执行的次数由map阶段生成的key数量决定。一个reduce任务可以执行多个reduce函数。
最后将生成文件保存到mr-out-x中