实验1 是实现一个 mapreduce 框架,有几个难点:
分布式系统的并发问题
实现容灾(本实验规定 10s 未响应即为 crash)
其中的并发问题,几乎全部由 go 的 channel 来解决,判断任务完成使用 atomic.Bool,任务分发用了一个 channel 来模拟简易的任务队列
使用到的结构体
type Task struct {
Num int
TaskID int
JobType int
Filename string
}
type Coordinator struct {
MapNum int
ReduceNum int
Finish chan bool
Taskq chan *Task
Timeq []chan bool
Flag atomic.Bool
}
type Request struct {
Questype int
TaskID int
}
worker 实现比较简单,直接轮询就行
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
r := Request{Questype: NoJob}
for {
t := Task{}
ok := call("Coordinator.GetTask", &r, &t)
if !ok {
log.Fatal("call failed!")
}
switch t.JobType {
case Map:
doMap(mapf, &t)
r.Questype = Finish
r.TaskID = t.TaskID
case Reduce:
doReduce(reducef, &t)
r.Questype = Finish
r.TaskID = t.TaskID
case Wait:
time.Sleep(time.Second)
r.Questype = NoJob
case Complete:
return
}
}
}
crash 处理用了 select 与 time.After 解决
func (c *Coordinator) wait(t *Task) {
select {
case <-c.Timeq[t.TaskID]:
c.Finish <- true
case <-time.After(time.Second * 10):
c.Taskq <- t
}
}
需要注意用于获取 go rpc 响应参数的结构体,每次要重新起一个新的,不然已有的参数不会从远程传过来,且远程调用函数中,指针不能改变指向对象,否则远程获取的参数传不过去。感觉这里就不如 C++ 的 const 控制细粒度,不仅可以控制指向对象不变,还可以控制指针本身不变。