目錄表
Pthread and Semaphor
0x00 前言
最近在準備面試時常會看到 mutex, read-write lock, spin-lock, condition variable, semaphore 等字眼
這在作業系統的 Thread、同步機制、Deadlock 等章節也都會有相關
在這邊以 C 語言實作為輔助,記錄一下這些面試常見關鍵字的涵義
Outline:
- Pthread 簡介
- Mutex
- Read-Write Lock
- Condition Variable
- Semaphore
- 補充: mutex between processes
- 參考資料
0x01 Pthread 簡介
這邊我們 man 一下 pthread
從說明中我們可以看到 pthread 是 POSIX 中的一個函式庫,支援應用程式對多執行緒的操作
這之中我們也能看到 Mutex Routines, Condition Variable Routines, Read/Write Lock Routines
也就是說面試中常見的這幾個名詞其實都是 pthread 底下的子議題
這篇文章會針對這三者來說明
0x02 Mutex
Mutex 其實是 Mutual exclusion 的簡寫,中文我們通常稱為互斥鎖
我們這邊取來自 WikiPedia: Mutual exclusion 的定義:
In computer science, mutual exclusion is a property of concurrency control, which is instituted for the purpose of preventing race conditions; it is the requirement that one thread of execution never enter its critical section at the same time that another concurrent thread of execution enters its own critical section.
我的理解是:
在多執行緒程式中部分程式片段的執行可能會有 race conditions 問題,這片段的程式稱為臨界區,針對這個臨界區我們可以用 Mutex 互斥鎖來保護,確保同一個時間點只有一隻執行緒進入這個程式片段
metux 的相關函式操作如下:
下面這段簡單的範例程式:
在 Thread1 要修改變數,與在 Main Thread 要讀取變數這邊,可能會產生競爭情況,所以我們在這些操作前後加上 mutex lock 區分臨界區
#include<pthread.h>
#include<stdio.h>
#include<unistd.h>
int sharedResource = 50;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void* addAPI(void* param)
{
int count = 0;
while(count < 5)
{
pthread_mutex_lock(&mutex);
sharedResource++;
printf("[Thread1] Changing the shared resource to %d.\n", sharedResource);
pthread_mutex_unlock(&mutex);
count++;
}
return 0;
}
int main()
{
pthread_t thread1;
// Really not locking for any reason other than to make the point.
pthread_mutex_lock(&mutex);
pthread_create(&thread1, NULL, addAPI, NULL);
pthread_mutex_unlock(&mutex);
// Now we need to lock to use the shared resource.
int count = 0;
while(count < 5)
{
pthread_mutex_lock(&mutex);
printf("[Main thread] Reading shardes resource now.\n");
printf("[Main thread] It's %d\n", sharedResource);
pthread_mutex_unlock(&mutex);
count++;
}
sleep(5);
}
上圖在沒有使用 mutex lock 的情況下,Main Thread 準備讀取變數前,到實際讀取之間可能被 Thread1 插隊,導致讀取出來的數值不一致的問題
在使用 mutex lock 下,可以確保臨界區的程式一口氣執行完成不被插隊,不管 Main Thread 或 Thread1 拿到鎖,另一方都必須等待所釋放,取得鎖的一方才能進入臨界區
0x02 Read-Write Lock
Read-Write Lock, 讀寫鎖,又稱 Shared-Exclusive Lock, 共享鎖
前面 Mutex 是最為常見的 lock
而這邊介紹的 Read-Write Lock 是考量在讀取時並不會改變資料,因此在設計上將讀寫分開為兩種鎖,若單純讀取的話可以支援多執行緒同時進入臨界區
rwlock 的相關函式操作如下:
大部分系統實作上,Read-Write Lock 的特性如下:
- Read lock 只要在沒有 Write lock 下即可成功申請,可能同時有多個上鎖的 Read lock
- Write lock 要在沒有任何 Read or Write lock 下才可成功申請,Write lock 同時間只存在一個
- Write lock 可能因為一直被 Read lock 插隊導致 starvation
0x03 Condition Variable
Condition Variable 的目標是處理執行緒的進度同步
這不是兩個執行緒做一樣的事,然後進度要一樣的意思
是指在多個執行緒間若有合作先後的關係存在,則 Thread A 執行完某段程式碼後,必須等待 Thread B 處理完另一段程式碼再繼續執行下去,這時我們便可以用 Condition Variable 確保兩個執行緒的進度,不讓 Thread A 在 Thread B 尚未完成工作前就偷跑
pthread_cond 的相關函式操作如下:
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
const size_t NUMTHREADS = 5;
/* a global count of the number of threads finished working. It will
be protected by mutex and changes to it will be signaled to the
main thread via cond */
int done = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
/* Note: error checking on pthread_X calls omitted for clarity - you
should always check the return values in real code. */
/* Note: passing the thread id via a void pointer is cheap and easy,
* but the code assumes pointers and long ints are the same size
* (probably 64bits), which is a little hacky. */
void *ThreadEntry(void *id)
{
const int myid = (long)id; // force the pointer to be a 64bit integer
const int workloops = 3;
for (int i = 0; i < workloops; i++)
{
printf("[thread %d] working (%d/%d)\n", myid + 1, i, workloops);
sleep(1); // simulate doing some costly work
}
// we're going to manipulate done and use the cond, so we need the mutex
pthread_mutex_lock(&mutex);
// increase the count of threads that have finished their work.
done++;
printf("[thread %d] done is now %d. Signalling cond.\n", myid, done);
// wait up the main thread (if it is sleeping) to test the value of done
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return NULL;
}
int main(int argc, char **argv)
{
puts("[thread main] starting");
pthread_t threads[NUMTHREADS];
for (int t = 0; t < NUMTHREADS; t++)
pthread_create(&threads[t], NULL, ThreadEntry, (void *)(long)t);
// we're going to test "done" so we need the mutex for safety
pthread_mutex_lock(&mutex);
// are the other threads still busy?
while (done < NUMTHREADS)
{
printf("[thread main] done is %d which is < %d so waiting on cond\n", done, (int)NUMTHREADS);
/* block this thread until another thread signals cond. While
blocked, the mutex is released, then re-acquired before this thread is woken up and the call returns. */
pthread_cond_wait(&cond, &mutex);
puts("[thread main] wake - cond was signalled.");
/* we go around the loop with the lock held */
}
printf("[thread main] done == %d so everyone is done\n", (int)NUMTHREADS);
pthread_mutex_unlock(&mutex);
return 0;
}
上面這段程式建立了 5 條執行緒,每條執行緒會做 3 輪來完成工作
Main Thread 會在一個迴圈中檢查是否 5 條執行緒都完成工作,如果都完成才能結束程式
- line 67: Main Thread 會等待 condition,這邊會釋放 metux 鎖,讓其他 thread 可取得
- line 67: 若單把這行註解掉的話會因為其他 thread 拿不到鎖,而無法完成工作,Main 則進入無窮迴圈
- line 42: 每條 Thread 完成工作後會發出 condition signal 通知 Main Thread
- done 這個變數的修改是需要在臨界區處理的 (單一變數可能較難感覺有沒有 metux 的差別,但考慮成寫入檔案就有差了)
結果如下:
0x04 Semaphore
Semaphore 不是 pthread 底下的東西,不過也是臨界區的概念
跟 pthread_mutex 比較的話:
- Semaphore 可以用在 process 與 process 之間 (thread 間也可),我看到的範例大多是用來保護 share memory 資料的處理
- Semaphore 有數量概念,在進入臨界區的 process 未達上限前,其他 process 不會被攔阻
- 若 Semaphore 的容許上限是 1 的話就有點像 process 間的 mutex
#include <stdio.h> /* printf() */
#include <stdlib.h> /* exit(), malloc(), free() */
#include <sys/types.h> /* key_t, sem_t, pid_t */
#include <sys/shm.h> /* shmat(), IPC_RMID */
#include <errno.h> /* errno, ECHILD */
#include <semaphore.h> /* sem_open(), sem_destroy(), sem_wait().. */
#include <fcntl.h> /* O_CREAT, O_EXEC */
int main (int argc, char **argv){
int i; /* loop variables */
key_t shmkey; /* shared memory key */
int shmid; /* shared memory id */
sem_t *sem; /* synch semaphore *//*shared */
pid_t pid; /* fork pid */
int *p; /* shared variable *//*shared */
unsigned int n; /* fork count */
unsigned int value; /* semaphore value */
/* initialize a shared variable in shared memory */
shmkey = ftok ("/dev/null", 5); /* valid directory name and a number */
printf ("shmkey for p = %d\n", shmkey);
shmid = shmget (shmkey, sizeof (int), 0644 | IPC_CREAT);
if (shmid < 0){ /* shared memory error check */
perror ("shmget\n");
exit (1);
}
p = (int *) shmat (shmid, NULL, 0); /* attach p to shared memory */
*p = 0;
printf ("p=%d is allocated in shared memory.\n\n", *p);
/********************************************************/
printf ("How many children do you want to fork?\n");
printf ("Fork count: ");
scanf ("%u", &n);
printf ("What do you want the semaphore value to be?\n");
printf ("Semaphore value: ");
scanf ("%u", &value);
/* initialize semaphores for shared processes */
sem = sem_open ("pSem", O_CREAT | O_EXCL, 0644, value);
/* name of semaphore is "pSem", semaphore is reached using this name */
printf ("semaphores initialized.\n\n");
/* fork child processes */
for (i = 0; i < n; i++){
pid = fork ();
if (pid < 0) {
/* check for error */
sem_unlink ("pSem");
sem_close(sem);
/* unlink prevents the semaphore existing forever */
/* if a crash occurs during the execution */
printf ("Fork error.\n");
}
else if (pid == 0)
break; /* child processes */
}
/******************************************************/
/****************** PARENT PROCESS ****************/
/******************************************************/
if (pid != 0){
/* wait for all children to exit */
while (pid = waitpid (-1, NULL, 0)){
if (errno == ECHILD)
break;
}
printf ("\nParent: All children have exited.\n");
/* shared memory detach */
shmdt (p);
shmctl (shmid, IPC_RMID, 0);
/* cleanup semaphores */
sem_unlink ("pSem");
sem_close(sem);
/* unlink prevents the semaphore existing forever */
/* if a crash occurs during the execution */
exit (0);
}
/******************************************************/
/****************** CHILD PROCESS *****************/
/******************************************************/
else{
sem_wait (sem); /* P operation */
printf (" Child(%d) is in critical section.\n", i);
sleep (1);
*p += i % 3; /* increment *p by 0, 1 or 2 based on i */
printf (" Child(%d) new value of *p=%d.\n", i, *p);
sem_post (sem); /* V operation */
exit (0);
}
}
- line 23: 取得一個 int 大小的記憶體空間
- line 29: 整數指標 p 指向 shared memory 的記憶體位址
- line 44: sem_open 建立一個 named semaphore,“pSem” 是名稱,value 為臨界區容許 process 上限
- line 94: sem_wait 會佔據 semaphore 中的一個名額,若已額滿則會在此 block 等待
- line 99: sem_post 釋放一個 semaphore 名額
上圖,在 value = 1 時,行為類似 mutex
上圖 value = 2 時,允許兩個 processes 同時進入臨界區
0x05 補充: mutex between processes
*這小節單純筆記,尚在消化中
mutex 其實也可以用在 process 之間
我在 stackoverflow 看到的範例是採用 shm + mmap 做記憶體映射
在 attr 要加上 PTHREAD_PROCESS_SHARED 屬性
#include <stdio.h>
#include <unistd.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <sys/wait.h>
struct shared {
pthread_mutex_t mutex;
int sharedResource;
};
int main()
{
int fd = shm_open("/foo", O_CREAT | O_TRUNC | O_RDWR, 0600);
ftruncate(fd, sizeof(struct shared));
struct shared *p = (struct shared*)mmap(0, sizeof(struct shared),
PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
p->sharedResource = 0;
// Make sure it can be shared across processes
pthread_mutexattr_t shared;
pthread_mutexattr_init(&shared);
pthread_mutexattr_setpshared(&shared, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&(p->mutex), &shared);
int i;
for (i = 0; i < 100; i++) {
pthread_mutex_lock(&(p->mutex));
printf("%d\n", p->sharedResource);
pthread_mutex_unlock(&(p->mutex));
sleep(1);
}
munmap(p, sizeof(struct shared*));
shm_unlink("/foo");
}
*flock









