Notes by: Hojat Parta, Hesam Samimi Synchronization from ground up

CS 111

Feb 6th 2008

 

Synchronization from ground up

 

Implementing Pipes (in multithreaded applications)

 

Assumptions:

•           One writer

•           One reader

•           Read/Write synchronization for ld, st words.

•           Buffer size is power of 2

 

 

image1

 

enum {N=1024};

typedef struct {

            char buf[N];

            unsigned int r, w;       // read and write cursors

} pipe_t;

 

 

** There is some tricky business with 1024 buffer size and unsigned int when it wraps around.

 

Here’s the code for read and write API:

 

void writec (pipe_t *p, char c) {

            while (p->w - p->r == N)

                        continue;

            p->buf[p->w++%N]=c;

}

 

char readc (pipe_t *p) {

            while (p->w - p->r == 0)

                        continue;

            return p->buf[p->r++%N];

}

 

 

New assumptions:

•           More than one writer

•           More than one reader

•           Read/Write synchronization for ld, st words.

•           Buffer size is power of 2

 

image2

 

Example:

            $ (cat a & cat b) | sort

 

Problem: Second thread can write to the same location before the first thread gets a chance to increment write cursor!

 

Solution: Critical section.

 

General rules for creating critical sections:

1-         Look for shared writes

2-         Find all shared reads that generate data which are used in shared writes

3-         Surround the code identified in (1) and (2) by lock and unlock pairs

 

void writec (pipe_t *p, char c) {

            while (p->w - p->r == N) // p->w and p->r are shared reads

                        continue;

            p->buf[p->w++%N]=c;   // p->w is shared read and write, p->buf[…] is a shared write

}

 

 

 

The new code with critical section added:

 

lock_t l;

 

void writec (pipe_t *p, char c) {

            lock (&l);

            while (p->w - p->r == N)

                        continue;

            p->buf[p->w++%N]=c;

            unlock (&l);

}

 

char readc (pipe_t *p) {

            lock (&l);

            while (p->w - p->r == 0)

                        continue;

            char c = p->buf[p->r++%N];

            unlock (&l);

            return c;

}

 

 

 

Problem: One global lock performs bad (slow)!

 

Solution: Having locks for each pipe.

 

The new code with lock for each pipe:

 

typedef struct {

            char buf[N];

            unsigned int r, w;       // read and write cursors

            spinlock_t l;

} pipe_t;

 

void writec (pipe_t *p, char c) {

            lock (&p->l);

            while (p->w - p->r == N)

                        continue;

            p->buf[p->w++%N]=c;

            unlock (&p->l);

}

 

char readc (pipe_t *p) {

            lock (&p->l);

            while (p->w - p->r == 0)

                        continue;

            char c = p->buf[p->r++%N];

            unlock (&p->l);

            return c;

}

 

 

 

Problem: With this approach we have looping inside of the critical section, which will cause halt!

 

Solution: Blocking Mutex.

 

Code for Blocking Mutex:

 

 

typedef struct {

            lock_t l;

            bool locked;

            proc_t *blocked_list;

} bmutex_t;

 

void acquire (bmutex_t *b) {

            for (;;) {

                        lock (&b->l);

                        curr_process->blocked=true; // Set self to blocked state

                        append (&b->blocked_list, curr_process); // assuming this avoid duplicates

                        if (!b->locked)

                                    break;

                        unlock (&b->l);

                        yield();

            }

            b->locked=true;

            curr_process->blocked=false;

            remove (&b->blocked_list, curr_process);

            unlock (&b->l);

}

 

void release (bmutex_t *b) { // this function is equivalent to notifyAll

            lock (&b->l);

            b->locked=false;

            for (proc_t *p=b->blocked_list; p; p=p->next)

                        p->blocked = false;

 

            unblock (&b->l);

}

 

 

Note: We could only set the process at the top of the blocked_list to runnable state which would make this function equivalent to notify instead of notifyAll.

 

Semaphores: (Dijkstra, 1960s)

 

Operates the same as blocking mutex except with unsigned int instead of bool (Blocking Butex is a binary Semaphore):

 

•           Locked when the unsigned integer equals 0

•           Unlocked when the unsigned integer is greater than 0

 

Goal: allow N simultaneous processes to access a resource.

 

Terminologies:

Down   p          grab                 acquire prolaag (probeer to verlagen = try-and-decrease)

Up       v          decrease           release              verhaag = increase

 

New code with blocking mutex:

 

typedef struct {

            char buf[N];

            unsigned int r, w;       // read and write cursors

            bmutex_t bm;

} pipe_t;

 

void writec (pipe_t *p, char c) {

            for (;;) {

                        acquire (&p->bm);

                        if (p->w - p->r!=N)

                                    break;

                        release (&p->bm);

            }

 

            p->buf[p->w++%N]=c;

            release (&p->bm);

}

 

char readc (pipe_t *p) {

            for (;;) {

                        acquire (&p->bm);

                        if (p->w - p->r!=0)

                                    break;

                        release (&p->bm);

            }

 

            char c = p->buf[p->r++%N];

            release (&p->bm);

            return c;

}

 

 

 

Problem: This approach is still polling!

 

Solution: We want to implement a code that provides us with following pseudo code:

            "

            acquire (&p->bm);

            wait_for (p->w - p->r == N);

            p->buf[p->w++%N]=c;

            release (&p->bm);

            "

 

Solution: Conditional Variables.

 

Conditional Variables:

Shared resource that contains value of an expression (Responsibility is on caller to establish this relationship).

 

We need:

•           A boolean condition

•           A blocking mutex protecting this expression

•           A conditional variable representing the condition

 

New API:

 

void wait_for (cond_var_t *c, bmutex_t *b);

•           You must acquire b as a precondition

•           release b, then block until c becomes true

•           reacquire b, then returns

 

void notify (cond_var_t c);

void notifyAll (cond_var_t c);

 

New code with conditional variable:

 

typedef struct {

            char buf[N];

            unsigned int r, w;       // read and write cursors

            bmutex_t bm;

            cond_var_t nonfull;

            cond_var_t nonempty;

} pipe_t;

 

void writec (pipe_t *p, char c) {

            acquire (&p->bm);

            while (p->w - p->r == N)

                        wait_for(&p->nonfull, &p->bm);

 

            p->buf[p->w++%N]=c;

            notify (&p->nonempty);

            release (&p->bm);

}

 

char readc (pipe_t *p) {

            acquire (&p->bm);

            while (p->w - p->r == 0)

                        wait_for(&p->nonempty, &p->bm);

 

            char c = p->buf[p->r++%N];

            notify (&p->nonfull);

            release (&p->bm);

            return c;

}

 

 

 

Problem: Consider this shell command:

            ... | cat | ...

which is equal to copying from STDIN to STDOUT ( reads(0,...) -> writes(1,... ) )

 

            This will result in reading from the pipe into the user memory and then write it back to the kernel memeory!

 

Solution: we develop a new API called copyc:

 

int copyc (pipe_t *in, pipe_t *out, size_t size);      

 

Example:

            copyc (0, 1, 1024); // 1024 is the size of

           

 

Code for copyc:

 

int copyc (pipe_t *in, pipe_t *out, size_t size) {

            acquire (&in->bm);

            acquire (&out->bm);

            .

            .

            .

            release (&in->bm);

            release (&out->bm);

}

 

 

 

Problem:

 

            Thread 1:                                  Thread 2:

            copy (p0, p1);                          copy (p1, p0);

1-         acquires p0's bm

2-                                                         acquires p1's bm

3-         waiting for p1's bm

4-                                                         waiting for p0's bm

 

Deadlock!

 

Deadlock:

It's a race condition. There are 4 conditions required for deadlocks:

1-         Circular wait

2-         Mutual Exclusion

3-         No preemption of lock

4-         hold & wait (a thread can hold 1 lock while waiting for another one)

 

Note: To avoid deadlocks, eleminate any of these 4 conditions!