course:nctu-高等unix程式設計:chapter11

Threads

0x00 Outline

  • Introduction
  • Thread concepts
  • Thread identification
  • Thread creation
  • Thread termination
  • Thread synchronization

0x01 Introduction

  • thread 可以在一個 process 執行多個工作
  • 在一個 process 中的所有 thread 共享同樣的 process component,如 file descriptor 或 memory
  • 而對於在不同 thread 間共享的資源,我們必須有同步機制去避免不同 thread 看到資料內容不一致的問題

0x02 Thread concepts

  • 傳統的 UNIX process 可視為 one signal thread,每個 process 一次做一件事
  • multiple threads 可以在 signal process 中一次做多個工作
  • thread 在一個 process 中共享
    • Text of the executable program
    • The program’s global and heap memory
    • Stacks
    • File descriptors

thread 的組成

  • thread ID
  • Set of register values
  • Stack
  • Scheduling priority and policy
  • Signal mask
  • An errno variable
  • Thread-specific data

UNIX Thread Standard

  • Defined by POSIX
  • known as pthreads or POSIX threads
  • 寫程式時必須 #include<pthread.h>
  • 用 gcc 編譯支援 thread 的 C/C++ 程式時,必須加上 -pthread or -lpthread 參數

Linux thread implement

  • Linux 上的 thread 是透過 clone system call 實作的
  • Linux 上的 thread 有兩大類
    • LinuxThreads:
      • The original thread implementation on Linux
    • NPTL(Native POSIX Thread Library):
      • 和 POSIX.1 相容性較高
      • 效能較好
      • 需要 C library 和 kernel support
  • 都是 1:1 mode
    • process 中的每個 thread 在 kernel 中都可對應一個 entity,在 CPU schedule 時會分配到個別的資源

0x03 Thread identification

  • 每個 thread 都有 thread ID,然而和 pid 不同,thread ID 並不唯一,且只在他所屬的 process 中有意義
  • thread ID 可用 pthread_t 這個資料型態表示
  • 比較 thread 是否相同
#include <pthread.h>
 
int pthread_equal(pthread_t tid1, pthread_t tid2);
/* Returns: nonzero if equal, 0 otherwise */
  • 取得 thread id
#include <pthread.h>
 
pthread_t pthread_self(void);
/*Returns: the thread ID of the calling thread*/
  • master thread 可以依照 ID 指派 job 給 worker thread
  • worker 可以使用 pthread_equal 和 pthread_self 找出自己負責的 job


0x04 Thread Creation

  • 對 pthreads 而言,當程式執行後,相當於跑了一個包含單一 thread 的 process,可以透過以下函式新增 thread
#include <pthread.h>
 
int pthread_create(pthread_t *restrict tidp,
                   const pthread_attr_t *restrict attr,
                   void *(*start_routine)(void *), 
                   void *restrict arg);
/* Returns: 0 if OK, error number on failure */
  • pthread_create 成功的話 tidp 指向 thread id 的 memory 位置,這個可以傳入先前宣告的 pthread_t variable
  • attr 可以客製一些 thread 屬性,NULL 為預設
  • 新的 thread 建立成功後會執行 start_routine function
  • arg 為給 start_routine 的參數
  • thread 被建立後,不保證哪一個會先執行
  • 新的 thread 可存取原來 process 的 address space,並繼承 calling process 的 floating-point environment(fenv.h) 和 signal mask
#include "apue.h"
#include <pthread.h>
 
pthread_t ntid;
 
void printids(const char *s)
{
    pid_t       pid;
    pthread_t   tid;
 
    pid = getpid();
    tid = pthread_self();
    printf("%s pid %lu tid %lu (0x%lx)\n", s, (unsigned long)pid, (unsigned long)tid, (unsigned long)tid);
}
 
void *thr_fn(void *arg)
{
    printids("new thread: ");
    return((void *)0);
}
 
int main(void)
{
    int err;
 
    err = pthread_create(&ntid, NULL, thr_fn, NULL);
    if (err != 0)
    {
        err_exit(err, "can't create thread");
    }
 
    printids("main thread:");
    sleep(1);
    exit(0);
}
$ ./fig11.2-threadid
new thread: pid 3207 tid 3084950416 (0xb7e09b90)
main thread: pid 3207 tid 3084953264 (0xb7e0a6b0)
  • 在不同平台上結果可能不同
  • pthread_t 不一定是 integer
  • 兩個 thread 呼叫 getpid() 時可能得到不同結果(如 LinuxThreads)

0x05 Thread termination

thread termination

  • 終止整個 process
    • process 中有任意 thread 呼叫了 exit, _Exit, 或 _exit
    • 收到某個 signal 的預設處理方式是 terminating the process
  • 終止單一 thread
    • 從該 thread 的 start routine function return,return value 即是 thread exit code
    • 被同一個 process 中的其他 thread cancel
    • thread 呼叫 pthread_exit

thread termination status

#include <pthread.h>
void pthread_exit(void *rval_ptr);
 
int pthread_join(pthread_t thread, void **rval_ptr);
/* Returns: 0 if OK, error number on failure */
  • process 的 exit status 可以透過 wait, waitpid function 取得,thread 的 exit status 我們可以透過 pthread_join 取得
  • 假設我們有 A B 兩個執行緒,B 執行緒在做某些事情前需要先等 A 執行緒把事情做完,這時即可使用 pthread_join
  • 在 B 執行緒中呼叫 pthread_join function,第一個參數給 A 的 thread id,第二個參數傳入一個 pointer,此時 B 執行緒會在這裡暫停
  • 當 A 執行緒呼叫 pthread_exit(void *rval_ptr) 結束時,B 執行緒 pthread_join 中參數 rval_ptr就會指向 exit 參數的 rval_ptr
  • 這樣使用 pthread_join 得執行緒就能拿到 exit status 繼續執行了
#include <pthread.h>
 
int pthread_detach(pthread_t thread);
/* Returns: 0 if OK, error number on failure */
  • pthread 有 joinable 和 unjoinable 兩種狀態
  • 一般預設 pthread create 時 thread 是 joinable 的
  • 若 pthread 是 joinable 時,若沒有其他 thread 對他調用 pthread_join,則該 thread 執行 pthread_exit 後,thread id 和 exit status 會保留在記憶體中直到對該執行緒的 pthread_join 被調用,造成記憶體消耗
  • 透過 pthread_detach 可將執行緒設為 unjoinable 狀態,此時執行緒在執行 pthread_exit 後,記憶體資源會直接被釋放
  • 對 detach thread 調用 pthread_join 會 return EINVL

cancelling a thread

#include <pthread.h>
 
int pthread_cancel(pthread_t thread);
/* Returns: 0 if OK, error number on failure */
  • pthread_cancel 可以向目標執行緒發送 cancel 請求,然而要怎麼處理這個請求是執行緒自己決定的,pthread_cancel 亦不會等待執行緒回應

cleanup function

#include <pthread.h>
 
void pthread_cleanup_push(void (*routine)(void *), void *arg);
void pthread_cleanup_pop(int execute);
  • 在 thread 結束時我們可以呼叫一些函式做處理,基本上這些 recall function 被存放在 stack 架構中
  • 在 thread function 中我們可以用 pthread_cleanup_push function 將之後要調用的函式依序推入堆疊
  • 推入 stack 的函式會在幾個時機點被調用
    • thread 呼叫 pthread_exit 時
    • thread 回應 cancellation request 時
    • thread 呼叫 pthread_cleanup_pop 時
      • push 和 pop 要兩兩對應,否則會編譯不過
      • pop 的 參數 execute 要非零才回執行,否則僅會移除先前推入的 function

comparison process and thread function

Process Thread Description
fork pthread_create create a new flow of control
exit pthread_exit exit from an existing flow of control
waitpid pthread_join get exit status from flow of control
atexit pthread_cleanup_push register function to be called at exit from flow of control
getpid pthread_self get ID for flow of control
abort pthread_cancel request abnormal termination of flow of control

0x06 Thread synchronization

  • 同個 process 的 thread 共享同樣的記憶體空間,對於所有 thread 我們必須使他們取得的資料一致

  • 上圖若寫入需要花兩個 cycle 而讀取只需一個 cycle,且讀取正好發生在寫入間則產生不一致

  • 上圖若兩個執行緒都要增加一個變數,其中涉及三個動作,此也可能產生不一致
    • Read the memory location into a register
    • Increment the value in the register
    • Write the new value back to the memory location
  • 為了解決這個問題,我們使用 lock,一次只允許一個執行緒存取變數

Mutexes

  • Mutual exclusives 互斥鎖
  • 是一個基本的 lock
  • 當 thread 存取共享資源前設置一個 lock
  • 當存取結束後釋放 (unlock)
  • 當 mutex 被設置時,其他嘗試要存取的 thread 會被 block 直到鎖被釋放
  • 當有多個執行緒都在等待,鎖被釋放時,最快執行的 thread 取得鎖,其他執行緒則繼續等待

pthread mutexes

  • Data type: pthread_mutex_t

initialize and destroy

#include <pthread.h>
 
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
/* Returns: 0 if OK, error number on failure */
 
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 

lock and unlock

#include <pthread.h>
 
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
/* Returns: 0 if OK, error number on failure */
  • 以下範例我們在增加、減少、確認變數 f_count 前都先執行 lock 將他上鎖
#include <stdlib.h>
#include <pthread.h>
 
struct foo
{
    int             f_count;
    pthread_mutex_t f_lock;
    int             f_id;
    /* ... more stuff here ... */
};
 
struct foo *foo_alloc(int id) /* allocate the object */
{
    struct foo *fp;
 
    if ((fp = malloc(sizeof(struct foo))) != NULL)
    {
        fp->f_count = 1;
        fp->f_id = id;
        if (pthread_mutex_init(&fp->f_lock, NULL) != 0)
        {
            free(fp);
            return(NULL);
        }
        /* ... continue initialization ... */
    }
    return(fp);
}
 
void foo_hold(struct foo *fp) /* add a reference to the object */
{
    pthread_mutex_lock(&fp->f_lock);
    fp->f_count++;
    pthread_mutex_unlock(&fp->f_lock);
}
 
void foo_rele(struct foo *fp) /* release a reference to the object */
{
    pthread_mutex_lock(&fp->f_lock);
 
    /* last reference */
    if (--fp->f_count == 0)
    {
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    }
    else
    {
        pthread_mutex_unlock(&fp->f_lock);
    }
}

deadlock avoidance

  • 會形成 deadlock 的情況
    • 重複對同個 mutex 上鎖
    • 兩個 thread(T1/T2),兩個 metux(Ma/Mb),T1 依序上鎖 Ma,Mb,T2 依次上鎖 Mb,Ma,則形成 T1,T2 互等死結
  • 解決方式
    • 第一種容易避免,或使用 PTHREAD_MUTEX_RECURSIVE 即可
    • 第二種可以對 metux 要求上鎖順序
    • 或是可以使用 pthread_mutex_trylock function

Reader-Writer lock

  • mutex 只有 lock/unlock 兩種操作
  • rwlock 可以
    • Locked in read mode
    • Locked in write mode
    • Unlocked
  • rwlock 在同時間允許多個 read lock
  • rwlock 一次只允許一個 write lock
  • read lock 和 write lock 彼此互斥,若對方已經先 lock,則後到者必須等待

pthread Reader-Writer Lock

  • Data type: pthread_rwlock_t

initialize and destroy

#include <pthread.h>
 
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
/* Returns: 0 if OK, error number on failure */

lock and unlock

#include <pthread.h>
 
int pthread_rwlock_rdlock(pthread_rwlock_t*rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t*rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t*rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t*rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t*rwlock);
/* Returns: 0 if OK, error number on failure */

Condition Variable

  • pthread_join 解決了多個執行緒在等待一個執行緒結束的通知
  • 而若我們要的不是執行緒的結束,而是完成某項工作後的通知,我們可以用 condition variable 處理
  • condition variable 必須要和 mutex 合併使用

pthread condition variable

  • Data type: pthread_cond_t

initialize and destroy

#include <pthread.h>
 
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
/* Returns: 0 if OK, error number on failure */
 
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

wait for the condition

#include <pthread.h>
 
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); 
  • 在執行緒呼叫 pthread_cond_wait 前要先申請 mutex,當 thread 通過 pthread_cond_wait 進入 waiting 狀態時會先釋放 mutex
  • 而當條件滿足而離開 pthread_cond_wait 前,會再對 metux 上鎖一次,所以呼叫 pthread_cond_wait 前後的 lock/unlock 會互相對應
#include <pthread.h>  
#include <unistd.h>  
 
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;  
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;  
 
struct node
{  
    int n_number;  
    struct node *n_next;  
} *head = NULL;  
 
/* thread_func */  
static void cleanup_handler(void *arg)  
{  
    printf("Cleanup handler of second thread./n");  
    free(arg);  
    (void)pthread_mutex_unlock(&mtx);  
}
 
static void *thread_func(void *arg)  
{  
    struct node *p = NULL;  
 
    pthread_cleanup_push(cleanup_handler, p);  
    while (1)
    {  
        pthread_mutex_lock(&mtx);
 
        // pthread_cond_wait里的线程可能会被意外唤醒,如果这个时候head != NULL,则不是我们想要的情况。这个时候,应该让线程继续进入pthread_cond_wait  
        while (head == NULL)
        {
            pthread_cond_wait(&cond, &mtx); 
            /* pthread_cond_wait 會先解除之前的 pthread_mutex_lock 鎖定的 mtx,然後 block 在 queue 休眠,直到再次被喚醒
              多數情况下是等待的條件成立而被唤醒,唤醒後,該執行緒會先執行 pthread_mutex_lock(&mtx); 再讀取资源 */
        }
 
        p = head;  
        head = head->n_next;  
        printf("Got %d from front of queue/n", p->n_number);  
        free(p);  
        pthread_mutex_unlock(&mtx);             // 釋放互斥鎖  
    }  
 
    pthread_cleanup_pop(0);  
    return 0;  
}  
 
int main(void)  
{  
    pthread_t tid;  
    int i;  
    struct node *p;  
    pthread_create(&tid, NULL, thread_func, NULL);  
 
    /*[tx6-main]*/  
    for (i = 0; i < 10; i++)
    {  
        p = malloc(sizeof(struct node));  
        p->n_number = i;  
        pthread_mutex_lock(&mtx);             // 需要操作head这个臨界資源,先上鎖,  
        p->n_next = head;  
        head = p;  
        pthread_cond_signal(&cond);  
        pthread_mutex_unlock(&mtx);           // 解鎖
        sleep(1);  
    }  
 
    printf("thread 1 wanna end the line.So cancel thread 2./n");  
    pthread_cancel(tid);             // pthread_cancel 是從外部终止子執行緒,子執行緒會在最近的取消點退出執行緒,而在我们的程式中,最近的取消點為 pthread_cond_wait()
    pthread_join(tid, NULL);  
    printf("All done -- exiting/n");  
    return 0;  
}  

timed wait

#include <pthread.h>
 
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
 
struct timespec
{
    __time_t tv_sec;    /* Seconds. */
    long int tv_nsec;   /* Nanoseconds. */
};
  • 等待至特定時間若條件不滿足則放棄等待,此時 pthread_cond_timedwait 會回傳 ETIMEDOUT
void maketimeout(struct timespec *tsp, long minutes)
{
    struct timeval now;                   /* get the current time */
    gettimeofday(&now);
    tsp->tv_sec = now.tv_sec;
    tsp->tv_nsec = now.tv_usec * 1000;    /* usec to nsec */
    tsp->tv_sec += minutes * 60;          /* add the offset to get timeout value */
}

notifications

#include <pthread.h>
 
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
/* Returns: 0 if OK, error number on failure */
  • broadcast 會喚醒所有在等待的執行緒
  • signal 會喚醒等待中的一個執行緒

Barriers

  • 可以理解成一個 mile stone,當一個執行緒率先跑到 mile stone 的時候就先等待,其他執行緒都到位之后,再從等待狀態喚醒,繼續做後面的事情
  • 當 pthread_barrier_wait 再成功喚醒後會回傳 PTHREAD_BARRIER_SERIAL_THREAD 給隨機一個 thread,其他則回傳 0,錯誤則回傳 error number
#include <pthread.h>
 
int pthread_barrier_destroy(pthread_barrier_t *barrier);
int pthread_barrier_init(pthread_barrier_t *restrict barrier, const pthread_barrierattr_t *restrict attr, unsigned count);
int pthread_barrier_wait(pthread_barrier_t *barrier);

0x07 參考資料

course/nctu-高等unix程式設計/chapter11.txt · 上一次變更: 127.0.0.1