Workgroup Constructs: Halo Exchange¶
The example code discussed in this section can be found in the file
RAJA/examples/tut_halo-exchange.cpp
. The file contains complete working
code for multiple OpenMP, CUDA, and HIP RAJA variants. Here, we describe
a subset of these variants.
Key RAJA features shown in this example:
RAJA::WorkPool
workgroup construct
RAJA::WorkGroup
workgroup construct
RAJA::WorkSite
workgroup construct
RAJA::TypedRangeSegment
iteration space constructRAJA workgroup policies
In this example, we show how to use the RAJA workgroup constructs to implement buffer packing and unpacking for data halo exchange on a computational grid, a common MPI communication operation for distributed memory applications. This technique may not provide a performance gain on a CPU system, but it can significantly speedup halo exchange on a GPU system compared to running many individual packing/unpacking kernels, for example.
Note
Using an abstraction layer over RAJA can make it easy to switch
between using individual RAJA::forall
loops or the RAJA workgroup
constructs to implement halo exchange packing and unpacking at
compile time or run time.
We start by setting the parameters for the halo exchange by using default values or values provided via command line input to the example code. These parameters determine the size of the grid, the width of the halo, the number of grid variables to pack/unpack, and the number of cycles; (iterations to run).
//
// Define grid dimensions
// Define halo width
// Define number of grid variables
// Define number of cycles
//
const int grid_dims[3] = { (argc != 7) ? 100 : std::atoi(argv[1]),
(argc != 7) ? 100 : std::atoi(argv[2]),
(argc != 7) ? 100 : std::atoi(argv[3]) };
const int halo_width = (argc != 7) ? 1 : std::atoi(argv[4]);
const int num_vars = (argc != 7) ? 3 : std::atoi(argv[5]);
const int num_cycles = (argc != 7) ? 3 : std::atoi(argv[6]);
Next, we allocate the variable data arrays (the memory manager in the example uses CUDA Unified Memory if CUDA is enabled). These grid variables are reset each cycle to allow checking the results of the packing and unpacking.
//
// Allocate grid variables and reference grid variables used to check
// correctness.
//
std::vector<double*> vars (num_vars, nullptr);
std::vector<double*> vars_ref(num_vars, nullptr);
for (int v = 0; v < num_vars; ++v) {
vars[v] = memoryManager::allocate<double>(var_size);
vars_ref[v] = memoryManager::allocate<double>(var_size);
}
We also allocate and initialize index lists of the grid elements to pack and unpack:
//
// Generate index lists for packing and unpacking
//
std::vector<int*> pack_index_lists(num_neighbors, nullptr);
std::vector<int > pack_index_list_lengths(num_neighbors, 0);
create_pack_lists(pack_index_lists, pack_index_list_lengths, halo_width, grid_dims);
std::vector<int*> unpack_index_lists(num_neighbors, nullptr);
std::vector<int > unpack_index_list_lengths(num_neighbors, 0);
create_unpack_lists(unpack_index_lists, unpack_index_list_lengths, halo_width, grid_dims);
All the code examples presented below copy the data packed from the grid interior:
0
0
0
0
0
0
1
2
3
0
0
4
5
6
0
0
7
8
9
0
0
0
0
0
0
into the adjacent halo cells:
1
1
2
3
3
1
1
2
3
3
4
4
5
6
6
7
7
8
9
9
7
7
8
9
9
Although the example code does not use MPI and multiple domains (one per MPI rank, for example), as would be the case in a real distributed memory parallel application, the data copy operations represent the spirit of how data communication would be done.
Packing and Unpacking (Basic Loop Execution)¶
A sequential non-RAJA example of data packing and unpacking would look like:
for (int l = 0; l < num_neighbors; ++l) {
double* buffer = buffers[l];
int* list = pack_index_lists[l];
int len = pack_index_list_lengths[l];
// pack
for (int v = 0; v < num_vars; ++v) {
double* var = vars[v];
for (int i = 0; i < len; i++) {
buffer[i] = var[list[i]];
}
buffer += len;
}
// send single message
}
and:
for (int l = 0; l < num_neighbors; ++l) {
// recv single message
double* buffer = buffers[l];
int* list = unpack_index_lists[l];
int len = unpack_index_list_lengths[l];
// unpack
for (int v = 0; v < num_vars; ++v) {
double* var = vars[v];
for (int i = 0; i < len; i++) {
var[list[i]] = buffer[i];
}
buffer += len;
}
}
RAJA Variants using forall¶
A sequential RAJA example uses this execution policy type:
using forall_policy = RAJA::seq_exec;
to pack the grid variable data into a buffer:
for (int l = 0; l < num_neighbors; ++l) {
double* buffer = buffers[l];
int* list = pack_index_lists[l];
int len = pack_index_list_lengths[l];
// pack
for (int v = 0; v < num_vars; ++v) {
double* var = vars[v];
RAJA::forall<forall_policy>(range_segment(0, len), [=] (int i) {
buffer[i] = var[list[i]];
});
buffer += len;
}
// send single message
}
and unpack the buffer data into the grid variable array:
for (int l = 0; l < num_neighbors; ++l) {
// recv single message
double* buffer = buffers[l];
int* list = unpack_index_lists[l];
int len = unpack_index_list_lengths[l];
// unpack
for (int v = 0; v < num_vars; ++v) {
double* var = vars[v];
RAJA::forall<forall_policy>(range_segment(0, len), [=] (int i) {
var[list[i]] = buffer[i];
});
buffer += len;
}
}
For parallel multithreading execution via OpenMP, the example can be run by replacing the execution policy with:
using forall_policy = RAJA::omp_parallel_for_exec;
Similarly, to run the loops in parallel on a CUDA GPU, we would use this policy:
using forall_policy = RAJA::cuda_exec_async<CUDA_BLOCK_SIZE>;
Note that we can use an asynchronous execution policy because there are no data races due to the intermediate buffer usage.
RAJA Variants using workgroup constructs¶
Using the workgroup constructs in the example requires defining a few more policies and types:
using forall_policy = RAJA::seq_exec;
using workgroup_policy = RAJA::WorkGroupPolicy <
RAJA::seq_work,
RAJA::ordered,
RAJA::ragged_array_of_objects,
RAJA::indirect_function_call_dispatch >;
using workpool = RAJA::WorkPool< workgroup_policy,
int,
RAJA::xargs<>,
memory_manager_allocator<char> >;
using workgroup = RAJA::WorkGroup< workgroup_policy,
int,
RAJA::xargs<>,
memory_manager_allocator<char> >;
using worksite = RAJA::WorkSite< workgroup_policy,
int,
RAJA::xargs<>,
memory_manager_allocator<char> >;
which are used in a slightly rearranged version of packing. See how the comment indicating where messages are sent has been moved down after the call to run the operations enqueued on the workgroup:
for (int l = 0; l < num_neighbors; ++l) {
double* buffer = buffers[l];
int* list = pack_index_lists[l];
int len = pack_index_list_lengths[l];
// pack
for (int v = 0; v < num_vars; ++v) {
double* var = vars[v];
pool_pack.enqueue(range_segment(0, len), [=] (int i) {
buffer[i] = var[list[i]];
});
buffer += len;
}
}
workgroup group_pack = pool_pack.instantiate();
worksite site_pack = group_pack.run();
// send all messages
Similarly, in the unpacking we wait to receive all of the messages before unpacking the data:
// recv all messages
for (int l = 0; l < num_neighbors; ++l) {
double* buffer = buffers[l];
int* list = unpack_index_lists[l];
int len = unpack_index_list_lengths[l];
// unpack
for (int v = 0; v < num_vars; ++v) {
double* var = vars[v];
pool_unpack.enqueue(range_segment(0, len), [=] (int i) {
var[list[i]] = buffer[i];
});
buffer += len;
}
}
workgroup group_unpack = pool_unpack.instantiate();
worksite site_unpack = group_unpack.run();
This reorganization has the downside of not overlapping the message sends with packing and the message receives with unpacking.
For parallel multithreading execution via OpenMP, the example using workgroup can be run by replacing the policies and types with:
using forall_policy = RAJA::omp_parallel_for_exec;
using workgroup_policy = RAJA::WorkGroupPolicy <
RAJA::omp_work,
RAJA::ordered,
RAJA::ragged_array_of_objects,
RAJA::indirect_function_call_dispatch >;
using workpool = RAJA::WorkPool< workgroup_policy,
int,
RAJA::xargs<>,
memory_manager_allocator<char> >;
using workgroup = RAJA::WorkGroup< workgroup_policy,
int,
RAJA::xargs<>,
memory_manager_allocator<char> >;
using worksite = RAJA::WorkSite< workgroup_policy,
int,
RAJA::xargs<>,
memory_manager_allocator<char> >;
The main differences between these types and the ones defined for the sequential
case above are the forall_policy
and the workgroup_policy
, which use
OpenMP execution policy types.
Similarly, to run the loops in parallel on a CUDA GPU use these policies and types, taking note of the unordered work ordering policy that allows the enqueued loops to all be run using a single CUDA kernel:
using forall_policy = RAJA::cuda_exec_async<CUDA_BLOCK_SIZE>;
using workgroup_policy = RAJA::WorkGroupPolicy <
RAJA::cuda_work_async<CUDA_WORKGROUP_BLOCK_SIZE>,
RAJA::unordered_cuda_loop_y_block_iter_x_threadblock_average,
RAJA::constant_stride_array_of_objects,
RAJA::indirect_function_call_dispatch >;
using workpool = RAJA::WorkPool< workgroup_policy,
int,
RAJA::xargs<>,
pinned_allocator<char> >;
using workgroup = RAJA::WorkGroup< workgroup_policy,
int,
RAJA::xargs<>,
pinned_allocator<char> >;
using worksite = RAJA::WorkSite< workgroup_policy,
int,
RAJA::xargs<>,
pinned_allocator<char> >;
The main differences between these types and the ones defined for the
sequential and OpenMP cases above are the forall_policy
and the
workgroup_policy
, which use different template parameters, and the
workpool
, workgroup
, and worksite
types which use ‘pinned’
memory allocation.
The packing is the same as the previous workgroup packing examples with the
exception of added synchronization after calling the workgroup run method
and before sending the messages. In the example code, there is a CUDA version
that uses forall to launch num_neighbors * num_vars
CUDA kernels and
performs num_neighbors
synchronizations to send each message in turn.
Here, the reorganization to pack all messages before sending lets us use an
unordered CUDA work ordering policy in the workgroup_policy
that reduces
the number of CUDA kernel launches to one. It also allows us to need to
synchronize only once before sending all of the messages:
for (int l = 0; l < num_neighbors; ++l) {
double* buffer = buffers[l];
int* list = pack_index_lists[l];
int len = pack_index_list_lengths[l];
// pack
for (int v = 0; v < num_vars; ++v) {
double* var = vars[v];
pool_pack.enqueue(range_segment(0, len), [=] RAJA_DEVICE (int i) {
buffer[i] = var[list[i]];
});
buffer += len;
}
}
workgroup group_pack = pool_pack.instantiate();
worksite site_pack = group_pack.run();
cudaErrchk(cudaDeviceSynchronize());
// send all messages
After waiting to receive all of the messages we use workgroup constructs with a CUDA unordered work ordering policy to unpack all of the messages using a single kernel launch:
// recv all messages
for (int l = 0; l < num_neighbors; ++l) {
double* buffer = buffers[l];
int* list = unpack_index_lists[l];
int len = unpack_index_list_lengths[l];
// unpack
for (int v = 0; v < num_vars; ++v) {
double* var = vars[v];
pool_unpack.enqueue(range_segment(0, len), [=] RAJA_DEVICE (int i) {
var[list[i]] = buffer[i];
});
buffer += len;
}
}
workgroup group_unpack = pool_unpack.instantiate();
worksite site_unpack = group_unpack.run();
cudaErrchk(cudaDeviceSynchronize());
Note that the synchronization after unpacking is done to ensure that
group_unpack
and site_unpack
survive until the unpacking loop has
finished executing.