52matlab技术网站,matlab教程,matlab安装教程,matlab下载

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 3197|回复: 0

mpi和openomp并行计算

[复制链接]

98

主题

180

帖子

2166

积分

版主

Rank: 7Rank: 7Rank: 7

积分
2166
发表于 2018-4-27 02:10:52 | 显示全部楼层 |阅读模式
本帖最后由 matlab的旋律 于 2018-5-3 14:56 编辑

//note: How to run and test: Enter all the txt files' name you want to test into one txt file named "filename_list.txt".
//                           Put this .c document and the filename_list.txt file together under the same folder and then run and compile.

#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <stdlib.h>
#include <omp.h>

#define FILENAME_LEN 50
#define FILENAME_NUM 100
#define HASH_TABLE_MAX_SIZE 100000
#define MAX_LINE_SIZE 100000

char text[FILENAME_NUM][MAX_LINE_SIZE * 50];
int hash_size[FILENAME_NUM];
typedef struct HashTable Node;
struct HashTable
{
        char* Key;
        int Value;
        Node* pNext;
};

Node * hashTable[FILENAME_NUM + 1][HASH_TABLE_MAX_SIZE];
unsigned int hashComputeByHashFunc(const char* key)
{
        unsigned int hash = 1;
        int temp;
        while (temp = *key++)
        {
                hash = hash * 33 + temp;
        }
        return hash;
}


void hashTableInsertFunc(const char* key, int value, int index)
{
        unsigned int pos, poscheck;
        Node* NewNode = (Node*)malloc(sizeof(Node));
        memset(NewNode, 0, sizeof(Node));
        NewNode->Key = (char*)malloc(sizeof(char)* (strlen(key) + 1));
        strcpy(NewNode->Key, key);
        NewNode->Value = value;

        pos = hashComputeByHashFunc(key) % HASH_TABLE_MAX_SIZE;
        poscheck = pos;
        
        while (hashTable[index][poscheck] != NULL && hashTable[index][poscheck]->Value != -1)
        {
                poscheck++;
                poscheck %= HASH_TABLE_MAX_SIZE;
        }
        NewNode->pNext = hashTable[index][pos];
        hashTable[index][pos] = NewNode;

        hash_size[index]++;
}

Node* hashTableLookupFunc(const char* key, int index)
{
        unsigned int pos = hashComputeByHashFunc(key) % HASH_TABLE_MAX_SIZE;
        Node* pHead;
        if (hashTable[index][pos])
        {
                pHead = hashTable[index][pos];
                while (pHead)
                {
                        if (strcmp(key, pHead->Key) == 0)
                        {
                                return pHead;
                        }
                        pHead = pHead->pNext;
                }
        }
        return NULL;
}

void hashTablePrintFunc(int index)
{
        int i = 0;
        Node* pHead;
        printf("=========== content of hash table ===========\n");
        while (i < HASH_TABLE_MAX_SIZE)
        {
                if (hashTable[index])
                {
                        pHead = hashTable[index];
                        while (pHead)
                        {
                                printf("Word: %s, count: %d ", pHead->Key, pHead->Value);
                                pHead = pHead->pNext;
                                printf("\n");
                        }
                }
                i++;
        }
}


void rmPunct2LowerFunc(char *p)
{
        char *src = p, *dst = p;
        while (*src)
        {
                if (ispunct((unsigned char)*src) || isdigit((unsigned char)*src))
                {
                        src++;
                }
                else if (isupper((unsigned char)*src))
                {
                        *dst++ = tolower((unsigned char)*src);
                        src++;
                }
                else if (src == dst)
                {
                        src++;
                        dst++;
                }
                else
                {
                        *dst++ = *src++;
                }
        }
        *dst = 0;
}


void readFunc(const char* filename, int index)
{
        FILE *fp = fopen(filename, "r");
        char word[1000];
        if (fp == NULL)
        {
                printf("input file is invalid !");
                return;
        }

        while (fscanf(fp, " %s", word) == 1)
        {
                rmPunct2LowerFunc(word);
                strcat(text[index], word);
                strcat(text[index], " ");
        }
        fclose(fp);
}



void mapperFunc(int index)
{
        char *nextWord;
        nextWord = strtok(text[index], " \r\n");
        while (nextWord != NULL)
        {
#pragma omp critical
                {
                        if (hashTableLookupFunc(nextWord, index) == NULL)
                        {
                                hashTableInsertFunc(nextWord, 1, index);
                        }
                        else
                        {
                                hashTableLookupFunc(nextWord, index)->Value++;
                        }
                        nextWord = strtok(NULL, " \r\n");
                }
        }
}

void reducerFunc(int reduceCount)
{
        int i, j;
#pragma omp parallel for
        for (i = 0; i < reduceCount; ++i)
        {
                for (j = 0; j < HASH_TABLE_MAX_SIZE; j++)
                {
#pragma omp critical
                        if (hashTable[j])
                        {
                                Node* pHead = hashTable[j];
                                while (pHead)
                                {
                                        if (hashTableLookupFunc(pHead->Key, FILENAME_NUM) == NULL)
                                                hashTableInsertFunc(pHead->Key, pHead->Value, FILENAME_NUM);
                                        else
                                        {
                                                int val = pHead->Value;
                                                hashTableLookupFunc(pHead->Key, FILENAME_NUM)->Value += val;
                                        }
                                        pHead = pHead->pNext;
                                }
                        }
                }
        }
}


void writerFunc(FILE * fp)
{
        int i;
        Node* p;
        fprintf(fp, "------print the result------ \n");
#pragma omp critial
        {
                for (i = 0; i < HASH_TABLE_MAX_SIZE; ++i)
                {
                        if (hashTable[FILENAME_NUM] != NULL)
                        {
                                p = hashTable[FILENAME_NUM];
                                while (p)
                                {
                                        fprintf(fp, "Word: %s, Count: %d\n", p->Key, p->Value);
                                        p = p->pNext;
                                }
                        }
                }
        }
}

// Main Function
int main()
{
        int file_num = 0;
        FILE *read_filename = fopen("filename_list.txt", "r");
        char **filename_list_array = (char **)malloc(sizeof(char*)* FILENAME_NUM);
        int i,j;
        Node* pHead;

#pragma omp parallel for
        for (i = 0; i < FILENAME_NUM; i++)
        {
                filename_list_array = (char *)malloc(sizeof(char)* FILENAME_LEN);
        }

        if (read_filename == NULL)
        {
                printf("open the file incorrectly !");
                return 0;
        }
        while (!feof(read_filename))
        {
                fscanf(read_filename, "%s\n", filename_list_array[file_num]);
                file_num++;
        }
        printf("The result can be found in output.txt .\n");

        omp_set_num_threads(8);
        double time = -omp_get_wtime();
#pragma omp parallel private (i)
        {
#pragma omp single nowait
                {
                        for (i = 0; i < file_num; i++)
                        {
#pragma omp task
                                {
                                        readFunc(filename_list_array, i);
                                }
                        }
                }
        }
#pragma omp parallel private (i)
        {
#pragma omp single nowait
                {
                        for (i = 0; i < file_num; i++)
                        {
#pragma omp task
                                {
                                        mapperFunc(i);
                                }
                        }
                }
        }
#pragma omp barrier
        reducerFunc(file_num);
#pragma omp barrier
        FILE *fp = fopen("output.txt", "w");

        writerFunc(fp);

        hashTablePrintFunc(FILENAME_NUM);

#pragma omp parallel for
        for (i = 0; i < FILENAME_NUM; i++)
        {
                for (j = 0; j < HASH_TABLE_MAX_SIZE; j++)
                {
                        if (hashTable[j])
                        {
                                pHead = hashTable[j];
                                if (pHead)
                                {
                                        free(pHead->Key);
                                        free(pHead);
                                }
                        }
                }
        }
        
        
        fclose(fp);
        time = time + omp_get_wtime();
        printf("Elapsed time is %lf seconds. \n", time);

        for (i = 0; i < FILENAME_NUM; i++)
        {
                free(filename_list_array);
        }
        free(filename_list_array);

        return 0;
}

//Guanshi He
//ECE 563
//Small Project
//Matrix Multiply
//MPI Version

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

//we need to define N as the multiply of the number of the worker thread
//Since the number of worker(s) would be 1, 3, 7, 15
//We set the matrix size as least common multiple 105
#define N 1050
//declare the three matrix, the result will be in c matrix
double a[N][N],b[N][N],c[N][N];

main(int argc, char *argv[]) {
    //MPI useful variable
    int size,rank;
    //number of worker threads
    int numworkers;
    //variable for message send and recv
    int source,dest;

    int rows,offset;
    int i,j,k;


    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Status status;
    numworkers = size - 1;

    clock_t start, end;
    double cpu_time1, cpu_time2;

    /*---------------------------- master ----------------------------*/
    start = clock();
    if (rank == 0) {
        for (i=0; i<N; i++) {
            for (j=0; j<N; j++) {
                a[j]= 1.0;
                b[j]= 1.0;
            }
        }

        // send matrix data to the worker threads
        rows = N / numworkers;
        offset = 0;

        for (dest = 1; dest <= numworkers; dest++) {
            MPI_Send(&offset, 1, MPI_INT, dest, 1, MPI_COMM_WORLD);
            MPI_Send(&rows, 1, MPI_INT, dest, 1, MPI_COMM_WORLD);
            MPI_Send(&a[offset][0], rows * N, MPI_DOUBLE, dest, 1, MPI_COMM_WORLD);
            MPI_Send(&b, N * N, MPI_DOUBLE, dest, 1, MPI_COMM_WORLD);
            offset = offset + rows;
        }

        // receive the data from other thread
        for (i = 1; i<= numworkers; i++) {
            source = i;
            MPI_Recv(&offset, 1, MPI_INT, source, 2, MPI_COMM_WORLD, &status);
            MPI_Recv(&rows, 1, MPI_INT, source, 2, MPI_COMM_WORLD, &status);
            MPI_Recv(&c[offset][0], rows * N, MPI_DOUBLE, source, 2, MPI_COMM_WORLD, &status);
        }

        //uncomment this for displaying the result
        // printf("Here is the result matrix:\n");
        // for (i=0; i<N; i++) {
        //     for (j=0; j<N; j++) {
        //         printf("%.2f   ", c[j]);
        //     }
        //     printf ("\n");
        // }

        end = clock();
        cpu_time1 = ((double)(end - start)) / CLOCKS_PER_SEC;

        //Time the sequtial run
        start = clock();
        for (i = 0; i < N; i++) {
            for (j = 0; j < N; j++) {
                for (offset = 0; offset < N; offset++) {
                    c[j] += a[offset] * b[j][offset];
                }
            }
        }
        end = clock();
        cpu_time2 = ((double)(end - start)) / CLOCKS_PER_SEC;

        //output the results and do the comparation
        printf("MPI size : %d\n", size);
        printf("Parallel running time : %f\n", cpu_time1);
        printf("Sequtial running time : %f\n", cpu_time2);
        printf("SpeedUp : %.2f \n", (100*(float)cpu_time2/(float)cpu_time1));

    }

    /*---------------------------- worker----------------------------*/
    if (rank > 0) {
        source = 0;

        //receving the data from master thread
        MPI_Recv(&offset, 1, MPI_INT, source, 1, MPI_COMM_WORLD, &status);
        MPI_Recv(&rows, 1, MPI_INT, source, 1, MPI_COMM_WORLD, &status);
        MPI_Recv(&a, rows * N, MPI_DOUBLE, source, 1, MPI_COMM_WORLD, &status);
        MPI_Recv(&b, N * N, MPI_DOUBLE, source, 1, MPI_COMM_WORLD, &status);

        // do the computation
        for (k = 0; k < N; k++) {
            for (i = 0; i < rows; i++) {
                c[k] = 0.0;
                for (j = 0; j < N; j++) {
                    c[k] += a[j] * b[j][k];
                }
            }
        }

        //send back the data
        MPI_Send(&offset, 1, MPI_INT, 0, 2, MPI_COMM_WORLD);
        MPI_Send(&rows, 1, MPI_INT, 0, 2, MPI_COMM_WORLD);
        MPI_Send(&c, rows * N, MPI_DOUBLE, 0, 2, MPI_COMM_WORLD);
    }
    MPI_Finalize();

}

// Result from the output
// he95@scholar-fe02:~/SmallProj $ mpiexec -n 2 ./a.out
// MPI size : 2
// Parallel running time : 3.700000
// Sequtial running time : 3.350000
// SpeedUp : 90.54
// he95@scholar-fe02:~/SmallProj $ mpiexec -n 4 ./a.out
// MPI size : 4
// Parallel running time : 1.270000
// Sequtial running time : 3.380000
// SpeedUp : 266.14
// he95@scholar-fe02:~/SmallProj $ mpiexec -n 8 ./a.out
// MPI size : 8
// Parallel running time : 0.620000
// Sequtial running time : 3.800000
// SpeedUp : 612.90
// he95@scholar-fe02:~/SmallProj $ mpiexec -n 16 ./a.out
// MPI size : 16
// Parallel running time : 0.360000
// Sequtial running time : 3.800000
// SpeedUp : 1055.56



//Team Member:
//Haichao Xu
//Guanshi He
//ECE563 Large Project Final Turnin MPI Version

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#define HASH_TABLE_MAX_SIZE 10000
#define number_of_processor 4
#define MAX_LINE_SIZE 100000
#define MAX_FILE_NUM 32
#define LINE_NUM 40

int flag[2];
int global_count = 0;
char text[MAX_LINE_SIZE * LINE_NUM];
int hash_table_size;  //the number of key-value pairs in the hash table!

typedef struct HashTable Node;
struct HashTable
{
    char sKey[40];
    int nValue;
    Node* pNext;
};

Node* hashTable[MAX_FILE_NUM][HASH_TABLE_MAX_SIZE * LINE_NUM]; //hash table data strcutrue

//string hash function
unsigned int hash_table_hash_str(const char* skey)
{
    const signed char *p = (const signed char*)skey;
    unsigned int hash = 5381;
    int temp;
    while( temp = *skey++) {
        hash = hash * 33 + temp;
    }
    return hash;
}

//insert key-value into hash table
void hash_table_insert(const char* skey, int nvalue, int index)
{
    unsigned int pos = hash_table_hash_str(skey) % HASH_TABLE_MAX_SIZE;
    Node* pNewNode = (Node*)malloc(sizeof(Node));
    memset(pNewNode, 0, sizeof(Node));
        if(hash_table_size >= HASH_TABLE_MAX_SIZE)
    {
        return;
    }
    //pNewNode->sKey = (char*)malloc(sizeof(char) * (strlen(skey) + 1));
    strcpy(pNewNode->sKey, skey);
    pNewNode->nValue = nvalue;

    //pNewNode->pNext = hashTable[pos];
    hashTable[index][pos] = pNewNode;
    hash_table_size++;
}

//lookup a key in the hash table
Node* hash_table_lookup(const char* skey, int index)
{
    unsigned int pos = hash_table_hash_str(skey) % HASH_TABLE_MAX_SIZE;
        Node* pHead;
    if(hashTable[index][pos])
    {
        pHead = hashTable[index][pos];
        if (pHead)
        {
            if(strcmp(skey, pHead->sKey) == 0)
                return pHead;
        }
    }
    return NULL;
}

void remove_punct_and_make_lower_case(char *p)
{
    char *src = p, *dst = p;
    while (*src) {
        if (ispunct((unsigned char)*src)) {
            src++;
        } else if (isdigit((unsigned char)*src)) {
            src++;
        } else if (isupper((unsigned char)*src)) {
            *dst++ = tolower((unsigned char)*src);
            src++;
        } else if (src == dst) {
            src++;
            dst++;
        } else {
            *dst++ = *src++;
        }
    }
    *dst = 0;
}


// Reader function
void reader(const char* filename)
{
  FILE *fp = fopen(filename, "r");
  char word[1024] = {0};
  if(fp == NULL){
    printf("invalid input file");
    exit(1);
  }
  //read word by word                                                                                                                                                                                       
  while(fscanf(fp, " %1023s", word) == 1){
    //convert to lower case                                                                                                                                                                                 
    remove_punct_and_make_lower_case(word);
    strcat(text, word);
    strcat(text," ");
    global_count++;
  }
  fclose(fp);
}


// Mapper function
void mapper(int each_step, int ex, int pid) {
    char *nextWord;
    char * ptr = NULL;
    int count = 0;
    nextWord = strtok_r(text, " \r\n", &ptr);
    while(nextWord != NULL) {
        if(count < pid * each_step + ex) {
            nextWord = strtok_r(NULL, " \r\n", &ptr);
        }
        else
                        break;
        count++;
    }
    count = 0;
    while (nextWord != NULL && count <= pid * each_step + ex) {
        if (hash_table_lookup(nextWord, pid) == NULL) {
            hash_table_insert(nextWord, 1, pid);
        } else {
             hash_table_lookup(nextWord, pid)->nValue++;   
        }
        count++;
        nextWord = strtok_r(NULL, " \r\n", &ptr);
    }
}
// Reducer function

void reducer(int reduceCount) {
    int i, j, val;
        Node* pHead;
    for(i = 0; i < reduceCount; ++i)
    {
        for (j = 0; j < HASH_TABLE_MAX_SIZE; j++) {
            if(hashTable[j])
            {
                pHead = hashTable[j];
                if(pHead)
                {
                    if (hash_table_lookup(pHead->sKey, MAX_FILE_NUM) == NULL)
                        hash_table_insert(pHead->sKey, pHead->nValue, MAX_FILE_NUM);
                    else {
                        val = pHead -> nValue;
                        hash_table_lookup(pHead->sKey, MAX_FILE_NUM)->nValue += val;
                    }

                }
            }
        }
    }
}

void writer(FILE * f) {
    int i, j;
        Node* p;
    fprintf(f, "====== Printing out the result =====\n");
    for(i = 0; i < HASH_TABLE_MAX_SIZE*LINE_NUM; i++)
       {
        if(hashTable[0] != NULL)
        {
          p = hashTable[0];
          if(p)
            {
              fprintf(f, "Word: %s, Count: %d\n", p->sKey, p->nValue);
            }
        }
    }
}
/* ============================test function ============================*/
int main(int argc, char** argv) {
    // Initialize the MPI environment
    int world_size, world_rank,j;
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &world_size);
    MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
    MPI_Status status[16];
    MPI_Request request[16];
    int nthreads, tid;
    srand(time(NULL));
        
        hash_table_size = 0;
    memset(hashTable, 0, sizeof(Node*) * HASH_TABLE_MAX_SIZE);
    int i = 0;
    //debug starts here
    MPI_Datatype Particletype;
    MPI_Datatype type[2] = { MPI_INT, MPI_BYTE};
    int blocklen[2] = {40, 1};
    MPI_Aint offsets[2];
    offsets[0] = 0;
    offsets[1] = 40;
    MPI_Type_create_struct(2, blocklen, offsets, type, &Particletype);
    MPI_Type_commit(&Particletype);

    //double start = omp_get_wtime();
    int each_step = 0;
    int ex = 0;
    //start READER
    for(i = 0; i < argc - 1; i++) {
        reader(argv[i+1]);
    }
    //calculate for steps and ex
    each_step = global_count / world_size;
    ex = global_count % world_size;

    if (world_rank == 0) {
        //broadcast text
        Node rec[world_size][100000];
        //initialization
        int k = 0;
        char * temp = " ";
        for (i = 0; i < world_size; i++) {
            for (k = 0; k < 100000; k++) {
                rec[k].nValue = 0;

                strcpy(rec[k].sKey, temp);
            }
        }
        MPI_Bcast(text, MAX_LINE_SIZE * LINE_NUM, MPI_BYTE, 0, MPI_COMM_WORLD);
        //mapper
        char *nextWord;
        char * ptr = NULL;
        int count = 0;
        nextWord = strtok_r(text, " \r\n", &ptr);
        while (nextWord != NULL && count < each_step + ex) {
            #pragma omp critical
            {
                if (hash_table_lookup(nextWord, 0) == NULL) {
                    hash_table_insert(nextWord, 1, 0);
                } else {
                    hash_table_lookup(nextWord, 0)->nValue++;   
                }
                count++;
                nextWord = strtok_r(NULL, " \r\n", &ptr);
            }
        }
        i = 1;
        while(i < world_size) {

            MPI_Irecv(&rec, 100000, Particletype, i, i, MPI_COMM_WORLD, request+i-1);
            i++;
        }
        //printf("%s \n %d \n %d \n", text, each_step, global_count);
        MPI_Waitall(world_size-1, request, status);
        //reduce
        int j = 0;
        for (i = 1; i < world_size; i++) {
            for (j = 0; j < 100000; j++) {
                if(rec[j].nValue > 0) {
                    if (hash_table_lookup(rec[j].sKey, 0) == NULL) {
                    hash_table_insert(rec[j].sKey, rec[j].nValue, 0);
                    } else {
                        //printf("%s \n %d \n", rec[j].sKey, rec[j].nValue);
                        hash_table_lookup(rec[j].sKey, 0)->nValue += rec[j].nValue;   
                    }
                }
            }
        }
        //writer thread
        FILE *fp = fopen("output.txt", "w");
        writer(fp);
        //double end = omp_get_wtime();
        printf("%f \n", end - start);
    }
    else if(world_rank != 0) {
        MPI_Bcast(text, MAX_LINE_SIZE * LINE_NUM, MPI_BYTE, 0, MPI_COMM_WORLD);
        //mapper
        Node send[100000];
        int index = world_rank;
        int count = 0;
        char* skey;
        mapper(each_step, ex, world_rank);
        //copy to send
        for(i = 0; i < HASH_TABLE_MAX_SIZE; ++i)
        if(hashTable[index])
        {
            Node* pHead = hashTable[index];
            if (pHead)
            {
                send[count].nValue = pHead->nValue;
                skey = pHead->sKey;
                strcpy(send[count].sKey, skey);
                count++;
            }
        }
        MPI_Send(&send, count, Particletype, 0, world_rank, MPI_COMM_WORLD);
    }
        
        j = 0;
        for(i = 0; i < HASH_TABLE_MAX_SIZE; ++i)//free the memory of the hash table
    {
        if(j >= 31) break;
        if(hashTable[j])
        {
            j++;
            Node* pHead = hashTable[j];
            if (pHead)
            {
                Node* pTemp = pHead;
                //pHead = pHead->pNext;
                if(pTemp)
                {
                    free(pTemp->sKey);
                    free(pTemp);
                }
            }
        }
    }

    // Finalize the MPI environment.
    MPI_Finalize();

}



回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|52matlab技术网站 ( 粤ICP备14005920号-5 )

GMT+8, 2020-9-26 03:30 , Processed in 0.062561 second(s), 19 queries .

Powered by Discuz! X3.2 Licensed

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表