[并发并行]_[线程模型]_[Pthread线程使用模型之一管道Pipeline]

场景

1.经常在Windows,MacOSX 开发C多线程程序的时候,经常需要和线程打交道,如果开发人员的数量不多时,同时掌握Win32和pthread线程
并不是容易的事情,而且使用Win32线程并不能写出跨平台的实现. 所以在成本的制约下选用pthread作为跨平台线程库的首选. 有足够人力的公司可以再封装一层对Win32和本地pthread的调用. 比如 chrome.

2.线程在做高可用,高性能的程序时必不可少,比如Socket,并发任务,顺序任务,文件下载等需要充分利用cpu资源节省执行时间的编码上起着至关重要的作用. 没有多线程的话,再处理后台任务,多任务上会更麻烦一些,当然可以使用协程,但那个并不灵活和常见.

3.学习一门线程模型,最让人关心的就是到底在什么场景下需要用到线程,这种POSIX 线程模型还能怎么用?

介绍

1.POSIX 线程,通常称作pthreads,是独立于语言的执行模型,也是一个并行执行模型. 它允许程序在重叠的时间控制不同的工作流. 每一个工作流被称作线程,同时创建和控制这些流程的工作是通过POSIX Threads API来实现的. POSIX Threads 的API定义是通过标准的
POSIX.1c,Threads extensions (IEEE Std 1003.1c-1995).

2.API的实现在所有的Unix-like系统里都有,比如 FreeBSD,NetBSD,OpenBSD,Linux,Mac OS X and Solaris,对于Windows系统,在 SFU/SUA 子系统里提供了一组POSIX APIs的实现,并且有第3方库 pthread-w32,它是基于存在的Windows API 实现的.

说明

1.《Programming with Posix Threads》 这本书介绍了3种模型(Pipeline,Work Crew,Client/Server). 我们有必要分析下,虽然现实场景比这些更复杂,但是他们组成了基础,掌握了更多的基础才能组成复杂的房屋设计,可以说是一种设计模式.

2.新学习这个pipeline模型,还没在实际项目中使用,所以没有很好的例子,暂时用官方例子说明,当然这个例子实在过于简单. 但是简单也正好记住它的原理.

管道(Pipeline)

说明

每个线程重复执行在序列集合里的相同的操作,并传递每个线程的输出到另一个线程执行下一个步骤,可以称作为流水线.

特点

  1. 每个线程就像一个生产特定部件的机器,它只生产某个部件,不做其他事情.
  2. 这个线程一直在做这个事情,不会停止,有材料时会一直生产完,生产一个部件就会传递到下一个线程.
  3. 它属于一个流水线中一部分,每个步骤都是有顺的,适合处理有序的数据流.

例子:
一个图片文件,线程A 负责读取这个图片到数组,线程B 负责搜索图片数组里的某些特定数据,线程C 负责搜集线程B搜索到的流式数据到报表里.
或者 线程A,B,C 在一个有序序列里分别对数据进行处理.

注: 看到这里有人会问,没必要3个线程啊,用1个线程处理也行阿. 我们想想线程的作用,节省时间,充分利用cpu资源实现并发.假如数据分析总共需要9分钟时间,有10个这样的任务,单线程就需要90 分钟. 如果是3个线程,每个线程处理一部分3分钟的数据. 总共需要
3*10+2 = 32 分钟. 性价比很高吧.

代码例子
pipe.c :
1.每个在管道里的线程对它的输入值+1,并传递到下一个线程. 主程序从stdin读入一系列命令行. 1个命令行或者是一个数字,这个数字注入到管道的开始. 或者字符 “=.” 它会让程序从管道末尾读入下一个结果并把它输出到stdout. 主函数创建管道,并且循环从stdin里读入行数据. 如果行数据是一个单一 “=” 字符. 它会从管道拉取一个值并且打印他们,否则它会转换行为一个整数值

/* * The main program to "drive" the pipeline... */
int main (int argc,char *argv[])
{
    pipe_t my_pipe;
    long value,result;
    int status;
    char line[128];

    pipe_create (&my_pipe,10);
    printf ("Enter integer values,or \"=\" for next result\n");

    while (1) {
        printf ("Data> ");
        if (fgets (line,sizeof (line),stdin) == NULL) exit (0);
        if (strlen (line) <= 1) continue;
        if (strlen (line) <= 2 && line[0] == '=') {
            if (pipe_result (&my_pipe,&result))
                printf ("Result is %ld\n",result);
            else
                printf ("Pipe is empty\n");
        } else {
            if (sscanf (line,"%ld",&value) < 1)
                fprintf (stderr,"Enter an integer value\n");
            else
                pipe_start (&my_pipe,value);
        }
    }
}

2.管道里的每个stage使用一个stage_t类型变量表示. stage_t包含一个mutex来同步访问这个stage. avail条件变量用来发送给stage表明数据已经准备好可以处理了,同时每个阶段拥有一个条件变量ready来表明它准备接收新的数据了. data 成员变量是前一个stage传递来的,thread是操作这个stage的线程,next 是指向下一个stage的指针.

3.pipe_t 结构体描述了一个管道. 它提供指针指向管道阶段的第一和最后一个stage. 第一个stage,head代表了管道里的第一个线程. 最后一个stage,tail是一个特别的stage_t,它没有线程– 因为它只是用来存储管道的最后数据.

/* * Internal structure describing a "stage" in the * pipeline. One for each thread,plus a "result * stage" where the final thread can stash the value. */
typedef struct stage_tag {
    pthread_mutex_t     mutex;          /* Protect data */
    pthread_cond_t      avail;          /* Data available */
    pthread_cond_t      ready;          /* Ready for data */
    int                 data_ready;     /* Data present */
    long                data;           /* Data to process */
    pthread_t           thread;         /* Thread for stage */
    struct stage_tag    *next;          /* Next stage */
} stage_t;

/* * External structure representing the entire * pipeline. */
typedef struct pipe_tag {
    pthread_mutex_t     mutex;          /* Mutex to protect pipe */
    stage_t             *head;          /* First stage */
    stage_t             *tail;          /* Final stage */
    int                 stages;         /* Number of stages */
    int                 active;         /* Active data elements */
} pipe_t;

4.pipe_send,这个函数用来开始管道传输,并且也是被每个stage调用来传递data到下一个stage里. 它首先开始等待指定的stage的ready 条件变量直到它能接收新的数据(之前的数据没处理完). 存储新的数据值,同时告诉stage数据已经准备好.

/*
 * Internal function to send a "message" to the
 * specified pipe stage. Threads use this to pass
 * along the modified data item.
 */
int pipe_send (stage_t *stage,long data)
{
    int status;

    status = pthread_mutex_lock (&stage->mutex);
    if (status != 0)
        return status;
    /*
     * If there's data in the pipe stage,wait for it
     * to be consumed.
     */
    while (stage->data_ready) {
        status = pthread_cond_wait (&stage->ready,&stage->mutex);
        if (status != 0) {
            pthread_mutex_unlock (&stage->mutex);
            return status;
        }
    }

    /*
     * Send the new data
     */
    stage->data = data;
    stage->data_ready = 1;
    status = pthread_cond_signal (&stage->avail);
    if (status != 0) {
        pthread_mutex_unlock (&stage->mutex);
        return status;
    }
    status = pthread_mutex_unlock (&stage->mutex);
    return status;
}

5.pipe_stage是管道里每个stage的开始函数,这个函数的参数就是指向stage_t结构体的指针. 这个线程永远循环执行处理数据. 因为 mutex在循环外被locked,线程看起来好像一直锁定stage的 mutex对象,然而,它是花费大多数时间来等待新的数据,通过avail条件变量. 注意这个线程自动解锁关联到条件变量的mutex. 当data获取到数据,线程自增数据值,同时传递结果到下一个stage. 接着线程清除data_ready变量值来表明不再有数据,同时发信号给ready条件变量来唤醒那些正在等待这个管道stage就绪的线程.

注意: 这个stage在处理数据时并没有解锁stage的mutex,也就是如果在处理耗时的操作时一直是锁住的. 只要data_ready不设置为0之前,可以在处理耗工作前解锁,处理完耗时的工作之后再加锁设置 data_ready = 0.

/*
 * The thread start routine for pipe stage threads.
 * Each will wait for a data item passed from the
 * caller or the prevIoUs stage,modify the data
 * and pass it along to the next (or final) stage.
 */
void *pipe_stage (void *arg)
{
    stage_t *stage = (stage_t*)arg;
    stage_t *next_stage = stage->next;
    int status;

    status = pthread_mutex_lock (&stage->mutex);
    if (status != 0)
        err_abort (status,"Lock pipe stage");
    while (1) {
        while (stage->data_ready != 1) {
            status = pthread_cond_wait (&stage->avail,&stage->mutex);
            if (status != 0)
                err_abort (status,"Wait for prevIoUs stage");
        }
        pipe_send (next_stage,stage->data + 1);
        stage->data_ready = 0;
        status = pthread_cond_signal (&stage->ready);
        if (status != 0)
            err_abort (status,"Wake next stage");
    }
    /*
     * Notice that the routine never unlocks the stage->mutex.
     * The call to pthread_cond_wait implicitly unlocks the
     * mutex while the thread is waiting,allowing other threads
     * to make progress. Because the loop never terminates,this
     * function has no need to unlock the mutex explicitly.
     */
}

6.pipe_create函数创建一个管道,它能创建任意个stage,并连接他们成为一个链表. 对于每个stage,它分配内存给stage_t 结构体并且初始化成员. 注意最后一个stage是被分配和初始化来保存管道最后的结果. 注意最后一个stage并不创建线程.

注意: 这里的链表操作方法,不需要额外的判断.

/*
 * External interface to create a pipeline. All the
 * data is initialized and the threads created. They'll
 * wait for data.
 */
int pipe_create (pipe_t *pipe,int stages)
{
    int pipe_index;
    stage_t **link = &pipe->head,*new_stage,*stage;
    int status;

    status = pthread_mutex_init (&pipe->mutex,NULL);
    if (status != 0)
        err_abort (status,"Init pipe mutex");
    pipe->stages = stages;
    pipe->active = 0;

    for (pipe_index = 0; pipe_index <= stages; pipe_index++) {
        new_stage = (stage_t*)malloc (sizeof (stage_t));
        if (new_stage == NULL)
            errno_abort ("Allocate stage");
        status = pthread_mutex_init (&new_stage->mutex,NULL);
        if (status != 0)
            err_abort (status,"Init stage mutex");
        status = pthread_cond_init (&new_stage->avail,"Init avail condition");
        status = pthread_cond_init (&new_stage->ready,"Init ready condition");
        new_stage->data_ready = 0;
        *link = new_stage;
        link = &new_stage->next;
    }

    *link = (stage_t*)NULL;     /* Terminate list */
    pipe->tail = new_stage;     /* Record the tail */

    /*
     * Create the threads for the pipe stages only after all
     * the data is initialized (including all links). Note
     * that the last stage doesn't get a thread,it's just
     * a receptacle for the final pipeline value.
     *
     * At this point,proper cleanup on an error would take up
     * more space than worthwhile in a "simple example",so
     * instead of cancelling and detaching all the threads
     * already created,plus the synchronization object and
     * memory cleanup done for earlier errors,it will simply
     * abort.
     */
    for (   stage = pipe->head;
            stage->next != NULL;
            stage = stage->next) {
        status = pthread_create (
            &stage->thread,NULL,pipe_stage,(void*)stage);
        if (status != 0)
            err_abort (status,"Create pipe stage");
    }
 return 0;
}

7.pipe_start和pipe_result函数. pipe_start函数入栈一个data数据到管道的开始stage并且迅速返回,不等待结果. pipe_result函数允许请求者等待最终的结果. pipe_start 函数递增pipeline里的active计数,这个active允许pipe_result探测没有更多的active项需要收集,同时迅速返回不阻塞. pipe_result 首先检查是否有一个active项在管道里. 如果没有,它返回status为0,之后解锁pipeline的mutex. 如果有另一个item在管道里,pipe_result锁定tail stage,并且等待它接收数据. 它复制数据并且重置tail stage以便能接收下一个数据项.

/*
 * External interface to start a pipeline by passing
 * data to the first stage. The routine returns while
 * the pipeline processes in parallel. Call the
 * pipe_result return to collect the final stage values
 * (note that the pipe will stall when each stage fills,* until the result is collected).
 */
int pipe_start (pipe_t *pipe,long value)
{
    int status;

    status = pthread_mutex_lock (&pipe->mutex);
    if (status != 0)
        err_abort (status,"Lock pipe mutex");
    pipe->active++;
    status = pthread_mutex_unlock (&pipe->mutex);
    if (status != 0)
        err_abort (status,"Unlock pipe mutex");
    pipe_send (pipe->head,value);
 return 0;
}

/*
 * Collect the result of the pipeline. Wait for a
 * result if the pipeline hasn't produced one.
 */
int pipe_result (pipe_t *pipe,long *result)
{
    stage_t *tail = pipe->tail;
    long value;
    int empty = 0;
    int status;

    status = pthread_mutex_lock (&pipe->mutex);
    if (status != 0)
        err_abort (status,"Lock pipe mutex");
    if (pipe->active <= 0)
        empty = 1;
    else
        pipe->active--;

    status = pthread_mutex_unlock (&pipe->mutex);
    if (status != 0)
        err_abort (status,"Unlock pipe mutex");
    if (empty)
 return 0;

    pthread_mutex_lock (&tail->mutex);
    while (!tail->data_ready)
        pthread_cond_wait (&tail->avail,&tail->mutex);
    *result = tail->data;
    tail->data_ready = 0;
    pthread_cond_signal (&tail->ready);
    pthread_mutex_unlock (&tail->mutex);    
 return 1;
}

术语

SFU (Windows Services for UNIX)
SUA (Interix subsystem component Subsystem for UNIX-based Applications)

参考

https://en.wikipedia.org/wiki/POSIX_Threads
https://en.wikipedia.org/wiki/Windows_Services_for_UNIX
Programming with POSIX Threads

相关文章

迭代器模式(Iterator)迭代器模式(Iterator)[Cursor]意图...
高性能IO模型浅析服务器端编程经常需要构造高性能的IO模型,...
策略模式(Strategy)策略模式(Strategy)[Policy]意图:定...
访问者模式(Visitor)访问者模式(Visitor)意图:表示一个...
命令模式(Command)命令模式(Command)[Action/Transactio...
生成器模式(Builder)生成器模式(Builder)意图:将一个对...