|  | 
 
| 本帖最后由 matlab的旋律 于 2018-5-3 14:56 编辑 
 //note: How to run and test: Enter all the txt files' name you want to test into one txt file named "filename_list.txt".
 //                           Put this .c document and the filename_list.txt file together under the same folder and then run and compile.
 
 #include <stdio.h>
 #include <string.h>
 #include <ctype.h>
 #include <stdlib.h>
 #include <omp.h>
 
 #define FILENAME_LEN 50
 #define FILENAME_NUM 100
 #define HASH_TABLE_MAX_SIZE 100000
 #define MAX_LINE_SIZE 100000
 
 char text[FILENAME_NUM][MAX_LINE_SIZE * 50];
 int hash_size[FILENAME_NUM];
 typedef struct HashTable Node;
 struct HashTable
 {
 char* Key;
 int Value;
 Node* pNext;
 };
 
 Node * hashTable[FILENAME_NUM + 1][HASH_TABLE_MAX_SIZE];
 unsigned int hashComputeByHashFunc(const char* key)
 {
 unsigned int hash = 1;
 int temp;
 while (temp = *key++)
 {
 hash = hash * 33 + temp;
 }
 return hash;
 }
 
 
 void hashTableInsertFunc(const char* key, int value, int index)
 {
 unsigned int pos, poscheck;
 Node* NewNode = (Node*)malloc(sizeof(Node));
 memset(NewNode, 0, sizeof(Node));
 NewNode->Key = (char*)malloc(sizeof(char)* (strlen(key) + 1));
 strcpy(NewNode->Key, key);
 NewNode->Value = value;
 
 pos = hashComputeByHashFunc(key) % HASH_TABLE_MAX_SIZE;
 poscheck = pos;
 
 while (hashTable[index][poscheck] != NULL && hashTable[index][poscheck]->Value != -1)
 {
 poscheck++;
 poscheck %= HASH_TABLE_MAX_SIZE;
 }
 NewNode->pNext = hashTable[index][pos];
 hashTable[index][pos] = NewNode;
 
 hash_size[index]++;
 }
 
 Node* hashTableLookupFunc(const char* key, int index)
 {
 unsigned int pos = hashComputeByHashFunc(key) % HASH_TABLE_MAX_SIZE;
 Node* pHead;
 if (hashTable[index][pos])
 {
 pHead = hashTable[index][pos];
 while (pHead)
 {
 if (strcmp(key, pHead->Key) == 0)
 {
 return pHead;
 }
 pHead = pHead->pNext;
 }
 }
 return NULL;
 }
 
 void hashTablePrintFunc(int index)
 {
 int i = 0;
 Node* pHead;
 printf("=========== content of hash table ===========\n");
 while (i < HASH_TABLE_MAX_SIZE)
 {
 if (hashTable[index])
 {
 pHead = hashTable[index];
 while (pHead)
 {
 printf("Word: %s, count: %d ", pHead->Key, pHead->Value);
 pHead = pHead->pNext;
 printf("\n");
 }
 }
 i++;
 }
 }
 
 
 void rmPunct2LowerFunc(char *p)
 {
 char *src = p, *dst = p;
 while (*src)
 {
 if (ispunct((unsigned char)*src) || isdigit((unsigned char)*src))
 {
 src++;
 }
 else if (isupper((unsigned char)*src))
 {
 *dst++ = tolower((unsigned char)*src);
 src++;
 }
 else if (src == dst)
 {
 src++;
 dst++;
 }
 else
 {
 *dst++ = *src++;
 }
 }
 *dst = 0;
 }
 
 
 void readFunc(const char* filename, int index)
 {
 FILE *fp = fopen(filename, "r");
 char word[1000];
 if (fp == NULL)
 {
 printf("input file is invalid !");
 return;
 }
 
 while (fscanf(fp, " %s", word) == 1)
 {
 rmPunct2LowerFunc(word);
 strcat(text[index], word);
 strcat(text[index], " ");
 }
 fclose(fp);
 }
 
 
 
 void mapperFunc(int index)
 {
 char *nextWord;
 nextWord = strtok(text[index], " \r\n");
 while (nextWord != NULL)
 {
 #pragma omp critical
 {
 if (hashTableLookupFunc(nextWord, index) == NULL)
 {
 hashTableInsertFunc(nextWord, 1, index);
 }
 else
 {
 hashTableLookupFunc(nextWord, index)->Value++;
 }
 nextWord = strtok(NULL, " \r\n");
 }
 }
 }
 
 void reducerFunc(int reduceCount)
 {
 int i, j;
 #pragma omp parallel for
 for (i = 0; i < reduceCount; ++i)
 {
 for (j = 0; j < HASH_TABLE_MAX_SIZE; j++)
 {
 #pragma omp critical
 if (hashTable[j])
 {
 Node* pHead = hashTable[j];
 while (pHead)
 {
 if (hashTableLookupFunc(pHead->Key, FILENAME_NUM) == NULL)
 hashTableInsertFunc(pHead->Key, pHead->Value, FILENAME_NUM);
 else
 {
 int val = pHead->Value;
 hashTableLookupFunc(pHead->Key, FILENAME_NUM)->Value += val;
 }
 pHead = pHead->pNext;
 }
 }
 }
 }
 }
 
 
 void writerFunc(FILE * fp)
 {
 int i;
 Node* p;
 fprintf(fp, "------print the result------ \n");
 #pragma omp critial
 {
 for (i = 0; i < HASH_TABLE_MAX_SIZE; ++i)
 {
 if (hashTable[FILENAME_NUM] != NULL)
 {
 p = hashTable[FILENAME_NUM];
 while (p)
 {
 fprintf(fp, "Word: %s, Count: %d\n", p->Key, p->Value);
 p = p->pNext;
 }
 }
 }
 }
 }
 
 // Main Function
 int main()
 {
 int file_num = 0;
 FILE *read_filename = fopen("filename_list.txt", "r");
 char **filename_list_array = (char **)malloc(sizeof(char*)* FILENAME_NUM);
 int i,j;
 Node* pHead;
 
 #pragma omp parallel for
 for (i = 0; i < FILENAME_NUM; i++)
 {
 filename_list_array = (char *)malloc(sizeof(char)* FILENAME_LEN);
 }
 
 if (read_filename == NULL)
 {
 printf("open the file incorrectly !");
 return 0;
 }
 while (!feof(read_filename))
 {
 fscanf(read_filename, "%s\n", filename_list_array[file_num]);
 file_num++;
 }
 printf("The result can be found in output.txt .\n");
 
 omp_set_num_threads(8);
 double time = -omp_get_wtime();
 #pragma omp parallel private (i)
 {
 #pragma omp single nowait
 {
 for (i = 0; i < file_num; i++)
 {
 #pragma omp task
 {
 readFunc(filename_list_array, i);
 }
 }
 }
 }
 #pragma omp parallel private (i)
 {
 #pragma omp single nowait
 {
 for (i = 0; i < file_num; i++)
 {
 #pragma omp task
 {
 mapperFunc(i);
 }
 }
 }
 }
 #pragma omp barrier
 reducerFunc(file_num);
 #pragma omp barrier
 FILE *fp = fopen("output.txt", "w");
 
 writerFunc(fp);
 
 hashTablePrintFunc(FILENAME_NUM);
 
 #pragma omp parallel for
 for (i = 0; i < FILENAME_NUM; i++)
 {
 for (j = 0; j < HASH_TABLE_MAX_SIZE; j++)
 {
 if (hashTable[j])
 {
 pHead = hashTable[j];
 if (pHead)
 {
 free(pHead->Key);
 free(pHead);
 }
 }
 }
 }
 
 
 fclose(fp);
 time = time + omp_get_wtime();
 printf("Elapsed time is %lf seconds. \n", time);
 
 for (i = 0; i < FILENAME_NUM; i++)
 {
 free(filename_list_array);
 }
 free(filename_list_array);
 
 return 0;
 }
 
 //Guanshi He
 //ECE 563
 //Small Project
 //Matrix Multiply
 //MPI Version
 
 #include <mpi.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <time.h>
 
 //we need to define N as the multiply of the number of the worker thread
 //Since the number of worker(s) would be 1, 3, 7, 15
 //We set the matrix size as least common multiple 105
 #define N 1050
 //declare the three matrix, the result will be in c matrix
 double a[N][N],b[N][N],c[N][N];
 
 main(int argc, char *argv[]) {
 //MPI useful variable
 int size,rank;
 //number of worker threads
 int numworkers;
 //variable for message send and recv
 int source,dest;
 
 int rows,offset;
 int i,j,k;
 
 
 MPI_Init(&argc, &argv);
 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
 MPI_Comm_size(MPI_COMM_WORLD, &size);
 MPI_Status status;
 numworkers = size - 1;
 
 clock_t start, end;
 double cpu_time1, cpu_time2;
 
 /*---------------------------- master ----------------------------*/
 start = clock();
 if (rank == 0) {
 for (i=0; i<N; i++) {
 for (j=0; j<N; j++) {
 a[j]= 1.0;
 b[j]= 1.0;
 }
 }
 
 // send matrix data to the worker threads
 rows = N / numworkers;
 offset = 0;
 
 for (dest = 1; dest <= numworkers; dest++) {
 MPI_Send(&offset, 1, MPI_INT, dest, 1, MPI_COMM_WORLD);
 MPI_Send(&rows, 1, MPI_INT, dest, 1, MPI_COMM_WORLD);
 MPI_Send(&a[offset][0], rows * N, MPI_DOUBLE, dest, 1, MPI_COMM_WORLD);
 MPI_Send(&b, N * N, MPI_DOUBLE, dest, 1, MPI_COMM_WORLD);
 offset = offset + rows;
 }
 
 // receive the data from other thread
 for (i = 1; i<= numworkers; i++) {
 source = i;
 MPI_Recv(&offset, 1, MPI_INT, source, 2, MPI_COMM_WORLD, &status);
 MPI_Recv(&rows, 1, MPI_INT, source, 2, MPI_COMM_WORLD, &status);
 MPI_Recv(&c[offset][0], rows * N, MPI_DOUBLE, source, 2, MPI_COMM_WORLD, &status);
 }
 
 //uncomment this for displaying the result
 // printf("Here is the result matrix:\n");
 // for (i=0; i<N; i++) {
 //     for (j=0; j<N; j++) {
 //         printf("%.2f   ", c[j]);
 //     }
 //     printf ("\n");
 // }
 
 end = clock();
 cpu_time1 = ((double)(end - start)) / CLOCKS_PER_SEC;
 
 //Time the sequtial run
 start = clock();
 for (i = 0; i < N; i++) {
 for (j = 0; j < N; j++) {
 for (offset = 0; offset < N; offset++) {
 c[j] += a[offset] * b[j][offset];
 }
 }
 }
 end = clock();
 cpu_time2 = ((double)(end - start)) / CLOCKS_PER_SEC;
 
 //output the results and do the comparation
 printf("MPI size : %d\n", size);
 printf("Parallel running time : %f\n", cpu_time1);
 printf("Sequtial running time : %f\n", cpu_time2);
 printf("SpeedUp : %.2f \n", (100*(float)cpu_time2/(float)cpu_time1));
 
 }
 
 /*---------------------------- worker----------------------------*/
 if (rank > 0) {
 source = 0;
 
 //receving the data from master thread
 MPI_Recv(&offset, 1, MPI_INT, source, 1, MPI_COMM_WORLD, &status);
 MPI_Recv(&rows, 1, MPI_INT, source, 1, MPI_COMM_WORLD, &status);
 MPI_Recv(&a, rows * N, MPI_DOUBLE, source, 1, MPI_COMM_WORLD, &status);
 MPI_Recv(&b, N * N, MPI_DOUBLE, source, 1, MPI_COMM_WORLD, &status);
 
 // do the computation
 for (k = 0; k < N; k++) {
 for (i = 0; i < rows; i++) {
 c[k] = 0.0;
 for (j = 0; j < N; j++) {
 c[k] += a[j] * b[j][k];
 }
 }
 }
 
 //send back the data
 MPI_Send(&offset, 1, MPI_INT, 0, 2, MPI_COMM_WORLD);
 MPI_Send(&rows, 1, MPI_INT, 0, 2, MPI_COMM_WORLD);
 MPI_Send(&c, rows * N, MPI_DOUBLE, 0, 2, MPI_COMM_WORLD);
 }
 MPI_Finalize();
 
 }
 
 // Result from the output
 // he95@scholar-fe02:~/SmallProj $ mpiexec -n 2 ./a.out
 // MPI size : 2
 // Parallel running time : 3.700000
 // Sequtial running time : 3.350000
 // SpeedUp : 90.54
 // he95@scholar-fe02:~/SmallProj $ mpiexec -n 4 ./a.out
 // MPI size : 4
 // Parallel running time : 1.270000
 // Sequtial running time : 3.380000
 // SpeedUp : 266.14
 // he95@scholar-fe02:~/SmallProj $ mpiexec -n 8 ./a.out
 // MPI size : 8
 // Parallel running time : 0.620000
 // Sequtial running time : 3.800000
 // SpeedUp : 612.90
 // he95@scholar-fe02:~/SmallProj $ mpiexec -n 16 ./a.out
 // MPI size : 16
 // Parallel running time : 0.360000
 // Sequtial running time : 3.800000
 // SpeedUp : 1055.56
 
 
 
 //Team Member:
 //Haichao Xu
 //Guanshi He
 //ECE563 Large Project Final Turnin MPI Version
 
 #include <mpi.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <time.h>
 #define HASH_TABLE_MAX_SIZE 10000
 #define number_of_processor 4
 #define MAX_LINE_SIZE 100000
 #define MAX_FILE_NUM 32
 #define LINE_NUM 40
 
 int flag[2];
 int global_count = 0;
 char text[MAX_LINE_SIZE * LINE_NUM];
 int hash_table_size;  //the number of key-value pairs in the hash table!
 
 typedef struct HashTable Node;
 struct HashTable
 {
 char sKey[40];
 int nValue;
 Node* pNext;
 };
 
 Node* hashTable[MAX_FILE_NUM][HASH_TABLE_MAX_SIZE * LINE_NUM]; //hash table data strcutrue
 
 //string hash function
 unsigned int hash_table_hash_str(const char* skey)
 {
 const signed char *p = (const signed char*)skey;
 unsigned int hash = 5381;
 int temp;
 while( temp = *skey++) {
 hash = hash * 33 + temp;
 }
 return hash;
 }
 
 //insert key-value into hash table
 void hash_table_insert(const char* skey, int nvalue, int index)
 {
 unsigned int pos = hash_table_hash_str(skey) % HASH_TABLE_MAX_SIZE;
 Node* pNewNode = (Node*)malloc(sizeof(Node));
 memset(pNewNode, 0, sizeof(Node));
 if(hash_table_size >= HASH_TABLE_MAX_SIZE)
 {
 return;
 }
 //pNewNode->sKey = (char*)malloc(sizeof(char) * (strlen(skey) + 1));
 strcpy(pNewNode->sKey, skey);
 pNewNode->nValue = nvalue;
 
 //pNewNode->pNext = hashTable[pos];
 hashTable[index][pos] = pNewNode;
 hash_table_size++;
 }
 
 //lookup a key in the hash table
 Node* hash_table_lookup(const char* skey, int index)
 {
 unsigned int pos = hash_table_hash_str(skey) % HASH_TABLE_MAX_SIZE;
 Node* pHead;
 if(hashTable[index][pos])
 {
 pHead = hashTable[index][pos];
 if (pHead)
 {
 if(strcmp(skey, pHead->sKey) == 0)
 return pHead;
 }
 }
 return NULL;
 }
 
 void remove_punct_and_make_lower_case(char *p)
 {
 char *src = p, *dst = p;
 while (*src) {
 if (ispunct((unsigned char)*src)) {
 src++;
 } else if (isdigit((unsigned char)*src)) {
 src++;
 } else if (isupper((unsigned char)*src)) {
 *dst++ = tolower((unsigned char)*src);
 src++;
 } else if (src == dst) {
 src++;
 dst++;
 } else {
 *dst++ = *src++;
 }
 }
 *dst = 0;
 }
 
 
 // Reader function
 void reader(const char* filename)
 {
 FILE *fp = fopen(filename, "r");
 char word[1024] = {0};
 if(fp == NULL){
 printf("invalid input file");
 exit(1);
 }
 //read word by word
 while(fscanf(fp, " %1023s", word) == 1){
 //convert to lower case
 remove_punct_and_make_lower_case(word);
 strcat(text, word);
 strcat(text," ");
 global_count++;
 }
 fclose(fp);
 }
 
 
 // Mapper function
 void mapper(int each_step, int ex, int pid) {
 char *nextWord;
 char * ptr = NULL;
 int count = 0;
 nextWord = strtok_r(text, " \r\n", &ptr);
 while(nextWord != NULL) {
 if(count < pid * each_step + ex) {
 nextWord = strtok_r(NULL, " \r\n", &ptr);
 }
 else
 break;
 count++;
 }
 count = 0;
 while (nextWord != NULL && count <= pid * each_step + ex) {
 if (hash_table_lookup(nextWord, pid) == NULL) {
 hash_table_insert(nextWord, 1, pid);
 } else {
 hash_table_lookup(nextWord, pid)->nValue++;
 }
 count++;
 nextWord = strtok_r(NULL, " \r\n", &ptr);
 }
 }
 // Reducer function
 
 void reducer(int reduceCount) {
 int i, j, val;
 Node* pHead;
 for(i = 0; i < reduceCount; ++i)
 {
 for (j = 0; j < HASH_TABLE_MAX_SIZE; j++) {
 if(hashTable[j])
 {
 pHead = hashTable[j];
 if(pHead)
 {
 if (hash_table_lookup(pHead->sKey, MAX_FILE_NUM) == NULL)
 hash_table_insert(pHead->sKey, pHead->nValue, MAX_FILE_NUM);
 else {
 val = pHead -> nValue;
 hash_table_lookup(pHead->sKey, MAX_FILE_NUM)->nValue += val;
 }
 
 }
 }
 }
 }
 }
 
 void writer(FILE * f) {
 int i, j;
 Node* p;
 fprintf(f, "====== Printing out the result =====\n");
 for(i = 0; i < HASH_TABLE_MAX_SIZE*LINE_NUM; i++)
 {
 if(hashTable[0] != NULL)
 {
 p = hashTable[0];
 if(p)
 {
 fprintf(f, "Word: %s, Count: %d\n", p->sKey, p->nValue);
 }
 }
 }
 }
 /* ============================test function ============================*/
 int main(int argc, char** argv) {
 // Initialize the MPI environment
 int world_size, world_rank,j;
 MPI_Init(&argc, &argv);
 MPI_Comm_size(MPI_COMM_WORLD, &world_size);
 MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
 MPI_Status status[16];
 MPI_Request request[16];
 int nthreads, tid;
 srand(time(NULL));
 
 hash_table_size = 0;
 memset(hashTable, 0, sizeof(Node*) * HASH_TABLE_MAX_SIZE);
 int i = 0;
 //debug starts here
 MPI_Datatype Particletype;
 MPI_Datatype type[2] = { MPI_INT, MPI_BYTE};
 int blocklen[2] = {40, 1};
 MPI_Aint offsets[2];
 offsets[0] = 0;
 offsets[1] = 40;
 MPI_Type_create_struct(2, blocklen, offsets, type, &Particletype);
 MPI_Type_commit(&Particletype);
 
 //double start = omp_get_wtime();
 int each_step = 0;
 int ex = 0;
 //start READER
 for(i = 0; i < argc - 1; i++) {
 reader(argv[i+1]);
 }
 //calculate for steps and ex
 each_step = global_count / world_size;
 ex = global_count % world_size;
 
 if (world_rank == 0) {
 //broadcast text
 Node rec[world_size][100000];
 //initialization
 int k = 0;
 char * temp = " ";
 for (i = 0; i < world_size; i++) {
 for (k = 0; k < 100000; k++) {
 rec[k].nValue = 0;
 
 strcpy(rec[k].sKey, temp);
 }
 }
 MPI_Bcast(text, MAX_LINE_SIZE * LINE_NUM, MPI_BYTE, 0, MPI_COMM_WORLD);
 //mapper
 char *nextWord;
 char * ptr = NULL;
 int count = 0;
 nextWord = strtok_r(text, " \r\n", &ptr);
 while (nextWord != NULL && count < each_step + ex) {
 #pragma omp critical
 {
 if (hash_table_lookup(nextWord, 0) == NULL) {
 hash_table_insert(nextWord, 1, 0);
 } else {
 hash_table_lookup(nextWord, 0)->nValue++;
 }
 count++;
 nextWord = strtok_r(NULL, " \r\n", &ptr);
 }
 }
 i = 1;
 while(i < world_size) {
 
 MPI_Irecv(&rec, 100000, Particletype, i, i, MPI_COMM_WORLD, request+i-1);
 i++;
 }
 //printf("%s \n %d \n %d \n", text, each_step, global_count);
 MPI_Waitall(world_size-1, request, status);
 //reduce
 int j = 0;
 for (i = 1; i < world_size; i++) {
 for (j = 0; j < 100000; j++) {
 if(rec[j].nValue > 0) {
 if (hash_table_lookup(rec[j].sKey, 0) == NULL) {
 hash_table_insert(rec[j].sKey, rec[j].nValue, 0);
 } else {
 //printf("%s \n %d \n", rec[j].sKey, rec[j].nValue);
 hash_table_lookup(rec[j].sKey, 0)->nValue += rec[j].nValue;
 }
 }
 }
 }
 //writer thread
 FILE *fp = fopen("output.txt", "w");
 writer(fp);
 //double end = omp_get_wtime();
 printf("%f \n", end - start);
 }
 else if(world_rank != 0) {
 MPI_Bcast(text, MAX_LINE_SIZE * LINE_NUM, MPI_BYTE, 0, MPI_COMM_WORLD);
 //mapper
 Node send[100000];
 int index = world_rank;
 int count = 0;
 char* skey;
 mapper(each_step, ex, world_rank);
 //copy to send
 for(i = 0; i < HASH_TABLE_MAX_SIZE; ++i)
 if(hashTable[index])
 {
 Node* pHead = hashTable[index];
 if (pHead)
 {
 send[count].nValue = pHead->nValue;
 skey = pHead->sKey;
 strcpy(send[count].sKey, skey);
 count++;
 }
 }
 MPI_Send(&send, count, Particletype, 0, world_rank, MPI_COMM_WORLD);
 }
 
 j = 0;
 for(i = 0; i < HASH_TABLE_MAX_SIZE; ++i)//free the memory of the hash table
 {
 if(j >= 31) break;
 if(hashTable[j])
 {
 j++;
 Node* pHead = hashTable[j];
 if (pHead)
 {
 Node* pTemp = pHead;
 //pHead = pHead->pNext;
 if(pTemp)
 {
 free(pTemp->sKey);
 free(pTemp);
 }
 }
 }
 }
 
 // Finalize the MPI environment.
 MPI_Finalize();
 
 }
 
 
 
 
 | 
 |