next up previous contents index
Next: 5 PVM Parallel Virtual Up: 4 Distributed Memory Computer Previous: Beispiel: Datenduplizierung

4.2 Nachrichtenaustausch bei gemeinsamem Speicher

Mit Hilfe von Semaphoren und Shared Memory wollen wir die Message Passing Primitive channel, send und receive implementieren. channel soll als globale (shared) Datenstruktur implementiert werden, auf die (nur) mittels send und receive zugegriffen wird. Falls sonst keine shared Variablen benutzt werden, erhalten wir genau die Funktionalität des Distributed Memory Message Passing.

Wir werden zunächst asynchrones Message Passing implementieren, d.h. send soll nicht blockieren und receive soll blockieren, falls keine Message vorliegt. Zur Vereinfachung werden wir allerdings den Kanal mit einer fixen Größe implementieren und send blockieren, falls der channel voll ist. Danach werden wir noch synchrones Message Passing implementieren, d.h. send blockiert, bis ein entsprechendes receive die Nachricht abgeholt hat.

Wenn wir einen channel als Puffer mit fixer Größe implementieren wollen, können wir auf die Vorarbeiten zum Bounded Buffer Problem zurückgreifen. channel wird dann eine Datenstruktur, die den Puffer enthält, sowie die Zähler front und rear und die Semaphoren empty und full. Als Vorlage für send nehmen wir die Produzenten Prozedur und als Vorlage für receive nehmen wir die Konsumenten Prozedur.

Damit ergibt sich folgende Lösungsskizze.

	template<class T> class channel
	{ private:
	    T buffer[asyncmsg_maxbuf];
	    INTEGER front, rear;
	    pthread_mutex_t s_mux, r_mux;
	    sema *empty;
	    sema *full;
	  public:
	    channel();
	    ~channel();
	    void send(T);
	    void receive(T*);
	};


	template<class T> channel<T>::channel()
	{
	  if (pthread_mutex_init(&s_mux, pthread_mutexattr_default) < 0) {
	    ERROR(severe, "Cannot initialize s_mux.");
	  }
	  if (pthread_mutex_init(&r_mux, pthread_mutexattr_default) < 0) {
	    ERROR(severe, "Cannot initialize r_mux.");
	  }
	  front = 0; rear = 0;
	  empty = new sema((INTEGER)asyncmsg_maxbuf); 
	  full = new sema((INTEGER)0); 
	}

	template<class T> channel<T>::~channel()
	{
	  if ( front != rear ) {
	    ERROR(severe, "channel not empty.");
	  }
	  delete empty;
	  delete full;
	}


	template<class T> void channel<T>::send(T v)
	{
	  empty->P();
	  if (pthread_mutex_lock(&s_mux) < 0) {
	    ERROR(severe, "send: Cannot lock s_mux.");
	  }
	  buffer[rear] = v;
	  rear++;
	  if (rear >= asyncmsg_maxbuf) {
	    rear = 0;
	  }
	  if (pthread_mutex_unlock(&s_mux) < 0) {
	    ERROR(severe, "send: Cannot unlock s_mux.");
	  }
	  full->V();
	}

	template<class T> void channel<T>::receive(T* v)
	{
	  (*full).P();
	  if (pthread_mutex_lock(&r_mux) < 0) {
	    ERROR(severe, "receive: Cannot lock r_mux.");
	  }
	  *v = buffer[front];
	  buffer[front] = 0;
	  front++;
	  if (front >= asyncmsg_maxbuf) {
	    front = 0;
	  }
	  if (pthread_mutex_unlock(&r_mux) < 0) {
	    ERROR(severe, "receive: Cannot unlock r_mux.");
	  }
	  (*empty).V();
	}

Falls der Puffer des Kanals voll ist, führt ch.empty->P() zum Blockieren von send. Dies birgt eine Gefahr von Deadlocks,  falls damit ein nachfolgendes send auf einen anderen Kanal nicht ausgeführt werden kann. Z.B.

Falls send (x, 1) in tex2html_wrap_inline2771 blockiert, wird receive (x, w) in tex2html_wrap_inline2773 nie ausgeführt, falls nicht ein anderer Prozess einmal ein receive (y, v) ausführt, um den Sendepuffer zu leeren.

Mit einer leichten Modifikation von send können wir nun auch synchrones Message Passing implementieren. Der Puffer braucht dafür natürlich nur aus einem Element zu bestehen. Die Modifikation besteht darin, die Operation ch.empty->P() von Beginn der send Prozedur ans Ende nach ch.full->P() zu verlegen. Der Semaphore empty muß dann auch mit 0 initialisiert werden. send blockiert dann solange, bis ein anderer Prozeß ein receive ausgeführt hat. Zusätzlich muß der gleichzeitige Zugriff auf send ausgeschlossen werden. Zur vollständigen Implementierung ist noch eine Funktion

 
       int channel<T>::empty();
erforderlich, mit der festgestellt werden kann, auf welchem Kanal Eingabedaten vorhanden sind, bevor das blockierende receive verwendet wird. Die empty-Funktion wird oft auch bei asynchronem Message Passing implementiert, sie wird dort aber nur aus Performance Gründen angeboten. Beim synchronen Message Passing ist empty aber sehr wichtig, um Deadlocks zu vermeiden.

Implementierungsskizze:

	template<class T> class channel { 
	private:
	  T buffer;
	  int data; // boolean.
	  pthread_mutex_t s_mux;
	  sema *empty;
	  sema *full;
	public:
	  channel();
	  ~channel();
	  void send(T);
	  void receive(T*);
	  int  empty();
	};


	template<class T> channel<T>::channel()
	{
	  if (pthread_mutex_init(&s_mux, pthread_mutexattr_default) < 0) {
	    ERROR(severe, "Cannot initialize s_mux.");
	  }
	  empty = new sema((INTEGER)asyncmsg_maxbuf); 
	  full = new sema((INTEGER)0); 
	}

	template<class T> channel<T>::~channel()
	{
	  delete empty;
	  delete full;
	}

	template<class T> channel<T>::empty()
	{
	  return data;
	}

	template<class T> void channel<T>::send(T v)
	{
	  if (pthread_mutex_lock(&s_mux) < 0) {
	    ERROR(severe, "send: Cannot lock s_mux.");
	  }
	  buffer= v; data= 1; // true.

	  full->V();
	  empty->P();

	  if (pthread_mutex_unlock(&s_mux) < 0) {
	    ERROR(severe, "send: Cannot unlock s_mux.");
	  }
	}

	template<class T> void channel<T>::receive(T* v)
	{
	  (*full).P();
	  *v = buffer; data= 0; // false.
	  (*empty).V();
	}
Warum wird kein atomic in receive benötigt ?

Es folgen die Listings der C++ Versionen der Programme. Die Listings der Header-Dateien ``ERROR.h'' und ``TYPES.h'' befinden sich im Anhang.

//   Async Message-Passing

#include <pthread.h>
#include "ERROR.h"
#include "TYPES.h"
#include "sema.h"

#define asyncmsg_maxbuf 200

template<class T> class channel { 
private:
  T buffer[asyncmsg_maxbuf];
  INTEGER front, rear;
  pthread_mutex_t s_mux, r_mux;
  sema *empty;
  sema *full;
public:
  channel();
  ~channel();
  void send(T);
  void receive(T*);
};
 

template<class T> channel<T>::channel()
{
  if (pthread_mutex_init(&s_mux, pthread_mutexattr_default) < 0) {
    ERROR(severe, "Cannot initialize s_mux.");
  }
  if (pthread_mutex_init(&r_mux, pthread_mutexattr_default) < 0) {
    ERROR(severe, "Cannot initialize r_mux.");
  }
  front = 0; rear = 0;
  empty = new sema((INTEGER)asyncmsg_maxbuf); 
  full = new sema((INTEGER)0); 
}

template<class T> channel<T>::~channel()
{
  if ( front != rear ) {
    ERROR(severe, "channel not empty.");
  }
  delete empty;
  delete full;
}

template<class T> void channel<T>::send(T v)
{
  empty->P();
  if (pthread_mutex_lock(&s_mux) < 0) {
    ERROR(severe, "send: Cannot lock s_mux.");
  }
  buffer[rear] = v;
  rear++;
  if (rear >= asyncmsg_maxbuf) {
    rear = 0;
  }
  if (pthread_mutex_unlock(&s_mux) < 0) {
    ERROR(severe, "send: Cannot unlock s_mux.");
  }
  full->V();
}

template<class T> void channel<T>::receive(T* v)
{
  (*full).P();
  if (pthread_mutex_lock(&r_mux) < 0) {
    ERROR(severe, "receive: Cannot lock r_mux.");
  }
  *v = buffer[front];
  buffer[front] = 0;
  front++;
  if (front >= asyncmsg_maxbuf) {
    front = 0;
  }
  if (pthread_mutex_unlock(&r_mux) < 0) {
    ERROR(severe, "receive: Cannot unlock r_mux.");
  }
  (*empty).V();
}




// Synchrones Message-Passing

#include "ERROR.h" 

#include <pthread.h>
#include "TYPES.h"
#include "sema.h"

template<class T> class channel { 
private:
  T buffer;
  int data; // boolean.
  pthread_mutex_t s_mux;
  sema *empty;
  sema *full;
public:
  channel();
  ~channel();
  void send(T);
  void receive(T*);
  int  empty();
};


template<class T> channel<T>::channel()
{
  if (pthread_mutex_init(&s_mux, pthread_mutexattr_default) < 0) {
    ERROR(severe, "Cannot initialize s_mux.");
  }
  empty = new sema((INTEGER)0); 
  full = new sema((INTEGER)0); 
}

template<class T> channel<T>::~channel()
{
  delete empty;
  delete full;
}

template<class T> channel<T>::empty()
{
  return data;
}

template<class T> void channel<T>::send(T v)
{
  if (pthread_mutex_lock(&s_mux) < 0) {
    ERROR(severe, "send: Cannot lock s_mux.");
  }
  buffer= v; data= 1; // true.

  full->V();
  empty->P();

  if (pthread_mutex_unlock(&s_mux) < 0) {
    ERROR(severe, "send: Cannot unlock s_mux.");
  }
}

template<class T> void channel<T>::receive(T* v)
{
  (*full).P();
  *v = buffer; data= 0; // false.
  (*empty).V();
}

aufgabe1155

aufgabe1157

Die Lösung ergibt sich wie im alten Beispiel, nur entfällt die Puffer Verwaltung und die Synchronisation, da nun alles von chaninit, send und receive erledigt wird.

// producer consumer mit message passing 

#include "TYPES.h"
#include <pthread.h>
#include "asyncmsg.h"
#include "ERROR.h"
#include <iostream.h>

#define maxprod 3
#define maxcons 7
#define maxwork 100000

channel<LONGINT> work;

struct S_1 {
    pthread_t A[maxprod - 1 + 1];
};
struct S_2 {
    pthread_t A[maxcons - 1 + 1];
};
struct S_3 {
    ADDRESS A[maxprod - 1 + 1];
};
struct S_4 {
    ADDRESS A[maxcons - 1 + 1];
};

ADDRESS
Producer(ADDRESS x)
{
  INTEGER i, j, k, m;
  INTEGER maxtodo, t;

  j = * (INTEGER*) x; 
  SWRITE("Producer start ", 15L);
  GWRITE(j);
  BLINES(0);
  maxtodo = maxwork / maxprod;
  t = maxwork % maxprod;
  if (j == 1) {
    maxtodo = maxtodo + t;
  }
  m = 1;
  for (;;) {
    k = 0;
    {
      LONGINT B_1 = 1, B_2 = j;

      if (B_1 <= B_2)
        for (i = B_1;; i += 1) {
          k = k + i * j;
          if (i >= B_2) break;
        }
    }
    work.send(k);
    m = m + 1;
    if (m > maxtodo) {
      goto EXIT_1;
    }
  } EXIT_1:;
  k = 0;
  SWRITE("Producer done ", 14L);
  GWRITE(j);
  BLINES(0);
  return (ADDRESS)k;
}

ADDRESS
Consumer(ADDRESS x)
{
  INTEGER i, j, k, m;
  INTEGER maxtodo, t;

  j = * (INTEGER*) x;
  SWRITE("Consumer start ", 15L);
  GWRITE(j);
  BLINES(0);
  maxtodo = maxwork / maxcons;
  t = maxwork % maxcons;
  if (j == 1) {
    maxtodo = maxtodo + t;
  }
  m = 1;
  i = 0;
  for (;;) {
    work.receive( &k);
    i = i + k;
    m = m + 1;
    if (m > maxtodo) {
      goto EXIT_2;
    }
  } EXIT_2:;
  k = 0;
  SWRITE("Consumer done ", 14L);
  GWRITE(j);
  BLINES(0);
  return (ADDRESS)k;
}

static void
tuwas()
{
  struct S_1 pt;
  struct S_2 ct;
  struct S_3 pi;
  struct S_4 ci;
  INTEGER i;

  // asyncmsg_chaninit(&work);
  i = 1;
  while (i <= maxprod) {
    pi.A[i - 1] = (ADDRESS)i;
    if (pthread_create(&pt.A[i - 1], pthread_attr_default, &Producer, &pi.A[i - 1]) < 0) {
      ERROR(severe, "Cannot create Producer.", 23L);
    }
    i = i + 1;
  }
  i = 1;
  while (i <= maxcons) {
    ci.A[i - 1] = (ADDRESS)i;
    if (pthread_create(&ct.A[i - 1], pthread_attr_default, &Consumer, &ci.A[i - 1]) < 0) {
      ERROR(severe, "Cannot create Consumer.", 23L);
    }
    i = i + 1;
  }
  i = 1;
  while (i <= maxprod) {
    if (pthread_join(pt.A[i - 1], &pi.A[i - 1]) < 0) {
      ERROR(severe, "Cannot join Producer.", 21L);
    }
    i = i + 1;
  }
  i = 1;
  while (i <= maxcons) {
    if (pthread_join(ct.A[i - 1], &ci.A[i - 1]) < 0) {
      ERROR(severe, "Cannot join Consumer.", 21L);
    }
    i = i + 1;
  }
}

int main()
{
  tuwas();
}

aufgabe1160


next up previous contents index
Next: 5 PVM Parallel Virtual Up: 4 Distributed Memory Computer Previous: Beispiel: Datenduplizierung

parallel@rz.uni-mannheim.de
Mon Okt 28 14:38:25 PST 1996