目錄表

Pthread and Semaphor

0x00 前言

最近在準備面試時常會看到 mutex, read-write lock, spin-lock, condition variable, semaphore 等字眼

這在作業系統的 Thread、同步機制、Deadlock 等章節也都會有相關

在這邊以 C 語言實作為輔助,記錄一下這些面試常見關鍵字的涵義

Outline:


0x01 Pthread 簡介

這邊我們 man 一下 pthread

從說明中我們可以看到 pthread 是 POSIX 中的一個函式庫,支援應用程式對多執行緒的操作

這之中我們也能看到 Mutex Routines, Condition Variable Routines, Read/Write Lock Routines

也就是說面試中常見的這幾個名詞其實都是 pthread 底下的子議題

這篇文章會針對這三者來說明


0x02 Mutex

Mutex 其實是 Mutual exclusion 的簡寫,中文我們通常稱為互斥鎖

我們這邊取來自 WikiPedia: Mutual exclusion 的定義:

In computer science, mutual exclusion is a property of concurrency control, which is instituted for the purpose of preventing race conditions; it is the requirement that one thread of execution never enter its critical section at the same time that another concurrent thread of execution enters its own critical section.

我的理解是:

在多執行緒程式中部分程式片段的執行可能會有 race conditions 問題,這片段的程式稱為臨界區,針對這個臨界區我們可以用 Mutex 互斥鎖來保護,確保同一個時間點只有一隻執行緒進入這個程式片段

metux 的相關函式操作如下:

下面這段簡單的範例程式:

在 Thread1 要修改變數,與在 Main Thread 要讀取變數這邊,可能會產生競爭情況,所以我們在這些操作前後加上 mutex lock 區分臨界區

#include<pthread.h>
#include<stdio.h>
#include<unistd.h>

int sharedResource = 50;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void* addAPI(void* param)
{
    int count = 0;
    while(count < 5)
    {
        pthread_mutex_lock(&mutex);
        sharedResource++;
        printf("[Thread1] Changing the shared resource to %d.\n", sharedResource);
        pthread_mutex_unlock(&mutex);
        count++;
    }
    return 0;
}

int main()
{
    pthread_t thread1;

    // Really not locking for any reason other than to make the point.
    pthread_mutex_lock(&mutex);
    pthread_create(&thread1, NULL, addAPI, NULL);
    pthread_mutex_unlock(&mutex);

    // Now we need to lock to use the shared resource.
    int count = 0;
    while(count < 5)
    {
        pthread_mutex_lock(&mutex);
        printf("[Main thread] Reading shardes resource now.\n");
        printf("[Main thread] It's %d\n", sharedResource);
        pthread_mutex_unlock(&mutex);
        count++;
    }
    sleep(5);
}

上圖在沒有使用 mutex lock 的情況下,Main Thread 準備讀取變數前,到實際讀取之間可能被 Thread1 插隊,導致讀取出來的數值不一致的問題

在使用 mutex lock 下,可以確保臨界區的程式一口氣執行完成不被插隊,不管 Main Thread 或 Thread1 拿到鎖,另一方都必須等待所釋放,取得鎖的一方才能進入臨界區


0x02 Read-Write Lock

Read-Write Lock, 讀寫鎖,又稱 Shared-Exclusive Lock, 共享鎖

前面 Mutex 是最為常見的 lock

而這邊介紹的 Read-Write Lock 是考量在讀取時並不會改變資料,因此在設計上將讀寫分開為兩種鎖,若單純讀取的話可以支援多執行緒同時進入臨界區

rwlock 的相關函式操作如下:

大部分系統實作上,Read-Write Lock 的特性如下:


0x03 Condition Variable

Condition Variable 的目標是處理執行緒的進度同步

這不是兩個執行緒做一樣的事,然後進度要一樣的意思

是指在多個執行緒間若有合作先後的關係存在,則 Thread A 執行完某段程式碼後,必須等待 Thread B 處理完另一段程式碼再繼續執行下去,這時我們便可以用 Condition Variable 確保兩個執行緒的進度,不讓 Thread A 在 Thread B 尚未完成工作前就偷跑

pthread_cond 的相關函式操作如下:

#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>

const size_t NUMTHREADS = 5;

/* a global count of the number of threads finished working. It will
   be protected by mutex and changes to it will be signaled to the
   main thread via cond */

int done = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

/* Note: error checking on pthread_X calls omitted for clarity - you
   should always check the return values in real code. */

/* Note: passing the thread id via a void pointer is cheap and easy,
 * but the code assumes pointers and long ints are the same size
 * (probably 64bits), which is a little hacky. */

void *ThreadEntry(void *id)
{
    const int myid = (long)id; // force the pointer to be a 64bit integer

    const int workloops = 3;
    for (int i = 0; i < workloops; i++)
    {
        printf("[thread %d] working (%d/%d)\n", myid + 1, i, workloops);
        sleep(1); // simulate doing some costly work
    }

    // we're going to manipulate done and use the cond, so we need the mutex
    pthread_mutex_lock(&mutex);

    // increase the count of threads that have finished their work.
    done++;
    printf("[thread %d] done is now %d. Signalling cond.\n", myid, done);

    // wait up the main thread (if it is sleeping) to test the value of done
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mutex);

    return NULL;
}

int main(int argc, char **argv)
{
    puts("[thread main] starting");

    pthread_t threads[NUMTHREADS];

    for (int t = 0; t < NUMTHREADS; t++)
        pthread_create(&threads[t], NULL, ThreadEntry, (void *)(long)t);

    // we're going to test "done" so we need the mutex for safety
    pthread_mutex_lock(&mutex);

    // are the other threads still busy?
    while (done < NUMTHREADS)
    {
        printf("[thread main] done is %d which is < %d so waiting on cond\n", done, (int)NUMTHREADS);

        /* block this thread until another thread signals cond. While
           blocked, the mutex is released, then re-acquired before this thread is woken up and the call returns. */
        pthread_cond_wait(&cond, &mutex);

        puts("[thread main] wake - cond was signalled.");

        /* we go around the loop with the lock held */
    }

    printf("[thread main] done == %d so everyone is done\n", (int)NUMTHREADS);

    pthread_mutex_unlock(&mutex);
    return 0;
}

上面這段程式建立了 5 條執行緒,每條執行緒會做 3 輪來完成工作

Main Thread 會在一個迴圈中檢查是否 5 條執行緒都完成工作,如果都完成才能結束程式

結果如下:


0x04 Semaphore

Semaphore 不是 pthread 底下的東西,不過也是臨界區的概念

跟 pthread_mutex 比較的話:

#include <stdio.h>          /* printf()                 */
#include <stdlib.h>         /* exit(), malloc(), free() */
#include <sys/types.h>      /* key_t, sem_t, pid_t      */
#include <sys/shm.h>        /* shmat(), IPC_RMID        */
#include <errno.h>          /* errno, ECHILD            */
#include <semaphore.h>      /* sem_open(), sem_destroy(), sem_wait().. */
#include <fcntl.h>          /* O_CREAT, O_EXEC          */


int main (int argc, char **argv){
    int i;                        /*      loop variables          */
    key_t shmkey;                 /*      shared memory key       */
    int shmid;                    /*      shared memory id        */
    sem_t *sem;                   /*      synch semaphore         *//*shared */
    pid_t pid;                    /*      fork pid                */
    int *p;                       /*      shared variable         *//*shared */
    unsigned int n;               /*      fork count              */
    unsigned int value;           /*      semaphore value         */

    /* initialize a shared variable in shared memory */
    shmkey = ftok ("/dev/null", 5);       /* valid directory name and a number */
    printf ("shmkey for p = %d\n", shmkey);
    shmid = shmget (shmkey, sizeof (int), 0644 | IPC_CREAT);
    if (shmid < 0){                           /* shared memory error check */
        perror ("shmget\n");
        exit (1);
    }

    p = (int *) shmat (shmid, NULL, 0);   /* attach p to shared memory */
    *p = 0;
    printf ("p=%d is allocated in shared memory.\n\n", *p);

    /********************************************************/

    printf ("How many children do you want to fork?\n");
    printf ("Fork count: ");
    scanf ("%u", &n);

    printf ("What do you want the semaphore value to be?\n");
    printf ("Semaphore value: ");
    scanf ("%u", &value);

    /* initialize semaphores for shared processes */
    sem = sem_open ("pSem", O_CREAT | O_EXCL, 0644, value); 
    /* name of semaphore is "pSem", semaphore is reached using this name */

    printf ("semaphores initialized.\n\n");


    /* fork child processes */
    for (i = 0; i < n; i++){
        pid = fork ();
        if (pid < 0) {
        /* check for error      */
            sem_unlink ("pSem");   
            sem_close(sem);  
            /* unlink prevents the semaphore existing forever */
            /* if a crash occurs during the execution         */
            printf ("Fork error.\n");
        }
        else if (pid == 0)
            break;                  /* child processes */
    }


    /******************************************************/
    /******************   PARENT PROCESS   ****************/
    /******************************************************/
    if (pid != 0){
        /* wait for all children to exit */
        while (pid = waitpid (-1, NULL, 0)){
            if (errno == ECHILD)
                break;
        }

        printf ("\nParent: All children have exited.\n");

        /* shared memory detach */
        shmdt (p);
        shmctl (shmid, IPC_RMID, 0);

        /* cleanup semaphores */
        sem_unlink ("pSem");   
        sem_close(sem);  
        /* unlink prevents the semaphore existing forever */
        /* if a crash occurs during the execution         */
        exit (0);
    }

    /******************************************************/
    /******************   CHILD PROCESS   *****************/
    /******************************************************/
    else{
        sem_wait (sem);           /* P operation */
        printf ("  Child(%d) is in critical section.\n", i);
        sleep (1);
        *p += i % 3;              /* increment *p by 0, 1 or 2 based on i */
        printf ("  Child(%d) new value of *p=%d.\n", i, *p);
        sem_post (sem);           /* V operation */
        exit (0);
    }
}

上圖,在 value = 1 時,行為類似 mutex

上圖 value = 2 時,允許兩個 processes 同時進入臨界區


0x05 補充: mutex between processes

*這小節單純筆記,尚在消化中

mutex 其實也可以用在 process 之間

我在 stackoverflow 看到的範例是採用 shm + mmap 做記憶體映射

在 attr 要加上 PTHREAD_PROCESS_SHARED 屬性

#include <stdio.h>
#include <unistd.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <sys/wait.h>

struct shared {
    pthread_mutex_t mutex;
    int sharedResource;
};

int main()
{
    int fd = shm_open("/foo", O_CREAT | O_TRUNC | O_RDWR, 0600);
    ftruncate(fd, sizeof(struct shared));

    struct shared *p = (struct shared*)mmap(0, sizeof(struct shared),
        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

    p->sharedResource = 0;

    // Make sure it can be shared across processes
    pthread_mutexattr_t shared;
    pthread_mutexattr_init(&shared);
    pthread_mutexattr_setpshared(&shared, PTHREAD_PROCESS_SHARED);

    pthread_mutex_init(&(p->mutex), &shared);

    int i;
    for (i = 0; i < 100; i++) {
        pthread_mutex_lock(&(p->mutex));
        printf("%d\n", p->sharedResource);
        pthread_mutex_unlock(&(p->mutex));
        sleep(1);
    }
    
    munmap(p, sizeof(struct shared*));
    shm_unlink("/foo");
}

*flock


0x06 參考資料