Halo Exchange (Workgroup Constructs)

Key RAJA features shown in this example:

  • RAJA::WorkPool workgroup construct
  • RAJA::WorkGroup workgroup construct
  • RAJA::WorkSite workgroup construct
  • RAJA::RangeSegment iteration space construct
  • RAJA workgroup policies

In this example, we show how to use the RAJA workgroup constructs to implement buffer packing and unpacking for data halo exchange on a computational grid, a common MPI communication operation. This may not provide a performance gain on a CPU system, but it can significantly speedup halo exchange on a GPU system compared to using RAJA::forall to run individual packing/unpacking kernels.

Note

Using an abstraction layer over RAJA can make it easy to switch between using individual RAJA::forall loops or the RAJA workgroup constructs to implement halo exchange packing and unpacking at compile time or run time.

We start by setting the parameters for the halo exchange by using default values or values provided via command line input. These parameters determine the size of the grid, the width of the halo, the number of grid variables and the number of cycles.

  //
  // Define grid dimensions
  // Define halo width
  // Define number of grid variables
  // Define number of cycles
  //
  const int grid_dims[3] = { (argc != 7) ? 100 : std::atoi(argv[1]),
                             (argc != 7) ? 100 : std::atoi(argv[2]),
                             (argc != 7) ? 100 : std::atoi(argv[3]) };
  const int halo_width =     (argc != 7) ?   1 : std::atoi(argv[4]);
  const int num_vars   =     (argc != 7) ?   3 : std::atoi(argv[5]);
  const int num_cycles =     (argc != 7) ?   3 : std::atoi(argv[6]);

Next, we allocate the variables array (the memory manager in the example uses CUDA Unified Memory if CUDA is enabled). These grid variables are reset each cycle to allow checking the results of the packing and unpacking.

  //
  // Allocate grid variables and reference grid variables used to check
  // correctness.
  //
  std::vector<double*> vars    (num_vars, nullptr);
  std::vector<double*> vars_ref(num_vars, nullptr);

  for (int v = 0; v < num_vars; ++v) {
    vars[v]     = memoryManager::allocate<double>(var_size);
    vars_ref[v] = memoryManager::allocate<double>(var_size);
  }

We also allocate and initialize index lists of the grid elements to pack and unpack:

All the code examples presented below copy the data packed from the grid interior:

0 0 0 0 0
0 1 2 3 0
0 4 5 6 0
0 7 8 9 0
0 0 0 0 0

into the adjacent halo cells:

1 1 2 3 3
1 1 2 3 3
4 4 5 6 6
7 7 8 9 9
7 7 8 9 9

Packing and Unpacking (Basic Loop Execution)

A sequential non-RAJA example of packing:

      for (int l = 0; l < num_neighbors; ++l) {

        double* buffer = buffers[l];
        int* list = pack_index_lists[l];
        int  len  = pack_index_list_lengths[l];

        // pack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          for (int i = 0; i < len; i++) {
            buffer[i] = var[list[i]];
          }

          buffer += len;
        }

        // send single message
      }

and unpacking:

      for (int l = 0; l < num_neighbors; ++l) {

        // recv single message

        double* buffer = buffers[l];
        int* list = unpack_index_lists[l];
        int  len  = unpack_index_list_lengths[l];

        // unpack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          for (int i = 0; i < len; i++) {
            var[list[i]] = buffer[i];
          }

          buffer += len;
        }
      }

RAJA Variants using forall

A sequential RAJA example uses this execution policy type:

    using forall_policy = RAJA::loop_exec;

to pack the grid variable data into a buffer:

      for (int l = 0; l < num_neighbors; ++l) {

        double* buffer = buffers[l];
        int* list = pack_index_lists[l];
        int  len  = pack_index_list_lengths[l];

        // pack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          RAJA::forall<forall_policy>(range_segment(0, len), [=] (int i) {
            buffer[i] = var[list[i]];
          });

          buffer += len;
        }

        // send single message
      }

and unpack the buffer data into the grid variable array:

      for (int l = 0; l < num_neighbors; ++l) {

        // recv single message

        double* buffer = buffers[l];
        int* list = unpack_index_lists[l];
        int  len  = unpack_index_list_lengths[l];

        // unpack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          RAJA::forall<forall_policy>(range_segment(0, len), [=] (int i) {
            var[list[i]] = buffer[i];
          });

          buffer += len;
        }
      }

For parallel multi-threading execution via OpenMP, the example can be run by replacing the execution policy with:

    using forall_policy = RAJA::omp_parallel_for_exec;

Similarly, to run the loops in parallel on a CUDA GPU, we would use this policy:

    using forall_policy = RAJA::cuda_exec_async<CUDA_BLOCK_SIZE>;

RAJA Variants using workgroup constructs

Using the workgroup constructs in the example requires defining a few more policies and types:

    using forall_policy = RAJA::loop_exec;

    using workgroup_policy = RAJA::WorkGroupPolicy <
                                 RAJA::loop_work,
                                 RAJA::ordered,
                                 RAJA::ragged_array_of_objects >;

    using workpool = RAJA::WorkPool< workgroup_policy,
                                     int,
                                     RAJA::xargs<>,
                                     memory_manager_allocator<char> >;

    using workgroup = RAJA::WorkGroup< workgroup_policy,
                                       int,
                                       RAJA::xargs<>,
                                       memory_manager_allocator<char> >;

    using worksite = RAJA::WorkSite< workgroup_policy,
                                     int,
                                     RAJA::xargs<>,
                                     memory_manager_allocator<char> >;

which are used in a slightly rearranged version of packing. See how the comment indicating where a message could be sent has been moved down after the call to run on the workgroup:

      for (int l = 0; l < num_neighbors; ++l) {

        double* buffer = buffers[l];
        int* list = pack_index_lists[l];
        int  len  = pack_index_list_lengths[l];

        // pack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          pool_pack.enqueue(range_segment(0, len), [=] (int i) {
            buffer[i] = var[list[i]];
          });

          buffer += len;
        }
      }

      workgroup group_pack = pool_pack.instantiate();

      worksite site_pack = group_pack.run();

      // send all messages

Similarly, in the unpacking we wait to receive all of the messages before unpacking the data:

      // recv all messages

      for (int l = 0; l < num_neighbors; ++l) {

        double* buffer = buffers[l];
        int* list = unpack_index_lists[l];
        int  len  = unpack_index_list_lengths[l];

        // unpack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          pool_unpack.enqueue(range_segment(0, len), [=] (int i) {
            var[list[i]] = buffer[i];
          });

          buffer += len;
        }
      }

      workgroup group_unpack = pool_unpack.instantiate();

      worksite site_unpack = group_unpack.run();

This reorganization has the downside of not overlapping the message sends with packing and the message receives with unpacking.

For parallel multi-threading execution via OpenMP, the example using workgroup can be run by replacing the policies and types with:

    using forall_policy = RAJA::omp_parallel_for_exec;

    using workgroup_policy = RAJA::WorkGroupPolicy <
                                 RAJA::omp_work,
                                 RAJA::ordered,
                                 RAJA::ragged_array_of_objects >;

    using workpool = RAJA::WorkPool< workgroup_policy,
                                     int,
                                     RAJA::xargs<>,
                                     memory_manager_allocator<char> >;

    using workgroup = RAJA::WorkGroup< workgroup_policy,
                                       int,
                                       RAJA::xargs<>,
                                       memory_manager_allocator<char> >;

    using worksite = RAJA::WorkSite< workgroup_policy,
                                     int,
                                     RAJA::xargs<>,
                                     memory_manager_allocator<char> >;

Similarly, to run the loops in parallel on a CUDA GPU use these policies and types, taking note of the unordered work ordering policy that allows the enqueued loops to all be run using a single CUDA kernel:

    using forall_policy = RAJA::cuda_exec_async<CUDA_BLOCK_SIZE>;

    using workgroup_policy = RAJA::WorkGroupPolicy <
                                 RAJA::cuda_work_async<CUDA_WORKGROUP_BLOCK_SIZE>,
                                 RAJA::unordered_cuda_loop_y_block_iter_x_threadblock_average,
                                 RAJA::constant_stride_array_of_objects >;

    using workpool = RAJA::WorkPool< workgroup_policy,
                                     int,
                                     RAJA::xargs<>,
                                     pinned_allocator<char> >;

    using workgroup = RAJA::WorkGroup< workgroup_policy,
                                       int,
                                       RAJA::xargs<>,
                                       pinned_allocator<char> >;

    using worksite = RAJA::WorkSite< workgroup_policy,
                                     int,
                                     RAJA::xargs<>,
                                     pinned_allocator<char> >;

The packing is the same as the previous workgroup packing examples with the exception of added synchronization after calling run and before sending the messages. The previous CUDA example used forall to launch num_neighbors * num_vars CUDA kernels and performed num_neighbors synchronizations to send each message in turn. Here, the reorganization to pack all messages before sending lets us use an unordered CUDA work ordering policy in the workgroup constructs that reduces the number of CUDA kernel launches to one. It also allows us to synchronize once before sending all of the messages:

      for (int l = 0; l < num_neighbors; ++l) {

        double* buffer = buffers[l];
        int* list = pack_index_lists[l];
        int  len  = pack_index_list_lengths[l];

        // pack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          pool_pack.enqueue(range_segment(0, len), [=] RAJA_DEVICE (int i) {
            buffer[i] = var[list[i]];
          });

          buffer += len;
        }
      }

      workgroup group_pack = pool_pack.instantiate();

      worksite site_pack = group_pack.run();

      cudaErrchk(cudaDeviceSynchronize());

      // send all messages

After waiting to receive all of the messages we use workgroup constructs using a CUDA unordered work ordering policy to unpack all of the messages using a single kernel launch:

      // recv all messages

      for (int l = 0; l < num_neighbors; ++l) {

        double* buffer = buffers[l];
        int* list = unpack_index_lists[l];
        int  len  = unpack_index_list_lengths[l];

        // unpack
        for (int v = 0; v < num_vars; ++v) {

          double* var = vars[v];

          pool_unpack.enqueue(range_segment(0, len), [=] RAJA_DEVICE (int i) {
            var[list[i]] = buffer[i];
          });

          buffer += len;
        }
      }

      workgroup group_unpack = pool_unpack.instantiate();

      worksite site_unpack = group_unpack.run();

      cudaErrchk(cudaDeviceSynchronize());

Note that the synchronization after unpacking is done to ensure that group_unpack and site_unpack survive until the unpacking loop has finished executing.

The file RAJA/examples/tut_halo-exchange.cpp contains a complete working example code, with OpenMP, CUDA, and HIP variants.