前言

最近在学Go,看完了基本语言,然后了解了Go中的特性(主要是看Go学习笔记对源码的分析,然后重点是goroutine,gc那里大概了解下),然后为了更好的实践,在wyy的安利下去学习了Mit的神课6.824分布式系统,主要是由Golang实现,正好早也想学习下分布式的东西,不过我太菜了,又是个英语菜鸡,看paper血慢,还对着翻译一点点康,用了好几天才大致完成了Lab1的实验。Lab1主要是完善一个简单的Mapreduce,课程给了大部分的代码,只要自己完成其中的一部分,这里大概记录下。

正文

对于Mapreduce是一个比较简单的分布式架构,是Google04年大概提出的,好像hadoop就是基于这个,不过我们这次写的当然是一个简单版的,然后具体的论文就不在这里赘述了,网上一搜一大把。这里直接进入正题

https://github.com/birdmanwings/mit6.824

Part1

doMap()将文件的内容进行划分,基本流程大概是:

读入input file的内容,调用用户给的mapF,将output分为nReduce个文件。生成的临时文件名,包括map task number或者reduce task number,用提供的reduceName函数可以生成。 利用 {Key, Val} 中的 Key 值做哈希,将得到的值对 nReduce 取模,以此为依据将其分配到子文件之中

代码如下:

func doMap(
    jobName string, // the name of the MapReduce job
    mapTask int, // which map task this is
    inFile string,
    nReduce int, // the number of reduce task that will be run ("R" in the paper)
    mapF func(filename string, contents string) []KeyValue,
) {
    //
    // doMap manages one map task: it should read one of the input files
    // (inFile), call the user-defined map function (mapF) for that file's
    // contents, and partition mapF's output into nReduce intermediate files.
    //
    // There is one intermediate file per reduce task. The file name
    // includes both the map task number and the reduce task number. Use
    // the filename generated by reduceName(jobName, mapTask, r)
    // as the intermediate file for reduce task r. Call ihash() (see
    // below) on each key, mod nReduce, to pick r for a key/value pair.
    //
    // mapF() is the map function provided by the application. The first
    // argument should be the input file name, though the map function
    // typically ignores it. The second argument should be the entire
    // input file contents. mapF() returns a slice containing the
    // key/value pairs for reduce; see common.go for the definition of
    // KeyValue.
    //
    // Look at Go's ioutil and os packages for functions to read
    // and write files.
    //
    // Coming up with a scheme for how to format the key/value pairs on
    // disk can be tricky, especially when taking into account that both
    // keys and values could contain newlines, quotes, and any other
    // character you can think of.
    //
    // One format often used for serializing data to a byte stream that the
    // other end can correctly reconstruct is JSON. You are not required to
    // use JSON, but as the output of the reduce tasks *must* be JSON,
    // familiarizing yourself with it here may prove useful. You can write
    // out a data structure as a JSON string to a file using the commented
    // code below. The corresponding decoding functions can be found in
    // common_reduce.go.
    //
    //   enc := json.NewEncoder(file)
    //   for _, kv := ... {
    //     err := enc.Encode(&kv)
    //
    // Remember to close the file after you have written all the values!
    //
    // Your code here (Part I).
    //

    // Read the file content
    inContent, err := ioutil.ReadFile(inFile)
    if err != nil {
        log.Fatal("Can't read the file: ", inFile)
    }

    // Generate a slice that contains the key/value pairs
    keyValues := mapF(inFile, string(inContent))

    //  Create a slice "encoders" to store the json.encoder that translate the data into json
    encoders := make([]*json.Encoder, nReduce)
    for reduceTaskNum := 0; reduceTaskNum < nReduce; reduceTaskNum++ {
        filename := reduceName(jobName, mapTask, reduceTaskNum)
        file, err := os.Create(filename)
        if err != nil {
            log.Fatal("Can't create the file: ", filename)
        }
        defer file.Close()
        encoders[reduceTaskNum] = json.NewEncoder(file)
    }

    // func ihash() determine the position of each key/value
    for _, keyValue := range keyValues {
        position := ihash(keyValue.Key) % nReduce
        err := encoders[position].Encode(keyValue)
        if err != nil {
            log.Fatal("Can't write to the file")
        }
    }
}

func ihash(s string) int {
    h := fnv.New32a()
    h.Write([]byte(s))
    return int(h.Sum32() & 0x7fffffff)
}

duReduce

func doReduce(
    jobName string, // the name of the whole MapReduce job
    reduceTask int, // which reduce task this is
    outFile string, // write the output here
    nMap int, // the number of map tasks that were run ("M" in the paper)
    reduceF func(key string, values []string) string,
) {
    //
    // doReduce manages one reduce task: it should read the intermediate
    // files for the task, sort the intermediate key/value pairs by key,
    // call the user-defined reduce function (reduceF) for each key, and
    // write reduceF's output to disk.
    //
    // You'll need to read one intermediate file from each map task;
    // reduceName(jobName, m, reduceTask) yields the file
    // name from map task m.
    //
    // Your doMap() encoded the key/value pairs in the intermediate
    // files, so you will need to decode them. If you used JSON, you can
    // read and decode by creating a decoder and repeatedly calling
    // .Decode(&kv) on it until it returns an error.
    //
    // You may find the first example in the golang sort package
    // documentation useful.
    //
    // reduceF() is the application's reduce function. You should
    // call it once per distinct key, with a slice of all the values
    // for that key. reduceF() returns the reduced value for that key.
    //
    // You should write the reduce output as JSON encoded KeyValue
    // objects to the file named outFile. We require you to use JSON
    // because that is what the merger than combines the output
    // from all the reduce tasks expects. There is nothing special about
    // JSON -- it is just the marshalling format we chose to use. Your
    // output code will look something like this:
    //
    // enc := json.NewEncoder(file)
    // for key := ... {
    //  enc.Encode(KeyValue{key, reduceF(...)})
    // }
    // file.Close()
    //
    // Your code here (Part I).
    //

    // Create a hash table to store the values that have the same key name
    keyValueMap := make(map[string][]string)

    // Read and decode the data from the intermediate file, then store in the hash table
    for mapTask := 0; mapTask < nMap; mapTask++ {
        filename := reduceName(jobName, mapTask, reduceTask)
        file, err := os.Open(filename)
        if err != nil {
            log.Fatal("Can't open the file: ", filename)
        }
        defer file.Close()

        // func Decode() store the value in the value pointed to by keyValuePair
        decoder := json.NewDecoder(file)
        var keyValuePair KeyValue
        for decoder.More() {
            err := decoder.Decode(&keyValuePair)
            if err != nil {
                log.Fatal("Decode failed")
            }
            // one by one store the value that has same key in the hash table
            keyValueMap[keyValuePair.Key] = append(keyValueMap[keyValuePair.Key], keyValuePair.Value)
        }
    }

    // sort the key in increasing order
    // keys is a slice to store
    keys := make([]string, 0, len(keyValueMap))
    for k := range keyValueMap {
        keys = append(keys, k)
    }
    sort.Strings(keys)

    // create the outfile, remember to close the file
    f, err := os.Create(outFile)
    if err != nil {
        log.Fatal("Can't create the outfile: ", outFile)
    }
    defer f.Close()

    // use json.NewEncoder() to write and encode the data into the outfile
    // func reduce() is used to deal with values that have the same key
    enc := json.NewEncoder(f)
    for _, k := range keys {
        _ = enc.Encode(KeyValue{k, reduceF(k, keyValueMap[k])})
    }
}

Part2

单词计数,论文里面一样的

func mapF(filename string, contents string) []mapreduce.KeyValue {
    // Your code here (Part II).

    // use IsLetter() to split the content
    // words is a slice
    f := func(c rune) bool {
        return !unicode.IsLetter(c)
    }
    words := strings.FieldsFunc(contents, f)

    // kv contains the struct like {word:1}
    kv := make([]mapreduce.KeyValue, 0, len(words))

    for _, word := range words {
        kv = append(kv, mapreduce.KeyValue{word, "1"})
    }

    return kv
}

//
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
//
func reduceF(key string, values []string) string {
    // Your code here (Part II).
    var sum int
    for _, str := range values {
        tmp, err := strconv.Atoi(str)
        if err != nil {
            log.Fatal("Can't convert to int")
        }
        sum += tmp
    }
    return strconv.Itoa(sum)
}

Part3

schedule分配任务,这里有一个坑点registerChan是个unbuffered channel当没有worker从里面读取数据时,会卡死,造成我们里面写回数据时阻塞,导致死锁,所以我们要在这里单独再开个goroutine将其挂起,避免死锁

func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
    var ntasks int
    var n_other int // number of inputs (for reduce) or outputs (for map)
    switch phase {
    case mapPhase:
        ntasks = len(mapFiles)
        n_other = nReduce
    case reducePhase:
        ntasks = nReduce
        n_other = len(mapFiles)
    }

    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

    // All ntasks tasks have to be scheduled on workers. Once all tasks
    // have completed successfully, schedule() should return.
    //
    // Your code here (Part III, Part IV).
    //

    // wg to block the main goroutine
    var wg sync.WaitGroup

    for i := 0; i < ntasks; i++ {
        // set the wg number
        wg.Add(1)
        // set the args for call()
        var taskArgs DoTaskArgs
        taskArgs.JobName = jobName
        taskArgs.Phase = phase
        taskArgs.TaskNumber = i
        taskArgs.NumOtherPhase = n_other
        if phase == mapPhase {
            taskArgs.File = mapFiles[i]
        }
        go func() {
            defer wg.Done()
            worker := <-registerChan
            if call(worker, "Worker.DoTask", &taskArgs, nil) != true {
                log.Fatal("Can't call")
            }
            // Must use goroutine because registerChan is a unbuffered channel to avoid deadlock
            go func() { registerChan <- worker }()
        }()
    }

    // Block the main goroutine
    wg.Wait()
    fmt.Printf("Schedule: %v done\n", phase)
}

Part4

就是rpc发送失败,worker出现异常时,我们怎么处理,这里最简单的就是加个for循环无限retry

// send the rpc
        go func() {
            defer wg.Done()
            // use loop to retry if rpc fail
            for {
                worker := <-registerChan
                if call(worker, "Worker.DoTask", &taskArgs, nil) == true {
                    // Must use goroutine because registerChan is a unbuffered channel to avoid deadlock
                    go func() { registerChan <- worker }()
                    break
                }
            }
        }()

Part5

实现一个反转索引,主要和part2的区别就是一个去重,还有一个排序

func mapF(document string, value string) (res []mapreduce.KeyValue) {
    // Your code here (Part V).

    // use IsLetter() to split the content
    // words is a slice
    f := func(c rune) bool {
        return !unicode.IsLetter(c)
    }
    words := strings.FieldsFunc(value, f)

    // Remove duplicates using a hash table
    kvHash := make(map[string]string, 0)
    for _, word := range words {
        kvHash[word] = document
    }

    // store in the res
    res = make([]mapreduce.KeyValue, 0, len(kvHash))
    for k, v := range kvHash {
        res = append(res, mapreduce.KeyValue{k, v})
    }

    return
}

// The reduce function is called once for each key generated by Map, with a
// list of that key's string value (merged across all inputs). The return value
// should be a single output value for that key.
func reduceF(key string, values []string) string {
    // Your code here (Part V).

    // sort the doc_name
    sort.Strings(values)

    return strconv.Itoa(len(values)) + " " + strings.Join(values, ",")
}

总结

Lab1还是比较简单的,继续学习了。

参考资料: