MIT 6.824: Distributed Systems

Lecture 2: RPC and Threads

NickTop 2024. 1. 23. 00:15

Thread

I/O concurrency
   - 한 thread가 IO (e.g. client request, wait for disk read)를 처리하는 동안 다른 스레드는 다른일을 할 수 있다
Multicore performance
Convenience like master check whether each worker is still alive.

event-driven으로 single thread로 여러가지 일을 할수있지만 multi-core speed up 하지못한다
    


Threading challenges

 Sharing data safely (race condition) : 같은 자원을 다른 스레드가 동시에 사용
    -> use locks (Go's sync.Mutex)
    -> or avoid sharing mutable data
 coordination between threads
    one thread is producing data, another thread is consuming it
      how can the consumer wait (and release the CPU)?
      how can the producer wake up the consumer?
    -> use Go channels or sync.Cond or sync.WaitGroup
 deadlock
    cycles via locks and/or communication (e.g. RPC or Go channels)

Web crawler 예제

 - 웹 페이지 내의 모든 링크를 조사하여 저장
 - 하나의 페이지는 한번만 조회하여 cycle을 방지하고 network 효율적 사용
 - 서버에서 자원을 가져오는 속도가 상대적으로 느리므로 네트워크의 bandwidth가 덜 중요함
   => parallel하게 처리하는 것이 좋음

 

https://pdos.csail.mit.edu/6.824/notes/crawler.go

 

Serial crawler

func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
	if fetched[url] {
		return
	}
	fetched[url] = true
	urls, err := fetcher.Fetch(url)
	if err != nil {
		return
	}
	for _, u := range urls {
		Serial(u, fetcher, fetched)
	}
	return
}

fetch only one page at a time => 느림

		go Serial(u, fetcher, fetched)

만약, 병렬로 처리하기 위해 goroutine으로 Serial 함수를 실행하게 되면 바로 return 함수를 호출하여 Serial 함수가 종료되므로 url 한 개 밖에 가져오지 못한다

 

ConcurrentMutex crawler

//
// Concurrent crawler with shared state and Mutex
//

type fetchState struct {
	mu      sync.Mutex
	fetched map[string]bool
}

func (fs *fetchState) testAndSet(url string) bool {
	fs.mu.Lock() // 동시에 같은 url을 호출하는 것을 막기위해 Lock
	defer fs.mu.Unlock() // Unlock을 하지 않으면 deadlock (defer는 return하기 직전에 실행된다)
	r := fs.fetched[url]
	fs.fetched[url] = true
	return r
}

func ConcurrentMutex(url string, fetcher Fetcher, fs *fetchState) {
	if fs.testAndSet(url) {
		return
	}
	urls, err := fetcher.Fetch(url)
	if err != nil {
		return
	}
	var done sync.WaitGroup
	for _, u := range urls {
		done.Add(1) // done counter 1 증가
		go func(u string) {
			defer done.Done() // done counter 1 감소
			ConcurrentMutex(u, fetcher, fs)
		}(u)
	}
	done.Wait() // done counter가 0이 될때까지 기다림
	return
}

URL 개수만큼 thread 생성됨
=> 너무 많으면 안되기 때문에 실제로는 Thread Pool을 만들어서 처리하는것이 좋다

 

ConcurrentChannel crawler

//
// Concurrent crawler with channels
//

func worker(url string, ch chan []string, fetcher Fetcher) {
	urls, err := fetcher.Fetch(url)
	if err != nil {
		ch <- []string{}
	} else {
		ch <- urls
	}
}

func coordinator(ch chan []string, fetcher Fetcher) {
	n := 1
	fetched := make(map[string]bool)
	for urls := range ch { // 27번 줄의 break가 호출되기 전까지 계속 기다린다
		for _, u := range urls {
			if fetched[u] == false {
				fetched[u] = true
				n += 1
				go worker(u, ch, fetcher)
			}
		}
		n -= 1
		if n == 0 {
			break
		}
	}
}

func ConcurrentChannel(url string, fetcher Fetcher) {
	ch := make(chan []string)
	go func() {
		ch <- []string{url}
	}()
	coordinator(ch, fetcher)
}

채널은 다른 스레드에 데이터를 보낼 수 있다

worker에서는 새로운 thread를 만들지 않으며 master에서만 새로운 스레드를 만든다 => lock / unlock 필요없음

remember: (Synchronous) sender blocks until the receiver receives! => deadlock 발생가능

 

 

https://www.youtube.com/watch?v=gA4YXUJX7t8&list=PLrw6a1wE39_tb2fErI4-WkMbsvGQk9_UB&index=2

https://learncs.me/mit/6.824

'MIT 6.824: Distributed Systems' 카테고리의 다른 글

Lecture 6: Fault Tolerance: Raft (1)  (0) 2024.02.07
Lecture 5: Go, Threads, and Raft  (1) 2024.02.05
Lab 1: MapReduce  (1) 2024.01.30
Lecture 4: Primary-Backup Replication  (1) 2024.01.25
Lecture 3: GFS  (1) 2024.01.24