synchronization

1.  How do I achieve synchronization?

a) By using a semaphore:

In java, you can use counting semaphore as a mutex by setting "available number" to be one.

i.e.

Semaphore available = new Semaphore(1);
//entering critical_section
available.acquire();
//doing what you must do and this part should be short. Be careful, don't do blocking job here, otherwise you are dead.
...
//leaving critical_section
available.release();
 

b) By using a file lock:

i) How to get?

In java, you can acquire a FileLock from FileChannel.

i.e.

RandomAccessFile myFile=new RandomAccessFile("myFile.txt", "rw");

...

//Be careful! This is a blocking call and you may get deadlock if you don't handle properly

FileLock myLock= myFile.getChannel().lock(); //there are several versions of locking, see below

//You reach here means you already get the file lock and you don't have to worry file may be changed by others.

...

ii) How to use?

In java, filelock is mandatory which means even the owner must "release" filelock before you can modify file.

i.e.

RandomAccessFile myFile=new RandomAccessFile("myFile.txt", "rw");

...

FileLock myLock= myFile.getChannel().lock();

//If you reach here, it means you get the lock. But then why should I release what I just get? This seems to be insane. :)

myLock.release(); //And here you have a synchronization flaw and I leave it to you to solve.

//operating on file

...

 

iii) Choice: blocking or non-blocking?

Basically it is your design choice to use either blocking or non-blocking call of various "lock" method, but there is some

subtle issue you may encounter in your server implementation. Blocking version (i.e. FileChannel.lock() ) has potential

risk of deadlock and usually has a low throughput. So, I suggest you always choose non-blocking version like "tryLock()".

However, you have to handle the exception in case the file is already locked by others. See explanation of sun.com.

c) By using java key word synchronized to declare functions:

a) "synchronized" key words is class-wise:

In java language, there is a native synchronization mechanism by declaring functions as "synchronized" so that "all" these methods

become "synchronized", i.e. function call becomes "atomic".

class Foo

{

    boolean critical;

    int synchronized funcA()

    {

        //by declaring "synchronized", we don't need to worry about the below context switch problem.

        if (critical)  //this is the risky place because context switch may happen here if we don't declare "synchronized"

                return 100 

        else

            return 200;

    }

    int synchronized funcB()

    {

        if (critical) //this is the risky place because context switch may happen here if we don't declare "synchronized"

            return 300

        else

            return 400;

    }

...

}

Now assume that one object instance of Foo is called by multiple threads then we don't need to do any synchronization at all. Please note not only multiple "funcA" calls are called one by one, also multiple "funcA" and "funcB" calls are also synchronized because "synchronized" key word is for all "synchronized" methods within class Foo! i.e. All "synchronized" methods of class Foo is automatically put in a "logical queue" and is executed exactly one by one no matter how many threads there are.

b) a big trap of "synchronized"

In our project implementation, we once encounter a deadlock scenario by miss-using both  synchronized key word because when a method is declared to be synchronized it becomes unable to re-enter. See the following code:

Just imagine that you have two threads running in your server. And one of them is a monitor threads, i.e. TCP 
server socket, and the other one is a working thread which is doing some job, i.e. file operation. The monitor
threads is calling "funcA" to check a flag "done" which may be modified by another thread by calling "funcB".
If the status "done" is modified, the monitoring job is finished. This can be very common in both our assignment
and project. However, the subtle thing is that by declaring both "funcA" and "funcB" as synchronized, you are
making them both atomic call. When "threadA" (the monitor thread) is running, "threadB" can never be able to 
call "funcB" and you are dead.
import java.io.*;

public class MyClass
{
	static boolean done=false;
	public static void main(String argv[])
	{
		MyThread threadA=new MyThread(0);
		MyThread threadB=new MyThread(1);
		System.out.println(" starts two threads");
		threadA.start();
		
		threadB.start();
			
		System.out.println(" two threads started");
	}


	static synchronized void funcA()
	{
		try
		{
			while (!done)
			{
				System.out.println("done is false and sleep...");
				Thread.sleep(1000);
			}
		}
		catch(Exception er)
		{
		}
	}

	static synchronized void funcB()
	{
		System.out.println("I never reach here!");
		done=true;
	}	

	static class MyThread extends Thread
	{
		int myID;
		public MyThread(int id)
		{
			myID=id;
		}
		public void run() 
		{
			if (myID==0)
			{
				funcA();
			}
			else
			{
				funcB();
			}
		}
	}
			
}	

        performance comparison between conditional wait and naive mutex

In linux, there is a superior synchronization tool, namely conditional wait. I design a simple performance test to compare

the performance between naive mutex with conditional wait. The outcome is stunning! Conditional wait finishes instantly

within one second. And naive mutex needs as long as 102seconds. And what's more, I have to force the idle thread "sleep",

otherwise it is not progressing at all!

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/time.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t dummy = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  cond  = PTHREAD_COND_INITIALIZER;

//void *functionCount1(void*);
//void *functionCount2(void*);

void* condProc(void*param);
void* mutexProc(void*param);
int  count = -1;
#define COUNT_DONE  50
//#define COUNT_HALT1  3
//#define COUNT_HALT2  6

typedef void*(*ThreadProc)(void*);

const int MaxThreadNumber=10;

pthread_t threads[MaxThreadNumber];
struct timeval start, end;
//int threadID[MaxThreadNumber];
//ThreadProc threadProc=myProc;
int i;
float result;

int totalSec, totalUsec;
void perform_test(ThreadProc proc)
{
	count=-1;
	float tempStart, tempEnd;
	int usec, sec;
	pthread_mutex_init(&mutex, NULL);
	pthread_cond_init(&cond, NULL);
	for (i=0; i<MaxThreadNumber; i++)
	{
		pthread_create(threads+i,NULL, proc, (void*)&i);
	}  
	gettimeofday(&start, NULL);
	count=0;
	pthread_cond_broadcast(&cond);

	for (i=0; i<MaxThreadNumber; i++)
	{
		pthread_join(threads[i], NULL);
	}
	gettimeofday(&end, NULL);

	if (end.tv_usec>start.tv_usec)
	{
		usec=end.tv_usec-start.tv_usec;
		sec=end.tv_sec-start.tv_sec;
	}
	else
	{
		usec=1000000+end.tv_usec-start.tv_usec;
		sec=end.tv_sec-1-start.tv_sec;
	}
/*		
	tempEnd=end.tv_sec+(float)end.tv_usec/1000000.0;

	tempStart=start.tv_sec+(float)start.tv_usec/1000000.0;
	printf("start=%f, end=%f\n", tempStart, tempEnd);
	result=tempEnd-tempStart;
	printf("the total time is: %f\n", result);
*/
	printf("sec=%d, usec=%d\n", sec, usec);
	totalSec+=sec;
	totalUsec+=usec;

}

int main()
{
	float total;

	int i;
	const int TotalTestNumber=20;
	total=0;
	totalSec=totalUsec=0;
	for (i=0; i<TotalTestNumber; i++)
	{
		
		printf("condition [%d]\n", i);
		perform_test(condProc);
		total+=result;
		//printf("condition[%d]: %f\n", i, result);
	}
	//printf("condition wait is on average of %f\n", total/(float)TotalTestNumber);
	printf("condition wait is sec=%d, usec=%d\n", totalSec, totalUsec);
	total=0.0;
	totalSec=totalUsec=0;
	for (i=0; i<TotalTestNumber; i++)
	{
		
		printf("mutex[%d]\n", i);
		perform_test(mutexProc);
		total+=result;
		//printf("mutex[%d]:  %f\n", i, result);
	}

	//printf("mutex wait is on average of %f\n", total/(float)TotalTestNumber);
	printf("mutex wait is sec=%d, usec=%d\n", totalSec, totalUsec);
	
	return 0;
}

void* condProc(void* param)
{
	int index=*((int*)param);
	//printf("thread %d starts...\n", index);
	pthread_mutex_lock(&mutex);
	while (count<0)
	{
		pthread_cond_wait(&cond, &mutex);
	}
	pthread_mutex_unlock(&mutex);
	pthread_cond_broadcast(&cond);

	while (true)
	{
		pthread_mutex_lock(&mutex);
		
		while (count%MaxThreadNumber!=index&&count<COUNT_DONE)
		{
			pthread_cond_wait(&cond, &mutex);
		}
		if (count==COUNT_DONE)
		{
			pthread_mutex_unlock(&mutex);
			pthread_cond_broadcast(&cond);
			break;			
		}
		else
		{		
	
			//printf("thread %d doing work to inc count %d\n", index, count);
			count++;
			pthread_mutex_unlock(&mutex);
			//sleep(rand()%2);
			pthread_cond_broadcast(&cond);
			
		}
	}
}
		
void* mutexProc(void* param)
{
	int index=*((int*)param);
	/*
	pthread_mutex_lock(&mutex);
	while (count<0)
	{
		pthread_cond_wait(&cond, &mutex);
	}
	pthread_mutex_unlock(&mutex);
	pthread_cond_broadcast(&cond);
*/
	printf("thread %d starts...\n", index);
	while (true)
	{
		//printf("thread %d\n", index);
		pthread_mutex_lock(&mutex);
		
		if (count%MaxThreadNumber==index)
		{
			count++;
		}
		pthread_mutex_unlock(&mutex);
		sleep(1);
		if (count==COUNT_DONE)
		{
			break;			
		}
	}
}

		


/*
void *functionCount1(void* param)
{
   for(;;)
   {
	sleep(rand()%2);
	printf("before mutex lock 1\n");
      pthread_mutex_lock( &condition_mutex );
	sleep(rand()%2);
      while( count%2 ==0 )
      {
	sleep(rand()%2);
	printf("before cond wait 1\n");
         pthread_cond_wait( &condition_cond, &condition_mutex );
	printf("after cond wait 1\n");
	sleep(rand()%2);
      }
	sleep(rand()%2);
	count++;
	printf("Counter value functionCount1: %d\n",count);
      pthread_mutex_unlock( &condition_mutex );
	sleep(rand()%2);
   
      
      
     
      if(count >= COUNT_DONE) return(NULL);
    }
}

void *functionCount2(void* param)
{
    for(;;)
    {
	sleep(rand()%2);
	printf("before mutex lock 2\n");
       pthread_mutex_lock( &condition_mutex );
	printf("after mutex lock 2\n");
	sleep(rand()%2);
       if( count%2!=0 )
       {
		sleep(rand()%2);
		printf("before signal 2\n");
	       pthread_mutex_unlock( &condition_mutex );
        	  pthread_cond_signal( &condition_cond );
		printf("after signal 2\n");
		sleep(rand()%2);
       }
	else
	{
		count++;
		printf("Counter value functionCount2: %d\n",count);
	       pthread_mutex_unlock( &condition_mutex );
		sleep(rand()%2);
     	}
	

	sleep(rand()%2);
    
  

       if(count >= COUNT_DONE) return(NULL);
    }

}
*/