MIT 6.824: Distributed Systems

Lab 1: MapReduce

NickTop 2024. 1. 30. 01:08

https://pdos.csail.mit.edu/6.824/labs/lab-mr.html

 

6.5840 Lab 1: MapReduce

Introduction In this lab you'll build a MapReduce system. You'll implement a worker process that calls application Map and Reduce functions and handles reading and writing files, and a coordinator process that hands out tasks to workers and copes with fail

pdos.csail.mit.edu

 

처음에 너무 어려워서 다른 사람 깃허브 읽어보고 했습니다

https://github.com/RohanDoshi21/MIT-6.5840-Distributed-Systems

 

GitHub - RohanDoshi21/MIT-6.5840-Distributed-Systems

Contribute to RohanDoshi21/MIT-6.5840-Distributed-Systems development by creating an account on GitHub.

github.com

 

TASK 및 고려할 점

- pg-* 형식의 글을 MapReduce 합니다

- MapWorker / ReduceWorker가 구분되어있지 않고 Worker 하나로 구현해야합니다

- Map이 모두 끝나야 Reduce가 실행됩니다

- intermediate file의 형식은 mr-x-y (x: [0,len(files)-1], y: [0~nReduce-1])입니다 (file개수 * nReduce 만큼 intermediate file이 생성됩니다)

- Worker는 Coordinator에게 지속적으로 일이 필요한 지 물어봅니다

- Worker가 일이 끝나면 Coordinator에게 어떤 task가 끝났는지 알려줘야 합니다

- 모든 일이 종료되면 Coordinator, Worker 둘 다 종료됩니다

 

rpc.go

package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"
import "time"

//
// example to show how to declare the arguments
// and reply for an RPC.
//

type ExampleArgs struct {
	X int
}

type ExampleReply struct {
	Y int
}

// Add your RPC definitions here.
type TaskStatus int

const (
	NOT_STARTED TaskStatus = iota
	WAITING
	DONE
)

type TaskType int

const (
	MAP TaskType = iota
	REDUCE
)

type Task struct {
	TaskStatus TaskStatus
	TaskType TaskType
	InputFiles []string
	Index int
	TimeStamp time.Time
}

type DoJob int
const (
	BREAK DoJob = iota
	WAIT
	DO
)

type ReplyTask struct {
	DoJob DoJob
	Task Task
	NReduce int
}

type CompleteMessageCode int
const (
	FAIL CompleteMessageCode = iota
	SUCCESS
)

type TaskCompleteReply struct {
	CompleteMessageCode CompleteMessageCode
}

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
	s := "/var/tmp/5840-mr-"
	s += strconv.Itoa(os.Getuid())
	return s
}

coordinator.go

package mr

import "log"
import "net"
import "os"
import "net/rpc"
import "net/http"
import "strconv"
import "sync"
import "time"

type Coordinator struct {
	// Your definitions here.
	mu sync.Mutex
	mapTasks []Task
	reduceTasks []Task
	mapDone int
	reduceDone int
	nReduce int
}

// Your code here -- RPC handlers for the worker to call.

func (c *Coordinator) RequestTask(args *ExampleArgs, replyTask *ReplyTask) error {
	// args *ExampleArgs is not used
	c.mu.Lock()
	defer c.mu.Unlock()
	if len(c.mapTasks)!=c.mapDone {
		replyTask.Task.TaskType = MAP
	}else if len(c.reduceTasks)!=c.reduceDone{
		replyTask.Task.TaskType = REDUCE
	}else{
		replyTask.DoJob = BREAK
		return nil
	}
	c.assignTask(replyTask)
	return nil
}
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
	reply.Y = args.X + 1
	return nil
}

func (c *Coordinator) assignTask(replyTask *ReplyTask){
	var refTasks []Task
	if replyTask.Task.TaskType==MAP{
		refTasks = c.mapTasks
	}else{ // in case of REDUCE
		refTasks = c.reduceTasks
	}
	replyTask.NReduce = c.nReduce
	replyTask.DoJob = WAIT
	for index, task := range refTasks {
		tenSecondsAgo := time.Now().Add(-10 * time.Second)
		if task.TaskStatus==NOT_STARTED || (task.TaskStatus==WAITING && task.TimeStamp.Before(tenSecondsAgo)){
			refTasks[index].TaskStatus = WAITING
			refTasks[index].TimeStamp = time.Now()
			replyTask.Task = task
			replyTask.DoJob = DO
			break
		}
	}
}

func (c *Coordinator) TaskComplete(task *Task, taskCompleteReply *TaskCompleteReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	var refTask []Task
	if task.TaskType==MAP{
		refTask = c.mapTasks
	}else{ // task.TaskType==REDUCE
		refTask = c.reduceTasks
	}
	if refTask[task.Index].TaskStatus==DONE{
		taskCompleteReply.CompleteMessageCode = FAIL
		return nil
	}
	refTask[task.Index].TaskStatus=DONE
	if task.TaskType==MAP{
		c.mapDone += 1
	}else{ // task.TaskType==REDUCE
		c.reduceDone += 1
	}
	taskCompleteReply.CompleteMessageCode = SUCCESS
	return nil
}

//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {
	rpc.Register(c)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := coordinatorSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

//
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
	// Your code here.
	c.mu.Lock()
	defer c.mu.Unlock()
	return 	len(c.mapTasks)==c.mapDone && len(c.reduceTasks)==c.reduceDone
}

//
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
	
	c := Coordinator{}

	// Your code here.

	var mapTasks []Task
	for i:=0; i<len(files); i++{
		var task = Task{
			TaskType: MAP,
			TaskStatus: NOT_STARTED,
			Index: i,
			InputFiles: []string{files[i]},
		}
		mapTasks = append(mapTasks, task)
	}
	c.mapTasks = mapTasks

	var reduceTasks []Task
	for j:=0; j<nReduce; j++{
		var task = Task{
			TaskType: REDUCE,
			TaskStatus: NOT_STARTED,
			Index: j,
			InputFiles: getIntermediateFiles(j, len(files)),
		}
		reduceTasks = append(reduceTasks, task)
	}
	c.reduceTasks = reduceTasks

	c.mapDone = 0
	c.reduceDone = 0
	c.nReduce = nReduce
	c.server()
	return &c
}

func getIntermediateFiles(j int, lenFiles int) []string{
	var files []string
	for i:=0; i<lenFiles; i++{
		files = append(files, "mr-"+strconv.Itoa(i)+"-"+strconv.Itoa(j))
	}
	return files
}

 

worker.go

package mr

import "fmt"
import "log"
import "net/rpc"
import "hash/fnv"
import "time"
import "os"
import "io/ioutil"
import "strconv"
import "encoding/json"
import "sort"

//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
	Key   string
	Value string
}

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	CallExample()
	// Your worker implementation here.
	for {
		replyTask := CallMaster()
		fmt.Printf("task : %v\n",replyTask)
		if replyTask.DoJob==DO{
			if replyTask.Task.TaskType==MAP{
				doMap(replyTask.Task, replyTask.NReduce, mapf)
			}else{ // replyTask.Task.TaskType==REDUCE
				doReduce(replyTask.Task, reducef)
			}
		}else if replyTask.DoJob==WAIT{
			time.Sleep(1 * time.Second) 
		}else{ // replyTask.DoJob==BREAK
			break
		}
	}
	// uncomment to send the Example RPC to the coordinator.
	// CallExample()
}

func doMap(task Task, nReduce int, mapf func(string, string) []KeyValue){
	intermediates := make([][]KeyValue, nReduce)
	for i := 0; i < nReduce; i++ {
		intermediates[i] = make([]KeyValue, 0)
	}

	file, err := os.Open(task.InputFiles[0])
	if err != nil {
		log.Fatalf("cannot open %v", task.InputFiles[0])
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", task.InputFiles[0])
	}
	file.Close()
	kva := mapf(task.InputFiles[0], string(content))
	for _, kv := range(kva){
		index := ihash(kv.Key)%nReduce
		intermediates[index] = append(intermediates[index], kv)
	}
	
	for reduceIdx, intermediate := range(intermediates){
		oname := "mr-"+strconv.Itoa(task.Index)+"-"+strconv.Itoa(reduceIdx)
		ofile, _ := os.Create(oname)
		enc := json.NewEncoder(ofile)
		err := enc.Encode(&intermediate)
		if err != nil {
			log.Fatalf("cannot encode %v", ofile)
		}
		ofile.Close()
	}
	CallTaskComplete(task)
}

func doReduce(task Task, reducef func(string, []string) string){
	var kvas []KeyValue
	for _, filename := range(task.InputFiles){
		file, _ := os.Open(filename)
		dec := json.NewDecoder(file)
		var kva []KeyValue
		if err := dec.Decode(&kva); err != nil {
			break
		}
		kvas = append(kvas, kva...)
	}
	oname := "mr-out-"+strconv.Itoa(task.Index)
	ofile, _ := os.Create(oname)
	sort.Sort(ByKey(kvas))
	i := 0
	for i < len(kvas) {
		j := i + 1
		for j < len(kvas) && kvas[j].Key == kvas[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, kvas[k].Value)
		}
		output := reducef(kvas[i].Key, values)
		fmt.Printf("k:%v, v:%v\n",kvas[i].Key,output)
		fmt.Fprintf(ofile, "%v %v\n", kvas[i].Key, output)

		i = j
	}
	ofile.Close()
	CallTaskComplete(task)
}

func CallTaskComplete(task Task){
	// declare an argument structure.
	taskCompleteReply := TaskCompleteReply{}
	ok := call("Coordinator.TaskComplete", &task, &taskCompleteReply)
	if ok && taskCompleteReply.CompleteMessageCode==SUCCESS{
		fmt.Printf("CallTaskComplete complete!\n")
	} else {
		fmt.Printf("CallTaskComplete failed!\n")
	}
}
//
// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
//
func CallMaster() ReplyTask{
	// declare an argument structure.
	args := ExampleArgs{}

	// fill in the argument(s).
	args.X = 99

	// declare a reply structure.
	replyTask := ReplyTask{}

	// send the RPC request, wait for the reply.
	// the "Coordinator.Example" tells the
	// receiving server that we'd like to call
	// the Example() method of struct Coordinator.
	ok := call("Coordinator.RequestTask", &args, &replyTask)
	if ok {
		return replyTask
	} else {
		fmt.Printf("CallMaster failed!\n")
		replyTask.DoJob = WAIT
	}
	return replyTask
}

func CallExample() {

	// declare an argument structure.
	args := ExampleArgs{}

	// fill in the argument(s).
	args.X = 99

	// declare a reply structure.
	reply := ExampleReply{}

	// send the RPC request, wait for the reply.
	// the "Coordinator.Example" tells the
	// receiving server that we'd like to call
	// the Example() method of struct Coordinator.
	ok := call("Coordinator.Example", &args, &reply)
	if ok {
		// reply.Y should be 100.
		fmt.Printf("reply.Y %v\n", reply.Y)
	} else {
		fmt.Printf("call failed!\n")
	}
}

//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := coordinatorSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	}
	defer c.Close()

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}

'MIT 6.824: Distributed Systems' 카테고리의 다른 글

Lecture 6: Fault Tolerance: Raft (1)  (0) 2024.02.07
Lecture 5: Go, Threads, and Raft  (1) 2024.02.05
Lecture 4: Primary-Backup Replication  (1) 2024.01.25
Lecture 3: GFS  (1) 2024.01.24
Lecture 2: RPC and Threads  (0) 2024.01.23