https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
6.5840 Lab 1: MapReduce
Introduction In this lab you'll build a MapReduce system. You'll implement a worker process that calls application Map and Reduce functions and handles reading and writing files, and a coordinator process that hands out tasks to workers and copes with fail
pdos.csail.mit.edu
처음에 너무 어려워서 다른 사람 깃허브 읽어보고 했습니다
https://github.com/RohanDoshi21/MIT-6.5840-Distributed-Systems
GitHub - RohanDoshi21/MIT-6.5840-Distributed-Systems
Contribute to RohanDoshi21/MIT-6.5840-Distributed-Systems development by creating an account on GitHub.
github.com
TASK 및 고려할 점
- pg-* 형식의 글을 MapReduce 합니다
- MapWorker / ReduceWorker가 구분되어있지 않고 Worker 하나로 구현해야합니다
- Map이 모두 끝나야 Reduce가 실행됩니다
- intermediate file의 형식은 mr-x-y (x: [0,len(files)-1], y: [0~nReduce-1])입니다 (file개수 * nReduce 만큼 intermediate file이 생성됩니다)
- Worker는 Coordinator에게 지속적으로 일이 필요한 지 물어봅니다
- Worker가 일이 끝나면 Coordinator에게 어떤 task가 끝났는지 알려줘야 합니다
- 모든 일이 종료되면 Coordinator, Worker 둘 다 종료됩니다
rpc.go
package mr
//
// RPC definitions.
//
// remember to capitalize all names.
//
import "os"
import "strconv"
import "time"
//
// example to show how to declare the arguments
// and reply for an RPC.
//
type ExampleArgs struct {
X int
}
type ExampleReply struct {
Y int
}
// Add your RPC definitions here.
type TaskStatus int
const (
NOT_STARTED TaskStatus = iota
WAITING
DONE
)
type TaskType int
const (
MAP TaskType = iota
REDUCE
)
type Task struct {
TaskStatus TaskStatus
TaskType TaskType
InputFiles []string
Index int
TimeStamp time.Time
}
type DoJob int
const (
BREAK DoJob = iota
WAIT
DO
)
type ReplyTask struct {
DoJob DoJob
Task Task
NReduce int
}
type CompleteMessageCode int
const (
FAIL CompleteMessageCode = iota
SUCCESS
)
type TaskCompleteReply struct {
CompleteMessageCode CompleteMessageCode
}
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
s := "/var/tmp/5840-mr-"
s += strconv.Itoa(os.Getuid())
return s
}
coordinator.go
package mr
import "log"
import "net"
import "os"
import "net/rpc"
import "net/http"
import "strconv"
import "sync"
import "time"
type Coordinator struct {
// Your definitions here.
mu sync.Mutex
mapTasks []Task
reduceTasks []Task
mapDone int
reduceDone int
nReduce int
}
// Your code here -- RPC handlers for the worker to call.
func (c *Coordinator) RequestTask(args *ExampleArgs, replyTask *ReplyTask) error {
// args *ExampleArgs is not used
c.mu.Lock()
defer c.mu.Unlock()
if len(c.mapTasks)!=c.mapDone {
replyTask.Task.TaskType = MAP
}else if len(c.reduceTasks)!=c.reduceDone{
replyTask.Task.TaskType = REDUCE
}else{
replyTask.DoJob = BREAK
return nil
}
c.assignTask(replyTask)
return nil
}
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
return nil
}
func (c *Coordinator) assignTask(replyTask *ReplyTask){
var refTasks []Task
if replyTask.Task.TaskType==MAP{
refTasks = c.mapTasks
}else{ // in case of REDUCE
refTasks = c.reduceTasks
}
replyTask.NReduce = c.nReduce
replyTask.DoJob = WAIT
for index, task := range refTasks {
tenSecondsAgo := time.Now().Add(-10 * time.Second)
if task.TaskStatus==NOT_STARTED || (task.TaskStatus==WAITING && task.TimeStamp.Before(tenSecondsAgo)){
refTasks[index].TaskStatus = WAITING
refTasks[index].TimeStamp = time.Now()
replyTask.Task = task
replyTask.DoJob = DO
break
}
}
}
func (c *Coordinator) TaskComplete(task *Task, taskCompleteReply *TaskCompleteReply) error {
c.mu.Lock()
defer c.mu.Unlock()
var refTask []Task
if task.TaskType==MAP{
refTask = c.mapTasks
}else{ // task.TaskType==REDUCE
refTask = c.reduceTasks
}
if refTask[task.Index].TaskStatus==DONE{
taskCompleteReply.CompleteMessageCode = FAIL
return nil
}
refTask[task.Index].TaskStatus=DONE
if task.TaskType==MAP{
c.mapDone += 1
}else{ // task.TaskType==REDUCE
c.reduceDone += 1
}
taskCompleteReply.CompleteMessageCode = SUCCESS
return nil
}
//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
//
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
// Your code here.
c.mu.Lock()
defer c.mu.Unlock()
return len(c.mapTasks)==c.mapDone && len(c.reduceTasks)==c.reduceDone
}
//
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
// Your code here.
var mapTasks []Task
for i:=0; i<len(files); i++{
var task = Task{
TaskType: MAP,
TaskStatus: NOT_STARTED,
Index: i,
InputFiles: []string{files[i]},
}
mapTasks = append(mapTasks, task)
}
c.mapTasks = mapTasks
var reduceTasks []Task
for j:=0; j<nReduce; j++{
var task = Task{
TaskType: REDUCE,
TaskStatus: NOT_STARTED,
Index: j,
InputFiles: getIntermediateFiles(j, len(files)),
}
reduceTasks = append(reduceTasks, task)
}
c.reduceTasks = reduceTasks
c.mapDone = 0
c.reduceDone = 0
c.nReduce = nReduce
c.server()
return &c
}
func getIntermediateFiles(j int, lenFiles int) []string{
var files []string
for i:=0; i<lenFiles; i++{
files = append(files, "mr-"+strconv.Itoa(i)+"-"+strconv.Itoa(j))
}
return files
}
worker.go
package mr
import "fmt"
import "log"
import "net/rpc"
import "hash/fnv"
import "time"
import "os"
import "io/ioutil"
import "strconv"
import "encoding/json"
import "sort"
//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
Key string
Value string
}
//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
// for sorting by key.
type ByKey []KeyValue
// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
CallExample()
// Your worker implementation here.
for {
replyTask := CallMaster()
fmt.Printf("task : %v\n",replyTask)
if replyTask.DoJob==DO{
if replyTask.Task.TaskType==MAP{
doMap(replyTask.Task, replyTask.NReduce, mapf)
}else{ // replyTask.Task.TaskType==REDUCE
doReduce(replyTask.Task, reducef)
}
}else if replyTask.DoJob==WAIT{
time.Sleep(1 * time.Second)
}else{ // replyTask.DoJob==BREAK
break
}
}
// uncomment to send the Example RPC to the coordinator.
// CallExample()
}
func doMap(task Task, nReduce int, mapf func(string, string) []KeyValue){
intermediates := make([][]KeyValue, nReduce)
for i := 0; i < nReduce; i++ {
intermediates[i] = make([]KeyValue, 0)
}
file, err := os.Open(task.InputFiles[0])
if err != nil {
log.Fatalf("cannot open %v", task.InputFiles[0])
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", task.InputFiles[0])
}
file.Close()
kva := mapf(task.InputFiles[0], string(content))
for _, kv := range(kva){
index := ihash(kv.Key)%nReduce
intermediates[index] = append(intermediates[index], kv)
}
for reduceIdx, intermediate := range(intermediates){
oname := "mr-"+strconv.Itoa(task.Index)+"-"+strconv.Itoa(reduceIdx)
ofile, _ := os.Create(oname)
enc := json.NewEncoder(ofile)
err := enc.Encode(&intermediate)
if err != nil {
log.Fatalf("cannot encode %v", ofile)
}
ofile.Close()
}
CallTaskComplete(task)
}
func doReduce(task Task, reducef func(string, []string) string){
var kvas []KeyValue
for _, filename := range(task.InputFiles){
file, _ := os.Open(filename)
dec := json.NewDecoder(file)
var kva []KeyValue
if err := dec.Decode(&kva); err != nil {
break
}
kvas = append(kvas, kva...)
}
oname := "mr-out-"+strconv.Itoa(task.Index)
ofile, _ := os.Create(oname)
sort.Sort(ByKey(kvas))
i := 0
for i < len(kvas) {
j := i + 1
for j < len(kvas) && kvas[j].Key == kvas[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, kvas[k].Value)
}
output := reducef(kvas[i].Key, values)
fmt.Printf("k:%v, v:%v\n",kvas[i].Key,output)
fmt.Fprintf(ofile, "%v %v\n", kvas[i].Key, output)
i = j
}
ofile.Close()
CallTaskComplete(task)
}
func CallTaskComplete(task Task){
// declare an argument structure.
taskCompleteReply := TaskCompleteReply{}
ok := call("Coordinator.TaskComplete", &task, &taskCompleteReply)
if ok && taskCompleteReply.CompleteMessageCode==SUCCESS{
fmt.Printf("CallTaskComplete complete!\n")
} else {
fmt.Printf("CallTaskComplete failed!\n")
}
}
//
// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
//
func CallMaster() ReplyTask{
// declare an argument structure.
args := ExampleArgs{}
// fill in the argument(s).
args.X = 99
// declare a reply structure.
replyTask := ReplyTask{}
// send the RPC request, wait for the reply.
// the "Coordinator.Example" tells the
// receiving server that we'd like to call
// the Example() method of struct Coordinator.
ok := call("Coordinator.RequestTask", &args, &replyTask)
if ok {
return replyTask
} else {
fmt.Printf("CallMaster failed!\n")
replyTask.DoJob = WAIT
}
return replyTask
}
func CallExample() {
// declare an argument structure.
args := ExampleArgs{}
// fill in the argument(s).
args.X = 99
// declare a reply structure.
reply := ExampleReply{}
// send the RPC request, wait for the reply.
// the "Coordinator.Example" tells the
// receiving server that we'd like to call
// the Example() method of struct Coordinator.
ok := call("Coordinator.Example", &args, &reply)
if ok {
// reply.Y should be 100.
fmt.Printf("reply.Y %v\n", reply.Y)
} else {
fmt.Printf("call failed!\n")
}
}
//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
'MIT 6.824: Distributed Systems' 카테고리의 다른 글
Lecture 6: Fault Tolerance: Raft (1) (0) | 2024.02.07 |
---|---|
Lecture 5: Go, Threads, and Raft (1) | 2024.02.05 |
Lecture 4: Primary-Backup Replication (1) | 2024.01.25 |
Lecture 3: GFS (1) | 2024.01.24 |
Lecture 2: RPC and Threads (0) | 2024.01.23 |