Thread safe Buffer Queue – Python code

Q. Implement Thread safe Buffer Queue in Python.

Ans: Check out below Python3 code.

Thread safe buffer queue using condition variable.

from ctypes import *
import threading
import random
import time

class buffer:
    def __init__(self, bufferLength=0, nextNode=None):
        if (0 == bufferLength):
            bufferLength = 1024
        self.data = bytes(bufferLength)
        self.next = nextNode
        self.bufferLength = bufferLength

    def getData(self):
        return self.data[:self.dataLength]

    def setData(self, data, dataLength):
        memmove(self.data, data, dataLength)
        self.dataLength = dataLength

    def getNext(self):
        return self.next

    def setNext(self, nextNode):
        self.next = nextNode

class bufferQueue:
    def __init__(self, length):
        self.head = None
        self.condMutex = threading.Condition()
        for i in range(length):
            newNode = buffer()
            newNode.setNext(self.head)
            self.head = newNode

    def enqueueBuffer(self, node):
        # Get the exclusive access of the buffer queue.
        self.condMutex.acquire()
        node.setNext(self.head)
        self.head = node
        # Notify any blocking thread that buffer is available.
        self.condMutex.notify()
        # Give up the exclusive access of the buffer queue.
        self.condMutex.release()

    def dequeueBuffer(self):
        # Get the exclusive access of the buffer queue.
        self.condMutex.acquire()
        # Block till buffer queue is empty.
        while (None == self.head):
            self.condMutex.wait()
        node = self.head
        self.head = node.getNext()
        # Give up the exclusive access of the buffer queue.
        self.condMutex.release()
        node.setNext(None)
        return node

def testBufferQueue(myQueue):
    myName = threading.currentThread().getName()
    for i in range(5):
        randomTime = random.randint(1,100)
        randomTime %= 5
        time.sleep(randomTime+1)
        print ('{0} requesting buffer from queue.'.format(myName))
        node = myQueue.dequeueBuffer()
        print ('{0} recieved buffer from queue.'.format(myName))
        time.sleep(randomTime+1)
        print ('{0} returning buffer to queue.'.format(myName))
        myQueue.enqueueBuffer(node)


if __name__== "__main__":
    myQueue = bufferQueue(1)
    t1 = threading.Thread(target=testBufferQueue, name='Thread1',args=(myQueue,))
    t2 = threading.Thread(target=testBufferQueue, name='Thread2',args=(myQueue,))

    t1.start()
    t2.start()
    t1.join()
    t2.join()

Thread safe buffer queue using semaphore.

class bufferQueue:
    def __init__(self, length):
        self.head = None
        self.semBuffer = threading.Semaphore(length)
        self.mutex = threading.Lock()
        for i in range(length):
            newNode = buffer()
            newNode.setNext(self.head)
            self.head = newNode

    def enqueueBuffer(self, node):
        # Get the exclusive access of the buffer queue.
        self.mutex.acquire()
        node.setNext(self.head)
        self.head = node
        # Give up the exclusive access of the buffer queue.
        self.mutex.release()
        # Release one buffer slot.
        self.semBuffer.release()

    def dequeueBuffer(self):
        # Consume one buffer slot.
        self.semBuffer.acquire()
        # Get the exclusive access of the buffer queue.
        self.mutex.acquire()
        node = self.head
        self.head = node.getNext()
        # Give up the exclusive access of the buffer queue.
        self.mutex.release()
        node.setNext(None)
        return node
Advertisements

Old Bridge – Python code

Q. Write the python code to solve the Old Bridge problem.

Ans: Check out below Python3 code.

import threading
import random
import time

class oldBridge (threading.Thread):
    s_lockBridge = None
    s_traffic = [None, None]
    s_mutex = [None, None]
    s_cars = [0, 0]
    s_entry = ['left', 'right']

    @staticmethod
    def initBridge():
        # Initialize bridge lock.
        oldBridge.s_lockBridge = threading.Semaphore(1)
        for i in range(2):
            # Max 3 cars in either direction.
            oldBridge.s_traffic[i] = threading.Semaphore(3)
            oldBridge.s_mutex[i] = threading.Lock()
            # Cars in either direction initially are zero.
            oldBridge.s_cars[i] = 0

    def __init__(self, car, direction):
        if (None == oldBridge.s_lockBridge):
            oldBridge.initBridge()
        threading.Thread.__init__(self)
        self.carNumber = car
        self.direction = direction

    def run(self):
        self.arriveBridge();
        self.drive();
        self.exitBridge();

    def arriveBridge(self):
        print ("Arriving car: {0} from direction: {1}".format(self.carNumber, oldBridge.s_entry[self.direction]));
        oldBridge.s_mutex[self.direction].acquire()
        if (0 == oldBridge.s_cars[self.direction]):
            # If you are the 1st car on your direction
            # Acquire exclusive lock on bridge.
            oldBridge.s_lockBridge.acquire()
        oldBridge.s_cars[self.direction] += 1
        oldBridge.s_mutex[self.direction].release()
        # Consume one slot for cars in same direction.
        oldBridge.s_traffic[self.direction].acquire()

    def drive(self):
        print ("Crossing car: {0} from direction: {1}".format(self.carNumber, oldBridge.s_entry[self.direction]));
        # Get random time to cross bridge.
        driveTime = random.randint(1,100)
        driveTime %= 3
        time.sleep(driveTime)

    def exitBridge(self):
        print ("Departing car: {0} from direction: {1}".format(self.carNumber, oldBridge.s_entry[self.direction]));
        oldBridge.s_mutex[self.direction].acquire()
        oldBridge.s_cars[self.direction] -= 1
        if (0 == oldBridge.s_cars[self.direction]):
            # If you are the last car on your direction
            # Give up exclusive access on bridge.
            oldBridge.s_lockBridge.release()
        oldBridge.s_mutex[self.direction].release()
        # Free up one slot for cars in same direction.
        oldBridge.s_traffic[self.direction].release()

if __name__== "__main__":
    threads = []
    for car in range(10):
        direction = random.randint(1,101)
        direction %= 2
        newcar = oldBridge(car, direction)
        threads.append(newcar)
        newcar.start()

    for thread in threads:
        thread.join()

Synchronisation for multithreaded programming

This is a small book explaining “Synchronisation for multithreaded programming”.

I learned theoretical aspects of synchronisation from the book: Unix Network Programming: Interprocess Communications – Vol.2: W. Richard Stevens
I got great hands on experience with synchronisation for multithreaded programming under K.K. George ‘s guidance.

This book covers following topics:

1. Synchronisation primitives: mutex, condition variable, semaphore

2. Thread safe Buffer Queue
    2.1 Python code – Thread safe Buffer Queue

3. Unordered producer consumer
    3.1 Python code – Unordered producer consumer

4. Ordered producer consumer
    4.1 Python code – Ordered producer consumer

5. Reader – Writer lock: Readers preference

6. Reader – Writer lock: Writers preference

7. Reader – Writer lock: Non starving

8. FIFO Mutex / Strict ordered Reader – Writer lock

9. Task Scheduler

10. Cigarette smoker’s problem
    10.1 Python code – Cigarette smoker’s problem

11. Sleeping barbers problem

12. Dinning philosopher’s problem

13. Building H2O

14. Old Bridge
    14.1 Python code – Old Bridge

Stay tuned to this page. I’ll keep on adding new pages as and when I come across interesting problems.

Reader Writer – Non starving

Q. The reader preference solution ends up starving the writers. The writer preference solution ends up starving the readers.
How to design a reader-writer lock problem with no starvation. No starvation means any thread should not wait indefinitely to acquire the lock.

Ans. Lets develop no starvation solution with a requirement that no thread should be blocked to acquire the lock by any other thread which is ready to acquire the lock at later point in time. To develop no starvation locks we are following lock request order. Lets consider a lock request order as shown below:
…. R1, R2, R3, R4, R5, W1 …..
R stands for read request and W stands for write request.
In this scenario, the order in which reader threads: “R1, R2, R3, R4” acquires shared lock and get access to shared object has no impact on the shared object and also note that one reader thread can not starve another reader thread. The only restriction is all the reader threads: “R1, R2, R3, R4” must acquire shared lock (in any order) before the writer thread W1 attempts to acquires the exclusive lock. To avoid this before a writer thread puts lock request there should not be any reader thread waiting to acquire the shared lock.

Lets consider another lock request sequence as shown below:
… W1, W2, W3, R1 …
When writer thread W1 has acquired the exclusive lock, if we allow both writers W2 & W3 to proceed with the lock request, both these writers will compete with each other to get the exclusive lock (kernel thread scheduling will decide which one of them will be unblocked once W1 gives up the exclusive lock.) Similarly when W2 has acquired the exclusive lock and if we allow both W3 & R1 to proceed with the lock request, both W3 and R1 will compete for the shared object access. To avoid this situation till a writer thread doesn’t give up the exclusive lock we must not allow other threads to put the lock request.

The strict ordering is based on two rules:
1. Before allowing a writer thread to compete for exclusive lock, one must make sure there are no readers waiting to acquire the shared lock.
2. When a writer has granted the exclusive lock no other lock request should be put till the writer thread gives up exclusive lock.

The below solution uses reader preference read-write lock for exclusive & shared access. For semaphore, condition variable and mutex pseudo implementation check this. In below code lock request granter function getAccess() decides the lock request order for read & write operations.

#define READ_REQUEST  1
#define WRITE_REQUEST 2

class RWlockType3
{
public:

    RWlockType3();
    ~RWlockType3();
    void
    acquireReadLock();
    void
    releaseReadLock();
    void
    acquireWriteLock();
    void
    releaseWriteLock();

private:

    UINT32 m_waitingReaderCount;
    Mutex *m_mutex;
    ConditionMutex *m_condMutex;
    Semaphore *m_writer;
    RWlockType1 *m_rwLock;

    void
    getAcess(
        byte requestType
        );
};

RWlockType3::RWlockType3()
{
    m_waitingReaderCount = 0;
    m_mutex = new Mutex();
    m_condMutex = new ConditionMutex();
    // Initialize semaphore with 0.
    m_writer = new Semaphore(0);
    m_rwLock = new RWlockType1();
}

void
RWlockType3::getAcess(
    byte requestType
    )
{
    // At a time, only one lock requester thread beyond this point.
    // (The mutex lock acquisition decides the lock request order.) 
    m_mutex->lock();
        // If it is read lock request
        // increment the waiting reader count.
        if (READ_REQUEST == requestType) {
            m_condMutex->lock();
                m_waitingReaderCount++;
            m_condMutex->unlock();
        } else if (WRITE_REQUEST == requestType) {
            // If it is write request
            // block till all the previous readers have acquired shared lock.
            m_condMutex->lock();
                while (m_waitingReaderCount != 0) {
                    m_condMutex->wait();
                }
            m_condMutex->unlock();
            // Do not accept any new lock request till writer has 
            // given up the exclusive lock.
            // Pl note that semaphore is initialized with 0.
            // this wait will block the calling thread till other thread
            // calls the post operation.
            m_writer->wait();
        }
    m_mutex->unlock();
    // Unblock other lock requester threads.
}

void
RWlockType3::acquireReadLock()
{
    // Get the permission to acquire read lock.
    getAcess(READ_REQUEST);
    // Acquire read lock.
    m_rwLock.acquireReadLock();
    // Read lock acquired. Decrement the waiting reader count.
    m_condMutex->lock();
        m_waitingReaderCount--;
        if (0 == m_waitingReaderCount) {
            // There are no more waiting readers.
            // Unblock a writer request blocked in getAcess().
            m_condMutex->signal();
        }
    m_condMutex->unlock();
}

void
RWlockType3::releaseReadLock()
{
    m_rwLock.releaseReadLock();
}

void
RWlockType3::acquireWriteLock()
{
    // Get the permission to acquire read lock.
    getAcess(WRITE_REQUEST);
    // At this point it is ensured by getAccess()
    // that there are no more waiting readers.
    m_rwLock.acquireWriteLock();
    // write lock acquired.
}

void
RWlockType3::releaseWriteLock()
{
    m_rwLock.releaseWriteLock();
    // writer given up exclusive lock.
    // unblock getAccess() to grant further lock requests.
    m_writer->post();
}

There is one slight problem with this solution. The lock request order is determined in getAccess() by a mutex lock. When many threads are competing to acquire mutex, kernel selects one of them to get the mutex lock. The mutex lock does not guarantee FIFO order for lock acquisition.
Thus though the solution has solved the starvation problem of readers writers, it does not impose the strict ordering among the readers and writers.

Synchronisation primitives: mutex, condition variable, semaphore

Starting a series in IPC synchronisation.

A. Mutex
Mutex stands for mutual exclusion. Only one thread in a program can lock any given mutex at a time. Thus a code section protected by mutex lock becomes critical section and is ensured to be executed by only one thread at a time. If a thread tries to lock a mutex that is already locked by some other thread, the calling thread blocks till the mutex unlocked.
The general use of mutex looks like:

mutex_lock()
    perform critical section operations
mutex_unlock()

The pseudocode used in subsequent posts will use below class definition for POSIX implementation of mutex:

class Mutex
{

public:
    Mutex()
    {
        pthread_mutex_init(&m_mutex, &attr);
    }

    ~Mutex()
    {
        pthread_mutex_destroy(&m_mutex);
    }

    void lock()
    {
        pthread_mutex_lock(&m_mutex);
    }

    void unlock()
    {
        pthread_mutex_unlock(&m_mutex);
    }

private:
    pthread_mutex_t m_mutex;
};

B. Condition variables
mutex are used for locking but they don’t provide the waiting functionality. Condition variables are used for waiting.
A condition variable has cond_wait function to wait for a condition to become true. And function cond_signal to unblock/wakeup a waiting thread when the condition becomes true. Similarly there is function cond_broadcast to unblock/wakeup all the waiting threads when the condition becomes true. A mutex is always associated with condition variable.

The waiting action for a condition variable looks like below:

mutex_lock(mutex)
    while (condition is false)
        cond_wait(cond_var, mutex);
    take appropriate action on true condition
    modify the condition
mutex_unlock(mutex)

When cond_wait puts the calling thread to sleep, two things happen: the corresponding mutex is unlocked and the calling thread is put to sleep till some other thread calls cond_signal.
After receiving the wakeup call, the sleeping thread first acquires the lock for the corresponding mutex and only after that it returns from the cond_wait call.
Another thing to notice is that whenever the calling process returns from the call cond_wait we always test the condition again because spurious wakeups can occur: a wakeup when desired condition is still not true.

The signaling action for a condition variable looks like below:

mutex_lock(mutex)
    set condition true
    cond_signal(cond_var);
mutex_unlock(mutex)

The pseudocode used in subsequent posts will use below class definition for POSIX implementation of condition variables:

class ConditionMutex
{

public:
    ConditionMutex()
    {
        pthread_cond_init(&m_cond, NULL);
        pthread_mutex_init(&m_mutex, NULL);
    }

    ~ConditionMutex()
    {
        pthread_cond_destroy(&m_cond);
        pthread_mutex_destroy(&m_mutex);
    }

    void lock()
    {
        pthread_mutex_lock(&m_mutex);
    }

    void unlock()
    {
        pthread_mutex_unlock(&m_mutex);
    }

    void broadcast()
    {
        pthread_cond_broadcast(&m_cond);
    }

    void signal()
    {
        pthread_cond_signal(&m_cond);
    }

    void wait()
    {
        pthread_cond_wait(&m_cond, &m_mutex);
    }
private:

    pthread_mutex_t m_mutex;
    pthread_cond_t m_cond;
};

C. Semaphore
Semaphore provides three operations.
1. Initialize: To intialize a semaphore caller needs to provide the initial value of the semaphore.
2. Wait: The thread calling wait operation on semaphore blocks till the value of semaphore variable is less than or equal to 0. Then the semaphore value is decremented by 1.
3. Post: The calling thread increments the value of semaphore by 1, and if there are threads waiting on this semaphore value, one of them is awoken.

In the most simplistic way one can visualize post and wait operation on semaphore like below:

post (semaphore s)
{
    s = s+1;
}

wait (semaphore s)
{
    while (s<=0)
    {
        // do nothing
        ;
    }
    s = s-1;
}

The pseudocode used in subsequent posts will use below class definition for POSIX implementation of semaphore:

class Semaphore
{

public:
    Semaphore(
        IN UINT32 initialValue
        )
    {
        sem_init(&m_semaphore, 0, initialValue);
    }

    ~Semaphore()
    {
        sem_destroy(&m_semaphore);
    }

    void wait()
    {
        sem_wait(&m_semaphore);
    }

    void post()
    {
        sem_post(&m_semaphore);
    }

private:

    sem_t m_semaphore;
};

GDB: find the thread which has locked the mutex

Q. In Linux if a multi threaded code seems hanged how to find which thread has locked the concerned mutex?
Ans:
1. Attach the gdb to the concerned process.

$sudo gdb -p pid

2. Get the information of all the running threads.

(gdb) info threads
 ......
 20 Thread 0x7f3804dde700 (LWP 19453) "XYZ" 0x00007f38db3afb9d in nanosleep () at ../sysdeps/unix/syscall-template.S:81
 19 Thread 0x7f37f82dd700 (LWP 19454) "XYZ" 0x00007f38db3afb9d in nanosleep () at ../sysdeps/unix/syscall-template.S:81
 18 Thread 0x7f37f7adc700 (LWP 19455) "XYZ" 0x00007f38db3af6dd in accept () at ../sysdeps/unix/syscall-template.S:81
 17 Thread 0x7f37f70d0700 (LWP 19460) "XYZ" __lll_lock_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/lowlevellock.S:135
 16 Thread 0x7f37f68cf700 (LWP 19461) "XYZ" 0x00007f38dbcecc0b in __memp_get_bucket () from /usr/lib/x86_64-linux-gnu/libdb_cxx-5.1.so
 15 Thread 0x7f37f60ce700 (LWP 19463) "XYZ" __lll_lock_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/lowlevellock.S:135
 14 Thread 0x7f37f58cd700 (LWP 19464) "XYZ" 0x00007f38db3af7eb in __libc_recv (fd=27, buf=0x7f37f58ccd88, n=4, flags=-616892437)
 at ../sysdeps/unix/sysv/linux/x86_64/recv.c:33
 13 Thread 0x7f37f50cc700 (LWP 19466) "XYZ" __lll_lock_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/lowlevellock.S:135
 12 Thread 0x7f37f48cb700 (LWP 19467) "XYZ" 0x00007f38db3af7eb in __libc_recv (fd=30, buf=0x7f37f48cad88, n=4, flags=-616892437)
 at ../sysdeps/unix/sysv/linux/x86_64/recv.c:33
 .......

3. Lets say thread 17 is not able to lock a mutex. Now check the details of the thread 17.

(gdb) thread 17
(gdb) where
#0  __lll_lock_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/lowlevellock.S:135
#1  0x00007f38db3aa664 in _L_lock_952 () from /lib/x86_64-linux-gnu/libpthread.so.0
#2  0x00007f38db3aa4c6 in __GI___pthread_mutex_lock (mutex=0x7f37e8004da8) at ../nptl/pthread_mutex_lock.c:114
#3  0x000000000041602f in LinuxMutex::lock (this=0x7f37e8004da0) at linux/mutex.cpp:34

Print the details of the locked mutex. In my case Mutex I’m trying to lock is of type LinuxMutex which in turn is a wrap over class around pthread_mutex_t. So I’m diplaying the class object, which has a private member variable m_mutex, which is of type pthread_mutex_t . In your case, you can directly print the pthread_mutex_t variable itself.

(gdb) p *((LinuxMutex *) 0x7f37e8004da0)
$4 = { = {_vptr.Mutex = 0x433130 }, m_mutex = {__data = {__lock = 2, __count = 1, __owner = 19461, __nusers = 1, __kind = 1,
      __spins = 0, __list = {__prev = 0x0, __next = 0x0}}, __size = "\002\000\000\000\001\000\000\000\005L\000\000\001\000\000\000\001", '\000' ,
    __align = 4294967298}}

The pthread_mutex_t variable has __data.__owner variable, which indicates the thread ID of the thread which has currently locked the mutex.
In our case the thread which has locked the mutex is 19461.

4. Check again the info threads to find out the thread which has locked the mutex.

(gdb) info threads
 ......
 20 Thread 0x7f3804dde700 (LWP 19453) "XYZ" 0x00007f38db3afb9d in nanosleep () at ../sysdeps/unix/syscall-template.S:81
 19 Thread 0x7f37f82dd700 (LWP 19454) "XYZ" 0x00007f38db3afb9d in nanosleep () at ../sysdeps/unix/syscall-template.S:81
 18 Thread 0x7f37f7adc700 (LWP 19455) "XYZ" 0x00007f38db3af6dd in accept () at ../sysdeps/unix/syscall-template.S:81
 17 Thread 0x7f37f70d0700 (LWP 19460) "XYZ" __lll_lock_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/lowlevellock.S:135
 16 Thread 0x7f37f68cf700 (LWP 19461) "XYZ" 0x00007f38dbcecc0b in __memp_get_bucket () from /usr/lib/x86_64-linux-gnu/libdb_cxx-5.1.so
 15 Thread 0x7f37f60ce700 (LWP 19463) "XYZ" __lll_lock_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/lowlevellock.S:135
 14 Thread 0x7f37f58cd700 (LWP 19464) "XYZ" 0x00007f38db3af7eb in __libc_recv (fd=27, buf=0x7f37f58ccd88, n=4, flags=-616892437)
 at ../sysdeps/unix/sysv/linux/x86_64/recv.c:33
 13 Thread 0x7f37f50cc700 (LWP 19466) "XYZ" __lll_lock_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/lowlevellock.S:135
 12 Thread 0x7f37f48cb700 (LWP 19467) "XYZ" 0x00007f38db3af7eb in __libc_recv (fd=30, buf=0x7f37f48cad88, n=4, flags=-616892437)
 at ../sysdeps/unix/sysv/linux/x86_64/recv.c:33
 .......

So here thread 16 is the thread which has locked the mutex needed by the thread 17.

Multithreading: Errno 9 (EBADF) – Bad File Number

If you’re randomly hitting “Bad File Number” error (Errno: 9) in a multithreaded application on Unix, then most likely you’re trapped in “Opened once but closed twice” bug.

What is this “Opened once but closed twice” bug?

Check out the below code:

foo ()
 {
     fd = open();
     // some processing.
     close (fd);
     // some processing.
     close (fd);
 }

The above code will work smoothly for a single threaded application. But in case of multithreading, if a thread “X” call open() (either in same function foo or any other function bar), when thread “Y” is in between the two close() calls, then in that case, thread “X” will get the same fd for open which thread “Y” was using (since open always return the smallest unused descriptor). File descriptors are shared by all the threads. So when thread “Y” carries out its second close(), it actually closes the file descriptor of thread “X”, which was valid & in use.

FD numbers are shared by all the threads of the same process. That means when thread “Y” closes FD number “n”, twice. Then in the second case of closing descriptor “n”, can result closing in use descriptor “n” by thread “X”. When the thread “X” starts using the descriptor “n”, it gets error – “Bad File Number” (Errno: 9).

Thus closing a descriptor twice in multithreaded application leads to random “Bad File Number” error (Errno: 9).