[go: up one dir, main page]

0% found this document useful (0 votes)
9 views67 pages

ECE 1747H: Parallel Programming: Message Passing (MPI)

Uploaded by

mofreh hogo
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PPT, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
9 views67 pages

ECE 1747H: Parallel Programming: Message Passing (MPI)

Uploaded by

mofreh hogo
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PPT, PDF, TXT or read online on Scribd
You are on page 1/ 67

ECE 1747H : Parallel

Programming
Message Passing (MPI)
Explicit Parallelism
• Same thing as multithreading for shared
memory.
• Explicit parallelism is more common with
message passing.
– User has explicit control over processes.
– Good: control can be used to performance
benefit.
– Bad: user has to deal with it.
Distributed Memory - Message Passing

mem1 mem2 mem3 memN

proc1 proc2 proc3 procN

network
Distributed Memory - Message Passing

• A variable x, a pointer p, or an array a[]


refer to different memory locations,
depending of the processor.
• In this course, we discuss message passing
as a programming model (can be on any
hardware)
What does the user have to do?
• This is what we said for shared memory:
– Decide how to decompose the computation into
parallel parts.
– Create (and destroy) processes to support that
decomposition.
– Add synchronization to make sure dependences
are covered.
• Is the same true for message passing?
Another Look at SOR Example
for some number of timesteps/iterations {
for (i=0; i<n; i++ )
for( j=0; j<n, j++ )
temp[i][j] = 0.25 *
( grid[i-1][j] + grid[i+1]
[j] +
grid[i][j-1] + grid[i]
[j+1] );
for( i=0; i<n; i++ )
for( j=0; j<n; j++ )
grid[i][j] = temp[i][j];
}
Shared Memory

grid 1 temp 1
2 2
3 3
4 4

proc1 proc2 proc3 procN


Message-Passing Data Distribution (only
middle processes)

grid grid
2 3
temp temp
2 3

proc2 proc3
Is this going to work?
Same code as we used for shared memory

for( i=from; i<to; i++ )


for( j=0; j<n; j++ )
temp[i][j] = 0.25*( grid[i-1][j] + grid[i+1]
[j]
+ grid[i][j-1] + grid[i]
[j+1]);

No, we need extra boundary elements for grid.


Data Distribution (only middle processes)

grid grid
2 3
temp temp
2 3

proc2 proc3
Is this going to work?
Same code as we used for shared memory
for( i=from; i<to; i++)
for( j=0; j<n; j++ )
temp[i][j] = 0.25*( grid[i-1][j] +
grid[i+1][j]
+ grid[i][j-1] + grid[i]
[j+1]);

No, on the next iteration we need boundary


elements from our neighbors.
Data Communication (only middle processes)

grid grid

proc2 proc3
Is this now going to work?
Same code as we used for shared memory
for( i=from; i<to; i++ )
for( j=0; j<n; j++ )
temp[i][j] = 0.25*( grid[i-1][j] +
grid[i+1][j]
+ grid[i][j-1] + grid[i]
[j+1]);

No, we need to translate the indices.


Index Translation

for( i=0; i<n/p; i++)


for( j=0; j<n; j++ )
temp[i][j] = 0.25*( grid[i-1][j] +
grid[i+1][j]
+ grid[i][j-1] + grid[i]
[j+1]);

Remember, all variables are local.


Index Translation is Optional
• Allocate the full arrays on each processor.
• Leave indices alone.
• Higher memory use.
• Sometimes necessary (see later).
What does the user need to do?
• Divide up program in parallel parts.
• Create and destroy processes to do above.
• Partition and distribute the data.
• Communicate data at the right time.
• (Sometimes) perform index translation.
• Still need to do synchronization?
– Sometimes, but many times goes hand in hand
with data communication.
Message Passing Systems
• Provide process creation and destruction.
• Provide message passing facilities (send
and receive, in various flavors) to distribute
and communicate data.
• Provide additional synchronization
facilities.
MPI (Message Passing Interface)
• Is the de facto message passing standard.
• Available on virtually all platforms.
• Grew out of an earlier message passing
system, PVM, now outdated.
MPI Process Creation/Destruction

MPI_Init( int argc, char **argv )


Initiates a computation.
MPI_Finalize()
Terminates a computation.
MPI Process Identification
MPI_Comm_size( comm, &size )
Determines the number of processes.
MPI_Comm_rank( comm, &pid )
Pid is the process identifier of
the caller.
MPI Basic Send
MPI_Send(buf, count, datatype, dest, tag, comm)
buf: address of send buffer
count: number of elements
datatype: data type of send buffer elements
dest: process id of destination process
tag: message tag (ignore for now)
comm: communicator (ignore for now)
MPI Basic Receive
MPI_Recv(buf, count, datatype, source, tag, comm, &status)

buf: address of receive buffer


count: size of receive buffer in elements
datatype: data type of receive buffer elements
source: source process id or MPI_ANY_SOURCE
tag and comm: ignore for now
status: status object
Willy
WillyZwaenepoel:
Zwaenepoel:
sing MPI Matrix Multiply (w/o Index Translation)
ing initialization
initializationofofaaand
andbb

main(int argc, char *argv[])


{
MPI_Init (&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
MPI_Comm_size(MPI_COMM_WORLD, &p);
from = (myrank * n)/p;
to = ((myrank+1) * n)/p;
/* Data distribution */ ...
/* Computation */ ...
/* Result gathering */ ...
MPI_Finalize();
}
MPI Matrix Multiply (w/o Index Translation)

/* Data distribution */
if( myrank != 0 ) {
MPI_Recv( &a[from], n*n/p, MPI_INT, 0, tag,
MPI_COMM_WORLD, &status );
MPI_Recv( &b, n*n, MPI_INT, 0, tag, MPI_COMM_WORLD,
&status );
} else {
for( i=1; i<p; i++ ) {
MPI_Send( &a[from], n*n/p, MPI_INT, i, tag,
MPI_COMM_WORLD );
MPI_Send( &b, n*n, MPI_INT, I, tag, MPI_COMM_WORLD
);
}
}
MPI Matrix Multiply (w/o Index Translation)

/* Computation */

for ( i=from; i<to; i++)


for (j=0; j<n; j++) {
C[i][j]=0;
for (k=0; k<n; k++)
C[i][j] += A[i][k]*B[k][j];
}
MPI Matrix Multiply (w/o Index Translation)

/* Result gathering */
if (myrank!=0)
MPI_Send( &c[from], n*n/p, MPI_INT, 0,
tag, MPI_COMM_WORLD);
else
for (i=1; i<p; i++)
MPI_Recv( &c[from], n*n/p, MPI_INT,
i, tag, MPI_COMM_WORLD,
&status);
Willy
WillyZwaenepoel:
Zwaenepoel:
sing MPI Matrix Multiply (with Index Translation)
ing initialization
initializationofofaaand
andbb

main(int argc, char *argv[])


{
MPI_Init (&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
MPI_Comm_size(MPI_COMM_WORLD, &p);
from = (myrank * n)/p;
to = ((myrank+1) * n)/p;
/* Data distribution */ ...
/* Computation */ ...
/* Result gathering */ ...
MPI_Finalize();
}
MPI Matrix Multiply (with Index Translation)

/* Data distribution */
if( myrank != 0 ) {
MPI_Recv( &a, n*n/p, MPI_INT, 0, tag, MPI_COMM_WORLD,
&status );
MPI_Recv( &b, n*n, MPI_INT, 0, tag, MPI_COMM_WORLD,
&status );
} else {
for( i=1; i<p; i++ ) {
MPI_Send( &a[from], n*n/p, MPI_INT, i, tag,
MPI_COMM_WORLD );
MPI_Send( &b, n*n, MPI_INT, I, tag, MPI_COMM_WORLD
);
}
}
MPI Matrix Multiply (with Index Translation)

/* Computation */

for ( i=0; i<n/p; i++)


for (j=0; j<n; j++) {
C[i][j]=0;
for (k=0; k<n; k++)
C[i][j] += A[i][k]*B[k][j];
}
MPI Matrix Multiply (with Index Translation)

/* Result gathering */
if (myrank!=0)
MPI_Send( &c, n*n/p, MPI_INT, 0, tag,
MPI_COMM_WORLD);
else
for( i=1; i<p; i++ )
MPI_Recv( &c[from], n*n/p, MPI_INT,
i, tag, MPI_COMM_WORLD,
&status);
Running a MPI Program
• mpirun <program_name> <arguments>
• Interacts with a daemon process on the
hosts.
• Causes a Unix process to be run on each of
the hosts.
• May only run in interactive mode (batch
mode may be blocked by ssh)
ECE1747 Parallel Programming

Message Passing (MPI)


Global Operations
What does the user need to do?
• Divide program in parallel parts.
• Create and destroy processes to do above.
• Partition and distribute the data.
• Communicate data at the right time.
• (Sometimes) perform index translation.
• Still need to do synchronization?
– Sometimes, but many times goes hand in hand
with data communication.
MPI Process Creation/Destruction

MPI_Init( int *argc, char **argv )


Initiates a computation.
MPI_Finalize()
Finalizes a computation.
MPI Process Identification
MPI_Comm_size( comm, &size )
Determines the number of processes.
MPI_Comm_rank( comm, &pid )
Pid is the process identifier of
the caller.
MPI Basic Send
MPI_Send(buf, count, datatype, dest, tag, comm)
buf: address of send buffer
count: number of elements
datatype: data type of send buffer elements
dest: process id of destination process
tag: message tag (ignore for now)
comm: communicator (ignore for now)
MPI Basic Receive
MPI_Recv(buf, count, datatype, source, tag, comm, &status)

buf: address of receive buffer


count: size of receive buffer in elements
datatype: data type of receive buffer elements
source: source process id or MPI_ANY_SOURCE
tag and comm: ignore for now
status: status object
Global Operations (1 of 2)
• So far, we have only looked at point-to-
point or one-to-one message passing
facilities.
• Often, it is useful to have one-to-many or
many-to-one message communication.
• This is what MPI’s global operations do.
Global Operations (2 of 2)
• MPI_Barrier
• MPI_Bcast
• MPI_Gather
• MPI_Scatter
• MPI_Reduce
• MPI_Allreduce
Barrier
MPI_Barrier(comm)

Global barrier synchronization, as before: all


processes wait until all have arrived.
Broadcast
MPI_Bcast(inbuf, incnt, intype, root, comm)

inbuf: address of input buffer (on root);


address of output buffer (elsewhere)
incnt: number of elements
intype: type of elements
root: process id of root process
Before Broadcast
inbuf

proc0 proc1 proc2 proc3

root
After Broadcast
inbuf

proc0 proc1 proc2 proc3

root
Scatter
MPI_Scatter(inbuf, incnt, intype, outbuf,
outcnt, outtype, root, comm)
inbuf: address of input buffer
incnt: number of input elements
intype: type of input elements
outbuf: address of output buffer
outcnt: number of output elements
outtype: type of output elements
root: process id of root process
Before Scatter
inbuf
outbuf

proc0 proc1 proc2 proc3


root
After Scatter
inbuf
outbuf

proc0 proc1 proc2 proc3

root
Gather
MPI_Gather(inbuf, incnt, intype, outbuf,
outcnt, outtype, root, comm)
inbuf: address of input buffer
incnt: number of input elements
intype: type of input elements
outbuf: address of output buffer
outcnt: number of output elements
outtype: type of output elements
root: process id of root process
Before Gather
inbuf
outbuf

proc0 proc1 proc2 proc3


root
After Gather
inbuf
outbuf

proc0 proc1 proc2 proc3

root
Broadcast/Scatter/Gather
• Funny thing: these three primitives are
sends and receives at the same time (a little
confusing sometimes).
• Perhaps un-intended consequence: requires
global agreement on layout of array.
Willy
WillyZwaenepoel:
Zwaenepoel:
sing MPI Matrix Multiply (w/o Index Translation)
ing initialization
initializationofofaaand
andbb

main(int argc, char *argv[])


{
MPI_Init (&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
MPI_Comm_size(MPI_COMM_WORLD, &p);
for( i=0; i<p; i++ ) {
from[i] = (i * n)/p;
to[i] = ((i+1) * n)/p;
}
/* Data distribution */ ...
/* Computation */ ...
/* Result gathering */ ...
MPI_Finalize();
}
MPI Matrix Multiply (w/o Index Translation)

/* Data distribution */
if( myrank != 0 ) {
MPI_Recv( &a[from[myrank]], n*n/p, MPI_INT, 0, tag,
MPI_COMM_WORLD, &status );
MPI_Recv( &b, n*n, MPI_INT, 0, tag, MPI_COMM_WORLD,
&status );
} else {
for( i=1; i<p; i++ ) {
MPI_Send( &a[from[i]], n*n/p, MPI_INT, i, tag,
MPI_COMM_WORLD );
MPI_Send( &b, n*n, MPI_INT, I, tag, MPI_COMM_WORLD
);
}
}
MPI Matrix Multiply (w/o Index Translation)

/* Computation */

for ( i=from[myrank]; i<to[myrank]; i++)


for (j=0; j<n; j++) {
C[i][j]=0;
for (k=0; k<n; k++)
C[i][j] += A[i][k]*B[k][j];
}
MPI Matrix Multiply (w/o Index Translation)

/* Result gathering */
if (myrank!=0)
MPI_Send( &c[from[myrank]], n*n/p, MPI_INT,
0, tag, MPI_COMM_WORLD);
else
for( i=1; i<p; i++ )
MPI_Recv( &c[from[i]], n*n/p, MPI_INT,
i, tag, MPI_COMM_WORLD,
&status);
Willy
WillyZwaenepoel:
Zwaenepoel:
sing MPI Matrix Multiply Revised (1 of 2)
ing initialization
initializationofofaaand
andbb

main(int argc, char *argv[])


{
MPI_Init (&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
MPI_Comm_size(MPI_COMM_WORLD, &p);
from = (myrank * n)/p;
to = ((myrank+1) * n)/p;
MPI_Scatter (a, n*n/p, MPI_INT, a, n*n/p, MPI_INT, 0,
MPI_COMM_WORLD);
MPI_Bcast (b,n*n, MPI_INT, 0, MPI_COMM_WORLD);
...
MPI Matrix Multiply Revised (2 of 2)
...
for (i=from; i<to; i++)
for (j=0; j<n; j++) {
C[i][j]=0;
for (k=0; k<n; k++)
C[i][j] += A[i][k]*B[k][j];
}
MPI_Gather (C[from], n*n/p, MPI_INT, c[from], n*n/p,
MPI_INT, 0, MPI_COMM_WORLD);
MPI_Finalize();
}
SOR Sequential Code
for some number of timesteps/iterations {
for (i=0; i<n; i++ )
for( j=0; j<n, j++ )
temp[i][j] = 0.25 *
( grid[i-1][j] + grid[i+1]
[j]
grid[i][j-1] + grid[i]
[j+1] );
for( i=0; i<n; i++ )
for( j=0; j<n; j++ )
grid[i][j] = temp[i][j];
}
MPI SOR
• Allocate grid and temp arrays.
• Use MPI_Scatter to distribute initial values,
if any (requires non-local allocation).
• Use MPI_Gather to return the results to
process 0 (requires non-local allocation).
• Focusing only on communication within the
computational part ...
Data Communication (only middle processes)

grid grid

proc2 proc3
MPI SOR
for some number of timesteps/iterations {
for (i=from; i<to; i++ )
for( j=0; j<n, j++ )
temp[i][j] = 0.25 *
( grid[i-1][j] + grid[i+1][j]
grid[i][j-1] + grid[i][j+1] );
for( i=from; i<to; i++ )
for( j=0; j<n; j++ )
grid[i][j] = temp[i][j];
/* here comes communication */
}
MPI SOR Communication
if (myrank != 0) {
MPI_Send (grid[from], n, MPI_DOUBLE,
myrank-1, tag, MPI_COMM_WORLD);
MPI_Recv (grid[from-1], n, MPI_DOUBLE,
myrank-1, tag, MPI_COMM_WORLD,
&status);
}
if (myrank != p-1) {
MPI_Send (grid[to-1], n, MPI_DOUBLE,
myrank+1, tag, MPI_COMM_WORLD);
MPI_Recv (grid[to], n, MPI_DOUBLE,
myrank+1, tag, MPI_COMM_WORLD, &status);
}
No Barrier Between Loop Nests?
• Not necessary.
• Anti-dependences do not need to be
covered in message passing.
• Memory is private, so overwrite does not
matter.
SOR: Terminating Condition
• Real versions of SOR do not run for some
fixed number of iterations.
• Instead, they test for convergence.
• Possible convergence criterion: difference
between two successive iterations is less
than some delta.
SOR Sequential Code with Convergence

for( ; diff > delta; ) {


for (i=0; i<n; i++ )
for( j=0; j<n, j++ ) { … }
diff = 0;
for( i=0; i<n; i++ )
for( j=0; j<n; j++ ) {
diff = max(diff, fabs(grid[i][j] -
temp[i][j]));
grid[i][j] = temp[i][j];
}
}
Reduction
MPI_Reduce(inbuf, outbuf, count, type, op, root, comm)

inbuf: address of input buffer


outbuf: address of output buffer
count: number of elements in input buffer
type: datatype of input buffer elements
op: operation (MPI_MIN, MPI_MAX, etc.)
root: process id of root process
Global Reduction
MPI_Allreduce(inbuf, outbuf, count, type, op, comm)

inbuf: address of input buffer


outbuf: address of output buffer
count: number of elements in input buffer
type: datatype of input buffer elements
op: operation (MPI_MIN, MPI_MAX, etc.)
no root process
MPI SOR Code with Convergence
for( ; diff > delta; ) {
for (i=from; i<to; i++ )
for( j=0; j<n, j++ ) { … }
mydiff = 0.0;
for( i=from; i<to; i++ )
for( j=0; j<n; j++ ) {
mydiff=max(mydiff,fabs(grid[i][j]-temp[i]
[j]);
grid[i][j] = temp[i][j];
}
MPI_Allreduce (&mydiff, &diff, 1, MPI_DOUBLE,
MPI_MAX, MPI_COMM_WORLD);
...
}

You might also like