Workgroup Constructs: Halo Exchange

The example code discussed in this section can be found in the file RAJA/examples/tut_halo-exchange.cpp. The file contains complete working code for multiple OpenMP, CUDA, and HIP RAJA variants. Here, we describe a subset of these variants.

Key RAJA features shown in this example:

  • RAJA::WorkPool workgroup construct

  • RAJA::WorkGroup workgroup construct

  • RAJA::WorkSite workgroup construct

  • RAJA::TypedRangeSegment iteration space construct

  • RAJA workgroup policies

In this example, we show how to use the RAJA workgroup constructs to implement buffer packing and unpacking for data halo exchange on a computational grid, a common MPI communication operation for distributed memory applications. This technique may not provide a performance gain on a CPU system, but it can significantly speedup halo exchange on a GPU system compared to running many individual packing/unpacking kernels, for example.

Note

Using an abstraction layer over RAJA can make it easy to switch between using individual RAJA::forall loops or the RAJA workgroup constructs to implement halo exchange packing and unpacking at compile time or run time.

We start by setting the parameters for the halo exchange by using default values or values provided via command line input to the example code. These parameters determine the size of the grid, the width of the halo, the number of grid variables to pack/unpack, and the number of cycles; (iterations to run).

  //
  // Define grid dimensions
  // Define halo width
  // Define number of grid variables
  // Define number of cycles
  //
  const int grid_dims[3] = { (argc != 7) ? 100 : std::atoi(argv[1]),
                             (argc != 7) ? 100 : std::atoi(argv[2]),
                             (argc != 7) ? 100 : std::atoi(argv[3]) };
  const int halo_width =     (argc != 7) ?   1 : std::atoi(argv[4]);
  const int num_vars   =     (argc != 7) ?   3 : std::atoi(argv[5]);
  const int num_cycles =     (argc != 7) ?   3 : std::atoi(argv[6]);

Next, we allocate the variable data arrays (the memory manager in the example uses CUDA Unified Memory if CUDA is enabled). These grid variables are reset each cycle to allow checking the results of the packing and unpacking.

  //
  // Allocate grid variables and reference grid variables used to check
  // correctness.
  //
  std::vector<double*> vars    (num_vars, nullptr);
  std::vector<double*> vars_ref(num_vars, nullptr);

  for (int v = 0; v < num_vars; ++v) {
    vars[v]     = memoryManager::allocate<double>(var_size);
    vars_ref[v] = memoryManager::allocate<double>(var_size);
  }

We also allocate and initialize index lists of the grid elements to pack and unpack:

  //
  // Generate index lists for packing and unpacking
  //
  std::vector<int*> pack_index_lists(num_neighbors, nullptr);
  std::vector<int > pack_index_list_lengths(num_neighbors, 0);
  create_pack_lists(pack_index_lists, pack_index_list_lengths, halo_width, grid_dims);

  std::vector<int*> unpack_index_lists(num_neighbors, nullptr);
  std::vector<int > unpack_index_list_lengths(num_neighbors, 0);
  create_unpack_lists(unpack_index_lists, unpack_index_list_lengths, halo_width, grid_dims);

All the code examples presented below copy the data packed from the grid interior:

0

0

0

0

0

0

1

2

3

0

0

4

5

6

0

0

7

8

9

0

0

0

0

0

0

into the adjacent halo cells:

1

1

2

3

3

1

1

2

3

3

4

4

5

6

6

7

7

8

9

9

7

7

8

9

9

Although the example code does not use MPI and multiple domains (one per MPI rank, for example), as would be the case in a real distributed memory parallel application, the data copy operations represent the spirit of how data communication would be done.

Packing and Unpacking (Basic Loop Execution)

A sequential non-RAJA example of data packing and unpacking would look like:

      for (int l = 0; l < num_neighbors; ++l) {

        double* buffer = buffers[l];
        int* list = pack_index_lists[l];
        int  len  = pack_index_list_lengths[l];

        // pack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          for (int i = 0; i < len; i++) {
            buffer[i] = var[list[i]];
          }

          buffer += len;
        }

        // send single message
      }

and:

      for (int l = 0; l < num_neighbors; ++l) {

        // recv single message

        double* buffer = buffers[l];
        int* list = unpack_index_lists[l];
        int  len  = unpack_index_list_lengths[l];

        // unpack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          for (int i = 0; i < len; i++) {
            var[list[i]] = buffer[i];
          }

          buffer += len;
        }
      }

RAJA Variants using forall

A sequential RAJA example uses this execution policy type:

    using forall_policy = RAJA::seq_exec;

to pack the grid variable data into a buffer:

      for (int l = 0; l < num_neighbors; ++l) {

        double* buffer = buffers[l];
        int* list = pack_index_lists[l];
        int  len  = pack_index_list_lengths[l];

        // pack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          RAJA::forall<forall_policy>(range_segment(0, len), [=] (int i) {
            buffer[i] = var[list[i]];
          });

          buffer += len;
        }

        // send single message
      }

and unpack the buffer data into the grid variable array:

      for (int l = 0; l < num_neighbors; ++l) {

        // recv single message

        double* buffer = buffers[l];
        int* list = unpack_index_lists[l];
        int  len  = unpack_index_list_lengths[l];

        // unpack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          RAJA::forall<forall_policy>(range_segment(0, len), [=] (int i) {
            var[list[i]] = buffer[i];
          });

          buffer += len;
        }
      }

For parallel multithreading execution via OpenMP, the example can be run by replacing the execution policy with:

    using forall_policy = RAJA::omp_parallel_for_exec;

Similarly, to run the loops in parallel on a CUDA GPU, we would use this policy:

    using forall_policy = RAJA::cuda_exec_async<CUDA_BLOCK_SIZE>;

Note that we can use an asynchronous execution policy because there are no data races due to the intermediate buffer usage.

RAJA Variants using workgroup constructs

Using the workgroup constructs in the example requires defining a few more policies and types:

    using forall_policy = RAJA::seq_exec;

    using workgroup_policy = RAJA::WorkGroupPolicy <
                                 RAJA::seq_work,
                                 RAJA::ordered,
                                 RAJA::ragged_array_of_objects,
                                 RAJA::indirect_function_call_dispatch >;

    using workpool = RAJA::WorkPool< workgroup_policy,
                                     int,
                                     RAJA::xargs<>,
                                     memory_manager_allocator<char> >;

    using workgroup = RAJA::WorkGroup< workgroup_policy,
                                       int,
                                       RAJA::xargs<>,
                                       memory_manager_allocator<char> >;

    using worksite = RAJA::WorkSite< workgroup_policy,
                                     int,
                                     RAJA::xargs<>,
                                     memory_manager_allocator<char> >;

which are used in a slightly rearranged version of packing. See how the comment indicating where messages are sent has been moved down after the call to run the operations enqueued on the workgroup:

      for (int l = 0; l < num_neighbors; ++l) {

        double* buffer = buffers[l];
        int* list = pack_index_lists[l];
        int  len  = pack_index_list_lengths[l];

        // pack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          pool_pack.enqueue(range_segment(0, len), [=] (int i) {
            buffer[i] = var[list[i]];
          });

          buffer += len;
        }
      }

      workgroup group_pack = pool_pack.instantiate();

      worksite site_pack = group_pack.run();

      // send all messages

Similarly, in the unpacking we wait to receive all of the messages before unpacking the data:

      // recv all messages

      for (int l = 0; l < num_neighbors; ++l) {

        double* buffer = buffers[l];
        int* list = unpack_index_lists[l];
        int  len  = unpack_index_list_lengths[l];

        // unpack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          pool_unpack.enqueue(range_segment(0, len), [=] (int i) {
            var[list[i]] = buffer[i];
          });

          buffer += len;
        }
      }

      workgroup group_unpack = pool_unpack.instantiate();

      worksite site_unpack = group_unpack.run();

This reorganization has the downside of not overlapping the message sends with packing and the message receives with unpacking.

For parallel multithreading execution via OpenMP, the example using workgroup can be run by replacing the policies and types with:

    using forall_policy = RAJA::omp_parallel_for_exec;

    using workgroup_policy = RAJA::WorkGroupPolicy <
                                 RAJA::omp_work,
                                 RAJA::ordered,
                                 RAJA::ragged_array_of_objects,
                                 RAJA::indirect_function_call_dispatch >;

    using workpool = RAJA::WorkPool< workgroup_policy,
                                     int,
                                     RAJA::xargs<>,
                                     memory_manager_allocator<char> >;

    using workgroup = RAJA::WorkGroup< workgroup_policy,
                                       int,
                                       RAJA::xargs<>,
                                       memory_manager_allocator<char> >;

    using worksite = RAJA::WorkSite< workgroup_policy,
                                     int,
                                     RAJA::xargs<>,
                                     memory_manager_allocator<char> >;

The main differences between these types and the ones defined for the sequential case above are the forall_policy and the workgroup_policy, which use OpenMP execution policy types.

Similarly, to run the loops in parallel on a CUDA GPU use these policies and types, taking note of the unordered work ordering policy that allows the enqueued loops to all be run using a single CUDA kernel:

    using forall_policy = RAJA::cuda_exec_async<CUDA_BLOCK_SIZE>;

    using workgroup_policy = RAJA::WorkGroupPolicy <
                                 RAJA::cuda_work_async<CUDA_WORKGROUP_BLOCK_SIZE>,
                                 RAJA::unordered_cuda_loop_y_block_iter_x_threadblock_average,
                                 RAJA::constant_stride_array_of_objects,
                                 RAJA::indirect_function_call_dispatch >;

    using workpool = RAJA::WorkPool< workgroup_policy,
                                     int,
                                     RAJA::xargs<>,
                                     pinned_allocator<char> >;

    using workgroup = RAJA::WorkGroup< workgroup_policy,
                                       int,
                                       RAJA::xargs<>,
                                       pinned_allocator<char> >;

    using worksite = RAJA::WorkSite< workgroup_policy,
                                     int,
                                     RAJA::xargs<>,
                                     pinned_allocator<char> >;

The main differences between these types and the ones defined for the sequential and OpenMP cases above are the forall_policy and the workgroup_policy, which use different template parameters, and the workpool, workgroup, and worksite types which use ‘pinned’ memory allocation.

The packing is the same as the previous workgroup packing examples with the exception of added synchronization after calling the workgroup run method and before sending the messages. In the example code, there is a CUDA version that uses forall to launch num_neighbors * num_vars CUDA kernels and performs num_neighbors synchronizations to send each message in turn. Here, the reorganization to pack all messages before sending lets us use an unordered CUDA work ordering policy in the workgroup_policy that reduces the number of CUDA kernel launches to one. It also allows us to need to synchronize only once before sending all of the messages:

      for (int l = 0; l < num_neighbors; ++l) {

        double* buffer = buffers[l];
        int* list = pack_index_lists[l];
        int  len  = pack_index_list_lengths[l];

        // pack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          pool_pack.enqueue(range_segment(0, len), [=] RAJA_DEVICE (int i) {
            buffer[i] = var[list[i]];
          });

          buffer += len;
        }
      }

      workgroup group_pack = pool_pack.instantiate();

      worksite site_pack = group_pack.run();

      cudaErrchk(cudaDeviceSynchronize());

      // send all messages

After waiting to receive all of the messages we use workgroup constructs with a CUDA unordered work ordering policy to unpack all of the messages using a single kernel launch:

      // recv all messages

      for (int l = 0; l < num_neighbors; ++l) {

        double* buffer = buffers[l];
        int* list = unpack_index_lists[l];
        int  len  = unpack_index_list_lengths[l];

        // unpack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          pool_unpack.enqueue(range_segment(0, len), [=] RAJA_DEVICE (int i) {
            var[list[i]] = buffer[i];
          });

          buffer += len;
        }
      }

      workgroup group_unpack = pool_unpack.instantiate();

      worksite site_unpack = group_unpack.run();

      cudaErrchk(cudaDeviceSynchronize());

Note that the synchronization after unpacking is done to ensure that group_unpack and site_unpack survive until the unpacking loop has finished executing.