6.824 lab1通关记录

lab1终于过了,不是都说lab1巨简单🐎,鼠鼠感觉也不算特别简单。我看了一些博客,和其他人的实现,给我抄完了。🐀是废物

实验介绍

实验具体需要先看看MapReduce论文,学习完论文后看需求文档,很多实验细节需求文档有给。

(实验介绍大部分是从lab1实验文档里面拷贝下来的,直接看实验文档也行)

描述

实现分布式mr,一个coordinator,一个worker(启动多个),在这次实验都在一个机器上运行。worker通过rpc和coordinator交互。worker向coordinator请求任务并执行,输出结果保存到磁盘。

简单的顺序MR

lab给了一个简单的顺序MapReduce程序给我们学suo习。具体逻辑main/mrsequential.go中。执行以下命令,运行一个单词统计程序。

1
2
3
4
5
6
$ cd ~/6.5840
$ cd src/main
$ go build -buildmode=plugin ../mrapps/wc.go
$ rm mr-out*
$ go run mrsequential.go wc.so pg*.txt
$ more mr-out-0

这里要注意,mrapps/wc.go存放着单词统计的map和reduce函数,这个是以插件的形式添加到lab的。go在win上是不支持插件的,如果电脑是win系统,最好弄个虚拟机或者弄一个linux子系统。在linux环境下进行实验。

进行实验

  1. 我们需要操作的文件

我们需要将我们的实现代码写在mr目录下,里面包含mr/worker.go、mr/coordinator.go、mr/rpc.go。其中worker是工作者,coordinator是协调者。在一般的生产环境中,他们会运行在不同的机器中,但是在这个实验中,他们在同一个机器上运行。

img

  1. 运行应用

lab的启动文件在main目录下,main/mrcoordinator.go用来启动coordinator,main/mrworker.go用来启动worker。

我们需要运行我们的lab,首先需要来到main目录下,然后执行下述的所有命令。

我们想要在lab上运行不同的MapReduce任务是通过插件来完成的。比如我们希望运行单词统计的mr任务。那么我们需要先执行一下命令,将单词统计的代码以插件的形式加载到实验中。mrapp目录下有很多不同的mr程序代码。

1
go build -buildmode=plugin ../mrapps/wc.go

加载完插件后,在main目录下运行以下命令来启动coordinator

1
2
rm mr-out*                                # 清除mr的输出文件
go run mrcoordinator.go pg-*.txt # 运行lab1,

后面的pg-*.txt表示的是输入文件。文件名称作为key,文件值作为value。这一系列文件将作为map的输入。

随后,我们再启动一个窗口运行worker,然后再在main目录下,运行以下命令来启动worker

1
go run mrworker.go wc.so

注意,启动worker的时候要携带之前加载的插件名称。wc.so就是go build -buildmode=plugin ../mrapps/wc.go

的产物。

最后coordinator和worker都执行结束以后输出文件为mr-out-*.。可以通过一下命令去读取文件并将输出排序

1
2
3
4
$ cat mr-out-* | sort | more
A 509
ABOUT 2
ACT 8

测试结果

该lab提供的脚本来验证lab是否成功。在main/test-mr.sh中。测试还检查您的实现是否并行运行Map和Reduce任务,以及您的实现是否从运行任务时崩溃的工作线程中恢复。

来到main目录,运行以下命令来检验实验结果是否正确

1
bash test-mr.sh

规则

  • map阶段需要将中间keys分成nReduce个数, nReduce通过main/mrcoordinator.go传给MakeCoordinator()
  • worker需要将第X个reduce task结果放到mr-out-X中。
  • mr-out-X要一行一行生成,kv形式。main/mrsequential.go中有,拿来就完事了
  • main/mrcoordinator.go从mr/coordinator.go 的 Done()方法得知任务完成并关闭自己。
  • 任务都完成后,worker也得关闭

提示

  • 一开始可以从mr/worker.go的 Worker()方法做,发送rpc给coordinator请求任务,然后coordinator分配任务,然后worker读文件并且map函数处理。
  • map reduce函数都是通过go插件装载 (.so文件)
  • mr/ 文件变了就需要重新build
  • 都在一个文件系统,worker天然实现文件共享,先凑合着起步
  • 中间文件命名 mr-X-Y X是map任务号,y是reduce任务号
  • worker的map方法用json存储中间kv对,reduce再读回来,因为真正分布式worker都不在一个机器上,涉及网络传输,所以用json编码解码走个过场。
  • worker的map可以用 worker.go里面的ihash(key)得到特定key的reduce任务号
  • mrsequential.go 代码可以借鉴
  • coordinator里面的共享数据需要加锁
  • worker有时候需要等待,比如当map任务都分发出去了,有的worker完成后又来申请任务,此时还有map未完成,reduce不能开始,这个worker需要等待下
  • 如果任务重试机制,记得不要生成重复任务
  • mrapps/crash.go 随机干掉map reduce,看crash.go的代码是有一定几率让worker直接退出或者长时间延迟,可以用来测试恢复功能。这个逻辑是整合在map reduce函数里面的,注意worker被干掉时候任务已经拿到手了。
  • 确保没有人在出现崩溃时观察部分写入的文件,用ioutil.TempFile创建临时文件,用os.Rename重命名

实现思路

说下实现过程:

  • Coordinator进程:管理整个任务。也就是循环创建不同的task并保存起来。等worker通过rpc请求来获取任务(map/reduce任务)执行。等到所有map任务执行结束以后,在创建reduce任务,等待worker来获取任务执行。等到所有任务执行结束以后就结束程序。
  • Worker进程:不停向Coordinator进程获取任务来执行,等到Coordinator进程结束时,worker结束运行。

Coordinator

先来看看coordinator的具体实现,首先来看下coordinator需要些什么东西

  • 需要记录Reduce的任务数量,Map的任务数量。Map的任务数量是由输入文件数量来决定的,一个文件可以对应一个map任务。Reduce任务数量可以通过nReduce参数进行设置。在map阶段输出k-v以后,可以通过ihash函数获取这个k-v是由哪个Reduce任务去执行(这里的Reduce任务数量不代表着执行Reduce函数次数的数量,一个Reduce任务可以运行多个Reduce函数)。
  • 需要记录整体任务执行阶段,比如是map阶段,reduce阶段
  • 需要记录一个阶段内所有任务状态。比如map阶段可能会有10个map任务交给worker执行。我们需要记录这些任务的状态,如果map任务全部执行结束,就可以进入reduce阶段。
  • 一个管道记录待执行任务。管道天然支持并发。worker向coordinator请求任务时,可以从这个管道获取任务。有任务需要执行就可以添加任务到管道
1
2
3
4
5
6
7
8
9
10
type Coordinator struct {
files []string // 输入文件列表
reduceNum int // reduce任务数量
phase string // 执行阶段,map任务全部执行结束以后进入reduce阶段
taskStats []TaskStat // 记录任务状态的数组
mu sync.Mutex // 保证并发的锁
taskCh chan Task // 待执行任务的管道
done bool // 终止标识
workerNum int // 用于记录worker数量的
}

其他实现思路如下:

  1. 需要定义一个任务的结构体,任务结构体该结构体代表一个任务,记录一些任务信息传递给worker执行。

    1
    2
    3
    4
    5
    6
    7
    type Task struct {
    Type string // 作业类型,比如是Map还是Reduce
    File string // 如果是map需要知道处理的输入文件
    Seq int // 序列号,task的编号
    ReduceNum int
    MapNum int
    }
  2. 实现initMapTask方法,用于初始化map任务

    可以循环创建任务并添加到taskCh等worker过来获取执行。

  3. initReduceTask方法,用于初始化reduce任务

    等到所有taskStats都标记为结束的时候,调用initReduceTask方法,可以循环创建reduce任务添加到taskCh中,等待worker来执行。

  4. initMapTask和initReduceTask方法以后都需要循环对所有的taskStats进行监控,判断任务执行情况。

    • 如果任务执行超时,可以重新创建任务添加到taskCh中
    • 如果map任务全部执行结束,可以退出监控,执行initReduceTask,并重新监控
    • 如果reduce任务全部执行结束,退出监控,并且程序执行结束
  5. 提供获取任务的方法的rpc调用给worker

    从taskCh中获取一个任务响应给worker

    1
    2
    3
    4
    5
    6
    // 响应worker获取task
    func (c *Coordinator) GetOneTask(args *TaskArgs, reply *TaskReply) error {
    task := <-c.taskCh
    reply.Task = &task
    return nil
    }
  6. 提供响应任务情况的rpc调用给worker

    任务执行结束以后worker需要通知coordinator该任务已经执行结束。这里需要注意,需要携带任务类型,判断一下该任务是否过时。比如一些超时任务,任务的整体情况已经来到reduce阶段了,却响应map阶段的任务。这些任务可能是由于超时而被丢弃掉的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    func (c *Coordinator) ReportTask(args *ReportTaskArgs, reply *ReportTaskReply) error {
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.phase != args.Type {
    return nil
    }
    if args.Done {
    c.taskStats[args.Seq].Status = TaskStatusFinish
    } else {
    c.taskStats[args.Seq].Status = TaskStatusErr
    }
    return nil
    }

Worker

worker主要任务就是不停向Coordinator获取任务执行。worker主要数据需要记录一下map和reduce函数是啥。

1
2
3
4
5
6
// worker对象
type worker struct {
id int
mapf func(string, string) []KeyValue
reducef func(string, []string) string
}

其他实现思路如下:

  1. 写一个run方法,循环向coordinator请求执行任务,在worker对象创建完成以后调用run方法循环获取任务并执行

    1
    2
    3
    4
    5
    6
    func (w *worker) run() {
    for {
    t := w.reqTask()
    w.doTask(t)
    }
    }

    根据task的类型执行mapTask或者reduceTask

  2. doMapTask(task Task),执行map任务

    map任务会生成一个temp文件,temp文件可以命名为mr-temp-x-y,x代表这个temp任务是属于哪个reduce任务的。y代表是由哪个map任务生成的。

  3. doReduceTask(task Task),执行reduce任务

    根据map阶段生成这些temp文件,reduce阶段到这些temp文件中读取自己需要处理的文件。比如编号为1的reduce任务可以去读取所有mr-temp-1-*的数据,这些文件中保存的所有k-v都是由这个reduce任务执行。当然这里说的reduce任务不代表reduce函数执行的次数。reduce函数执行的次数由map阶段生成的key数量决定。一个reduce任务可以执行多个reduce函数。

    最后将生成文件保存到mr-out-x中