next up previous contents index
Next: 5.6 Verteilte Semaphore Up: 5 PVM Parallel Virtual Previous: 5.4 Empfangen und Entpacken

5.5 Produzenten Konsumenten Beispiel

aufgabe1340

Lösungsskizze:

        DEFINITION MODULE MsgPas;
        
        (* Message Passing with PVM Definition Module. *)
        
        FROM KSR1 IMPORT int; 
        FROM pvm IMPORT ptrtchr;
        
        TYPE task    = RECORD 
                       tid: int; 
                       flag: int; 
                       file: ptrtchr; 
                       arg: ptrtchr; 
                       where: ptrtchr; 
                       END; 
        
        TYPE channel = RECORD 
                       t: task; 
                       msgtag: int; 
                       mode: int; 
                       END; 
        
        PROCEDURE taskinit(VAR t: task); 
        (* Task initialization. *)
        
        PROCEDURE chaninit(VAR ch: channel; VAR t: task; no: int); 
        (* Channel initialization. *)
        
        PROCEDURE send(VAR ch: channel; v: INTEGER); 
        (* Send v to channel ch. *)
        
        PROCEDURE receive(VAR ch: channel; VAR v: INTEGER); 
        (* Receive v from channel ch. *)
        
        END MsgPas.

        IMPLEMENTATION MODULE MsgPas;
        
        (* Message Passing with PVM Implementation Module. *)
        
        FROM SYSTEM IMPORT ADR, ADDRESS; 
        FROM KSR1 IMPORT int; 
        
        FROM pvm IMPORT pvm_mytid, pvm_parent, pvm_exit, 
                        pvm_initsend, pvm_send, pvm_recv, 
                        pvm_pklong, pvm_upklong, 
                        PvmTaskDefault, 
                        PvmDataDefault, PvmDataRaw, 
                        PvmNoParent, 
                        ptrtchr, pvm_spawn; 
        
        FROM MASERR IMPORT ERROR, severe, fatal; 
        
        CONST file  = "pcmsg"; 
        CONST PARM  = " -m 400 "; 
        CONST where = ""; 
        
        VAR mytid: int; 
        
        PROCEDURE taskinit(VAR t: task); 
        (* Task initialization. *)
        VAR   argv: ARRAY [0..1] OF ptrtchr; 
              tids: ARRAY [0..1] OF int; 
              f: int; 
              tid: int; 
        BEGIN
        (*1*) (*initialize fields*)  
              t.tid:=-1; 
              t.flag:=PvmTaskDefault;       
              t.file:=ADDRESS(file); 
              t.arg:=ADR(PARM); 
              t.where:=ADDRESS(where); 
              IF mytid = -1 THEN mytid:=pvm_mytid(); END;       
        (*2*) (*see if we have a parent *) 
              tid:=pvm_parent(); 
              IF tid <> PvmNoParent THEN t.tid:=tid; 
                 RETURN; END;  
        (*3*) (*initialize spawn parms*)  
              argv[0]:=t.arg; argv[1]:=ADDRESS(0); 
        (*4*) (* create task or connect to parent *)
              f:=pvm_spawn( t.file, argv, t.flag, t.where, 1, tids); 
              IF f <> 1 THEN 
                 ERROR(severe,"taskinit: Cannot spawn pvm task."); 
                 RETURN; END; 
              t.tid:=tids[0];    
        (*9*) END taskinit;
        
        PROCEDURE chaninit(VAR ch: channel; VAR t: task; no: int); 
        (* Channel initialization. *)
        BEGIN
        (*1*) (*initialize *) 
              ch.t:=t;  
              ch.msgtag:=no; 
              ch.mode:=PvmDataRaw; 
        (*9*) END chaninit;
        
        PROCEDURE send(VAR ch: channel; v: INTEGER); 
        (* Send v to channel ch. *)
        VAR   val: ARRAY [0..1] OF int; 
        BEGIN 
        (*1*) (*init buffer*) 
              IF pvm_initsend( ch.mode ) < 0 THEN 
                 ERROR(severe,"send: Cannot initialize buffer."); END;  
        (*2*) (*deposit message*) val[0]:=v; 
              IF pvm_pklong( val, 1, 1 ) < 0 THEN  
                 ERROR(severe,"send: Cannot put into buffer."); END;  
        (*3*) (*send*) 
              IF pvm_send( ch.t.tid, ch.msgtag ) < 0 THEN  
                 ERROR(severe,"send: Cannot send on channel."); END;  
        (*9*) END send;
        
        PROCEDURE receive(VAR ch: channel; VAR v: INTEGER); 
        (* Receive v from channel ch. *)
        VAR   val: ARRAY [0..1] OF int; 
        BEGIN 
        (*1*) (*receive*) 
              IF pvm_recv( ch.t.tid, ch.msgtag ) < 0 THEN  
                 ERROR(severe,"receive: Cannot receive on channel."); END;  
        (*2*) (*deposit message*) 
              IF pvm_upklong( val, 1, 1 ) < 0 THEN  
                 ERROR(severe,"receive: Cannot get from buffer."); END;  
              v:=val[0]; 
        (*3*) END receive;
        
        BEGIN 
            mytid:=-1; 
        END MsgPas.

aufgabe1345

  

        MODULE pc1msg;
        
        (* Producer Consumer with PVM MP Module. *)
        
        FROM SYSTEM IMPORT ADDRESS, TSIZE; 
        
        FROM MsgPas IMPORT task, taskinit, channel, chaninit, 
                           send, receive; 
        
        FROM pvm IMPORT pvm_parent, pvm_exit; 
        
        FROM MASELEM IMPORT GAMMAINT;
        FROM MASERR  IMPORT harmless, severe, fatal, ERROR; 
        FROM MASBIOS IMPORT SWRITE, GWRITE, BLINES; 
        
        CONST maxprod = 1; 
        CONST maxcons = 1; 
        CONST maxwork = 10000; 
        
        VAR   co: task; 
              work: channel;  
           
        PROCEDURE Producer(VAR x: ADDRESS): ADDRESS; 
        (* same as in pcmsg.mi. *)
        
        PROCEDURE Consumer(VAR x: ADDRESS): ADDRESS; 
        (* same as in pcmsg.mi. *)
        
        PROCEDURE tuwas;
        (* wie der name schon sagt. *)
        VAR   i, o: INTEGER; 
        BEGIN
        (*0*) (*initialize*) i:=1; 
              taskinit(co); 
              chaninit(work, co, 1);  
        (*2*) (*start*) 
              IF pvm_parent() < 0 THEN o:=Producer(i); 
                                  ELSE o:=Consumer(i) END; 
        (*3*) (*exit*)
              pvm_exit();   
        (*9*) END tuwas;
        
        BEGIN 
              tuwas;
        END pc1msg.



parallel@rz.uni-mannheim.de
Mon Okt 28 14:38:25 PST 1996