next up previous contents index
Next: 4 Distributed Memory Computer Up: 3 POSIX threads Previous: 3.6.3 await mit Semaphoren

3.7 Das Bounded Buffer Problem

  Mit Hilfe von Semaphoren wollen wir das folgende Problem lösen:

Zur Weitergabe der Daten dient ein Puffer,  der in der Lage sein soll, k Daten zu speichern. (Wobei tex2html_wrap_inline2650 , tex2html_wrap_inline2652 ). Falls der Puffer voll ist, sollen die Produzenten warten, bis wieder Platz ist und falls der Puffer leer ist, sollen die Konsumenten warten, bis wieder Daten vorhanden sind. Falls keine Arbeit mehr anliegt, sollen die Produzenten und die Konsumenten terminieren.

Der Puffer wird als Ringpuffer  implementiert, siehe Abbildung 3.2. Die Variablen front und rear zeigen jeweils auf den nächsten vollen Platz zum Lesen des Puffers bzw. auf den nächsten freien Platz zum Schreiben. Falls der Puffer nicht voll ist, kann ein Produzent in den Puffer schreiben und falls der Puffer nicht leer ist, kann ein Konsument aus dem Puffer lesen.

   table789
Tabelle 3.2: Ringpuffer

Die Synchronisation muß somit sicherstellen, daß der Produzent wartet, falls der Puffer voll ist und der Konsument wartet, falls der Puffer leer ist. D.h. sei x die Anzahl der Elemente im Puffer, dann muß immer gelten tex2html_wrap_inline2658 . Insbesondere muß gelten tex2html_wrap_inline2660 , damit der Produzent eine Nachricht schreiben kann und es muß gelten tex2html_wrap_inline2662 , damit der Konsument eine Nachricht lesen kann.

Damit ergibt sich folgender Ansatz:

Producer (i): LOOP  (* erzeuge *) 
                    await x < k then
                          (*put to buffer*); x:= x + 1 end
                    END

Consumer (i): LOOP await x >= 1 then
                         (*pop from buffer*); x:= x - 1 end
                   (* verbrauchen *)
                   END.
zusammen mit dem Hauptprogramm
       x:=0;
       con Producer (1); ...; Producer (n);
           Consumer (1); ...; Consumer (m) end;

Der Nachteil dieser Lösung ist, daß Produzenten und Konsumenten nur abwechselnd auf den Puffer zugreifen können. Eine Abhilfe schafft die Verwendung von zwei Zählern, z.B. e und f (empty und full), die wie folgt verwendet werden. Wobei e den Anfangswert k und f den Anfangswert 0 haben muß.

Producer (i): LOOP (* erzeuge *)
                   await e >= 1 then e:= e - 1 end;
                   (* put to buffer *)
                   atomic f:= f + 1 end
                   END;

Consumer (i): LOOP await f >= 1 then f:= f - 1 end;
                   (* pop from buffer *)
                   atomic e:= e + 1 end;
                   (* verbrauchen *)
                   END;

Zusätzlich muß noch sichergestellt werden, daß nicht mehrere Produzenten bzw. mehrere Konsumenten gleichzeitig auf den Puffer zugreifen. D.h. (* put to buffer *) muß auf Produzentenseite atomar sein und (* pop from buffer *) muß auf Konsumentenseite atomar sein.

Die beiden await Statements sind nun schon direkt mit Semaphoren zu implementieren:

        Producer: await e >= 1 then e:= e - 1 end
        Consumer: atomic e:= e + 1 end
wird zu
        Producer: P(empty) 
        Consumer: V(empty)
wobei empty als Semaphor mit k initialisiert wird. Und
        Consumer: await f >= 1 then f:= f - 1 end
        Producer: atomic f:= f + 1 end
wird zu
        Consumer: P (full)         
        Producer: V (full)
wobei full als Semaphor mit 0 initialisiert wird.

Damit ergibt sich die folgende Lösungsskizze

Producer (i): LOOP (* erzeugen *)
                   P(empty);
                   atomic_producer (* put to buffer *) end;
                   V(full);
                   END

Consumer (i): LOOP P(full):
                   atomic_consumer (* get from buffer *) end;
                   V (empty);
                   (* verbrauchen *)
                   END;

Zur Implementierung ist noch die Frage zu klären, wie die Arbeit verteilt werden soll. Wir nehmen an, daß am Anfang feststeht, wieviel Arbeit insgesamt zu tun ist. Diese Arbeit wird dann zu gleichen Teilen auf die Produzenten bzw. die Konsumenten aufgeteilt, wobei jeweils der 1. die verbleibenden Reste bearbeiten soll. (Unter der Annahme, daß für jeden mindestens eine Arbeit anfällt.) Falls die Arbeit dynamisch anfällt, müßte man Synchronisationspunkte einführen, nach denen jeweils die Arbeit neu verteilt wird.

//  Producer-Consumer Problem mit Semaphoren (Bounded Buffer)
/*****************************************************************************
 * pcsem   Februar 96                               Parallelrechnerpraktikum *
 * -----                                            ------------------------ *
 *                                                                           *
 * Producer-Consumer, Bounded Buffer                                         *
 *                                                                           *
 * Implementierung des Producer-Consumer-Problems mit Hilfe von Semaphoren.  *
 *                                                                           *
 *                                                                           *
 *                                                                           *
 *****************************************************************************/


#include <pthread.h>
#include "sema.h"


#define MAXPROD 3
#define MAXCONS 7
#define MAXWORK 100000


typedef unsigned int uint;

class BoundedBuffer
{
private:
    int *b;         // Der Ringbuffer.
    uint siz;       // Groesse des Buffers.
    uint anf, end;  // Zeiger auf Anfang und Ende des Buffers.

    sema e, f;      // Semaphoren fuer Buffer leer bzw voll.
    sema w, r;      // Semaphoren fuer atomares Schreiben/Lesen.

    BoundedBuffer(const BoundedBuffer& bb) = 0;

public:
    BoundedBuffer(uint s =16);
//    ~BoundedBuffer();

    void write(int v);
    int  read();
};

BoundedBuffer::BoundedBuffer(uint s =16) : b(new int[s]), siz(s),
                                           anf(0), end(0), e(s), f(0),
                                           w(1), r(1)
{
    if (b == NULL)
	{ cerr << "Ringbuffer: Konnte keinen Speicher allokieren" << endl;
	  exit(1);
	}
}
    

void BoundedBuffer::write(int v)
{
    e.P();        // Platz im Buffer reservieren.

    w.P();        // atomares Schreiben.
    b[end]= v;
    end= (end+1) % siz;
    w.V();

    f.V();        // ein gefuellter Platz mehr.
}


int  BoundedBuffer::read()
{
    int result;
    f.P();        // Platz im Buffer loeschen.

    r.P();        // atomares Lesen.
    result= b[anf];
    anf= (anf+1) % siz;
    r.V();

    e.V();        // ein freier Platz mehr.

    return result;
}

/////////////////////////////////////////////////////////////
//////   ---  globale Variablen  ---                   //////
/////////////////////////////////////////////////////////////

BoundedBuffer bb(MAXBUF);



/////////////////////////////////////////////////////////////
//////   ---   Producer und Consumer   ---             //////
/////////////////////////////////////////////////////////////

ADDRESS Producer(ADRESS x)
{
    INTEGER j, k;
    INTEGER maxtodo, t;

    j= *((INTEGER *) x);            // my number.
    SWRITE("Producer start", 15L);
    GWRITE(j); 
    BLINES(0);
    maxtodo = MAXWORK / MAXPROD;    // Arbeit Aufteilen.
    t       = MAXWORK % MAXPROD;

    if (j == 1)
	maxtodo = maxtodo + t;      // Der 1. macht a bisle mehr.

    for (int cnt= 0; cnt < maxtodo; cnt++) 
      { k = 0;
        for (int i= 1; i < j; i++)  // Work berechnen.
	    k+= i*j;
        bb.write(k);                // und in den Buffer schreiben.

      }
    k = 0;
    SWRITE("Producer done ", 14L);
    GWRITE(j);
    BLINES(0);
    return (ADDRESS)k;
};


ADDRESS Consumer(ADDRESS x)
{
    INTEGER j, k= 0;
    INTEGER maxtodo, t;

    j= *((INTEGER *) x);
    SWRITE("Consumer start", 15L);
    GWRITE(j); 
    BLINES(0);
    maxtodo = MAXWORK / MAXCONS;    // Arbeit Aufteilen.
    t       = MAXWORK % MAXCONS;

    if (j == 1)
	maxtodo = maxtodo + t;      // Der 1. macht a bisle mehr.

    for (int cnt= 0; cnt < maxtodo; cnt++)
	k+= bb.read();
    k = 0;
    SWRITE("Consumer done", 14L);
    GWRITE(j);
    BLINES(0);
    return (ADDRESS)k;
};

/////////////////////////////////////////////////////////////
//////   ---   M A I N   ---                           //////
/////////////////////////////////////////////////////////////


struct S_1 {
    pthread_t A[MAXPROD];
};
struct S_2 {
    pthread_t A[MAXCONS];
};
struct S_3 {
    ADDRESS A[MAXPROD];
};
struct S_4 {
    ADDRESS A[MAXCONS];
};


int main()
{
    struct S_1 pt;
    struct S_2 ct;
    struct S_3 pi;
    struct S_4 ci;
    int i;

    for (i= 0; i < MAXPROD; i++) 
        {
	  pi.A[i]= (ADDRESS)i;
	  if (pthread_create(&pt.A[i], pthread_attr_default, &Producer,
			     &pi.A[i]) < 0)
	      ERROR(severe, "Cannot create Producer.", 23L);
        }

    for (i= 0; i < MAXCONS; i++)
	{
	  ci.A[i]= (ADDRESS)i;
	  if (pthread_create(&ct.A[i], pthread_attr_default, &Consumer,
			     &ci.A[i - 1]) < 0)
	      ERROR(severe, "Cannot create Consumer.", 23L);
	}

    for (i= 0; i < MAXPROD; i++)
          if (pthread_join(pt.A[i], &pi.A[i]) < 0)
	      ERROR(severe, "Cannot join Producer.", 21L);
    for (i= 0; i < MAXCONS; i++)
	if (pthread_join(ct.A[i - 1], &ci.A[i - 1]) < 0)
	    ERROR(severe, "Cannot join Consumer.", 21L);
   
    return 0;
};

Damit ist die Besprechung des Beispiels beendet.


next up previous contents index
Next: 4 Distributed Memory Computer Up: 3 POSIX threads Previous: 3.6.3 await mit Semaphoren

parallel@rz.uni-mannheim.de
Mon Okt 28 14:38:25 PST 1996