Regular-Sampling-Quick-Sorting (MPI)
A. First Edition
This is simple practicing because I want to prove something.
Actually I want to say that this is intellectually shallow because it is a simple variant of merge-sort which is one of external sorting algorithm used extensively in database to sort very big list which cannot be placed totally in memory.
Originally the definition of sorting in MPI is that lists are scattered around a series of working clusters before sorting. After sorting they still are scattered in those machines except that each segment is sorted and if one element in A is bigger than any element in B, then no element in A would not be smaller than any element in B. (Is it complicated? No, it just says segments are sorted.)
I modify the algorithm a little bit because I think it is meaningless that the sorted lists are still segments
existed in different machines. So, I require whole list will be assembled in one node finally.
1. firstPhase: sorting local segment list and sample elements in sorted list.
2. secondPhase: sampled elements are sent to one machine, say master node, and sorted then broadcasted to all
working node.
3. thirdPhase: each working node use these samples to slice sorted list into sub lists and send back the length
of these sub lists back to master node so that master node can arrange to receive these sub list in corresponding
position in next phase.
4. fourthPhase: receiving these sub lists using temporary buffers . And merge these lists into one final array.
5. zeroPhase: in order to compare with speed of sorting with single machine, I invented this function.
E.Further improvement
F.File listing
1. quicksort.cpp
file name: quick.cpp
#include <stdlib.h> #include <stdio.h> #include <time.h> #include <math.h> #include "mpi.h" int myRank; int mySize; const int SingleArrayLength=10000000; const int MaxArrayElement=1000000; const int MinArrayElement=100; int* myArray; const int WorkerNumber=7; const int SampleOffset=SingleArrayLength/WorkerNumber; const int StartSampleOffset=WorkerNumber/2-1; int* sampleArray; int**mergeBuf; MPI_Request* sampleRequests; const int SAMPLE=100; const int PIVOT=101; const int MaxIntegerPrintLength=10; void myExit() { //printf("\n\nrank %d finishes\n", myRank); } void printArray(int* array, int length, char* comment="Array print out:"); void printArray(int* array, int length, char* comment) { char* buf; char temp[MaxIntegerPrintLength]; int commentLength=0; commentLength=strlen(comment)+5; buf=new char[length*MaxIntegerPrintLength+commentLength]; sprintf(buf, "rank[%d] %s:*****", myRank, comment); for (int i=0; i<length; i++) { sprintf(temp, "%d,", array[i]); strcat(buf, temp); } strcat(buf, "\n"); printf(buf); delete []buf; } void initialize() { int i; atexit(myExit); srand(myRank*time(0)); if (myRank==0) { sampleArray=new int[WorkerNumber*WorkerNumber]; sampleRequests=new MPI_Request[WorkerNumber]; myArray=new int[SingleArrayLength*WorkerNumber]; mergeBuf=new int*[WorkerNumber]; for (i=0; i<WorkerNumber; i++) { mergeBuf[i]=new int[SingleArrayLength]; } } else { sampleArray=new int[WorkerNumber]; sampleRequests=new MPI_Request[1]; //srand(time(0)); myArray=new int[SingleArrayLength]; for (i=0; i<SingleArrayLength; i++) { myArray[i]=rand()%MaxArrayElement+MinArrayElement; } } } int intComp(const void* first, const void* second) { return *(int*)first - *(int*)second; } void zeroPhase() { int i; double start, end; if (myRank==0) { for (i=0; i<WorkerNumber; i++) { sampleRequests[i]=MPI_REQUEST_NULL; MPI_Irecv(myArray+i*SingleArrayLength, SingleArrayLength, MPI_INT, i+1, 0, MPI_COMM_WORLD, sampleRequests+i); } MPI_Waitall(WorkerNumber, sampleRequests, MPI_STATUSES_IGNORE); start=MPI_Wtime(); qsort(myArray, WorkerNumber*SingleArrayLength, sizeof(int), intComp); end=MPI_Wtime(); printf("single machine sorting array of length %d takes %f\n",SingleArrayLength*WorkerNumber, end-start); } else { MPI_Send(myArray, SingleArrayLength, MPI_INT, 0, 0, MPI_COMM_WORLD); } } void firstPhase() { if (myRank!=0) { qsort(myArray, SingleArrayLength, sizeof(int), intComp); //printArray(myArray, SingleArrayLength, "worker data array print out"); //retrieve samples for (int i=0; i<WorkerNumber; i++) { sampleArray[i]=myArray[i*SampleOffset]; } } } void secondPhase() { int i; if (myRank==0) { for (i=0; i<WorkerNumber; i++) { MPI_Irecv(sampleArray+i*WorkerNumber, WorkerNumber, MPI_INT, i+1, SAMPLE, MPI_COMM_WORLD, sampleRequests+i); //MPI_Recv(sampleArray+i*WorkerNumber, WorkerNumber, MPI_INT, i+1, SAMPLE, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } MPI_Waitall(WorkerNumber, sampleRequests, MPI_STATUSES_IGNORE); qsort(sampleArray, WorkerNumber*WorkerNumber, sizeof(int), intComp); //printArray(myArray, SingleArrayLength, "worker's data array\n"); /* for (i=0; i<WorkerNumber*WorkerNumber; i++) { printf("sample[%d]=%d\n", i, sampleArray[i]); } */ //printArray(sampleArray, WorkerNumber*WorkerNumber); for (i=1; i<WorkerNumber; i++) { sampleArray[i-1]=sampleArray[WorkerNumber*i+StartSampleOffset]; } MPI_Bcast(sampleArray, WorkerNumber-1, MPI_INT, 0, MPI_COMM_WORLD); //printArray(sampleArray, WorkerNumber-1, "this is the sampel data broadcasted"); } else { MPI_Ssend(sampleArray, WorkerNumber, MPI_INT, 0, SAMPLE, MPI_COMM_WORLD); MPI_Bcast(sampleArray, WorkerNumber-1, MPI_INT, 0, MPI_COMM_WORLD); } /* for (i=0; i<WorkerNumber-1; i++) { printf("rank[%d][%d]=%d\n", myRank, i, sampleArray[i]); } */ } //it returns the smallest index of which the number is bigger than or equal to the key int binarySearch(int key, int* array, int length) { int front=0, end=length-1; if (key>array[end]) { return length; } if (key<array[front]) { return 0; } int pos=(front+end+1)/2;; while (front<=end) { if (key>array[pos]) { front=pos+1; } else { if (key<array[pos]) { end=pos-1; } else { break; } } pos=(front+end+1)/2; } return pos; } void thirdPhase() { int i, flag; //do binary search if (myRank!=0) { for (i=0; i<WorkerNumber-1; i++) { //printf("rank[%d]key=%d\n", myRank, sampleArray[i]); sampleArray[i]=binarySearch(sampleArray[i], myArray, SingleArrayLength); //printf("rank[%d][%d]=%d and the data myArray[%d]=%d\n", myRank, i, sampleArray[i],sampleArray[i], myArray[sampleArray[i]] );//for testing //printf("before %d and after %d \n", myArray[sampleArray[i]-1], myArray[sampleArray[i]+1]); } sampleArray[WorkerNumber-1]=SingleArrayLength; MPI_Ssend(sampleArray, WorkerNumber, MPI_INT, 0, PIVOT, MPI_COMM_WORLD); //printArray(sampleArray, WorkerNumber, "worker sampleArray print out in third phase"); } else { for (i=0; i<WorkerNumber; i++) { sampleRequests[i]=MPI_REQUEST_NULL; MPI_Irecv(sampleArray+i*WorkerNumber, WorkerNumber, MPI_INT, i+1, PIVOT, MPI_COMM_WORLD, sampleRequests+i); //MPI_Recv(sampleArray+i*WorkerNumber, WorkerNumber, MPI_INT, i+1, PIVOT, MPI_COMM_WORLD, MPI_STATUS_IGNORE); //sampleArray[(i+1)*WorkerNumber]=WorkerNumber; } //printArray(sampleArray, WorkerNumber*WorkerNumber, "master sampleArray print out in third phase"); MPI_Waitall(WorkerNumber, sampleRequests, MPI_STATUSES_IGNORE); /* for (i=0; i<WorkerNumber*WorkerNumber; i++) { printf("sample[%d]=%d\n", i, sampleArray[i]); } */ } } void doMerge(int** mergeBuf, int* lengthArray,int& currentPos) { int indexArray[WorkerNumber]; int i, candidate, candidateIndex; bool beFirst=true, allOver=false; for (i=0; i<WorkerNumber; i++) { indexArray[i]=0; } do { beFirst=true; allOver=true; for (i=0; i<WorkerNumber; i++) { if (indexArray[i]<lengthArray[i]) { allOver=false; if (beFirst) { beFirst=false; candidate=mergeBuf[i][indexArray[i]]; candidateIndex=i; } else { if (candidate>mergeBuf[i][indexArray[i]]) { candidate=mergeBuf[i][indexArray[i]]; candidateIndex=i; } } } } if (allOver) { break; } myArray[currentPos]=candidate; currentPos++; indexArray[candidateIndex]++; } while (true); } void fourthPhase() { int i, j, flag; int* sizePtr; int* dataPtr; int currentPos=0; int previous, current; int length; //MPI_Request* tempRequests; int lengthArray[WorkerNumber]; if (myRank==0) { //tempRequests=new MPI_Request[WorkerNumber*WorkerNumber]; //printArray(sampleArray, WorkerNumber*WorkerNumber, "before 4th phase, let' see sample Array\n"); for (i=0; i<WorkerNumber; i++)//the index of worker node { for (j=0; j<WorkerNumber; j++)//the index within worker node index { sizePtr=sampleArray+j*WorkerNumber+i; if (i==0) { previous=0; current=*sizePtr; } else { if (i==WorkerNumber-1) { current=SingleArrayLength; } else { current=*sizePtr; } previous=*(sampleArray+j*WorkerNumber+i-1); } //printf("\n current=%d, previous=%d\n", current, previous); lengthArray[j]=current - previous; //currentPos+=length; //dataPtr=myArray+currentPos; sampleRequests[j]=MPI_REQUEST_NULL; //printf("\nmaster begins\n"); if (lengthArray[j]>0) { //MPI_Irecv(dataPtr, length, MPI_INT, j+1, j*10+i, MPI_COMM_WORLD, tempRequests+j*WorkerNumber+i); //printf("\nmaster begin to recv data from rank %d of length %d\n", j+1, lengthArray[j]); MPI_Irecv(mergeBuf[j], lengthArray[j], MPI_INT, j+1, j*10+i, MPI_COMM_WORLD, sampleRequests+j); //MPI_Recv(mergeBuf[j], lengthArray[j], MPI_INT, j+1, j*10+i, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } //printf("\nmaster after prints\n"); } MPI_Waitall(WorkerNumber, sampleRequests, MPI_STATUSES_IGNORE); doMerge(mergeBuf, lengthArray, currentPos); /* printf("\nmaster after tests of %d\n", i+1 ); for (int k=0; k<WorkerNumber; k++) { //printf("\nmaster going to print %d\n", lengthArray[k]); if (lengthArray[k]>0) { printArray(mergeBuf[k], lengthArray[k], "Master receive segment"); } } */ } //MPI_Testall(WorkerNumber*WorkerNumber, tempRequests, &flag, MPI_STATUSES_IGNORE); } else { for (i=0; i<WorkerNumber; i++) { sizePtr=sampleArray+i; if (i==0) { previous=0; current=*sizePtr; } else { if (i==WorkerNumber-1) { current=SingleArrayLength; } else { current=*sizePtr; } previous=*(sampleArray+i-1); } dataPtr=myArray+previous; length=current-previous; currentPos+=length; if (length>0) { //printArray(dataPtr, length, "going to send segment"); MPI_Send(dataPtr, length, MPI_INT, 0, (myRank-1)*10+i, MPI_COMM_WORLD); } } } } void testArray() { int previous=myArray[0], current=myArray[0]; for (int i=1; i<WorkerNumber*SingleArrayLength; i++) { current=myArray[i]; if (current<previous) { printf("sorting error at %d with %d > %d\n", i, previous, current); //exit(4); } previous=current; //printf("rank[%d][%d]=%d\n", myRank, i, myArray[i]); } } int main(int argc, char** argv) { double start, end; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &myRank); MPI_Comm_size(MPI_COMM_WORLD, &mySize); initialize(); zeroPhase(); if (myRank==0) { start=MPI_Wtime(); } firstPhase(); //printf("\nfirst phase ends\n"); secondPhase(); //printf("\nsecond phase ends\n"); thirdPhase(); //printf("\nthird phase ends\n"); fourthPhase(); //printf("\nfourth phase ends\n"); if (myRank==0) { end=MPI_Wtime(); printf("distributing system sorting takes %f\n", end-start); } return 0; }
running result:
single machine sorting array of length 7000 takes 0.003865 distributing system sorting takes 0.901712 0.000u 0.012s 0:01.69 0.5% 0+0k 0+0io 0pf+0w single machine sorting array of length 70000 takes 0.042404 distributing system sorting takes 0.899044 0.000u 0.008s 0:01.99 0.0% 0+0k 0+0io 0pf+0w single machine sorting array of length 700000 takes 0.499217 distributing system sorting takes 0.818472 0.004u 0.012s 0:02.23 0.4% 0+0k 0+0io 0pf+0w single machine sorting array of length 7000000 takes 10.278250 distributing system sorting takes 3.047682 0.000u 0.004s 0:18.02 0.0% 0+0k 0+0io 0pf+0w single machine sorting array of length 70000000 takes 66.025549 distributing system sorting takes 13.169787 0.004u 0.004s 1:30.91 0.0% 0+0k 0+0io 0pf+0w