next up previous contents index
Next: A Benutzung der Compiler Up: 5.6 Verteilte Semaphore Previous: 5.6.1 Logische Uhren

5.6.2 User und Helper Pthread

Jeder beteiligte Prozess führt lokal einen Semaphorenzähler und eine Tabelle (request message queue) der verlangten Semaphoroperationen, siehe Abbildung 5.2. Die Tabelle wird nach der Sendezeit (ts) und dem Sender (s) der Anforderungen sortiert gehalten.

   table1370
Tabelle 5.2: Request Message Queue

Jede Anforderung von einer P bzw. V Operation wird lokal vermerkt und an alle Partner ein `Acknowledgement' geschickt, daß man die Anforderung vermerkt hat. Falls von allen Partnern wieder die Acknowledgements eingetroffen sind wird die angeforderte Operation lokal an dem Semaphorenzähler ausgeführt und gegebenenfalls die weitere Verarbeitung angestossen. Im einzelnen sind folgende Schritte zu beachten.

Mit PVM müssen in einem Vorbereitungsschritt die taskids  aller beteiligten Prozesse verteilt und empfangen werden.

Die Kanäle sind als

channel semch ( sender: INTEGER, action: Action, timestmp: INTEGER)
definiert, mit
Action = { P, V, Ack, Fin }.

Die lokale Bearbeitung in einem Prozess wird von je einem `User' und einem `Helper' Pthread durchgeführt.

await soll mit einem lokalen Semaphore implementiert werden.

Damit ergibt sich folgende Losungsskizze für distributed semaphores. 

User(i):

 P(): broadcast c-sem( ..., P, ...); await go end; 
 V(): broadcast c-sem( ..., V, ...);

Helper(i):

 receive c-sem[i]( ..., action, ... );
 CASE 
 action =   P:   / insert into queue, 
 action =   V:   \ broadcast c-sem( ..., Ack, ... )
 action = Ack:  at:=minimal Ack-time; 
                FORALL acknowledged V DO INC(sem); 
                       remove from queue END;   
                FORALL acknowledged P DO DEC(sem); 
                       IF my-request THEN signal go END; 
                       remove from queue END; 
 action = Fin: EXIT Loop;

Es folgt das Listing des Programms.

        MODULE dsema;
        
        (* Distributed Semaphores with PVM MP Module. *)
        
        FROM SYSTEM IMPORT ADDRESS, TSIZE; 
        FROM KSR1 IMPORT int; 
        
        FROM MsgPas IMPORT task, taskinit, channel, chaninit, 
                           send, receive; 
        
        FROM pvm IMPORT pvm_mytid, pvm_parent, pvm_exit, 
                        pvm_initsend, pvm_send, pvm_recv, 
                        pvm_pklong, pvm_upklong, 
                        PvmTaskDefault, 
                        PvmDataDefault, PvmDataRaw, 
                        PvmNoParent, 
                        ptrtchr, pvm_spawn; 
        
        FROM pthread IMPORT pthread_t, pthread_mutex_t, pthread_cond_t,  
                     attr_default, mutexattr_default, condattr_default, 
                     pthread_cond_init, pthread_cond_wait, pthread_cond_broadcast, 
                     pthread_mutex_init, pthread_mutex_lock, pthread_mutex_unlock, 
                     pthread_create, pthread_join;
        
        IMPORT sema; 
        
        FROM MASELEM IMPORT GAMMAINT;
        FROM MASERR  IMPORT harmless, severe, fatal, ERROR; 
        FROM MASBIOS IMPORT SWRITE, GWRITE, BLINES; 
        
        CONST maxco = 3; 
        CONST maxq  = 100; 
        
        CONST P     = 10;
              V     = 20;
              Ack   = 99;
              Fin   = 0;
              Undef = -1;
        
        TYPE event = RECORD 
              send : INTEGER; 
            action : INTEGER;
                ts : INTEGER; 
                     END; 
        VAR mq: ARRAY[0..maxq] OF event; 
        VAR valid: INTEGER; 
            lc   : INTEGER;
        
        PROCEDURE insert(s, a, t: INTEGER); 
        (* Insert event in queue. *)
        VAR   i, j, k: INTEGER; 
        BEGIN 
        (*1*) (*search location *) 
              i:=0; j:=-1; k:=-1; 
              WHILE (i <= valid) AND ( (mq[i].ts < t) 
                    OR ((mq[i].ts = t) AND (mq[i].send < s)) ) DO 
                    IF mq[i].action = Undef THEN k:=i END; 
                    i:=i+1 END;  
        (*2*) (*insert *) 
              IF i > valid THEN (*enter new*) 
                 valid:=valid+1; 
                 mq[valid].send:=s; 
                 mq[valid].action:=a; 
                 mq[valid].ts:=t; 
                 RETURN; 
                 END; 
              (* now i <= valid *) 
              (* and (mq[i].ts > t) OR ((mq[i].ts = t) AND (mq[i].send > s)) *)
                                                           (*  = unmoeglich*)
              valid:=valid+1; j:=valid; 
              WHILE i < j DO (*move up*)
                    mq[j].send:=mq[j-1].send; 
                    mq[j].action:=mq[j-1].action; 
                    mq[j].ts:=mq[j-1].ts; 
                    j:=j-1 END;  
              mq[i].send:=s; 
              mq[i].action:=a; 
              mq[i].ts:=t; 
        (*3*) (*compress *) IF k < 0 THEN RETURN END; 
              i:=0; 
              WHILE (i <= valid) AND (mq[i].action <> Undef) DO
                    i:=i+1 END;  
              j:=i; i:=i-1; 
              WHILE j <= valid DO
                    IF mq[j].action = Undef 
                       THEN j:=j+1  
                       ELSE mq[i].send:=mq[j].send; 
                            mq[i].action:=mq[j].action; 
                            mq[i].ts:=mq[j].ts; 
                            j:=j+1; i:=i+1 END;  
                    END; 
              valid:=i-1; 
        (*9*) END insert;
        
        PROCEDURE Acktime(s, t: INTEGER): INTEGER; 
        (* Determine fully acknowledgement time. *)
        VAR   i, j: INTEGER; 
        BEGIN 
        (*1*) (*update *) 
              IF mintime[s] < t THEN mintime[s]:=t END; 
        (*2*) (*find minimum *) 
              j:=mintime[0]; i:=1;  
              WHILE i <= maxco DO 
                    IF mintime[i] < j THEN j:=mintime[i] END;  
                    i:=i+1; END; 
              RETURN(j); 
        (*9*) END Acktime;
        
        PROCEDURE broadcast3(VAR cha: ARRAY OF channel; n, v1, v2, v3: INTEGER); 
        (* Send v1, v2, v3 to first n channels of cha. *)
        VAR   i: INTEGER; 
        BEGIN 
        (*1*) (*loop *) i:=0; 
              WHILE i <= n DO send3(cha[i], v1, v2, v3); 
                    i:=i+1 END;  
        (*9*) END broadcast3;
        
        PROCEDURE send3(VAR ch: channel; v1, v2, v3: INTEGER); 
        (* Send v1, v2, v3 to channel ch. *)
        VAR   val: ARRAY [0..2] OF int; 
        BEGIN 
        (*1*) (*init buffer*) 
              sema.P(bmux); 
              IF pvm_initsend( ch.mode ) < 0 THEN 
                 ERROR(severe,"send3: Cannot initialize buffer."); END;  
        (*2*) (*deposit message*) 
              val[0]:=v1; 
              val[1]:=v2; 
              val[2]:=v3; 
              IF pvm_pklong( val, 3, 1 ) < 0 THEN  
                 ERROR(severe,"send3: Cannot put into buffer."); END;  
        (*3*) (*send*) 
              IF pvm_send( ch.t.tid, ch.msgtag ) < 0 THEN  
                 ERROR(severe,"send3: Cannot send on channel."); END;  
              sema.V(bmux); 
        (*9*) END send3;
        
        PROCEDURE receive3(VAR ch: channel; VAR v1, v2, v3: INTEGER); 
        (* Receive v1, v2, v3 from channel ch. *)
        VAR   val: ARRAY [0..2] OF int; 
        BEGIN 
        (*1*) (*receive*) 
              IF pvm_recv( ch.t.tid, ch.msgtag ) < 0 THEN  
                 ERROR(severe,"receive3: Cannot receive on channel."); END;  
        (*2*) (*deposit message*) 
              IF pvm_upklong( val, 3, 1 ) < 0 THEN  
                 ERROR(severe,"receive3: Cannot get from buffer."); END;  
              v1:=val[0]; 
              v2:=val[1]; 
              v3:=val[2]; 
        (*3*) END receive3;
        
        VAR   co: ARRAY [0..maxco+1] OF task; 
              semch: ARRAY [0..maxco+1] OF channel;  
              mintime: ARRAY [0..maxco] OF INTEGER; 
        
        PROCEDURE Master(VAR x: ADDRESS): ADDRESS; 
        (* Send coworkers all tids. *)
        VAR   i, j, k, m: INTEGER; 
              t: INTEGER;  
        BEGIN 
        (*1*) (*my number *) j:=INTEGER(x); 
              SWRITE("Master start "); GWRITE(j); BLINES(0); 
        (*2*) (*initialize coworkers*) 
              i:=0; 
              co[i].tid:=mytid; (*hack*)
              chaninit(semch[i], co[i], 1);  
              i:=1; 
              WHILE i <= maxco DO 
                    taskinit(co[i]); 
                    chaninit(semch[i], co[i], 1);  
                    i:=i+1; END; 
              i:=maxco+1; 
              co[i].tid:=-1; (*hack*)
              chaninit(semch[i], co[i], 1);  
        (*3*) (*broadcast tids except of master*) 
              i:=1; 
              WHILE i <= maxco DO m:=1; 
                    WHILE m <= maxco DO  
                          send(semch[i],co[m].tid); 
                          m:=m+1; END;  
                    i:=i+1; END;  
        (*5*) (*finish*) k:=0;
              SWRITE("Master done "); GWRITE(j); BLINES(0); 
              RETURN( ADDRESS(k) )
        (*9*) END Master;
        
        PROCEDURE Coworker(VAR x: ADDRESS): ADDRESS; 
        (* Receive tids of coworkers. *)
        VAR   i, j, k, m: INTEGER;
              t: INTEGER;  
        BEGIN 
        (*1*) (*my number *) j:=INTEGER(x);        
              SWRITE("Coworker start "); GWRITE(j); BLINES(0); 
        (*2*) (*initialize channel to master*) i:=0; 
              taskinit(co[i]); 
              chaninit(semch[i], co[i], 1);  
        (*2*) (*receive tids of coworkers*) k:=-1; 
              m:=1; 
              WHILE m <= maxco DO  
                    receive(semch[i],t); 
                    IF t = mytid THEN k:=m END; 
                    co[m].tid:=t;  (*hack*)
                    chaninit(semch[m], co[m], 1);  
                    m:=m+1; END;  
              m:=maxco+1; 
              co[m].tid:=-1; (*hack*)
              chaninit(semch[m], co[m], 1);  
        (*5*) (*finish*) 
              SWRITE("Coworker done "); GWRITE(j); BLINES(0); 
              RETURN( ADDRESS(k) )
        (*9*) END Coworker;
        
        PROCEDURE DoWork(VAR x: ADDRESS): ADDRESS; 
        (* Do some work. *)
        VAR   i, j, k: INTEGER;
              ts: INTEGER;  
        BEGIN 
        (*1*) (*my number *) j:=INTEGER(x);        
              SWRITE("Do work start "); GWRITE(j); BLINES(0); 
              lc:=0; 
        (*2*) (*get lock*) 
              broadcast3(semch,maxco,j,P,lc); INC(lc); 
              sema.P(smux); 
              SWRITE("Lock now held by "); GWRITE(j); BLINES(0); 
        (*3*) (*release lock*) 
              broadcast3(semch,maxco,j,V,lc); INC(lc); 
        (*5*) (*finish*) k:=0; 
              IF j = 0 THEN ERROR(severe,"DoWork: need a break."); 
                            broadcast3(semch,maxco,j,Fin,lc); INC(lc); 
                       ELSE ERROR(harmless,"DoWork: need a break."); END; 
              SWRITE("Do work done "); GWRITE(j); BLINES(0); 
              RETURN( ADDRESS(k) )
        (*9*) END DoWork;
        
        PROCEDURE DoHelp(VAR x: ADDRESS): ADDRESS; 
        (* Do some help. *)
        VAR   i, j, k: INTEGER;
              lc, ts, at: INTEGER;  
              sem: INTEGER; 
              sender, action: INTEGER; 
        BEGIN 
        (*1*) (*my number *) j:=INTEGER(x);        
              SWRITE("Do help start "); GWRITE(j); BLINES(0); 
              lc:=0; 
              sem:=1; (*one may get the lock*) 
        (*2*) (*process requests *) 
        LOOP  
              receive3(semch[maxco+1],sender,action,ts); 
                       IF ts >= lc THEN lc:=ts+1 END; INC(lc); 
              SWRITE("DoHelp: sender = "); GWRITE(sender); 
              SWRITE(", action = "); GWRITE(action); 
              SWRITE(", timestamp = "); GWRITE(ts); 
              IF action <> Ack THEN BLINES(0); END;  
              CASE action OF 
              |         P : insert(sender,action,ts); 
                            broadcast3(semch,maxco,j,Ack,lc); INC(lc); 
              |         V : insert(sender,action,ts);
                            broadcast3(semch,maxco,j,Ack,lc); INC(lc); 
              |       Ack : at:=Acktime(sender,ts); 
                            SWRITE(", Acktime = "); GWRITE(at); BLINES(0);  
                            i:=0; 
                            WHILE (i <= valid) AND (mq[i].ts < at) DO 
                                  IF mq[i].action = V THEN 
                                     sem:=sem+1; 
                                     mq[i].action:=Undef; (* = remove *) 
                                     END; 
                                  i:=i+1 END; 
                            i:=0; 
                            WHILE (i <= valid) AND (mq[i].ts < at) DO 
                                  IF mq[i].action = P THEN 
                                     IF sem > 0 THEN sem:=sem-1; 
                                        IF mq[i].send = j THEN (*my request*) 
                                           sema.V(smux); END;  
                                        mq[i].action:=Undef; (* = remove *) 
                                        END;  
                                     END; 
                                  i:=i+1 END; 
              |       Fin : IF sem = 1 THEN EXIT;   
                               ELSE ERROR(harmless,"DoHelp: semaphore <> 1."); 
                                    EXIT; END; 
                       ELSE ERROR(severe,"DoHelp: invalid action"); 
                            EXIT END; 
              END (*loop*); 
        (*5*) (*finish*) 
              SWRITE("Do help done "); GWRITE(j); BLINES(0); 
              RETURN( ADDRESS(k) )
        (*9*) END DoHelp;
        
        VAR mytid: int; 
        VAR bmux, smux: sema.semaphore; 
        
        PROCEDURE tuwas;
        (* wie der name schon sagt. *)
        VAR   i, o, j: INTEGER; 
              pt: pthread_t; 
        BEGIN
        (*1*) (*initialization*) mytid:=pvm_mytid(); 
              valid:=-1;
              j:=0;
              WHILE j <= maxco DO mintime[j]:=-1; 
                    j:=j+1 END;  
              sema.seminit(bmux,1); 
              sema.seminit(smux,0); 
        (*2*) (*start*) i:=1; 
              IF pvm_parent() < 0 THEN o:=Master(i); 
                                  ELSE o:=Coworker(i) END; 
              i:=o; 
        (*3*) (*do work and help *)
              IF pthread_create(pt,attr_default,DoHelp,i) < 0 THEN 
                 ERROR(severe,"Cannot create DoHelp."); END;  
              o:=DoWork(i);  
              IF pthread_join(pt,o) < 0 THEN 
                 ERROR(severe,"Cannot join DoHelp."); END;  
        (*4*) (*exit*)
              pvm_exit();   
        (*9*) END tuwas;
        
        BEGIN 
              tuwas;
        END dsema.

Die Ausgabe könnte zum Beispiel wie folgt aussehen. Es wird nur die Ausgabe des Master Prozesses gezeigt.

     Master start 1
     Master done 1
     Do work start 0
     Do help start 0
     DoHelp: sender = 0, action = 10, timestamp = 0
     DoHelp: sender = 0, action = 99, timestamp = 2, Acktime = -1
     DoHelp: sender = 1, action = 10, timestamp = 0
     DoHelp: sender = 2, action = 10, timestamp = 0
     DoHelp: sender = 2, action = 99, timestamp = 2, Acktime = -1
     DoHelp: sender = 0, action = 99, timestamp = 5, Acktime = -1
     DoHelp: sender = 2, action = 99, timestamp = 5, Acktime = -1
     DoHelp: sender = 1, action = 99, timestamp = 2, Acktime = 2
     DoHelp: sender = 2, action = 99, timestamp = 7, Acktime = 2
     DoHelp: sender = 0, action = 99, timestamp = 7, Acktime = 2
     DoHelp: Lock now held by 0
     ** severe error: DoWork: need a break. 
     (a)bort, (b)reak, (c)ontinue, (d)ebug, <ENTER> ? 
     sender = 1, action = 99, timestamp = 5, Acktime = 5
     DoHelp: sender = 0, action = 20, timestamp = 1          (Bem. *)
     DoHelp: sender = 2, action = 99, timestamp = 16, Acktime = 5
     DoHelp: sender = 0, action = 99, timestamp = 16, Acktime = 5
     DoHelp: sender = 1, action = 99, timestamp = 7, Acktime = 7
     DoHelp: sender = 1, action = 99, timestamp = 16, Acktime = 16
     DoHelp: sender = 1, action = 20, timestamp = 1
     DoHelp: sender = 0, action = 99, timestamp = 22, Acktime = 16
     DoHelp: sender = 2, action = 99, timestamp = 22, Acktime = 16
     DoHelp: sender = 2, action = 20, timestamp = 1
     DoHelp: sender = 2, action = 99, timestamp = 26, Acktime = 16
     DoHelp: sender = 1, action = 99, timestamp = 22, Acktime = 22
     DoHelp: sender = 0, action = 99, timestamp = 26, Acktime = 22
     DoHelp: sender = 1, action = 99, timestamp = 26, Acktime = 26
     continue. 
     Do work done 0
     DoHelp: sender = 0, action = 0, timestamp = 2
     Do help done 0

Nach Master done sind die Subtasks mit spawn gestartet und die Kanäle zwischen allen den Tasks initialisiert, d.h. auch die Subtasks kennen sich gegenseitig. Von den Subtasks werden entsprechend die Coworker Prozeduren abgearbeitet. Danach werden (von allen drei Tasks 0 1 2) die eigentlichen Arbeits- und Hilfstasks gestartet (DoWork und DoHelp).

Die Zeilen, die mit DoHelp: beginnen, zeigen nun die Aktivitäten in der Hilfstask nach dem Empfang eines Requests. Es wird jeweils der Sender, die verlangte Aktion und die logische Zeit des Requests angegeben. Die letzte Angabe ist die minimale Acknowledgement Zeit, d.h. die Zeit zu der sicher jede beteiligte Task die Requests registriert hat. Die Aktionen sind wie folgt kodiert: 10 tex2html_wrap_inline2396 P Operation, 20 tex2html_wrap_inline2396 V Operation, 99 tex2html_wrap_inline2396 Acknowledge Operation, 0 tex2html_wrap_inline2396 Terminiere.

Die einzelnen Aktivitäten sind wie folgt. P request von 0, Ack dazu von 0, P request von 1, P request von 2, Ack von 2, zweites Ack von 0, zweites Ack von 2, schließlich das noch fehlende erste Ack von 1. Ab diesem Zeitpunkt ist der erste P Request von allen Tasks registriert worden und die minimale Acknowledgement Zeit springt auf 2 (2 logische Ticks, da bis dahin 2 send/receives durchgeführt wurden). Wärend noch 2 Acks eintreffen (von 2 und 0) bekommt die Task 0 den Lock zugesprochen (Lock now held by 0). Die nachfolgende Fehlermeldung dient nur dazu die Programmausführung eine Weile aufzuhalten. Dann wird von Task 0 der Semaphore wieder freigegeben, d.h. es wird eine V Operation requested.

Nachdem alle Tasks diesen Request registriert haben bekommt Task 1 den Lock zugesprochen. Dies ist an dem nachfolgenden V Request von 1 ersichtlich. Warum bekommt die Task 1 den Lock und nicht Task 2, obwohl beide Tasks auf einen Lock warten ? Kann es sein, daß sich zwei Task beide den Lock zusprechen ? Nachdem von Task 1 der Semaphore wieder freigegeben wurde, und alle anderen Tasks den Request registriert haben, bekommt endlich auch Task 2 den Lock zugesprochen. Nach der Freigabe des Locks durch Task 2 terminieren alle Arbeits- und Hilfstasks.

Bemerkung: An der Stelle (Bem. *) befindet sich ein Fehler, der Timestamp müßte größer als 1 sein. Der Fehler entstand durch die Verwendung der lc Variablen als lokale Variable in DoWork und DoHelp, lc hätte eine globale Variable sein müssen. Der Fehler ist in dem Programm-Listing beseitigt, nicht aber in dem Beispiel. Allerdings müßten die Änderungen von lc atomar erfolgen, d.h. lc müßte durch einen Mutex geschützt werden.

Damit ist die Besprechung des Beispiels beendet.


next up previous contents index
Next: A Benutzung der Compiler Up: 5.6 Verteilte Semaphore Previous: 5.6.1 Logische Uhren

parallel@rz.uni-mannheim.de
Mon Okt 28 14:38:25 PST 1996