Mix Master Threading

cocktail So in a bar, you can have someone pour multiple liquids at once or do one liquid after another.  Most can be poured all at once however.  So they can be poured asynchronously.  Designing the system to do one or the other style pour is rather easy.  Mixing the two caused a bit of a headache much like drinking too much.  A drink example that uses this case is a long island ice tea.  You need a few types that can be poured at once then you end it with a splash of coke.

So how to solve this?

Since we’ll need thread to open multiple relays at the same time and creating threads is expensive, we’ll use a ThreadPool object to help manage and keep old threads alive.  Reusing threads is not just nice, it is the American way of life.  Just remember that.

Another massive thing to remember is finding out what resources are thread safe.  MSDN will tell you at the bottom what is thread safe.

First we’ll set up all the toys we’ll need to set this up, some AutoResetEvents, an int, bools, some Mutex, and a Semaphore.  Yes this is a lot of stuff. 

A mutex?  semaphore?  AutoResetEvent?  muh?  If you can’t remember what these toys are, I’ll explain using a line to get into a bar analogy.  

  • Mutex: Kind of like a bouncer.  It makes sure the path to the floor is clear, if it isn’t, it won’t let you get in.
  • Semaphore: Allows only so many in to party at the same time.
  • AutoResetEvent: You get a buzzer that tells you when you are cool enough to enter.  When you get enough girls to get in, your buzzer goes off and you’re allowed in.  Until then, stand out in the cold.  Similar to a mutex, but different.

If you need extra help, I suggest checking out Wikipedia, they’ll have some helpful demos, so will MSDN.

Code:

public delegate void RelayEventHandler(object sender, RelayCommand e);
public event RelayEventHandler RelayStartedEvent;
public event RelayEventHandler RelayExecutedEvent;

readonly Semaphore _pool = new Semaphore(1, 1);
readonly AutoResetEvent sequentialWorkWait = new AutoResetEvent(true);
readonly AutoResetEvent asyncToSequentialWorkWait = new AutoResetEvent(false);
readonly Queue<RelayCommand> commands = new Queue<RelayCommand>();
readonly Mutex writingToComport = new Mutex();
readonly Mutex addingItemsToQueue = new Mutex();

bool doingWork;
bool asyncWorkMode;
int concurrentWorkers;

This entire code is broken up in three methods:

  • AddCommand
  • processQueue (Singleton thread)
  • doWork (multithreaded)

Now lets add stuff.  We need to use a mutex to be sure commands are added in the proper order.  This may be overkill but better safer than sorry.  With a Mutex, you call WaitOne() to signal you’re in a critical area of shared code and only you can have access to it.  All other threads must wait until ReleaseMutex() is called.  With WaitOne, you can set a timeout, in my instance, I’m not because I trust I coded this properly.

public void AddCommand(params RelayCommand[] Commands)
{
	addingItemsToQueue.WaitOne();

	foreach (RelayCommand command in Commands)
		commands.Enqueue(command);

	addingItemsToQueue.ReleaseMutex();

	if (!doingWork && commands.Count > 0)
		ThreadPool.QueueUserWorkItem(processQueue);
}

Now instead of showing the queue processing function, we’ll switch to the worker function.  This can actually be altered to whatever you need by swapping line 7 to 9 and changing what state object you pass in.

  1. If the UI needs to know when work has started, it will be notified.
  2. We do work
  3. Subtract the amount of concurrent workers we have
  4. If the command is sequential, we’ll signal we’re all done so the next call can happen
  5. If the command is asynchronous but we’ve queued up a sequential call, we’ll check to be sure all work is done, if so we’ll signal the sequential work can now start as all asynchronous work has completed.
  6. If the UI needs to know when work has finished, it will be notified.

private void doWork(object state)
{
	if (state == null) return;
	
	var command = (RelayCommand)state;

	if (RelayStartedEvent != null)
			RelayStartedEvent.Invoke(this, command);

	On(command.Bank, command.Relay);
	Thread.Sleep(command.Milliseconds);
	Off(command.Bank, command.Relay);

	concurrentWorkers--;

	if (!command.Asynchronous)
		sequentialWorkWait.Set();
	else
		// now signaled to be in async mode 
		// and all async'ed work is done
		if (!asyncWorkMode && concurrentWorkers == 0)
			asyncToSequentialWorkWait.Set();

	if (RelayExecutedEvent != null)
		RelayExecutedEvent.Invoke(this, command);
}

Now the heavy lifter, the queue processor.  Since we are using a ThreadPool, we want to be utterly sure only one processor is happening else stuff may get weird, so for this we use a Semaphore.  I actually think the code comments do a good job of explaining what is going on and why.  If not I’ll go in and explain further.  I have the doingWork Boolean just to prevent extra threads from being created for no reason if more commands are being added while work is being done.

private void processQueue(object state)
{
	// force only one of these types of thread procs
	// to execute at any time
	_pool.WaitOne();
	doingWork = true;
	var firstWorkerLoop = true;
	while (commands.Count > 0)
	{
		var command = commands.Dequeue();
		// can't pop until inside loop
		if (firstWorkerLoop)
			asyncWorkMode = command.Asynchronous;

		firstWorkerLoop = false;
		if (!command.Asynchronous)
		{
			if (asyncWorkMode)
			{
				asyncWorkMode = false;
				asyncToSequentialWorkWait.WaitOne();
				// after every sequential call, we reset to blocking
				// need to allow this to continue
				sequentialWorkWait.Set();
			}

			sequentialWorkWait.WaitOne();
			// on next loop, thread will wait now
			sequentialWorkWait.Reset(); 
		}
		else // asynced
		{
			if (!asyncWorkMode)
			{
				// have to wait for current pour to finish.
				sequentialWorkWait.WaitOne();
				// have to reset so sync will wait for
				// all async to finish
				asyncToSequentialWorkWait.Reset(); 
			}

			asyncWorkMode = true;
		}

		concurrentWorkers++;
		ThreadPool.QueueUserWorkItem(doWork, command);
	}
	doingWork = false;
	_pool.Release();
}

Who knew making mixed drinks could involve mutually exclusive resources?

No comments posted yet.

Post a Comment

Please add 3 and 7 and type the answer here: