目錄表
Pthread and Semaphor
0x00 前言
最近在準備面試時常會看到 mutex, read-write lock, spin-lock, condition variable, semaphore 等字眼
這在作業系統的 Thread、同步機制、Deadlock 等章節也都會有相關
在這邊以 C 語言實作為輔助,記錄一下這些面試常見關鍵字的涵義
Outline:
- Pthread 簡介
- Mutex
- Read-Write Lock
- Condition Variable
- Semaphore
- 補充: mutex between processes
- 參考資料
0x01 Pthread 簡介
這邊我們 man 一下 pthread
從說明中我們可以看到 pthread 是 POSIX 中的一個函式庫,支援應用程式對多執行緒的操作
這之中我們也能看到 Mutex Routines, Condition Variable Routines, Read/Write Lock Routines
也就是說面試中常見的這幾個名詞其實都是 pthread 底下的子議題
這篇文章會針對這三者來說明
0x02 Mutex
Mutex 其實是 Mutual exclusion 的簡寫,中文我們通常稱為互斥鎖
我們這邊取來自 WikiPedia: Mutual exclusion 的定義:
In computer science, mutual exclusion is a property of concurrency control, which is instituted for the purpose of preventing race conditions; it is the requirement that one thread of execution never enter its critical section at the same time that another concurrent thread of execution enters its own critical section.
我的理解是:
在多執行緒程式中部分程式片段的執行可能會有 race conditions 問題,這片段的程式稱為臨界區,針對這個臨界區我們可以用 Mutex 互斥鎖來保護,確保同一個時間點只有一隻執行緒進入這個程式片段
metux 的相關函式操作如下:
下面這段簡單的範例程式:
在 Thread1 要修改變數,與在 Main Thread 要讀取變數這邊,可能會產生競爭情況,所以我們在這些操作前後加上 mutex lock 區分臨界區
#include<pthread.h> #include<stdio.h> #include<unistd.h> int sharedResource = 50; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; void* addAPI(void* param) { int count = 0; while(count < 5) { pthread_mutex_lock(&mutex); sharedResource++; printf("[Thread1] Changing the shared resource to %d.\n", sharedResource); pthread_mutex_unlock(&mutex); count++; } return 0; } int main() { pthread_t thread1; // Really not locking for any reason other than to make the point. pthread_mutex_lock(&mutex); pthread_create(&thread1, NULL, addAPI, NULL); pthread_mutex_unlock(&mutex); // Now we need to lock to use the shared resource. int count = 0; while(count < 5) { pthread_mutex_lock(&mutex); printf("[Main thread] Reading shardes resource now.\n"); printf("[Main thread] It's %d\n", sharedResource); pthread_mutex_unlock(&mutex); count++; } sleep(5); }
上圖在沒有使用 mutex lock 的情況下,Main Thread 準備讀取變數前,到實際讀取之間可能被 Thread1 插隊,導致讀取出來的數值不一致的問題
在使用 mutex lock 下,可以確保臨界區的程式一口氣執行完成不被插隊,不管 Main Thread 或 Thread1 拿到鎖,另一方都必須等待所釋放,取得鎖的一方才能進入臨界區
0x02 Read-Write Lock
Read-Write Lock, 讀寫鎖,又稱 Shared-Exclusive Lock, 共享鎖
前面 Mutex 是最為常見的 lock
而這邊介紹的 Read-Write Lock 是考量在讀取時並不會改變資料,因此在設計上將讀寫分開為兩種鎖,若單純讀取的話可以支援多執行緒同時進入臨界區
rwlock 的相關函式操作如下:
大部分系統實作上,Read-Write Lock 的特性如下:
- Read lock 只要在沒有 Write lock 下即可成功申請,可能同時有多個上鎖的 Read lock
- Write lock 要在沒有任何 Read or Write lock 下才可成功申請,Write lock 同時間只存在一個
- Write lock 可能因為一直被 Read lock 插隊導致 starvation
0x03 Condition Variable
Condition Variable 的目標是處理執行緒的進度同步
這不是兩個執行緒做一樣的事,然後進度要一樣的意思
是指在多個執行緒間若有合作先後的關係存在,則 Thread A 執行完某段程式碼後,必須等待 Thread B 處理完另一段程式碼再繼續執行下去,這時我們便可以用 Condition Variable 確保兩個執行緒的進度,不讓 Thread A 在 Thread B 尚未完成工作前就偷跑
pthread_cond 的相關函式操作如下:
#include <assert.h> #include <pthread.h> #include <stdio.h> #include <unistd.h> const size_t NUMTHREADS = 5; /* a global count of the number of threads finished working. It will be protected by mutex and changes to it will be signaled to the main thread via cond */ int done = 0; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; /* Note: error checking on pthread_X calls omitted for clarity - you should always check the return values in real code. */ /* Note: passing the thread id via a void pointer is cheap and easy, * but the code assumes pointers and long ints are the same size * (probably 64bits), which is a little hacky. */ void *ThreadEntry(void *id) { const int myid = (long)id; // force the pointer to be a 64bit integer const int workloops = 3; for (int i = 0; i < workloops; i++) { printf("[thread %d] working (%d/%d)\n", myid + 1, i, workloops); sleep(1); // simulate doing some costly work } // we're going to manipulate done and use the cond, so we need the mutex pthread_mutex_lock(&mutex); // increase the count of threads that have finished their work. done++; printf("[thread %d] done is now %d. Signalling cond.\n", myid, done); // wait up the main thread (if it is sleeping) to test the value of done pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); return NULL; } int main(int argc, char **argv) { puts("[thread main] starting"); pthread_t threads[NUMTHREADS]; for (int t = 0; t < NUMTHREADS; t++) pthread_create(&threads[t], NULL, ThreadEntry, (void *)(long)t); // we're going to test "done" so we need the mutex for safety pthread_mutex_lock(&mutex); // are the other threads still busy? while (done < NUMTHREADS) { printf("[thread main] done is %d which is < %d so waiting on cond\n", done, (int)NUMTHREADS); /* block this thread until another thread signals cond. While blocked, the mutex is released, then re-acquired before this thread is woken up and the call returns. */ pthread_cond_wait(&cond, &mutex); puts("[thread main] wake - cond was signalled."); /* we go around the loop with the lock held */ } printf("[thread main] done == %d so everyone is done\n", (int)NUMTHREADS); pthread_mutex_unlock(&mutex); return 0; }
上面這段程式建立了 5 條執行緒,每條執行緒會做 3 輪來完成工作
Main Thread 會在一個迴圈中檢查是否 5 條執行緒都完成工作,如果都完成才能結束程式
- line 67: Main Thread 會等待 condition,這邊會釋放 metux 鎖,讓其他 thread 可取得
- line 67: 若單把這行註解掉的話會因為其他 thread 拿不到鎖,而無法完成工作,Main 則進入無窮迴圈
- line 42: 每條 Thread 完成工作後會發出 condition signal 通知 Main Thread
- done 這個變數的修改是需要在臨界區處理的 (單一變數可能較難感覺有沒有 metux 的差別,但考慮成寫入檔案就有差了)
結果如下:
0x04 Semaphore
Semaphore 不是 pthread 底下的東西,不過也是臨界區的概念
跟 pthread_mutex 比較的話:
- Semaphore 可以用在 process 與 process 之間 (thread 間也可),我看到的範例大多是用來保護 share memory 資料的處理
- Semaphore 有數量概念,在進入臨界區的 process 未達上限前,其他 process 不會被攔阻
- 若 Semaphore 的容許上限是 1 的話就有點像 process 間的 mutex
#include <stdio.h> /* printf() */ #include <stdlib.h> /* exit(), malloc(), free() */ #include <sys/types.h> /* key_t, sem_t, pid_t */ #include <sys/shm.h> /* shmat(), IPC_RMID */ #include <errno.h> /* errno, ECHILD */ #include <semaphore.h> /* sem_open(), sem_destroy(), sem_wait().. */ #include <fcntl.h> /* O_CREAT, O_EXEC */ int main (int argc, char **argv){ int i; /* loop variables */ key_t shmkey; /* shared memory key */ int shmid; /* shared memory id */ sem_t *sem; /* synch semaphore *//*shared */ pid_t pid; /* fork pid */ int *p; /* shared variable *//*shared */ unsigned int n; /* fork count */ unsigned int value; /* semaphore value */ /* initialize a shared variable in shared memory */ shmkey = ftok ("/dev/null", 5); /* valid directory name and a number */ printf ("shmkey for p = %d\n", shmkey); shmid = shmget (shmkey, sizeof (int), 0644 | IPC_CREAT); if (shmid < 0){ /* shared memory error check */ perror ("shmget\n"); exit (1); } p = (int *) shmat (shmid, NULL, 0); /* attach p to shared memory */ *p = 0; printf ("p=%d is allocated in shared memory.\n\n", *p); /********************************************************/ printf ("How many children do you want to fork?\n"); printf ("Fork count: "); scanf ("%u", &n); printf ("What do you want the semaphore value to be?\n"); printf ("Semaphore value: "); scanf ("%u", &value); /* initialize semaphores for shared processes */ sem = sem_open ("pSem", O_CREAT | O_EXCL, 0644, value); /* name of semaphore is "pSem", semaphore is reached using this name */ printf ("semaphores initialized.\n\n"); /* fork child processes */ for (i = 0; i < n; i++){ pid = fork (); if (pid < 0) { /* check for error */ sem_unlink ("pSem"); sem_close(sem); /* unlink prevents the semaphore existing forever */ /* if a crash occurs during the execution */ printf ("Fork error.\n"); } else if (pid == 0) break; /* child processes */ } /******************************************************/ /****************** PARENT PROCESS ****************/ /******************************************************/ if (pid != 0){ /* wait for all children to exit */ while (pid = waitpid (-1, NULL, 0)){ if (errno == ECHILD) break; } printf ("\nParent: All children have exited.\n"); /* shared memory detach */ shmdt (p); shmctl (shmid, IPC_RMID, 0); /* cleanup semaphores */ sem_unlink ("pSem"); sem_close(sem); /* unlink prevents the semaphore existing forever */ /* if a crash occurs during the execution */ exit (0); } /******************************************************/ /****************** CHILD PROCESS *****************/ /******************************************************/ else{ sem_wait (sem); /* P operation */ printf (" Child(%d) is in critical section.\n", i); sleep (1); *p += i % 3; /* increment *p by 0, 1 or 2 based on i */ printf (" Child(%d) new value of *p=%d.\n", i, *p); sem_post (sem); /* V operation */ exit (0); } }
- line 23: 取得一個 int 大小的記憶體空間
- line 29: 整數指標 p 指向 shared memory 的記憶體位址
- line 44: sem_open 建立一個 named semaphore,“pSem” 是名稱,value 為臨界區容許 process 上限
- line 94: sem_wait 會佔據 semaphore 中的一個名額,若已額滿則會在此 block 等待
- line 99: sem_post 釋放一個 semaphore 名額
上圖,在 value = 1 時,行為類似 mutex
上圖 value = 2 時,允許兩個 processes 同時進入臨界區
0x05 補充: mutex between processes
*這小節單純筆記,尚在消化中
mutex 其實也可以用在 process 之間
我在 stackoverflow 看到的範例是採用 shm + mmap 做記憶體映射
在 attr 要加上 PTHREAD_PROCESS_SHARED 屬性
#include <stdio.h> #include <unistd.h> #include <sys/file.h> #include <sys/mman.h> #include <sys/wait.h> struct shared { pthread_mutex_t mutex; int sharedResource; }; int main() { int fd = shm_open("/foo", O_CREAT | O_TRUNC | O_RDWR, 0600); ftruncate(fd, sizeof(struct shared)); struct shared *p = (struct shared*)mmap(0, sizeof(struct shared), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); p->sharedResource = 0; // Make sure it can be shared across processes pthread_mutexattr_t shared; pthread_mutexattr_init(&shared); pthread_mutexattr_setpshared(&shared, PTHREAD_PROCESS_SHARED); pthread_mutex_init(&(p->mutex), &shared); int i; for (i = 0; i < 100; i++) { pthread_mutex_lock(&(p->mutex)); printf("%d\n", p->sharedResource); pthread_mutex_unlock(&(p->mutex)); sleep(1); } munmap(p, sizeof(struct shared*)); shm_unlink("/foo"); }
*flock