hjman 发表于 2015-3-12 11:54:35

linux 多线程处理

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <assert.h>
#include <time.h>
#include <unistd.h>
#include <curses.h>

#define MAX_SIZE    10
#define WORK_NUMBER 100
#define MAX_THREADS 5

int id_num = 0;
int id = 1;

int exitthread = 0;

typedef struct worker {

    void *(*process)(void *arg);
    void *arg;
    int id;
    struct worker *prev;
    struct worker *next;
    int data;
    int flag ;      

} CalThread_worker;

typedef struct {
    pthread_mutex_t queue_lock;
    pthread_cond_t queue_ready;

    CalThread_worker *queue_head;

    int shutdown;
    pthread_t *threadid;

    int max_thread_num;

    int cur_queue_size;

    int cal_counts;

    int r;

    int addid;

    int subid;

    int addflag;

    int subflag;

    int start;

    int sumdata;

} CalThread_pool;

int pool_add_worker(void *(*process)(void *arg), void *arg);
void *thread_routine(void *arg);
void *thread_moniter(void *arg);

//share resource
static CalThread_pool *pool = NULL;
void pool_init(int max_thread_num)
{
    int i = 0;
        pthread_t moniter_tid;
    srand(clock());
    pool = (CalThread_pool *) malloc(sizeof(CalThread_pool));
    pthread_mutex_init(&(pool->queue_lock), NULL);
    pthread_cond_init(&(pool->queue_ready), NULL);
    pool->queue_head = NULL;
    pool->max_thread_num = max_thread_num;
    pool->cur_queue_size = 0;
    pool->shutdown = 0;
    pool->start = 0;
    pool->cal_counts = 0;
    //锟斤拷锟揭?拷锟斤拷锟侥诧拷锟斤拷
    for (i = 0; i < MAX_SIZE; i++) {
      pool->addid = rand() % 100;
      pool->subid = rand() % 100;
      pool->r = rand() & 1000;
    }
    pool->subflag = 1;
    printf("%d,%d\n", pool->addid, pool->subid);
    //}
    pool->threadid = (pthread_t *) malloc(max_thread_num * sizeof(pthread_t));
    for (i = 0; i < max_thread_num; i++) {
      pthread_create (&(pool->threadid), NULL, thread_routine,NULL);
    }

        pthread_create (&moniter_tid, NULL, thread_moniter,NULL);
}

int pool_add_worker(void *(*process)(void *arg), void *arg)
{
    CalThread_worker *newworker = (CalThread_worker *) malloc(
                                    sizeof(CalThread_worker));
    newworker->process = process;
    newworker->arg = arg;
    newworker->next = NULL;
    pthread_mutex_lock(&(pool->queue_lock));
    CalThread_worker *member = pool->queue_head;
    if (member != NULL) {
      while (member->next != NULL)
            member = member->next;
      member->next = newworker;
      newworker->prev = member;
    } else {
      pool->queue_head = newworker;
      newworker->prev = NULL;
    }
    newworker->id = id_num++;
    newworker->data = 1000;

    //assert (pool->queue_head != NULL);
    pool->cur_queue_size++;
    pthread_mutex_unlock(&(pool->queue_lock));

    return 0;
}

int pool_destroy()
{
    if (pool->shutdown)
      return -1;
    pool->start = 0;
    pool->cal_counts = 0;
    pool->shutdown = 1;
    pthread_cond_broadcast(&(pool->queue_ready));
    int i;
    for (i = 0; i < pool->max_thread_num; i++)
      pthread_join(pool->threadid, NULL);
    free(pool->threadid);
    CalThread_worker *head = NULL;
    while (pool->queue_head != NULL) {
      head = pool->queue_head;
      pool->queue_head = pool->queue_head->next;
      free(head);
    }
    pthread_mutex_destroy(&(pool->queue_lock));
    pthread_cond_destroy(&(pool->queue_ready));
    free(pool);
    pool = NULL;
    printf("pool_destroy\n");
    return 0;
}

void * thread_routine(void *arg)
{
    //printf ("starting thread 0x%x\n", pthread_self ());
    while (1) {
      pthread_mutex_lock(&(pool->queue_lock));
      while (pool->cal_counts == 0 && !pool->shutdown) {
            printf("thread 0x%x is waiting\n", pthread_self());
            pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
      }
      if (pool->shutdown) {
            pthread_mutex_unlock(&(pool->queue_lock));
            printf("thread 0x%x will exit\n", pthread_self());
            pthread_exit(NULL);
      }
      printf("thread 0x%x is starting to work\n", pthread_self());
      //assert (pool->queue_head != NULL);
      //pool->cur_queue_size--;
      usleep(1000*500);
      CalThread_worker *worker = NULL;
      if (pool->start == 1) {
            worker = pool->queue_head;
            if (pool->subflag) {
                do {
                  if (worker->id == pool->subid)
                        break;
                  worker = worker->next;
                } while (worker != NULL);               
                worker->flag= 1;
                pool->addflag = 1;
                pool->subflag = 0;
            } else if (pool->addflag) {
                do {
                  if (worker->id == pool->addid)
                        break;
                  worker = worker->next;
                } while (worker != NULL);
                if (worker != NULL) {
                  pool->addflag = 0;
                  pool->subflag = 1;
                  worker->flag= 2;
                }
            }
      }      
      pthread_mutex_unlock(&(pool->queue_lock));
      if (worker != NULL) {
            (*(worker->process))(worker->arg);
      }
    }
    pthread_exit (NULL);
    return 0;
}

void * thread_moniter(void *arg){

        while (1) {
           char ch = getchar();
       if (ch == 'e' || ch == 'E')break;
           usleep(1000*500);
    }        
        exitthread = 1;
        pthread_exit (NULL);

        return NULL;
}


void * calprocess(void *arg)
{
        int i ;
    //printf("threadid is 0x%x, working on task %d\n", pthread_self(), *(int *) arg);
    CalThread_worker *worker = NULL;
    worker = pool->queue_head;
    int orgdata = 0;
    do {
      if (worker->id == *(int *) arg)
            break;
      worker = worker->next;
    } while (worker != NULL);
    if (worker == NULL)return NULL;
    if (worker->flag == 1) {
      orgdata = worker->data ;
      worker->data -= pool->r;
      pool->sumdata = worker->data;
      printf ("the %d calcuate \n",id++);
      printf ("threadid :0x%x, sub-data:%d\n",pthread_self(),worker->data );
      worker->data = orgdata;
    }
    if (worker->flag == 2) {
      orgdata = worker->data;
      worker->data += pool->r;
                pool->cal_counts--;
      if (pool->cal_counts == 0){
                        pool->start = 0;
                        for (i = 0; i < MAX_SIZE; i++) {
                pool->addid = rand() % 100;
                pool->subid = rand() % 100;
                pool->r = rand() & 1000;
            }
                        pool->start = 1;
                        pool->cal_counts = MAX_SIZE;
      }         
      pool->sumdata += worker->data;
      printf ("threadid :0x%x, add-data:%d\n",pthread_self(),worker->data );
      printf ("threadid :0x%x, sum-data:%d\n",pthread_self(),pool->sumdata);
      worker->data = orgdata;
    }
    worker->flag = 0;
    return NULL;
}

int main(int argc, char **argv)
{
    int i = 0;
    pool_init(MAX_THREADS);
    pool->subflag = 1;
    pool->start = 1;   
    int *workingnum = (int *) malloc(sizeof(int) * WORK_NUMBER);

   for (i = 0; i < WORK_NUMBER; i++) {
      workingnum = i;
      pool_add_worker(calprocess, &workingnum);
    }
    pool->cal_counts = MAX_SIZE;
    printf("counts:%d\n", pool->cal_counts);

    pthread_cond_broadcast(&(pool->queue_ready));
        while (!exitthread)
       sleep(2);

    pool_destroy();
    free(workingnum);
    printf("main end\n");
    return 0;


页: [1]
查看完整版本: linux 多线程处理