Cigarette smoker’s problem

Q. This problem was originally described by Suhas Patil in 1971.
Three chain smokers are together in a room with a vendor of cigarette supplies. To make and use a cigarette, each smoker need three ingredients: tobacco, paper and matches, all of which the vendor has ample in supply. One smoker has his own tobacco, second has his own paper and third has his own matches.
The action begins when the vendor puts two of the ingredients on the table, to allow one of the smoker to commit the unhealthy act. When the appropriate smoker is done he wakes up the vendor, who then puts two more ingredients at random on table, thus unblocking another smoker. Write a program to simulate smokers and the agent behaviour.

Ans:
Vendor routine:
1. Generate two items randomly and put them on table.
2. Broadcast the information to all the smokers.
3. Go to sleep till the particular smoker, creates cigarette and is done with smoking.

Smokers routine:
1. Wait for the two ingredients to be made available on table.
2. create the cigarette and smoke.
3. Wakeup the sleeping vendor.

For semaphore, condition variable and mutex pseudo implementation check this.

 

#define TOBACCO 0
#define PAPER 1
#define MATCHES 2

class CigaretteSmoker {
public:
    CigaretteSmoker();
    ~CigaretteSmoker();
    void
    vendorRoutine();
    void
    smokerRoutine(
        IN byte neededItem1
        IN byte neededItem2
        );

private:
    bool m_availableItems[3];
    ConditionMutex *m_condMutex;
    Semaphore m_vendorSleep;
};

CigaretteSmoker()
{
    // No item is available pn table.
    m_availableItems[TOBACCO] = false;
    m_availableItems[PAPER] = false;
    m_availableItems[MATCHES] = false;

    m_condMutex = new ConditionMutex();
    m_vendorSleep = new Semaphore(0);
}

// One thread for vendor will be spawned to execute below routine.
void
vendorRoutine()
{
    while (true) {
        byte item1 = generateRandomIngredient();
        byte item2 = generateRandomIngredient();

        m_condMutex->lock();
            // Make items available on table.
            m_availableItems[item1] = true;
            m_availableItems[item2] = true;

            // Announce to all smokers that items are made available on table.
            m_condMutex->broadcast();
            // Note the use of broadcast all smokers will be unblocked.
            // But only one smoker will have all 3 ingredients
            // and can start smoking.
        m_condMutex->unlock();

        // Go to sleep till the selected smoker is done with smoking.
        m_vendorSleep->wait();
        // Note semaphore is initialized with 0.
    }
}

// Three threads, one for each smoker will be spawned to execute below routine.
// Each will need different set of {item1, item2}.
void
smokerRoutine(
    IN byte neededItem1
    IN byte neededItem2
    )
{
    while (true) {
        m_condMutex->lock();
            // Block till the needed items are on table.
            while ((m_availableItems[neededItem1] != true) ||
                   (m_availableItems[neededItem2] != true)) {
                m_condMutex->wait();
            }
        m_condMutex->unlock();

        // Pickup the items from the table.
        m_availableItems[neededItem1] = false;
        m_availableItems[neededItem2] = false;
        // All ingredients are with you start smoking.
        startUnhealthyActSmoking();
        // Smoking is done, wakeup the sleeping vendor.
        m_vendorSleep->post();
    }
}
Advertisements

Task scheduler

Q. Design a task scheduler module with the constraint of maximum number of tasks allowed to run in parallel. A task is independent process that needs to be spawned.

Ans: Task scheduler workflow:
1. When new task launch is requested, check if maximum number of allowed parallel tasks are already running or not. If yes, block the request till there is room for new task to launch. If no, launch the new task.
2. When new task is launched, spawn a thread to watch the task progress. When the task is complete unblock the launch of waiting / new tasks.

The solution uses vfork() and execl() for task launch and waitpid to wait till the end of task.

For semaphore, condition variable and mutex pseudo implementation check this.

#define MAX_PARALLEL_TASKS 16

class TaskScheduler
{
public:

    TaskScheduler();
    ~TaskScheduler();
    void
    scheduleTask();

private:

    void
    watchSubjob(
        IN void *data
        );
    void
    endTask();
    Semaphore *m_semaphore;
};

TaskScheduler::TaskScheduler()
{
    // Total slots for task launching: MAX_PARALLEL_TASKS
    m_semaphore = new Semaphore(MAX_PARALLEL_TASKS);
}

void
TaskScheduler::scheduleTask()
{
    // Block if there is no room to launch task.
    m_semaphore->wait();
    // One task slot consumed.

    // Launch new task.
    pid_t pid = vfork();
    if (0 == pid) {
        // Inside the child process, exec new task.
        execl(TASK_EXE_PATH, TASK_EXE, argv1...);
    }
    // Inside the parent process.
    // Fork a thread to wait for the child exit.
    pthread_t thread;
    pthread_create(&thread, NULL, watchSubjob, (void *) (&pid));
}

void
TaskScheduler::watchSubjob(
    IN void *data
    )
{
    pid_t pid = *((pid_t *) data);
    // Wait for the process to end.
    waitpid(pid, NULL, 0);
    endTask();
}

void
TaskScheduler::endTask()
{
    // The task is ended, announce it by:
    // releasing one slot for task launcher.
    m_semaphore->post();
}

There is a problem with above code. Line number 46 has an issue:

    pthread_create(&thread, NULL, watchSubjob, (void *) (&pid));

The problem is variable pid is a local variable and we are passing it to a thread. Now what the thread function TaskScheduler::watchSubjob receives as the argument solely depends on the thread scheduling algorithm.
If the newly created thread is scheduled before the thread creator function TaskScheduler::scheduleTask returns, then the pid value is correctly received in the thread function TaskScheduler::watchSubjob
But if the newly created thread is scheduled after the thread creator function TaskScheduler::scheduleTask returns. Then the stack frame for that function is wind up before the thread starts execution. Thus the pid value referred in thread function TaskScheduler::watchSubjob may contain garbage.

That is why you should never pass a stack variable to a thread function as a argument.
Correct function should look like below:

void
TaskScheduler::scheduleTask()
{
    // Block if there is no room to launch task.
    m_semaphore->wait();
    // One task slot consumed.

    // Launch new task.
    pid_t *pid = new pid_t
    *pid = vfork();
    if (0 == *pid) {
        // Inside the child process, exec new task.
        execl(TASK_EXE_PATH, TASK_EXE, argv1...);
    }
    // Inside the parent process.
    // Fork a thread to wait for the child exit.
    pthread_t thread;
    pthread_create(&thread, NULL, watchSubjob, (void *) (pid));
    // After using pid, watchSubjob should free the memory
}

FIFO mutex

Q. Last post for non starving reader-writer ended with a need for FIFO mutual exclusion. Mutex provides mutual exclusion but does not guarantee FIFO scheduling for critical section access. How to build FIFO mutex?

Ans: Ticket locks are used to implement the FIFO mutex.
1. The idea is to have a central ticketing policy. There are two tickets: “now_serving” and the “next_available”.
2. Every thread for acquiring the lock gets the “next_available” ticket and blocks until the “next_available” becomes the “now_serving” ticket.
3. While unlocking the lock, the thread increments the “now_serving” ticket to allow next thread in FIFO to acquire the lock.

class FifoMutex
{
public:

    FifoMutex();
    ~FifoMutex();
    void
    lock();
    void
    unlock();

private:

    UINT32 m_nowServing;
    UINT32 m_nextAvailable;
    ConditionMutex *m_condMutex;
};

FifoMutex::FifoMutex()
{
    m_nowServing = 0;
    m_nextAvailable = 0;
    m_condMutex = new ConditionMutex();
}
void
FifoMutex::lock()
{
    UINT32 myTicket;

    // Get your ticket and increment the m_nextAvailable.
    m_condMutex->lock();
        myTicket = m_nextAvailable;
        m_nextAvailable++;
    m_condMutex->unlock();

    // Wait till your ticket is getting served.
    m_condMutex->lock();
        while (m_nowServing != myTicket) {
            m_condMutex->wait();
        }
    m_condMutex->unlock();
}

void
FifoMutex::unlock()
{
    // Increment the m_nowServing and wakeup all blocked threads.
    m_condMutex->lock();
        m_nowServing++;
        // Pl note we are broadcasting the wakeup call.
        // We must broadcast since we don't know 
        // which thread should acquire the lock next.
        // All the waiting threads will be unblocked, 
        // only one of them will pass condition: "m_nowServing == myTicket"
        m_condMutex->broadcast();
    m_condMutex->unlock();
}

With this FifoMutex we can easily convert non starving reader-writer solution to Strict ordering reader writer solution just by replacing the datatype of member variable of class RWlockType3. Replace the member variable Mutex *m_mutex; with new datatype as FifoMutex *m_mutex; and your solution for Strict ordering reader writer is ready!!!

Reader Writer – Non starving

Q. The reader preference solution ends up starving the writers. The writer preference solution ends up starving the readers.
How to design a reader-writer lock problem with no starvation. No starvation means any thread should not wait indefinitely to acquire the lock.

Ans. Lets develop no starvation solution with a requirement that no thread should be blocked to acquire the lock by any other thread which is ready to acquire the lock at later point in time. To develop no starvation locks we are following lock request order. Lets consider a lock request order as shown below:
…. R1, R2, R3, R4, R5, W1 …..
R stands for read request and W stands for write request.
In this scenario, the order in which reader threads: “R1, R2, R3, R4” acquires shared lock and get access to shared object has no impact on the shared object and also note that one reader thread can not starve another reader thread. The only restriction is all the reader threads: “R1, R2, R3, R4” must acquire shared lock (in any order) before the writer thread W1 attempts to acquires the exclusive lock. To avoid this before a writer thread puts lock request there should not be any reader thread waiting to acquire the shared lock.

Lets consider another lock request sequence as shown below:
… W1, W2, W3, R1 …
When writer thread W1 has acquired the exclusive lock, if we allow both writers W2 & W3 to proceed with the lock request, both these writers will compete with each other to get the exclusive lock (kernel thread scheduling will decide which one of them will be unblocked once W1 gives up the exclusive lock.) Similarly when W2 has acquired the exclusive lock and if we allow both W3 & R1 to proceed with the lock request, both W3 and R1 will compete for the shared object access. To avoid this situation till a writer thread doesn’t give up the exclusive lock we must not allow other threads to put the lock request.

The strict ordering is based on two rules:
1. Before allowing a writer thread to compete for exclusive lock, one must make sure there are no readers waiting to acquire the shared lock.
2. When a writer has granted the exclusive lock no other lock request should be put till the writer thread gives up exclusive lock.

The below solution uses reader preference read-write lock for exclusive & shared access. For semaphore, condition variable and mutex pseudo implementation check this. In below code lock request granter function getAccess() decides the lock request order for read & write operations.

#define READ_REQUEST  1
#define WRITE_REQUEST 2

class RWlockType3
{
public:

    RWlockType3();
    ~RWlockType3();
    void
    acquireReadLock();
    void
    releaseReadLock();
    void
    acquireWriteLock();
    void
    releaseWriteLock();

private:

    UINT32 m_waitingReaderCount;
    Mutex *m_mutex;
    ConditionMutex *m_condMutex;
    Semaphore *m_writer;
    RWlockType1 *m_rwLock;

    void
    getAcess(
        byte requestType
        );
};

RWlockType3::RWlockType3()
{
    m_waitingReaderCount = 0;
    m_mutex = new Mutex();
    m_condMutex = new ConditionMutex();
    // Initialize semaphore with 0.
    m_writer = new Semaphore(0);
    m_rwLock = new RWlockType1();
}

void
RWlockType3::getAcess(
    byte requestType
    )
{
    // At a time, only one lock requester thread beyond this point.
    // (The mutex lock acquisition decides the lock request order.) 
    m_mutex->lock();
        // If it is read lock request
        // increment the waiting reader count.
        if (READ_REQUEST == requestType) {
            m_condMutex->lock();
                m_waitingReaderCount++;
            m_condMutex->unlock();
        } else if (WRITE_REQUEST == requestType) {
            // If it is write request
            // block till all the previous readers have acquired shared lock.
            m_condMutex->lock();
                while (m_waitingReaderCount != 0) {
                    m_condMutex->wait();
                }
            m_condMutex->unlock();
            // Do not accept any new lock request till writer has 
            // given up the exclusive lock.
            // Pl note that semaphore is initialized with 0.
            // this wait will block the calling thread till other thread
            // calls the post operation.
            m_writer->wait();
        }
    m_mutex->unlock();
    // Unblock other lock requester threads.
}

void
RWlockType3::acquireReadLock()
{
    // Get the permission to acquire read lock.
    getAcess(READ_REQUEST);
    // Acquire read lock.
    m_rwLock.acquireReadLock();
    // Read lock acquired. Decrement the waiting reader count.
    m_condMutex->lock();
        m_waitingReaderCount--;
        if (0 == m_waitingReaderCount) {
            // There are no more waiting readers.
            // Unblock a writer request blocked in getAcess().
            m_condMutex->signal();
        }
    m_condMutex->unlock();
}

void
RWlockType3::releaseReadLock()
{
    m_rwLock.releaseReadLock();
}

void
RWlockType3::acquireWriteLock()
{
    // Get the permission to acquire read lock.
    getAcess(WRITE_REQUEST);
    // At this point it is ensured by getAccess()
    // that there are no more waiting readers.
    m_rwLock.acquireWriteLock();
    // write lock acquired.
}

void
RWlockType3::releaseWriteLock()
{
    m_rwLock.releaseWriteLock();
    // writer given up exclusive lock.
    // unblock getAccess() to grant further lock requests.
    m_writer->post();
}

There is one slight problem with this solution. The lock request order is determined in getAccess() by a mutex lock. When many threads are competing to acquire mutex, kernel selects one of them to get the mutex lock. The mutex lock does not guarantee FIFO order for lock acquisition.
Thus though the solution has solved the starvation problem of readers writers, it does not impose the strict ordering among the readers and writers.

Reader Writer – Writers preference

Q. Classical reader writer problem with a twist for giving preference to writers. Preference to writers means whenever a writer is ready to acquire exclusive lock it should not be blocked by reader which gets ready to acquire the lock at later point in time and once a writer acquires the lock, till there is a pending writer, no reader should acquire the lock.

Ans: Refer research paper: Concurrent control with “Readers” and “Writers” by authors P. J. Courtois, F. Heymans, and D.L. Parnas (MBLE Research Laboratory Brussels, Belgium) date: October 1971.

The below implementation is copy of the above research paper with comments added.
For semaphore and mutex pseudo implementation check this.

class RWlockType2
{
public:
    RWlockType2();
    ~RWlockType2();
    void
    acquireReadLock();
    void
    releaseReadLock();
    void
    acquireWriteLock();
    void
    releaseWriteLock();

private:
    UINT32 m_readerCount;
    UINT32 m_writerCount;
    Mutex *m_mutex1;
    Mutex *m_mutex2;
    Mutex *m_mutex3;
    Semaphore *m_reader;
    Semaphore *m_writer;
};

RWlockType2::RWlockType2()
{
    m_mutex1 = new Mutex();
    m_mutex2 = new Mutex();
    m_mutex3 = new Mutex();
    m_reader = new Semaphore(1);
    m_writer = new Semaphore(1);
    m_readerCount = 0;
    m_writerCount = 0;
}

void
RWlockType2::acquireReadLock()
{
    // Section A.
    // Allow only one reader at a time beyond this section.
    m_mutex3->lock();
        // Section B.
        // Wait till no more writer is waiting to get the lock.
        // Check "Section C" in acquireWriteLock()
        m_reader->wait();
            // A reader reached here, it means:
            // at this point no writer is pending.
            m_mutex1->lock();
                // Increment the reader count;
                readerCount++;
                if (1 == readerCount) {
                    // If this is the first reader block the access for writers.
                    // Check "Section D" in acquireWriteLock()
                    m_writer->wait();
                }
            m_mutex1->unlock();
        // Unblock pending writers.
        // Enable a pending writer to block immediate 
        // next reader at "Section B."
        m_reader->post();
    // Unblock pending readers.
    // Allow a pending reader cross "Section A".
    m_mutex3->unlock();
}

void
RWlockType2::releaseReadLock()
{
    m_mutex1->lock();
        // Decrement the reader count.
        readerCount--;
        if (0 == readerCount) {
            // Section E.
            // If this was the last reader allow access for writers.
            m_writer->post();
        }
    m_mutex1->unlock();
}

void
RWlockType2::acquireWriteLock()
{
    m_mutex2->lock();
        // Increment the pending writer count.
        writerCount++;
        if (1 == writerCount) {
            // Section C.
            // If this is the first writer block further readers.
            // Check "Section B" in acquireReadLock()
            m_reader->wait();
            // All readers are blocked from "lock acquiring" competition.
        }
    m_mutex2->unlock();
    // Section D.
    // If it is first writer: 
    // wait till all existing readers are out and "Section E"
    // in releaseReadLock is called.
    // Else this is "lock acquiring" competition between only pending writers.

    // Get exclusive access for write lock.
    m_writer->wait();
    // Only one writer at a time can reach here.
}

void
RWlockType2::releaseWriteLock()
{
    // Unblock next write request.
    // Check "Section D" in acquireWriteLock()
    m_writer->post();

    m_mutex2->lock();
        // Decrement the writer count.
        writerCount--;
        if (0 == writerCount) {
            // If it is the last writer unblock readers.
            // Check "Section B" acquireReadLock()
            m_reader->post();
            // This allows readers to join "lock acquiring" competition.
        }
    m_mutex2->unlock();
}

Reader Writer – Readers preference

Q. Classical reader writer problem. You can read more about it here.
A resource is shared by reader & writer threads. The constraints for the threads to access the shared resource are:
– If a reader thread is accessing the shared resource, writer thread is not allowed to access the shared resource. However another reader thread(s) can access the shared resource.
– If a writer thread is accessing the shared resource no other thread (reader or writer) is allowed to access the shared resource.

Ans: For semaphore and mutex pseudo implementation check this.
The solution described here give the preference to readers and can cause writer starvation.

class RWlockType1
{
public:

    RWlockType1();
    ~RWlockType1();
    void
    acquireReadLock();
    void
    releaseReadLock();
    void
    acquireWriteLock();
    void
    releaseWriteLock();

private:

    UINT32 m_readerCount;
    Mutex *m_mutex;
    Semaphore *m_writer;
};

RWlockType1::RWlockType1()
{
    m_mutex = new Mutex();
    m_writer = new Semaphore(1);
    m_readerCount = 0;
}

void
RWlockType1::acquireReadLock()
{
    m_mutex->lock();
        // Increase the reader count.
        m_readerCount++;
        if (1 == m_readerCount) {
            // If it is the first reader:
            // - wait for the writer to finish (if any)
            m_writer->wait();
            // Shared object access for write requests blocked.
        }
    m_mutex->unlock();
}

void
RWlockType1::releaseReadLock()
{
    m_mutex->lock();
        // Reading done. Decrement the reader count.
        m_readerCount--;
        if (0 == readerCount) {
            // If it was the last reader:
            // unblock the waiting writer (if any)
            m_writer->post();
            // Shared object access for write requests allowed.
        }
    m_mutex->unlock();
}

void
RWlockType1::acquireWriteLock()
{
    // Sleep until:
    // 1. Reader count is 0, and
    // 2. No other writer has access to the shared object.
    m_writer->wait();
    // Exclusive access of shared object granted.
}

void
RWlockType1::releaseWriteLock()
{
    // Release the exclusive access of the shared object.
    m_writer->post();
    // Shared object available now for readers and writers.
}

Once the first reader acquires the reader lock, it will lock the shared object for writers. Doing this will prevent any writers from accessing it. Subsequent readers can just utilize the locked (from writers) shared object. Last reader unlock the shared object for write access.

In this solution, every writer must lock shared object exclusively. This means that a stream of readers can subsequently block writers out and starve them.

Ordered producer consumer

Q. Classic producer consumer problem with a slight twist. There is ordering between the items produced. Lets assume that items produced has a sequence number associated. There are multiple producer threads, each producing item for a different sequence number. There is a single consumer thread which must consume the items in sequential order of the sequence number.
This kind of situation can occur when you want to read a large amount of data and the data reading process is slow. To overcome the sow speed of sequential read, you create multiple threads to read independent data segments. Now the data segments can be read in any order but while consuming, it must be consumed in sequential order.

Ans: The solution uses thread safe implementation of the buffer queue solution described here and the unordered_map.
For condition variable and mutex pseudo implementation check this.

1. The solution uses one thread safe buffer queue: ’empty’ buffer queue and one unordered_map mapping sequence number to the produced buffer: ‘filled’ buffer map.
2. The empty buffer queue is initialized with N buffer items. (all the available buffer items are empty initially).
3. The filled buffer map is initially empty. (since no item is produced initially, there is nothing for consumers to consume).
4. Apart from this there are two sequence numbers: producer_sequence number (used by producer threads to keep track of next sequence number to produce data.) and consumer_sequence number (used by consumer thread to keep track of next sequence number to consume data.)

5. Producer threads follows below subroutine:
i. Dequeue a buffer from the empty buffer queue. (if the empty buffer queue is empty the calling thread is put to sleep).
ii. In a thread safe way get the next producer_sequence number to produce the data and increment the producer_sequence number.
iii. Produce data corresponding to the sequence number.
iv. Add the produced buffer in the filled buffer map. Send wakeup signal to consumer thread.

6. Consumer thread follows below subroutine:
i. Get the consumer_sequence number.
ii. Go to sleep till the buffer corresponding to consumer_sequence number is not present in the filled buffer map.
iii. Consume the buffer corresponding to sequence number.
iv. Delete the consumed buffer entry from filled buffer map and Enqueue the consumed buffer to empty buffer queue. (in turn wakeup any producer thread waiting for a buffer to produce).

Below is pseudo code which uses thread safe implementation of buffer queue to solve the producer consumer problem.

#define N 100
#define SEQ_TO_PRODUCE 10000

class orderedProducerConsumer
{
public:
    orderedProducerConsumer();
    ~orderedProducerConsumer();
    void
    producerSubroutine();
    void
    consumerSubroutine();

private:
    bufferQueue *m_emptyQueue
    unordered_map m_filledMap;
    UINT32 m_producerSequence;
    UINT32 m_cosumerSequence;
    Mutex *m_seqMutex;
    ConditionMutex *m_bufferMapCondMutex;
};

orderedProducerConsumer::orderedProducerConsumer()
{
    // Initialize empty queue with N items.
    m_emptyQueue = new bufferQueue(N);
    // Initialize sequence numbers to 0.
    m_producerSequence = m_cosumerSequence = 0;
    // Initialize mutex & condition mutex.
    m_seqMutex = new Mutex();
    m_bufferMapCondMutex = new ConditionMutex();
}

// Multiple threads can be spawned to run producer routine.
void
orderedProducerConsumer::producerSubroutine()
{
    buffer *node;
    UINT32 mySequence;

    while (true) {
        // Get the sequence number to produce data.
        m_seqMutex->lock();
            mySequence = m_producerSequence;
            if (mySequence  >= SEQ_TO_PRODUCE) {
                // Entire range of required data produced.
                // Producer thread can exit now.
                m_seqMutex->unlock();
                break;
            }
            m_producerSequence++;
        m_seqMutex->unlock();
        // Get an empty buffer.
        node = m_emptyQueue->dequeueBuffer();
        // Produce data corresponding to sequence number.
        produceData(mySequence, node);
        // Put the produced buffer in filled map for consumer.
        m_bufferMapCondMutex->lock();
            m_filledMap[mySequence] = node;
            // Send wakeup call to consumer.
            m_bufferMapCondMutex->signal();
        m_bufferMapCondMutex->unlock();
    }
}

// Single thread will be spawned to run consumer routine.
void
orderedProducerConsumer::consumerSubroutine()
{
    buffer *node;

    while (m_cosumerSequence < SEQ_TO_PRODUCE) {
        // Sleep till required buffer is not produced.
        m_bufferMapCondMutex->lock();
            while (m_filledMap.find(m_cosumerSequence) !=
                   m_filledMap.end()) {
                m_bufferMapCondMutex->wait();
                // After unblocking check the condition again.
            }
            // Extract the buffer from map.
            node = m_filledMap[m_cosumerSequence];
            m_filledMap.erase(m_cosumerSequence);
        m_bufferMapCondMutex->unlock();

        // Consume the buffer.
        consumeData(m_cosumerSequence, node);
        // Put the consumed buffer in empty queue for producers.
        m_emptyQueue->enqueueBuffer(node);
        // Increment the consumer sequence number.
        m_cosumerSequence++;
    }
}

Unordered producer consumer

Q. Classic producer consumer problem. The producer doesn’t impose any ordering on produced items. The consumers can consume produced items in any order.

Ans: The solution uses thread safe implementation of the buffer queue solution described here.

1. The solution uses two thread safe buffer queues: ’empty’ buffer queue and ‘filled’ buffer queue.
2. The empty buffer queue is initialized with N buffer items. (all the available buffer items are empty initially).
3. The filled buffer queue is initialized with 0 buffer items. (since no item is produced initially, there is nothing for consumers to consume).

4. Producer threads follows below subroutine:
i. Dequeue a buffer from the empty buffer queue. (if the empty buffer queue is empty the calling thread is put to sleep).
ii. Produce item in the buffer.
iii. Enqueue the produced buffer in the filled buffer queue. (in turn wakeup any consumer thread waiting for a buffer to consume).

5. Consumer thread follows below subroutine:
i. Dequeue a buffer from the filled buffer queue. (if the filled buffer queue is empty the calling thread is put to sleep).
ii. Consume the item from the buffer.
iii. Enqueue the consumed buffer in the empty buffer queue. (in turn wakeup any producer thread waiting for a buffer to produce).

Below is pseudo code which uses thread safe implementation of buffer queue to solve the producer consumer problem. The solution can support multiple producer & multiple consumers threads working concurrently.

#define N 100
#define ITEMS_TO_PRODUCE 10000

class unorderedProducerConsumer
{
public:
    unorderedProducerConsumer();
    ~unorderedProducerConsumer();
    void
    producerSubroutine();
    void
    consumerSubroutine();

private:
    bufferQueue *m_emptyQueue
    bufferQueue *m_filledQueue;
    UINT32 m_producerCount;
    UINT32 m_consumerCount;
    Mutex *m_mutex;
};

unorderedProducerConsumer::unorderedProducerConsumer()
{
    // Initialize empty queue with N items.
    m_emptyQueue = new bufferQueue(N);
    // Initialize filled queue with 0 items.
    m_filledQueue = new bufferQueue(0);
    m_producerCount = m_consumerCount = 0;
    m_mutex = new Mutex();
}

void
unorderedProducerConsumer::producerSubroutine()
{
    buffer *node;

    while (true) {
         m_mutex->lock();
             m_producerCount++;
             if (m_producerCount >= ITEMS_TO_PRODUCE) {
                 // All the items produced. Exit the producer thread.
                 m_mutex->unlock();
                 break;
             }
         m_mutex->unlock();
        // Get an empty buffer.
        node = m_emptyQueue->dequeueBuffer();
        // Produce an item in the buffer.
        produceItem(node);
        // Put the produced buffer in filled queue for consumers.
        m_filledQueue->enqueueBuffer(node);
    }
}

void
unorderedProducerConsumer::consumerSubroutine()
{
    buffer *node;

     while (true) {
         m_mutex->lock();
             m_consumerCount++;
             if (m_consumerCount >= ITEMS_TO_PRODUCE) {
                 // All the items consumed. Exit the consumer thread.
                 m_mutex->unlock();
                 break;
             }
         m_mutex->unlock();
        // Get a produced buffer.
        node = m_filledQueue->dequeueBuffer();
        // Consume the buffer.
        consumeItem(node);
        // Put the consumed buffer in empty queue for producers.
        m_emptyQueue->enqueueBuffer(node);
    }
}

Thread safe Buffer queue

Q. Implement a buffer queue as singly linked list. The buffer queue needs to be accessed in multithreaded environment. Buffer queue should provide functions to dequeue a buffer and enqueue a buffer.

Ans:
This kind of buffer queue for multithreaded environment can be implemented using condition variable.
For condition variable pseudo implementation check this.

Lets say a buffer is defined like below:

class buffer
{
    public:
        buffer(
            IN UINT32 bufferLength
            );
        ~buffer();
    private:
        void *m_data;
        buffer *m_next;

    friend class bufferQueue;
};

The buffer queue class is defined like below:

#define BUFFSIZE 1024

class bufferQueue
{
public:
    bufferQueue (
        IN UINT32 length
        );
    ~bufferQueue(); // To minimize code implementation removed from the blog entry.
    void
    enqueueBuffer(
        IN buffer *node
        );
    buffer *
    dequeueBuffer();

private:
    buffer *m_head;
    ConditionMutex *m_condMutex;
};

bufferQueue::bufferQueue (
    IN UINT32 length
    )
{
    buffer *newNode;
    m_head = NULL;
    // Create the singly linked list of buffers of size BUFFSIZE.
    for (int i=0; im_next = m_head;
        m_head = newNode;
    }
    // Initialize the condition variable
    m_condMutex = new ConditionMutex();
}
// Return the buffer to the buffer queue.
void
bufferQueue::enqueueBuffer(
    IN buffer *node
    )
{
    // Get the exclusive access of the buffer queue.
    m_condMutex->lock();
        node->m_next = m_head;
        m_head = node;
        // Thread(s) may be blocked for the buffer dequeue operation
        // send wakeup call to any one of those threads.
        m_condMutex->signal();
    // Give up the exclusive access of the buffer queue.
    m_condMutex->unlock();
}

// Remove a buffer from the buffer queue.
buffer *
bufferQueue::dequeueBuffer()
{
    buffer *node;
    // Get the exclusive access of the buffer queue.
    m_condMutex->lock();
        // While the buffer queue is empty block the calling thread.
        while (NULL == m_head) {
            m_condMutex->wait();
            // Take care of spurious wakeup calls: Even after
            // unblocking check the condition again before proceeding.
        }
        // When thread reaches this point it is ensured that:
        // 1. The buffer queue is not empty and
        // 2. Only this thread has exclusive access to the buffer queue
        // (since this is thread which has the lock for the
        // mutex corresponding the condition variable.)
        node = m_head;
        m_head = node->m_next;
     // Give up the exclusive access of the buffer queue.
    m_condMutex->unlock();

    node->m_next = NULL;
    return node;
}

 

 

Synchronisation primitives: mutex, condition variable, semaphore

Starting a series in IPC synchronisation.

A. Mutex
Mutex stands for mutual exclusion. Only one thread in a program can lock any given mutex at a time. Thus a code section protected by mutex lock becomes critical section and is ensured to be executed by only one thread at a time. If a thread tries to lock a mutex that is already locked by some other thread, the calling thread blocks till the mutex unlocked.
The general use of mutex looks like:

mutex_lock()
    perform critical section operations
mutex_unlock()

The pseudocode used in subsequent posts will use below class definition for POSIX implementation of mutex:

class Mutex
{

public:
    Mutex()
    {
        pthread_mutex_init(&m_mutex, &attr);
    }

    ~Mutex()
    {
        pthread_mutex_destroy(&m_mutex);
    }

    void lock()
    {
        pthread_mutex_lock(&m_mutex);
    }

    void unlock()
    {
        pthread_mutex_unlock(&m_mutex);
    }

private:
    pthread_mutex_t m_mutex;
};

B. Condition variables
mutex are used for locking but they don’t provide the waiting functionality. Condition variables are used for waiting.
A condition variable has cond_wait function to wait for a condition to become true. And function cond_signal to unblock/wakeup a waiting thread when the condition becomes true. Similarly there is function cond_broadcast to unblock/wakeup all the waiting threads when the condition becomes true. A mutex is always associated with condition variable.

The waiting action for a condition variable looks like below:

mutex_lock(mutex)
    while (condition is false)
        cond_wait(cond_var, mutex);
    take appropriate action on true condition
    modify the condition
mutex_unlock(mutex)

When cond_wait puts the calling thread to sleep, two things happen: the corresponding mutex is unlocked and the calling thread is put to sleep till some other thread calls cond_signal.
After receiving the wakeup call, the sleeping thread first acquires the lock for the corresponding mutex and only after that it returns from the cond_wait call.
Another thing to notice is that whenever the calling process returns from the call cond_wait we always test the condition again because spurious wakeups can occur: a wakeup when desired condition is still not true.

The signaling action for a condition variable looks like below:

mutex_lock(mutex)
    set condition true
    cond_signal(cond_var);
mutex_unlock(mutex)

The pseudocode used in subsequent posts will use below class definition for POSIX implementation of condition variables:

class ConditionMutex
{

public:
    ConditionMutex()
    {
        pthread_cond_init(&m_cond, NULL);
        pthread_mutex_init(&m_mutex, NULL);
    }

    ~ConditionMutex()
    {
        pthread_cond_destroy(&m_cond);
        pthread_mutex_destroy(&m_mutex);
    }

    void lock()
    {
        pthread_mutex_lock(&m_mutex);
    }

    void unlock()
    {
        pthread_mutex_unlock(&m_mutex);
    }

    void broadcast()
    {
        pthread_cond_broadcast(&m_cond);
    }

    void signal()
    {
        pthread_cond_signal(&m_cond);
    }

    void wait()
    {
        pthread_cond_wait(&m_cond, &m_mutex);
    }
private:

    pthread_mutex_t m_mutex;
    pthread_cond_t m_cond;
};

C. Semaphore
Semaphore provides three operations.
1. Initialize: To intialize a semaphore caller needs to provide the initial value of the semaphore.
2. Wait: The thread calling wait operation on semaphore blocks till the value of semaphore variable is less than or equal to 0. Then the semaphore value is decremented by 1.
3. Post: The calling thread increments the value of semaphore by 1, and if there are threads waiting on this semaphore value, one of them is awoken.

In the most simplistic way one can visualize post and wait operation on semaphore like below:

post (semaphore s)
{
    s = s+1;
}

wait (semaphore s)
{
    while (s<=0)
    {
        // do nothing
        ;
    }
    s = s-1;
}

The pseudocode used in subsequent posts will use below class definition for POSIX implementation of semaphore:

class Semaphore
{

public:
    Semaphore(
        IN UINT32 initialValue
        )
    {
        sem_init(&m_semaphore, 0, initialValue);
    }

    ~Semaphore()
    {
        sem_destroy(&m_semaphore);
    }

    void wait()
    {
        sem_wait(&m_semaphore);
    }

    void post()
    {
        sem_post(&m_semaphore);
    }

private:

    sem_t m_semaphore;
};