Old Bridge – Python code

Q. Write the python code to solve the Old Bridge problem.

Ans: Check out below Python3 code.

import threading
import random
import time

class oldBridge (threading.Thread):
    s_lockBridge = None
    s_traffic = [None, None]
    s_mutex = [None, None]
    s_cars = [0, 0]
    s_entry = ['left', 'right']

    @staticmethod
    def initBridge():
        # Initialize bridge lock.
        oldBridge.s_lockBridge = threading.Semaphore(1)
        for i in range(2):
            # Max 3 cars in either direction.
            oldBridge.s_traffic[i] = threading.Semaphore(3)
            oldBridge.s_mutex[i] = threading.Lock()
            # Cars in either direction initially are zero.
            oldBridge.s_cars[i] = 0

    def __init__(self, car, direction):
        if (None == oldBridge.s_lockBridge):
            oldBridge.initBridge()
        threading.Thread.__init__(self)
        self.carNumber = car
        self.direction = direction

    def run(self):
        self.arriveBridge();
        self.drive();
        self.exitBridge();

    def arriveBridge(self):
        print ("Arriving car: {0} from direction: {1}".format(self.carNumber, oldBridge.s_entry[self.direction]));
        oldBridge.s_mutex[self.direction].acquire()
        if (0 == oldBridge.s_cars[self.direction]):
            # If you are the 1st car on your direction
            # Acquire exclusive lock on bridge.
            oldBridge.s_lockBridge.acquire()
        oldBridge.s_cars[self.direction] += 1
        oldBridge.s_mutex[self.direction].release()
        # Consume one slot for cars in same direction.
        oldBridge.s_traffic[self.direction].acquire()

    def drive(self):
        print ("Crossing car: {0} from direction: {1}".format(self.carNumber, oldBridge.s_entry[self.direction]));
        # Get random time to cross bridge.
        driveTime = random.randint(1,100)
        driveTime %= 3
        time.sleep(driveTime)

    def exitBridge(self):
        print ("Departing car: {0} from direction: {1}".format(self.carNumber, oldBridge.s_entry[self.direction]));
        oldBridge.s_mutex[self.direction].acquire()
        oldBridge.s_cars[self.direction] -= 1
        if (0 == oldBridge.s_cars[self.direction]):
            # If you are the last car on your direction
            # Give up exclusive access on bridge.
            oldBridge.s_lockBridge.release()
        oldBridge.s_mutex[self.direction].release()
        # Free up one slot for cars in same direction.
        oldBridge.s_traffic[self.direction].release()

if __name__== "__main__":
    threads = []
    for car in range(10):
        direction = random.randint(1,101)
        direction %= 2
        newcar = oldBridge(car, direction)
        threads.append(newcar)
        newcar.start()

    for thread in threads:
        thread.join()
Advertisements

Old Bridge

Q. An old bridge has only one lane and can only hold at most 3 cars at a time without risking collapse. Create a solution that controls traffic so that at any given time, there are at most 3 cars on the bridge, and all of them are going the same direction. A car calls ArriveBridge when it arrives at the bridge and wants to go in the specified direction (0 or 1); ArriveBridge should not return until the car is allowed to get on the bridge. A car calls ExitBridge when it gets off the bridge, potentially allowing other cars to get on. Don’t worry about starving cars trying to go in one direction; just make sure cars are always on the bridge when they can be.
Source: https://inst.eecs.berkeley.edu/~cs162/sp13/hand-outs/synch-problems.html

Ans: The car routine is as below:
1. If you’re the first car going in your direction, lock the bridge exclusively, blocking any traffic from the opposite side. If opposite side traffic is on sleep till end of opposite side traffic and then acquire exclusive lock on bridge.
2. Refrain from starting your journey if already there are three cars on the bridge at that time. Block till there is a room on the bridge to accommodate your car.
3. Drive on the bridge.
4. Exit the bridge. If there are any cars waiting to cross the bridge in same direction allow one of them to drive on the bridge. If you are the last car going in your direction give up the exclusive lock on the bridge and allow traffic from other side.

#define LEFT_SIDE 0
#define RIGHT_SIDE 1

class OldBridge {
public:
    OldBridge();
    ~OldBridge();
    void
    crossBridge(
        IN byte direction
        );

private:
    int m_cars[2];
    Semaphore *m_traffic[2];
    Mutex *m_mutex[2];
    Semaphore *m_lockBridge;

    void
    arriveBridge(
        IN byte direction
        );
    void
    exitBridge(
        IN byte direction
        );
    void
    drive();
};

OldBridge::OldBridge()
{
    for (int i=0; i<2; i++) {
        // Cars in either direction initially are zero.
        m_cars[i] = 0;
        // Max 3 cars in either direction.
        m_traffic[i] = new Semaphore(3);
        m_mutex[i] = new Mutex();
    }
    m_lockBridge = new Semaphore(1);
}

void
OldBridge::crossBridge(
    IN byte direction
    )
{
    arriveBridge(direction);
    drive();
    exitBridge(direction);
}

void
OldBridge::arriveBridge(
    IN byte direction
    )
{
    m_mutex[direction]->lock();
        if (0 == m_cars[direction]) {
            // If you are the 1st car on your direction
            // Acquire exclusive lock on bridge.
            m_lockBridge->wait();
        }
        m_cars[direction]++;
    m_mutex[direction]->unlock();

    // Consume one slot for cars in same direction.
    m_traffic[direction]->wait();
}

void
OldBridge::exitBridge(
    IN byte direction
    )
{
    m_mutex[direction]->lock();
        m_cars[direction]--;
        if (0 == m_cars[direction]) {
            // If you are the last car on your direction
            // Give up exclusive access on bridge.
            m_lockBridge->post();
        }
    m_mutex[direction]->unlock();

    // Free up one slot for cars in same direction.
    m_traffic[direction]->post();
}

Dinning philosopher’s problem

Q. In 1965 Dijkstra formulated the Dinning philosopher’s problem.
Five philosophers are seated around a round table. Each philosopher has a plate of spaghetti. Philosopher needs two forks to eat it. Between each plate is fork. The life of philosopher consists of alternate periods of eating and thinking. When philosopher is hungry she tries to acquire left and right fork, one at a time, in either order. If successful in acquiring two forks, she eats for a while, then puts down the fork and continues to think. Program the routine for the dinning philosophers.

Ans: Andrew Tanenbaum’s Modern Operating System has a great solution for the dinning philosophers problem. The below code is using Tanenbaum’s solution.

The resource picking order can not solve the problem. Picking left fork first, then right fork to start eating will never work. If all the philosophers pick up their left fork no one will be able to pick up the their right fork. Adding random delay in the fork picking order will also not guarantee the deadlock avoidance.

The only solution that will work is a philosopher will not pickup any fork till both the left & right hand forks are available (i.e. neither neighbour is eating at the same time.)

The philosopher’s routine is:
0. Each philosopher will have state associated with her. The state is referred by the neighbours to find out what the philosopher is doing.
1. Spend time thinking. State of the philosopher is THINKING.
2. Philosopher becomes hungry, state of the philosopher is HUNGRY.
3. Check if you can pickup the left fork and right fork. (i.e. make sure left neighbour & right neighbour’s state is not EATING.) If either of fork is not available block till they become available.
4. When both forks are available change state to EATING, pickup the forks and start eating.
5. When your eating is done. Put down the left fork, check if your left neighbour is blocked for the fork you’re putting down, if she is wake her up to start eating. Then put down the right fork, check if your right neighbour is blocked for the fork you’re putting down, if she is wake her up to start eating.
6. Goto step 1.

#define N 5
#define LEFT(i) (i-1)%N
#define RIGHT(i) (i+1)%N
#define THINKING 0
#define HUNGRY 1
#define EATING 2

class DinningPhilosopher {
public:
    DinningPhilosopher();
    ~DinningPhilosopher();
    void
    philosopherRoutine(
        IN byte i
        );

private:
    byte m_state[N];
    Semaphore *m_sem[N];
    Mutex *m_mutex;

    void
    takeForks(
        IN byte i
        );

    void
    putForks(
        IN byte i
        );

    void
    testForkAvailability(
        IN byte i
        );
};

DinningPhilosopher::DinningPhilosopher()
{
    // All philosopher's are thinking initially.
    for (int i=0; i<N; i++) {
        m_state[i] = THINKING;
    }
    for (int i=0; i<N; i++) {
        m_sem[i] = new Semaphore(0);
    }
    m_mutex = new Mutex();
}

// N threads will be spawned, one for each philosopher.
// Argument i is philosopher id between (0,N)
void
DinningPhilosopher::philosopherRoutine(
    IN byte i
    )
{
    while(true) {
        continueThinking();
        takeForks(i);
        continueEating();
        putForks(i);
    }
}

void
DinningPhilosopher::takeForks(
    IN byte i
    )
{
    m_mutex->lock();
        // Announce to neighbours you're HUNGRY.
        m_state[i] = HUNGRY;
        // Test if left fork & right forks are available.
        // If available announce neighbours you're EATING.
        // so they don't pick up forks you already claimed.
        testForkAvailability(i);
    m_mutex->unlock();
    // If you are not yet EATING,
    // block till required forks are available.
    // Section A.
    m_sem[i]->wait();
}

void
DinningPhilosopher::testForkAvailability(
    IN byte i
    )
{
    // Note that m_mutex was locked before calling this function
    // Thread has exclusive access to execute this function.
    if (m_state[LEFT(i)] != EATING && // your left fork is available
        m_state[RIGHT(i)] != EATING && // your right fork is available
        m_state[i] == HUNGRY // and you want to start eating.
        ) {
        // Announce neighbours you started eating.
        m_state[i] = EATING;
        // Fork availability is passed.
        // Make sure you're not blocked at "Section A"
        // in function takeForks()
        m_sem[i]->post();
    }
}

void
DinningPhilosopher::putForks(
    IN byte i
    )
{
    m_mutex->lock();
        // Announce to neighbours you're done EATING.
        m_state[i] = THINKING;
        // Put down the left fork,
        // If your left neighbour is blocked at "Section A"
        // of function takeForks().
        // Allow him to unblock to start eating.
        testAvailability(LEFT(i));
        // Put down the right fork.
        testAvailability(RIGHT(i));
    m_mutex->unlock();
}

Sleeping barbers problem

Q. Sleeping barber is a classical synchornization problem.
The barber shop has one barber, one cutting chair and n chairs for waiting customers. If there are no customers barber fall asleep. When customer arrives he has to wakeup the sleeping barber. If additional customer arrives while barber is busy cutting hair, they sit down if there are empty chair or they leave the shop. The problem is to program the customer and barber routine. This standard problem has many solutions available.
Now lets make the problem bit hard by adding multiple barbers. There are m barbers and m cutting chairs in the shop. With all the above constraints design the customer and barber routine.

Ans: There are m barbers and n waiting chairs. Divide the waiting chairs into emptyChairs queue and occupiedChairs queue. There are m barbers, create m threads to run the barber routine. Every customer visiting the barber shop run the customer routine. Initially all the n waiting chairs are in emptyChairs queue.

Customer routine:
1. Check if emptyChairs queue is empty. If empty leave the barber shop. Else grab a chair from emptyChairs queue.
2. Add the grabbed chair into occupiedChairs queue and send the wakeup call to a barber.
3. Sleep till one barber is allotted to cut your hair.
4. After barber is allotted to you, leave the occupied chair and push the chair to emptyChairs queue.
5. Inform the allotted barber that you are ready for hair cut.
6. Wait till the hair cut is done.
7. Leave the barber shop.

Barber routine:
1. Sleep till there is a chair in the occupiedChairs queue.
2. Remove a chair from occupiedChairs queue. (i.e. select a customer for hair cut).
3. Inform the selected customer that you’re ready for the hair cut.
4. Sleep till the customer leaves his occupied chair and is ready for hair cut.
5. Cut the hair.
6. Inform the customer that hair cut is done.
7. Go to step 1.

#define WAITING_CHAIRS 16
#define NUMBER_OF_BARBERS 4

struct waitingChair {
    int allotedBarber;
    Semaphore *customerWait;

    waitingChair()
    {
        // No barber is alloted initially.
        allottedBarber = -1;
        customerWait = new Semaphore(0);
    }
};

struct cuttingChair {
    // customer's head to cut hair.
    void *customerHead;
    Semaphore *startHairCut;
    Semaphore *endHairCut;

    cuttingChair()
    {
        customerHead = NULL;
        startHairCut = new Semaphore(0);
        endHairCut = new Semaphore(0);
    }
};

class SleepingBarbers {
public:
    SleepingBarbers();
    ~SleepingBarbers();
    void
    customerRoutine();
    void
    barberRoutine(
        IN int barberId
        );

private:
    queue m_emptyChairs;
    queue m_occupiedChairs;
    cuttingChair m_baberChairs[NUMBER_OF_BARBERS];
    Mutex *m_emptyChairMutex;
    ConditionMutex *m_occupiedChairCondMutex;
};

SleepingBarbers::SleepingBarbers()
{
    for (int i=0; i<WAITING_CHAIRS; i++) {
        m_emptyChairs.push(new waitingChair());
    }
    m_emptyChairMutex = new Mutex();
    m_occupiedChairCondMutex = new ConditionMutex();
}

void
SleepingBarbers::customerRoutine()
{
    m_emptyChairMutex->lock();
        // Check if there is empty chair available.
        if (m_emptyChairs.empty()) {
            // Leave the shop, before leaving unlock the mutex.
            m_emptyChairMutex->unlock();
            return;
        }
        // Grab an empty chair.
        waitingChair *myChair = m_emptyChairs.front();
        m_emptyChairs.pop();
    m_emptyChairMutex->unlock();

    // Announce barbers that you have grabbed a chair.
    m_occupiedChairCondMutex->lock();
        m_occupiedChairs.push(myChair);
        // wakeup sleeping barber (if any).
        m_occupiedChairCondMutex->signal();
    m_occupiedChairCondMutex->unlock();

    // Go to sleep, till one barber is assigned to you.
    myChair->customerWait->wait();

    // A barber is ready to cut your hair.
    // Find out the barber assigned to you.
    int myBarber = myChair->allottedBarber;

    // Empty your occupied chair.
    m_emptyChairMutex->lock();
        myChair->allottedBarber = -1;
        m_emptyChairs.push(myChair);
    m_emptyChairMutex->unlock();

    // Make your head available for the barber.
    m_baberChairs[myBarber].customerHead = makeYourHeadAvilable();

    // Request the barber to start hair cut.
    m_baberChairs[myBarber].startHairCut->post();

    // Wait till the barber is done with your hair cut.
    m_baberChairs[myBarber].endHairCut->wait();
}

// m threads are initialized to run this routine.
void
SleepingBarbers::barberRoutine(
    IN int barberId
    )
{
    while (true) {
        m_occupiedChairCondMutex->lock();
            // Sleep till customer is available for haircut.
            while (m_occupiedChairs.empty()) {
                m_occupiedChairCondMutex->wait();
            }
            waitingChair *nextCustomer = m_occupiedChairs.front();
            m_occupiedChairs.pop();
        m_occupiedChairCondMutex->unlock();

        // Inform customer that you are ready to cut hair.
        nextCustomer->allottedBarber = barberId;
        nextCustomer->customerWait->post();

        // Wait till customer is ready for hair cut.
        m_baberChairs[barberId].startHairCut->wait();

        // Perform hair cut.
        performHarCut(m_baberChairs[barberId].customerHead);

        // Inform the customer hair cut is done.
        m_baberChairs[barberId].endHairCut->post();
    }
}

FIFO mutex

Q. Last post for non starving reader-writer ended with a need for FIFO mutual exclusion. Mutex provides mutual exclusion but does not guarantee FIFO scheduling for critical section access. How to build FIFO mutex?

Ans: Ticket locks are used to implement the FIFO mutex.
1. The idea is to have a central ticketing policy. There are two tickets: “now_serving” and the “next_available”.
2. Every thread for acquiring the lock gets the “next_available” ticket and blocks until the “next_available” becomes the “now_serving” ticket.
3. While unlocking the lock, the thread increments the “now_serving” ticket to allow next thread in FIFO to acquire the lock.

class FifoMutex
{
public:

    FifoMutex();
    ~FifoMutex();
    void
    lock();
    void
    unlock();

private:

    UINT32 m_nowServing;
    UINT32 m_nextAvailable;
    ConditionMutex *m_condMutex;
};

FifoMutex::FifoMutex()
{
    m_nowServing = 0;
    m_nextAvailable = 0;
    m_condMutex = new ConditionMutex();
}
void
FifoMutex::lock()
{
    UINT32 myTicket;

    // Get your ticket and increment the m_nextAvailable.
    m_condMutex->lock();
        myTicket = m_nextAvailable;
        m_nextAvailable++;
    m_condMutex->unlock();

    // Wait till your ticket is getting served.
    m_condMutex->lock();
        while (m_nowServing != myTicket) {
            m_condMutex->wait();
        }
    m_condMutex->unlock();
}

void
FifoMutex::unlock()
{
    // Increment the m_nowServing and wakeup all blocked threads.
    m_condMutex->lock();
        m_nowServing++;
        // Pl note we are broadcasting the wakeup call.
        // We must broadcast since we don't know 
        // which thread should acquire the lock next.
        // All the waiting threads will be unblocked, 
        // only one of them will pass condition: "m_nowServing == myTicket"
        m_condMutex->broadcast();
    m_condMutex->unlock();
}

With this FifoMutex we can easily convert non starving reader-writer solution to Strict ordering reader writer solution just by replacing the datatype of member variable of class RWlockType3. Replace the member variable Mutex *m_mutex; with new datatype as FifoMutex *m_mutex; and your solution for Strict ordering reader writer is ready!!!

Reader Writer – Non starving

Q. The reader preference solution ends up starving the writers. The writer preference solution ends up starving the readers.
How to design a reader-writer lock problem with no starvation. No starvation means any thread should not wait indefinitely to acquire the lock.

Ans. Lets develop no starvation solution with a requirement that no thread should be blocked to acquire the lock by any other thread which is ready to acquire the lock at later point in time. To develop no starvation locks we are following lock request order. Lets consider a lock request order as shown below:
…. R1, R2, R3, R4, R5, W1 …..
R stands for read request and W stands for write request.
In this scenario, the order in which reader threads: “R1, R2, R3, R4” acquires shared lock and get access to shared object has no impact on the shared object and also note that one reader thread can not starve another reader thread. The only restriction is all the reader threads: “R1, R2, R3, R4” must acquire shared lock (in any order) before the writer thread W1 attempts to acquires the exclusive lock. To avoid this before a writer thread puts lock request there should not be any reader thread waiting to acquire the shared lock.

Lets consider another lock request sequence as shown below:
… W1, W2, W3, R1 …
When writer thread W1 has acquired the exclusive lock, if we allow both writers W2 & W3 to proceed with the lock request, both these writers will compete with each other to get the exclusive lock (kernel thread scheduling will decide which one of them will be unblocked once W1 gives up the exclusive lock.) Similarly when W2 has acquired the exclusive lock and if we allow both W3 & R1 to proceed with the lock request, both W3 and R1 will compete for the shared object access. To avoid this situation till a writer thread doesn’t give up the exclusive lock we must not allow other threads to put the lock request.

The strict ordering is based on two rules:
1. Before allowing a writer thread to compete for exclusive lock, one must make sure there are no readers waiting to acquire the shared lock.
2. When a writer has granted the exclusive lock no other lock request should be put till the writer thread gives up exclusive lock.

The below solution uses reader preference read-write lock for exclusive & shared access. For semaphore, condition variable and mutex pseudo implementation check this. In below code lock request granter function getAccess() decides the lock request order for read & write operations.

#define READ_REQUEST  1
#define WRITE_REQUEST 2

class RWlockType3
{
public:

    RWlockType3();
    ~RWlockType3();
    void
    acquireReadLock();
    void
    releaseReadLock();
    void
    acquireWriteLock();
    void
    releaseWriteLock();

private:

    UINT32 m_waitingReaderCount;
    Mutex *m_mutex;
    ConditionMutex *m_condMutex;
    Semaphore *m_writer;
    RWlockType1 *m_rwLock;

    void
    getAcess(
        byte requestType
        );
};

RWlockType3::RWlockType3()
{
    m_waitingReaderCount = 0;
    m_mutex = new Mutex();
    m_condMutex = new ConditionMutex();
    // Initialize semaphore with 0.
    m_writer = new Semaphore(0);
    m_rwLock = new RWlockType1();
}

void
RWlockType3::getAcess(
    byte requestType
    )
{
    // At a time, only one lock requester thread beyond this point.
    // (The mutex lock acquisition decides the lock request order.) 
    m_mutex->lock();
        // If it is read lock request
        // increment the waiting reader count.
        if (READ_REQUEST == requestType) {
            m_condMutex->lock();
                m_waitingReaderCount++;
            m_condMutex->unlock();
        } else if (WRITE_REQUEST == requestType) {
            // If it is write request
            // block till all the previous readers have acquired shared lock.
            m_condMutex->lock();
                while (m_waitingReaderCount != 0) {
                    m_condMutex->wait();
                }
            m_condMutex->unlock();
            // Do not accept any new lock request till writer has 
            // given up the exclusive lock.
            // Pl note that semaphore is initialized with 0.
            // this wait will block the calling thread till other thread
            // calls the post operation.
            m_writer->wait();
        }
    m_mutex->unlock();
    // Unblock other lock requester threads.
}

void
RWlockType3::acquireReadLock()
{
    // Get the permission to acquire read lock.
    getAcess(READ_REQUEST);
    // Acquire read lock.
    m_rwLock.acquireReadLock();
    // Read lock acquired. Decrement the waiting reader count.
    m_condMutex->lock();
        m_waitingReaderCount--;
        if (0 == m_waitingReaderCount) {
            // There are no more waiting readers.
            // Unblock a writer request blocked in getAcess().
            m_condMutex->signal();
        }
    m_condMutex->unlock();
}

void
RWlockType3::releaseReadLock()
{
    m_rwLock.releaseReadLock();
}

void
RWlockType3::acquireWriteLock()
{
    // Get the permission to acquire read lock.
    getAcess(WRITE_REQUEST);
    // At this point it is ensured by getAccess()
    // that there are no more waiting readers.
    m_rwLock.acquireWriteLock();
    // write lock acquired.
}

void
RWlockType3::releaseWriteLock()
{
    m_rwLock.releaseWriteLock();
    // writer given up exclusive lock.
    // unblock getAccess() to grant further lock requests.
    m_writer->post();
}

There is one slight problem with this solution. The lock request order is determined in getAccess() by a mutex lock. When many threads are competing to acquire mutex, kernel selects one of them to get the mutex lock. The mutex lock does not guarantee FIFO order for lock acquisition.
Thus though the solution has solved the starvation problem of readers writers, it does not impose the strict ordering among the readers and writers.

Reader Writer – Writers preference

Q. Classical reader writer problem with a twist for giving preference to writers. Preference to writers means whenever a writer is ready to acquire exclusive lock it should not be blocked by reader which gets ready to acquire the lock at later point in time and once a writer acquires the lock, till there is a pending writer, no reader should acquire the lock.

Ans: Refer research paper: Concurrent control with “Readers” and “Writers” by authors P. J. Courtois, F. Heymans, and D.L. Parnas (MBLE Research Laboratory Brussels, Belgium) date: October 1971.

The below implementation is copy of the above research paper with comments added.
For semaphore and mutex pseudo implementation check this.

class RWlockType2
{
public:
    RWlockType2();
    ~RWlockType2();
    void
    acquireReadLock();
    void
    releaseReadLock();
    void
    acquireWriteLock();
    void
    releaseWriteLock();

private:
    UINT32 m_readerCount;
    UINT32 m_writerCount;
    Mutex *m_mutex1;
    Mutex *m_mutex2;
    Mutex *m_mutex3;
    Semaphore *m_reader;
    Semaphore *m_writer;
};

RWlockType2::RWlockType2()
{
    m_mutex1 = new Mutex();
    m_mutex2 = new Mutex();
    m_mutex3 = new Mutex();
    m_reader = new Semaphore(1);
    m_writer = new Semaphore(1);
    m_readerCount = 0;
    m_writerCount = 0;
}

void
RWlockType2::acquireReadLock()
{
    // Section A.
    // Allow only one reader at a time beyond this section.
    m_mutex3->lock();
        // Section B.
        // Wait till no more writer is waiting to get the lock.
        // Check "Section C" in acquireWriteLock()
        m_reader->wait();
            // A reader reached here, it means:
            // at this point no writer is pending.
            m_mutex1->lock();
                // Increment the reader count;
                readerCount++;
                if (1 == readerCount) {
                    // If this is the first reader block the access for writers.
                    // Check "Section D" in acquireWriteLock()
                    m_writer->wait();
                }
            m_mutex1->unlock();
        // Unblock pending writers.
        // Enable a pending writer to block immediate 
        // next reader at "Section B."
        m_reader->post();
    // Unblock pending readers.
    // Allow a pending reader cross "Section A".
    m_mutex3->unlock();
}

void
RWlockType2::releaseReadLock()
{
    m_mutex1->lock();
        // Decrement the reader count.
        readerCount--;
        if (0 == readerCount) {
            // Section E.
            // If this was the last reader allow access for writers.
            m_writer->post();
        }
    m_mutex1->unlock();
}

void
RWlockType2::acquireWriteLock()
{
    m_mutex2->lock();
        // Increment the pending writer count.
        writerCount++;
        if (1 == writerCount) {
            // Section C.
            // If this is the first writer block further readers.
            // Check "Section B" in acquireReadLock()
            m_reader->wait();
            // All readers are blocked from "lock acquiring" competition.
        }
    m_mutex2->unlock();
    // Section D.
    // If it is first writer: 
    // wait till all existing readers are out and "Section E"
    // in releaseReadLock is called.
    // Else this is "lock acquiring" competition between only pending writers.

    // Get exclusive access for write lock.
    m_writer->wait();
    // Only one writer at a time can reach here.
}

void
RWlockType2::releaseWriteLock()
{
    // Unblock next write request.
    // Check "Section D" in acquireWriteLock()
    m_writer->post();

    m_mutex2->lock();
        // Decrement the writer count.
        writerCount--;
        if (0 == writerCount) {
            // If it is the last writer unblock readers.
            // Check "Section B" acquireReadLock()
            m_reader->post();
            // This allows readers to join "lock acquiring" competition.
        }
    m_mutex2->unlock();
}

Reader Writer – Readers preference

Q. Classical reader writer problem. You can read more about it here.
A resource is shared by reader & writer threads. The constraints for the threads to access the shared resource are:
– If a reader thread is accessing the shared resource, writer thread is not allowed to access the shared resource. However another reader thread(s) can access the shared resource.
– If a writer thread is accessing the shared resource no other thread (reader or writer) is allowed to access the shared resource.

Ans: For semaphore and mutex pseudo implementation check this.
The solution described here give the preference to readers and can cause writer starvation.

class RWlockType1
{
public:

    RWlockType1();
    ~RWlockType1();
    void
    acquireReadLock();
    void
    releaseReadLock();
    void
    acquireWriteLock();
    void
    releaseWriteLock();

private:

    UINT32 m_readerCount;
    Mutex *m_mutex;
    Semaphore *m_writer;
};

RWlockType1::RWlockType1()
{
    m_mutex = new Mutex();
    m_writer = new Semaphore(1);
    m_readerCount = 0;
}

void
RWlockType1::acquireReadLock()
{
    m_mutex->lock();
        // Increase the reader count.
        m_readerCount++;
        if (1 == m_readerCount) {
            // If it is the first reader:
            // - wait for the writer to finish (if any)
            m_writer->wait();
            // Shared object access for write requests blocked.
        }
    m_mutex->unlock();
}

void
RWlockType1::releaseReadLock()
{
    m_mutex->lock();
        // Reading done. Decrement the reader count.
        m_readerCount--;
        if (0 == readerCount) {
            // If it was the last reader:
            // unblock the waiting writer (if any)
            m_writer->post();
            // Shared object access for write requests allowed.
        }
    m_mutex->unlock();
}

void
RWlockType1::acquireWriteLock()
{
    // Sleep until:
    // 1. Reader count is 0, and
    // 2. No other writer has access to the shared object.
    m_writer->wait();
    // Exclusive access of shared object granted.
}

void
RWlockType1::releaseWriteLock()
{
    // Release the exclusive access of the shared object.
    m_writer->post();
    // Shared object available now for readers and writers.
}

Once the first reader acquires the reader lock, it will lock the shared object for writers. Doing this will prevent any writers from accessing it. Subsequent readers can just utilize the locked (from writers) shared object. Last reader unlock the shared object for write access.

In this solution, every writer must lock shared object exclusively. This means that a stream of readers can subsequently block writers out and starve them.

Ordered producer consumer

Q. Classic producer consumer problem with a slight twist. There is ordering between the items produced. Lets assume that items produced has a sequence number associated. There are multiple producer threads, each producing item for a different sequence number. There is a single consumer thread which must consume the items in sequential order of the sequence number.
This kind of situation can occur when you want to read a large amount of data and the data reading process is slow. To overcome the sow speed of sequential read, you create multiple threads to read independent data segments. Now the data segments can be read in any order but while consuming, it must be consumed in sequential order.

Ans: The solution uses thread safe implementation of the buffer queue solution described here and the unordered_map.
For condition variable and mutex pseudo implementation check this.

1. The solution uses one thread safe buffer queue: ’empty’ buffer queue and one unordered_map mapping sequence number to the produced buffer: ‘filled’ buffer map.
2. The empty buffer queue is initialized with N buffer items. (all the available buffer items are empty initially).
3. The filled buffer map is initially empty. (since no item is produced initially, there is nothing for consumers to consume).
4. Apart from this there are two sequence numbers: producer_sequence number (used by producer threads to keep track of next sequence number to produce data.) and consumer_sequence number (used by consumer thread to keep track of next sequence number to consume data.)

5. Producer threads follows below subroutine:
i. Dequeue a buffer from the empty buffer queue. (if the empty buffer queue is empty the calling thread is put to sleep).
ii. In a thread safe way get the next producer_sequence number to produce the data and increment the producer_sequence number.
iii. Produce data corresponding to the sequence number.
iv. Add the produced buffer in the filled buffer map. Send wakeup signal to consumer thread.

6. Consumer thread follows below subroutine:
i. Get the consumer_sequence number.
ii. Go to sleep till the buffer corresponding to consumer_sequence number is not present in the filled buffer map.
iii. Consume the buffer corresponding to sequence number.
iv. Delete the consumed buffer entry from filled buffer map and Enqueue the consumed buffer to empty buffer queue. (in turn wakeup any producer thread waiting for a buffer to produce).

Below is pseudo code which uses thread safe implementation of buffer queue to solve the producer consumer problem.

#define N 100
#define SEQ_TO_PRODUCE 10000

class orderedProducerConsumer
{
public:
    orderedProducerConsumer();
    ~orderedProducerConsumer();
    void
    producerSubroutine();
    void
    consumerSubroutine();

private:
    bufferQueue *m_emptyQueue
    unordered_map m_filledMap;
    UINT32 m_producerSequence;
    UINT32 m_cosumerSequence;
    Mutex *m_seqMutex;
    ConditionMutex *m_bufferMapCondMutex;
};

orderedProducerConsumer::orderedProducerConsumer()
{
    // Initialize empty queue with N items.
    m_emptyQueue = new bufferQueue(N);
    // Initialize sequence numbers to 0.
    m_producerSequence = m_cosumerSequence = 0;
    // Initialize mutex & condition mutex.
    m_seqMutex = new Mutex();
    m_bufferMapCondMutex = new ConditionMutex();
}

// Multiple threads can be spawned to run producer routine.
void
orderedProducerConsumer::producerSubroutine()
{
    buffer *node;
    UINT32 mySequence;

    while (true) {
        // Get the sequence number to produce data.
        m_seqMutex->lock();
            mySequence = m_producerSequence;
            if (mySequence  >= SEQ_TO_PRODUCE) {
                // Entire range of required data produced.
                // Producer thread can exit now.
                m_seqMutex->unlock();
                break;
            }
            m_producerSequence++;
        m_seqMutex->unlock();
        // Get an empty buffer.
        node = m_emptyQueue->dequeueBuffer();
        // Produce data corresponding to sequence number.
        produceData(mySequence, node);
        // Put the produced buffer in filled map for consumer.
        m_bufferMapCondMutex->lock();
            m_filledMap[mySequence] = node;
            // Send wakeup call to consumer.
            m_bufferMapCondMutex->signal();
        m_bufferMapCondMutex->unlock();
    }
}

// Single thread will be spawned to run consumer routine.
void
orderedProducerConsumer::consumerSubroutine()
{
    buffer *node;

    while (m_cosumerSequence < SEQ_TO_PRODUCE) {
        // Sleep till required buffer is not produced.
        m_bufferMapCondMutex->lock();
            while (m_filledMap.find(m_cosumerSequence) !=
                   m_filledMap.end()) {
                m_bufferMapCondMutex->wait();
                // After unblocking check the condition again.
            }
            // Extract the buffer from map.
            node = m_filledMap[m_cosumerSequence];
            m_filledMap.erase(m_cosumerSequence);
        m_bufferMapCondMutex->unlock();

        // Consume the buffer.
        consumeData(m_cosumerSequence, node);
        // Put the consumed buffer in empty queue for producers.
        m_emptyQueue->enqueueBuffer(node);
        // Increment the consumer sequence number.
        m_cosumerSequence++;
    }
}

Synchronisation primitives: mutex, condition variable, semaphore

Starting a series in IPC synchronisation.

A. Mutex
Mutex stands for mutual exclusion. Only one thread in a program can lock any given mutex at a time. Thus a code section protected by mutex lock becomes critical section and is ensured to be executed by only one thread at a time. If a thread tries to lock a mutex that is already locked by some other thread, the calling thread blocks till the mutex unlocked.
The general use of mutex looks like:

mutex_lock()
    perform critical section operations
mutex_unlock()

The pseudocode used in subsequent posts will use below class definition for POSIX implementation of mutex:

class Mutex
{

public:
    Mutex()
    {
        pthread_mutex_init(&m_mutex, &attr);
    }

    ~Mutex()
    {
        pthread_mutex_destroy(&m_mutex);
    }

    void lock()
    {
        pthread_mutex_lock(&m_mutex);
    }

    void unlock()
    {
        pthread_mutex_unlock(&m_mutex);
    }

private:
    pthread_mutex_t m_mutex;
};

B. Condition variables
mutex are used for locking but they don’t provide the waiting functionality. Condition variables are used for waiting.
A condition variable has cond_wait function to wait for a condition to become true. And function cond_signal to unblock/wakeup a waiting thread when the condition becomes true. Similarly there is function cond_broadcast to unblock/wakeup all the waiting threads when the condition becomes true. A mutex is always associated with condition variable.

The waiting action for a condition variable looks like below:

mutex_lock(mutex)
    while (condition is false)
        cond_wait(cond_var, mutex);
    take appropriate action on true condition
    modify the condition
mutex_unlock(mutex)

When cond_wait puts the calling thread to sleep, two things happen: the corresponding mutex is unlocked and the calling thread is put to sleep till some other thread calls cond_signal.
After receiving the wakeup call, the sleeping thread first acquires the lock for the corresponding mutex and only after that it returns from the cond_wait call.
Another thing to notice is that whenever the calling process returns from the call cond_wait we always test the condition again because spurious wakeups can occur: a wakeup when desired condition is still not true.

The signaling action for a condition variable looks like below:

mutex_lock(mutex)
    set condition true
    cond_signal(cond_var);
mutex_unlock(mutex)

The pseudocode used in subsequent posts will use below class definition for POSIX implementation of condition variables:

class ConditionMutex
{

public:
    ConditionMutex()
    {
        pthread_cond_init(&m_cond, NULL);
        pthread_mutex_init(&m_mutex, NULL);
    }

    ~ConditionMutex()
    {
        pthread_cond_destroy(&m_cond);
        pthread_mutex_destroy(&m_mutex);
    }

    void lock()
    {
        pthread_mutex_lock(&m_mutex);
    }

    void unlock()
    {
        pthread_mutex_unlock(&m_mutex);
    }

    void broadcast()
    {
        pthread_cond_broadcast(&m_cond);
    }

    void signal()
    {
        pthread_cond_signal(&m_cond);
    }

    void wait()
    {
        pthread_cond_wait(&m_cond, &m_mutex);
    }
private:

    pthread_mutex_t m_mutex;
    pthread_cond_t m_cond;
};

C. Semaphore
Semaphore provides three operations.
1. Initialize: To intialize a semaphore caller needs to provide the initial value of the semaphore.
2. Wait: The thread calling wait operation on semaphore blocks till the value of semaphore variable is less than or equal to 0. Then the semaphore value is decremented by 1.
3. Post: The calling thread increments the value of semaphore by 1, and if there are threads waiting on this semaphore value, one of them is awoken.

In the most simplistic way one can visualize post and wait operation on semaphore like below:

post (semaphore s)
{
    s = s+1;
}

wait (semaphore s)
{
    while (s<=0)
    {
        // do nothing
        ;
    }
    s = s-1;
}

The pseudocode used in subsequent posts will use below class definition for POSIX implementation of semaphore:

class Semaphore
{

public:
    Semaphore(
        IN UINT32 initialValue
        )
    {
        sem_init(&m_semaphore, 0, initialValue);
    }

    ~Semaphore()
    {
        sem_destroy(&m_semaphore);
    }

    void wait()
    {
        sem_wait(&m_semaphore);
    }

    void post()
    {
        sem_post(&m_semaphore);
    }

private:

    sem_t m_semaphore;
};