Notes by: Hojat Parta, Hesam Samimi
CS 111
Feb 6th 2008
Synchronization from ground up
Implementing Pipes (in multithreaded applications)
Assumptions:
• One writer
• One reader
• Read/Write synchronization for ld, st words.
• Buffer size is power of 2
enum {N=1024}; typedef struct { char buf[N]; unsigned int r, w; // read and write cursors } pipe_t;
|
** There is some tricky business with 1024 buffer size and unsigned int when it wraps around.
Here’s the code for read and write API:
void writec (pipe_t *p, char c) { while (p->w - p->r == N) continue; p->buf[p->w++%N]=c; }
char readc (pipe_t *p) { while (p->w - p->r == 0) continue; return p->buf[p->r++%N]; }
|
New assumptions:
• More than one writer
• More than one reader
• Read/Write synchronization for ld, st words.
• Buffer size is power of 2
Example:
$ (cat a & cat b) | sort
Problem: Second thread can write to the same location before the first thread gets a chance to increment write cursor!
Solution: Critical section.
General rules for creating critical sections:
1- Look for shared writes
2- Find all shared reads that generate data which are used in shared writes
3- Surround the code identified in (1) and (2) by lock and unlock pairs
void writec (pipe_t *p, char c) { while (p->w - p->r == N) // p->w and p->r are shared reads continue; p->buf[p->w++%N]=c; // p->w is shared read and write, p->buf[…] is a shared write }
|
The new code with critical section added:
lock_t l;
void writec (pipe_t *p, char c) { lock (&l); while (p->w - p->r == N) continue; p->buf[p->w++%N]=c; unlock (&l); }
char readc (pipe_t *p) { lock (&l); while (p->w - p->r == 0) continue; char c = p->buf[p->r++%N]; unlock (&l); return c; }
|
Problem: One global lock performs bad (slow)!
Solution: Having locks for each pipe.
The new code with lock for each pipe:
typedef struct { char buf[N]; unsigned int r, w; // read and write cursors spinlock_t l; } pipe_t;
void writec (pipe_t *p, char c) { lock (&p->l); while (p->w - p->r == N) continue; p->buf[p->w++%N]=c; unlock (&p->l); }
char readc (pipe_t *p) { lock (&p->l); while (p->w - p->r == 0) continue; char c = p->buf[p->r++%N]; unlock (&p->l); return c; }
|
Problem: With this approach we have looping inside of the critical section, which will cause halt!
Solution: Blocking Mutex.
Code for Blocking Mutex:
typedef struct { lock_t l; bool locked; proc_t *blocked_list; } bmutex_t;
void acquire (bmutex_t *b) { for (;;) { lock (&b->l); curr_process->blocked=true; // Set self to blocked state append (&b->blocked_list, curr_process); // assuming this avoid duplicates if (!b->locked) break; unlock (&b->l); yield(); } b->locked=true; curr_process->blocked=false; remove (&b->blocked_list, curr_process); unlock (&b->l); }
void release (bmutex_t *b) { // this function is equivalent to notifyAll lock (&b->l); b->locked=false; for (proc_t *p=b->blocked_list; p; p=p->next) p->blocked = false;
unblock (&b->l); }
|
Note: We could only set the process at the top of the blocked_list to runnable state which would make this function equivalent to notify instead of notifyAll.
Semaphores: (Dijkstra, 1960s)
Operates the same as blocking mutex except with unsigned int instead of bool (Blocking Butex is a binary Semaphore):
• Locked when the unsigned integer equals 0
• Unlocked when the unsigned integer is greater than 0
Goal: allow N simultaneous processes to access a resource.
Terminologies:
Down p grab acquire prolaag (probeer to verlagen = try-and-decrease)
Up v decrease release verhaag = increase
New code with blocking mutex:
typedef struct { char buf[N]; unsigned int r, w; // read and write cursors bmutex_t bm; } pipe_t;
void writec (pipe_t *p, char c) { for (;;) { acquire (&p->bm); if (p->w - p->r!=N) break; release (&p->bm); }
p->buf[p->w++%N]=c; release (&p->bm); }
char readc (pipe_t *p) { for (;;) { acquire (&p->bm); if (p->w - p->r!=0) break; release (&p->bm); }
char c = p->buf[p->r++%N]; release (&p->bm); return c; }
|
Problem: This approach is still polling!
Solution: We want to implement a code that provides us with following pseudo code:
"
acquire (&p->bm);
wait_for (p->w - p->r == N);
p->buf[p->w++%N]=c;
release (&p->bm);
"
Solution: Conditional Variables.
Conditional Variables:
Shared resource that contains value of an expression (Responsibility is on caller to establish this relationship).
We need:
• A boolean condition
• A blocking mutex protecting this expression
• A conditional variable representing the condition
New API:
void wait_for (cond_var_t *c, bmutex_t *b);
• You must acquire b as a precondition
• release b, then block until c becomes true
• reacquire b, then returns
void notify (cond_var_t c);
void notifyAll (cond_var_t c);
New code with conditional variable:
typedef struct { char buf[N]; unsigned int r, w; // read and write cursors bmutex_t bm; cond_var_t nonfull; cond_var_t nonempty; } pipe_t;
void writec (pipe_t *p, char c) { acquire (&p->bm); while (p->w - p->r == N) wait_for(&p->nonfull, &p->bm);
p->buf[p->w++%N]=c; notify (&p->nonempty); release (&p->bm); }
char readc (pipe_t *p) { acquire (&p->bm); while (p->w - p->r == 0) wait_for(&p->nonempty, &p->bm);
char c = p->buf[p->r++%N]; notify (&p->nonfull); release (&p->bm); return c; }
|
Problem: Consider this shell command:
... | cat | ...
which is equal to copying from STDIN to STDOUT ( reads(0,...) -> writes(1,... ) )
This will result in reading from the pipe into the user memory and then write it back to the kernel memeory!
Solution: we develop a new API called copyc:
int copyc (pipe_t *in, pipe_t *out, size_t size);
Example:
copyc (0, 1, 1024); // 1024 is the size of
Code for copyc:
int copyc (pipe_t *in, pipe_t *out, size_t size) { acquire (&in->bm); acquire (&out->bm); . . . release (&in->bm); release (&out->bm); }
|
Problem:
Thread 1: Thread 2:
copy (p0, p1); copy (p1, p0);
1- acquires p0's bm
2- acquires p1's bm
3- waiting for p1's bm
4- waiting for p0's bm
Deadlock!
Deadlock:
It's a race condition. There are 4 conditions required for deadlocks:
1- Circular wait
2- Mutual Exclusion
3- No preemption of lock
4- hold & wait (a thread can hold 1 lock while waiting for another one)
Note: To avoid deadlocks, eleminate any of these 4 conditions!