Cabana 0.8.0-dev
 
Loading...
Searching...
No Matches
Cabana_CommunicationPlan.hpp
Go to the documentation of this file.
1/****************************************************************************
2 * Copyright (c) 2018-2023 by the Cabana authors *
3 * All rights reserved. *
4 * *
5 * This file is part of the Cabana library. Cabana is distributed under a *
6 * BSD 3-clause license. For the licensing terms see the LICENSE file in *
7 * the top-level directory. *
8 * *
9 * SPDX-License-Identifier: BSD-3-Clause *
10 ****************************************************************************/
11
16#ifndef CABANA_COMMUNICATIONPLAN_HPP
17#define CABANA_COMMUNICATIONPLAN_HPP
18
19#include <Cabana_Utils.hpp>
20
21#include <Kokkos_Core.hpp>
22#include <Kokkos_ScatterView.hpp>
23
24#include <mpi.h>
25
26#include <algorithm>
27#include <exception>
28#include <memory>
29#include <numeric>
30#include <type_traits>
31#include <vector>
32
33namespace Cabana
34{
35namespace Impl
36{
38//---------------------------------------------------------------------------//
39// Count sends and create steering algorithm tags.
40struct CountSendsAndCreateSteeringDuplicated
41{
42};
43struct CountSendsAndCreateSteeringAtomic
44{
45};
46
47//---------------------------------------------------------------------------//
48// Count sends and create steering algorithm selector.
49template <class ExecutionSpace>
50struct CountSendsAndCreateSteeringAlgorithm;
51
52// CUDA and HIP use atomics.
53#ifdef KOKKOS_ENABLE_CUDA
54template <>
55struct CountSendsAndCreateSteeringAlgorithm<Kokkos::Cuda>
56{
57 using type = CountSendsAndCreateSteeringAtomic;
58};
59#endif // end KOKKOS_ENABLE_CUDA
60#ifdef KOKKOS_ENABLE_HIP
61template <>
62struct CountSendsAndCreateSteeringAlgorithm<Kokkos::Experimental::HIP>
63{
64 using type = CountSendsAndCreateSteeringAtomic;
65};
66#endif // end KOKKOS_ENABLE_HIP
67#ifdef KOKKOS_ENABLE_SYCL
68template <>
69struct CountSendsAndCreateSteeringAlgorithm<Kokkos::Experimental::SYCL>
70{
71 using type = CountSendsAndCreateSteeringAtomic;
72};
73#endif // end KOKKOS_ENABLE_SYCL
74#ifdef KOKKOS_ENABLE_OPENMPTARGET
75template <>
76struct CountSendsAndCreateSteeringAlgorithm<Kokkos::Experimental::OpenMPTarget>
77{
78 using type = CountSendsAndCreateSteeringAtomic;
79};
80#endif // end KOKKOS_ENABLE_OPENMPTARGET
81
82// The default is to use duplication.
83template <class ExecutionSpace>
84struct CountSendsAndCreateSteeringAlgorithm
85{
86 using type = CountSendsAndCreateSteeringDuplicated;
87};
88
89//---------------------------------------------------------------------------//
90// Count sends and generate the steering vector. Atomic version.
91template <class ExecutionSpace, class ExportRankView>
92auto countSendsAndCreateSteering( ExecutionSpace,
93 const ExportRankView element_export_ranks,
94 const int comm_size,
95 CountSendsAndCreateSteeringAtomic )
96 -> std::pair<Kokkos::View<int*, typename ExportRankView::memory_space>,
97 Kokkos::View<typename ExportRankView::size_type*,
98 typename ExportRankView::memory_space>>
99{
100 using memory_space = typename ExportRankView::memory_space;
101 using size_type = typename ExportRankView::size_type;
102
103 // Create views.
104 Kokkos::View<int*, memory_space> neighbor_counts( "neighbor_counts",
105 comm_size );
106 Kokkos::View<size_type*, memory_space> neighbor_ids(
107 Kokkos::ViewAllocateWithoutInitializing( "neighbor_ids" ),
108 element_export_ranks.size() );
109
110 // Count the sends and create the steering vector.
111
112 // For smaller values of comm_size, use the optimized scratch memory path
113 // Value of 64 is arbitrary, should be at least 27 to cover 3-d halos?
114 if ( comm_size <= 64 )
115 {
116 constexpr int team_size = 256;
117 Kokkos::TeamPolicy<ExecutionSpace> team_policy(
118 ( element_export_ranks.size() + team_size - 1 ) / team_size,
119 team_size );
120 team_policy = team_policy.set_scratch_size(
121 0,
122 Kokkos::PerTeam( sizeof( int ) * ( team_size + 2 * comm_size ) ) );
123 Kokkos::parallel_for(
124 "Cabana::CommunicationPlan::countSendsAndCreateSteeringShared",
125 team_policy,
126 KOKKOS_LAMBDA(
127 const typename Kokkos::TeamPolicy<ExecutionSpace>::member_type&
128 team ) {
129 // NOTE: `get_shmem` returns shared memory pointers *aligned to
130 // 8 bytes*, so if `comm_size` is odd we can get erroneously
131 // padded offsets if we call `get_shmem` separately for each
132 // shared memory intermediary. Acquiring all the needed scratch
133 // memory at once then computing pointer offsets by hand avoids
134 // this issue.
135 int* scratch = (int*)team.team_shmem().get_shmem(
136 ( team.team_size() + 2 * comm_size ) * sizeof( int ), 0 );
137
138 // local neighbor_ids, gives the local offset relative to
139 // calculated global offset. Size team.team_size() * sizeof(int)
140 int* local_neighbor_ids = scratch;
141 // local histogram, `comm_size` in size
142 int* histo = local_neighbor_ids + team.team_size();
143 // offset into global array, `comm_size` in size
144 int* global_offset = histo + comm_size;
145 // overall element `tid`
146 const auto tid =
147 team.team_rank() + team.league_rank() * team.team_size();
148 // local element
149 const int local_id = team.team_rank();
150 // total number of elements, for convenience
151 const int num_elements = element_export_ranks.size();
152 // my element export rank
153 const int my_element_export_rank =
154 ( tid < num_elements ? element_export_ranks( tid ) : -1 );
155
156 // cannot outright terminate early b/c threads share work
157 // if (tid >= num_elements) return;
158 const bool in_bounds =
159 tid < num_elements && my_element_export_rank >= 0;
160
161 Kokkos::parallel_for(
162 Kokkos::TeamThreadRange( team, comm_size ),
163 [&]( const int i ) { histo[i] = 0; } );
164
165 // synchronize zeroing
166 team.team_barrier();
167
168 // build local histogram, need to encode num_elements check here
169 if ( in_bounds )
170 {
171 // shared memory atomic add, accumulate into local offset
172 local_neighbor_ids[local_id] = Kokkos::atomic_fetch_add(
173 &histo[my_element_export_rank], 1 );
174 }
175
176 // synchronize local histogram build
177 team.team_barrier();
178
179 // reserve space in global array via a loop over neighbor counts
180 Kokkos::parallel_for(
181 Kokkos::TeamThreadRange( team, comm_size ),
182 [&]( const int i )
183 {
184 // global memory atomic add, reserves space
185 global_offset[i] = Kokkos::atomic_fetch_add(
186 &neighbor_counts( i ), histo[i] );
187 } );
188
189 // synchronize block-stride loop
190 team.team_barrier();
191
192 // and now store to my location
193 if ( in_bounds )
194 {
195 neighbor_ids( tid ) =
196 global_offset[my_element_export_rank] +
197 local_neighbor_ids[local_id];
198 }
199 } );
200 }
201 else
202 {
203 // For larger numbers of export ranks (for ex, migration) we use the
204 // global memory version, though this is a point of future optimization
205
206 Kokkos::parallel_for(
207 "Cabana::CommunicationPlan::countSendsAndCreateSteering",
208 Kokkos::RangePolicy<ExecutionSpace>( 0,
209 element_export_ranks.size() ),
210 KOKKOS_LAMBDA( const size_type i ) {
211 if ( element_export_ranks( i ) >= 0 )
212 neighbor_ids( i ) = Kokkos::atomic_fetch_add(
213 &neighbor_counts( element_export_ranks( i ) ), 1 );
214 } );
215 }
216 Kokkos::fence();
217
218 // Return the counts and ids.
219 return std::make_pair( neighbor_counts, neighbor_ids );
220}
221
222//---------------------------------------------------------------------------//
223// Count sends and generate the steering vector. Duplicated version.
224template <class ExecutionSpace, class ExportRankView>
225auto countSendsAndCreateSteering( ExecutionSpace,
226 const ExportRankView element_export_ranks,
227 const int comm_size,
228 CountSendsAndCreateSteeringDuplicated )
229 -> std::pair<Kokkos::View<int*, typename ExportRankView::memory_space>,
230 Kokkos::View<typename ExportRankView::size_type*,
231 typename ExportRankView::memory_space>>
232{
233 using memory_space = typename ExportRankView::memory_space;
234 using size_type = typename ExportRankView::size_type;
235
236 // Create a unique thread token.
237 Kokkos::Experimental::UniqueToken<
238 ExecutionSpace, Kokkos::Experimental::UniqueTokenScope::Global>
239 unique_token;
240
241 // Create views.
242 Kokkos::View<int*, memory_space> neighbor_counts(
243 Kokkos::ViewAllocateWithoutInitializing( "neighbor_counts" ),
244 comm_size );
245 Kokkos::View<size_type*, memory_space> neighbor_ids(
246 Kokkos::ViewAllocateWithoutInitializing( "neighbor_ids" ),
247 element_export_ranks.size() );
248 Kokkos::View<int**, memory_space> neighbor_counts_dup(
249 "neighbor_counts", unique_token.size(), comm_size );
250 Kokkos::View<size_type**, memory_space> neighbor_ids_dup(
251 "neighbor_ids", unique_token.size(), element_export_ranks.size() );
252
253 // Compute initial duplicated sends and steering.
254 Kokkos::parallel_for(
255 "Cabana::CommunicationPlan::intialCount",
256 Kokkos::RangePolicy<ExecutionSpace>( 0, element_export_ranks.size() ),
257 KOKKOS_LAMBDA( const size_type i ) {
258 if ( element_export_ranks( i ) >= 0 )
259 {
260 // Get the thread id.
261 auto thread_id = unique_token.acquire();
262
263 // Do the duplicated fetch-add. If this is a valid element id
264 // increment the send count for this rank. Add the incremented
265 // count as the thread-local neighbor id. This is too big by
266 // one (because we use the prefix increment operator) but we
267 // want a non-zero value so we can later find which thread
268 // this element was located on because we are always
269 // guaranteed a non-zero value. We will subtract this value
270 // later.
271 neighbor_ids_dup( thread_id, i ) = ++neighbor_counts_dup(
272 thread_id, element_export_ranks( i ) );
273
274 // Release the thread id.
275 unique_token.release( thread_id );
276 }
277 } );
278 Kokkos::fence();
279
280 // Team policy
281 using team_policy =
282 Kokkos::TeamPolicy<ExecutionSpace, Kokkos::Schedule<Kokkos::Dynamic>>;
283 using index_type = typename team_policy::index_type;
284
285 // Compute the send counts for each neighbor rank by reducing across
286 // the thread duplicates.
287 Kokkos::parallel_for(
288 "Cabana::CommunicationPlan::finalCount",
289 team_policy( neighbor_counts.extent( 0 ), Kokkos::AUTO ),
290 KOKKOS_LAMBDA( const typename team_policy::member_type& team ) {
291 // Get the element id.
292 auto i = team.league_rank();
293
294 // Add the thread results.
295 int thread_counts = 0;
296 Kokkos::parallel_reduce(
297 Kokkos::TeamThreadRange( team,
298 neighbor_counts_dup.extent( 0 ) ),
299 [&]( const index_type thread_id, int& result )
300 { result += neighbor_counts_dup( thread_id, i ); },
301 thread_counts );
302 neighbor_counts( i ) = thread_counts;
303 } );
304 Kokkos::fence();
305
306 // Compute the location of each export element in the send buffer of
307 // its destination rank.
308 Kokkos::parallel_for(
309 "Cabana::CommunicationPlan::createSteering",
310 team_policy( element_export_ranks.size(), Kokkos::AUTO ),
311 KOKKOS_LAMBDA( const typename team_policy::member_type& team ) {
312 // Get the element id.
313 auto i = team.league_rank();
314
315 // Only operate on valid elements
316 if ( element_export_ranks( i ) >= 0 )
317 {
318 // Compute the thread id in which we located the element
319 // during the count phase. Only the thread in which we
320 // located the element will contribute to the reduction.
321 index_type dup_thread = 0;
322 Kokkos::parallel_reduce(
323 Kokkos::TeamThreadRange( team,
324 neighbor_ids_dup.extent( 0 ) ),
325 [&]( const index_type thread_id, index_type& result )
326 {
327 if ( neighbor_ids_dup( thread_id, i ) > 0 )
328 result += thread_id;
329 },
330 dup_thread );
331
332 // Compute the offset of this element in the steering
333 // vector for its destination rank. Loop through the
334 // threads up to the thread that found this element in the
335 // count stage. All thread counts prior to that thread
336 // will contribute to the offset.
337 size_type thread_offset = 0;
338 Kokkos::parallel_reduce(
339 Kokkos::TeamThreadRange( team, dup_thread ),
340 [&]( const index_type thread_id, size_type& result ) {
341 result += neighbor_counts_dup(
342 thread_id, element_export_ranks( i ) );
343 },
344 thread_offset );
345
346 // Add the thread-local value to the offset where we subtract
347 // the 1 that we added artificially when we were first
348 // counting.
349 neighbor_ids( i ) =
350 thread_offset + neighbor_ids_dup( dup_thread, i ) - 1;
351 }
352 } );
353 Kokkos::fence();
354
355 // Return the counts and ids.
356 return std::make_pair( neighbor_counts, neighbor_ids );
357}
358
359//---------------------------------------------------------------------------//
361} // end namespace Impl
362
363//---------------------------------------------------------------------------//
370inline std::vector<int> getUniqueTopology( MPI_Comm comm,
371 std::vector<int> topology )
372{
373 auto remove_end = std::remove( topology.begin(), topology.end(), -1 );
374 std::sort( topology.begin(), remove_end );
375 auto unique_end = std::unique( topology.begin(), remove_end );
376 topology.resize( std::distance( topology.begin(), unique_end ) );
377
378 // Put this rank first.
379 int my_rank = -1;
380 MPI_Comm_rank( comm, &my_rank );
381 for ( auto& n : topology )
382 {
383 if ( n == my_rank )
384 {
385 std::swap( n, topology[0] );
386 break;
387 }
388 }
389 return topology;
390}
391
392//---------------------------------------------------------------------------//
418template <class MemorySpace>
420{
421 public:
422 // FIXME: extracting the self type for backwards compatibility with previous
423 // template on DeviceType. Should simply be MemorySpace after next release.
425 using memory_space = typename MemorySpace::memory_space;
426 // FIXME: replace warning with memory space assert after next release.
427 static_assert( Impl::deprecated( Kokkos::is_device<MemorySpace>() ) );
428
430 using device_type [[deprecated]] = typename memory_space::device_type;
431
433 using execution_space = typename memory_space::execution_space;
434
435 // FIXME: extracting the self type for backwards compatibility with previous
436 // template on DeviceType. Should simply be memory_space::size_type after
437 // next release.
439 using size_type = typename memory_space::memory_space::size_type;
440
447 {
448 _comm_ptr.reset(
449 // Duplicate the communicator and store in a std::shared_ptr so that
450 // all copies point to the same object
451 [comm]()
452 {
453 auto p = std::make_unique<MPI_Comm>();
454 MPI_Comm_dup( comm, p.get() );
455 return p.release();
456 }(),
457 // Custom deleter to mark the communicator for deallocation
458 []( MPI_Comm* p )
459 {
460 MPI_Comm_free( p );
461 delete p;
462 } );
463 }
464
468 MPI_Comm comm() const { return *_comm_ptr; }
469
475 int numNeighbor() const { return _neighbors.size(); }
476
482 int neighborRank( const int neighbor ) const
483 {
484 return _neighbors[neighbor];
485 }
486
495 std::size_t numExport( const int neighbor ) const
496 {
497 return _num_export[neighbor];
498 }
499
505 std::size_t totalNumExport() const { return _total_num_export; }
506
515 std::size_t numImport( const int neighbor ) const
516 {
517 return _num_import[neighbor];
518 }
519
525 std::size_t totalNumImport() const { return _total_num_import; }
526
537 std::size_t exportSize() const { return _num_export_element; }
538
548 Kokkos::View<std::size_t*, memory_space> getExportSteering() const
549 {
550 return _export_steering;
551 }
552
553 // The functions in the public block below would normally be protected but
554 // we make them public to allow using private class data in CUDA kernels
555 // with lambda functions.
556 public:
594 template <class ExecutionSpace, class ViewType>
595 Kokkos::View<size_type*, memory_space>
596 createFromExportsAndTopology( ExecutionSpace exec_space,
597 const ViewType& element_export_ranks,
598 const std::vector<int>& neighbor_ranks )
599 {
601
602 // Store the number of export elements.
603 _num_export_element = element_export_ranks.size();
604
605 // Store the unique neighbors (this rank first).
606 _neighbors = getUniqueTopology( comm(), neighbor_ranks );
607 int num_n = _neighbors.size();
608
609 // Get the size of this communicator.
610 int comm_size = -1;
611 MPI_Comm_size( comm(), &comm_size );
612
613 // Get the MPI rank we are currently on.
614 int my_rank = -1;
615 MPI_Comm_rank( comm(), &my_rank );
616
617 // Pick an mpi tag for communication. This object has it's own
618 // communication space so any mpi tag will do.
619 const int mpi_tag = 1221;
620
621 // Initialize import/export sizes.
622 _num_export.assign( num_n, 0 );
623 _num_import.assign( num_n, 0 );
624
625 // Count the number of sends this rank will do to other ranks. Keep
626 // track of which slot we get in our neighbor's send buffer.
627 auto counts_and_ids = Impl::countSendsAndCreateSteering(
628 exec_space, element_export_ranks, comm_size,
629 typename Impl::CountSendsAndCreateSteeringAlgorithm<
630 ExecutionSpace>::type() );
631
632 // Copy the counts to the host.
633 auto neighbor_counts_host = Kokkos::create_mirror_view_and_copy(
634 Kokkos::HostSpace(), counts_and_ids.first );
635
636 // Get the export counts.
637 for ( int n = 0; n < num_n; ++n )
638 _num_export[n] = neighbor_counts_host( _neighbors[n] );
639
640 // Post receives for the number of imports we will get.
641 std::vector<MPI_Request> requests;
642 requests.reserve( num_n );
643 for ( int n = 0; n < num_n; ++n )
644 if ( my_rank != _neighbors[n] )
645 {
646 requests.push_back( MPI_Request() );
647 MPI_Irecv( &_num_import[n], 1, MPI_UNSIGNED_LONG, _neighbors[n],
648 mpi_tag, comm(), &( requests.back() ) );
649 }
650 else
651 _num_import[n] = _num_export[n];
652
653 // Send the number of exports to each of our neighbors.
654 for ( int n = 0; n < num_n; ++n )
655 if ( my_rank != _neighbors[n] )
656 MPI_Send( &_num_export[n], 1, MPI_UNSIGNED_LONG, _neighbors[n],
657 mpi_tag, comm() );
658
659 // Wait on receives.
660 std::vector<MPI_Status> status( requests.size() );
661 const int ec =
662 MPI_Waitall( requests.size(), requests.data(), status.data() );
663 if ( MPI_SUCCESS != ec )
664 throw std::logic_error(
665 "Cabana::CommunicationPlan::createFromExportsAndTopology: "
666 "Failed MPI Communication" );
667
668 // Get the total number of imports/exports.
669 _total_num_export = std::accumulate(
670 _num_export.begin(), _num_export.end(), std::size_t{ 0u } );
671 _total_num_import = std::accumulate(
672 _num_import.begin(), _num_import.end(), std::size_t{ 0u } );
673
674 // Barrier before continuing to ensure synchronization.
675 MPI_Barrier( comm() );
676
677 // Return the neighbor ids.
678 return counts_and_ids.second;
679 }
680
716 template <class ViewType>
717 Kokkos::View<size_type*, memory_space>
718 createFromExportsAndTopology( const ViewType& element_export_ranks,
719 const std::vector<int>& neighbor_ranks )
720 {
721 // Use the default execution space.
723 execution_space{}, element_export_ranks, neighbor_ranks );
724 }
725
756 template <class ExecutionSpace, class ViewType>
757 Kokkos::View<size_type*, memory_space>
758 createFromExportsOnly( ExecutionSpace exec_space,
759 const ViewType& element_export_ranks )
760 {
762
763 // Store the number of export elements.
764 _num_export_element = element_export_ranks.size();
765
766 // Get the size of this communicator.
767 int comm_size = -1;
768 MPI_Comm_size( comm(), &comm_size );
769
770 // Get the MPI rank we are currently on.
771 int my_rank = -1;
772 MPI_Comm_rank( comm(), &my_rank );
773
774 // Pick an mpi tag for communication. This object has it's own
775 // communication space so any mpi tag will do.
776 const int mpi_tag = 1221;
777
778 // Count the number of sends this rank will do to other ranks. Keep
779 // track of which slot we get in our neighbor's send buffer.
780 auto counts_and_ids = Impl::countSendsAndCreateSteering(
781 exec_space, element_export_ranks, comm_size,
782 typename Impl::CountSendsAndCreateSteeringAlgorithm<
783 ExecutionSpace>::type() );
784
785 // Copy the counts to the host.
786 auto neighbor_counts_host = Kokkos::create_mirror_view_and_copy(
787 Kokkos::HostSpace(), counts_and_ids.first );
788
789 // Extract the export ranks and number of exports and then flag the
790 // send ranks.
791 _neighbors.clear();
792 _num_export.clear();
793 _total_num_export = 0;
794 for ( int r = 0; r < comm_size; ++r )
795 if ( neighbor_counts_host( r ) > 0 )
796 {
797 _neighbors.push_back( r );
798 _num_export.push_back( neighbor_counts_host( r ) );
799 _total_num_export += neighbor_counts_host( r );
800 neighbor_counts_host( r ) = 1;
801 }
802
803 // Get the number of export ranks and initially allocate the import
804 // sizes.
805 int num_export_rank = _neighbors.size();
806 _num_import.assign( num_export_rank, 0 );
807
808 // If we are sending to ourself put that one first in the neighbor
809 // list and assign the number of imports to be the number of exports.
810 bool self_send = false;
811 for ( int n = 0; n < num_export_rank; ++n )
812 if ( _neighbors[n] == my_rank )
813 {
814 std::swap( _neighbors[n], _neighbors[0] );
815 std::swap( _num_export[n], _num_export[0] );
816 _num_import[0] = _num_export[0];
817 self_send = true;
818 break;
819 }
820
821 // Determine how many total import ranks each neighbor has.
822 int num_import_rank = -1;
823 std::vector<int> recv_counts( comm_size, 1 );
824 MPI_Reduce_scatter( neighbor_counts_host.data(), &num_import_rank,
825 recv_counts.data(), MPI_INT, MPI_SUM, comm() );
826 if ( self_send )
827 --num_import_rank;
828
829 // Post the expected number of receives and indicate we might get them
830 // from any rank.
831 std::vector<std::size_t> import_sizes( num_import_rank );
832 std::vector<MPI_Request> requests( num_import_rank );
833 for ( int n = 0; n < num_import_rank; ++n )
834 MPI_Irecv( &import_sizes[n], 1, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE,
835 mpi_tag, comm(), &requests[n] );
836
837 // Do blocking sends. Dont do any self sends.
838 int self_offset = ( self_send ) ? 1 : 0;
839 for ( int n = self_offset; n < num_export_rank; ++n )
840 MPI_Send( &_num_export[n], 1, MPI_UNSIGNED_LONG, _neighbors[n],
841 mpi_tag, comm() );
842
843 // Wait on non-blocking receives.
844 std::vector<MPI_Status> status( requests.size() );
845 const int ec =
846 MPI_Waitall( requests.size(), requests.data(), status.data() );
847 if ( MPI_SUCCESS != ec )
848 throw std::logic_error(
849 "Cabana::CommunicationPlan::createFromExportsOnly: Failed MPI "
850 "Communication" );
851
852 // Compute the total number of imports.
853 _total_num_import =
854 std::accumulate( import_sizes.begin(), import_sizes.end(),
855 ( self_send ) ? _num_import[0] : 0 );
856
857 // Extract the imports. If we did self sends we already know what
858 // imports we got from that.
859 for ( int i = 0; i < num_import_rank; ++i )
860 {
861 // Get the message source.
862 const auto source = status[i].MPI_SOURCE;
863
864 // See if the neighbor we received stuff from was someone we also
865 // sent stuff to.
866 auto found_neighbor =
867 std::find( _neighbors.begin(), _neighbors.end(), source );
868
869 // If this is a new neighbor (i.e. someone we didn't send anything
870 // to) record this.
871 if ( found_neighbor == std::end( _neighbors ) )
872 {
873 _neighbors.push_back( source );
874 _num_import.push_back( import_sizes[i] );
875 _num_export.push_back( 0 );
876 }
877
878 // Otherwise if we already sent something to this neighbor that
879 // means we already have a neighbor/export entry. Just assign the
880 // import entry for that neighbor.
881 else
882 {
883 auto n = std::distance( _neighbors.begin(), found_neighbor );
884 _num_import[n] = import_sizes[i];
885 }
886 }
887
888 // Barrier before continuing to ensure synchronization.
889 MPI_Barrier( comm() );
890
891 // Return the neighbor ids.
892 return counts_and_ids.second;
893 }
894
923 template <class ViewType>
924 Kokkos::View<size_type*, memory_space>
925 createFromExportsOnly( const ViewType& element_export_ranks )
926 {
927 // Use the default execution space.
928 return createFromExportsOnly( execution_space{}, element_export_ranks );
929 }
930
945 template <class PackViewType, class RankViewType>
946 void createExportSteering( const PackViewType& neighbor_ids,
947 const RankViewType& element_export_ranks )
948 {
949 // passing in element_export_ranks here as a dummy argument.
950 createSteering( true, neighbor_ids, element_export_ranks,
951 element_export_ranks );
952 }
953
973 template <class PackViewType, class RankViewType, class IdViewType>
974 void createExportSteering( const PackViewType& neighbor_ids,
975 const RankViewType& element_export_ranks,
976 const IdViewType& element_export_ids )
977 {
978 createSteering( false, neighbor_ids, element_export_ranks,
979 element_export_ids );
980 }
981
983 // Create the export steering vector.
984 template <class ExecutionSpace, class PackViewType, class RankViewType,
985 class IdViewType>
986 void createSteering( ExecutionSpace, const bool use_iota,
987 const PackViewType& neighbor_ids,
988 const RankViewType& element_export_ranks,
989 const IdViewType& element_export_ids )
990 {
992
993 if ( !use_iota &&
994 ( element_export_ids.size() != element_export_ranks.size() ) )
995 throw std::runtime_error(
996 "Cabana::CommunicationPlan::createSteering: Export ids and "
997 "ranks different sizes!" );
998
999 // Get the size of this communicator.
1000 int comm_size = -1;
1001 MPI_Comm_size( *_comm_ptr, &comm_size );
1002
1003 // Calculate the steering offsets via exclusive prefix sum for the
1004 // exports.
1005 int num_n = _neighbors.size();
1006 std::vector<std::size_t> offsets( num_n, 0.0 );
1007 for ( int n = 1; n < num_n; ++n )
1008 offsets[n] = offsets[n - 1] + _num_export[n - 1];
1009
1010 // Map the offsets to the device.
1011 Kokkos::View<std::size_t*, Kokkos::HostSpace> rank_offsets_host(
1012 Kokkos::ViewAllocateWithoutInitializing( "rank_map" ), comm_size );
1013 for ( int n = 0; n < num_n; ++n )
1014 rank_offsets_host( _neighbors[n] ) = offsets[n];
1015 auto rank_offsets = Kokkos::create_mirror_view_and_copy(
1016 memory_space(), rank_offsets_host );
1017
1018 // Create the export steering vector for writing local elements into
1019 // the send buffer. Note we create a local, shallow copy - this is a
1020 // CUDA workaround for handling class private data.
1021 _export_steering = Kokkos::View<std::size_t*, memory_space>(
1022 Kokkos::ViewAllocateWithoutInitializing( "export_steering" ),
1023 _total_num_export );
1024 auto steer_vec = _export_steering;
1025 Kokkos::parallel_for(
1026 "Cabana::CommunicationPlan::createSteering",
1027 Kokkos::RangePolicy<ExecutionSpace>( 0, _num_export_element ),
1028 KOKKOS_LAMBDA( const int i ) {
1029 if ( element_export_ranks( i ) >= 0 )
1030 steer_vec( rank_offsets( element_export_ranks( i ) ) +
1031 neighbor_ids( i ) ) =
1032 ( use_iota ) ? i : element_export_ids( i );
1033 } );
1034 Kokkos::fence();
1035 }
1036
1037 template <class PackViewType, class RankViewType, class IdViewType>
1038 void createSteering( const bool use_iota, const PackViewType& neighbor_ids,
1039 const RankViewType& element_export_ranks,
1040 const IdViewType& element_export_ids )
1041 {
1042 // Use the default execution space.
1043 createSteering( execution_space{}, use_iota, neighbor_ids,
1044 element_export_ranks, element_export_ids );
1045 }
1047
1048 private:
1049 std::shared_ptr<MPI_Comm> _comm_ptr;
1050 std::vector<int> _neighbors;
1051 std::size_t _total_num_export;
1052 std::size_t _total_num_import;
1053 std::vector<std::size_t> _num_export;
1054 std::vector<std::size_t> _num_import;
1055 std::size_t _num_export_element;
1056 Kokkos::View<std::size_t*, memory_space> _export_steering;
1057};
1058
1059//---------------------------------------------------------------------------//
1063template <class AoSoAType>
1065{
1066 static_assert( is_aosoa<AoSoAType>::value, "" );
1067
1069 using particle_data_type = AoSoAType;
1071 using memory_space = typename particle_data_type::memory_space;
1073 using data_type = typename particle_data_type::tuple_type;
1075 using buffer_type = typename Kokkos::View<data_type*, memory_space>;
1076
1082 : _particles( particles )
1083 {
1085 Kokkos::ViewAllocateWithoutInitializing( "send_buffer" ), 0 );
1087 Kokkos::ViewAllocateWithoutInitializing( "recv_buffer" ), 0 );
1088 }
1089
1091 void reallocateSend( const std::size_t num_send )
1092 {
1093 Kokkos::realloc( _send_buffer, num_send );
1094 }
1095
1096 void reallocateReceive( const std::size_t num_recv )
1097 {
1098 Kokkos::realloc( _recv_buffer, num_recv );
1099 }
1100
1108 std::size_t _num_comp = 0;
1109};
1110
1114template <class SliceType>
1116{
1117 static_assert( is_slice<SliceType>::value, "" );
1118
1120 using particle_data_type = SliceType;
1122 using memory_space = typename particle_data_type::memory_space;
1124 using data_type = typename particle_data_type::value_type;
1127 typename Kokkos::View<data_type**, Kokkos::LayoutRight, memory_space>;
1128
1134 : _particles( particles )
1135 {
1137
1139 Kokkos::ViewAllocateWithoutInitializing( "send_buffer" ), 0, 0 );
1141 Kokkos::ViewAllocateWithoutInitializing( "recv_buffer" ), 0, 0 );
1142 }
1143
1145 void reallocateSend( const std::size_t num_send )
1146 {
1147 Kokkos::realloc( _send_buffer, num_send, _num_comp );
1148 }
1149
1150 void reallocateReceive( const std::size_t num_recv )
1151 {
1152 Kokkos::realloc( _recv_buffer, num_recv, _num_comp );
1153 }
1154
1157 {
1158 _num_comp = 1;
1159 for ( std::size_t d = 2; d < _particles.viewRank(); ++d )
1160 _num_comp *= _particles.extent( d );
1161 }
1162
1170 std::size_t _num_comp;
1171};
1172//---------------------------------------------------------------------------//
1173
1177template <class CommPlanType, class CommDataType>
1179{
1180 public:
1182 using plan_type = CommPlanType;
1184 using execution_space = typename plan_type::execution_space;
1186 using policy_type = Kokkos::RangePolicy<execution_space>;
1188 using comm_data_type = CommDataType;
1190 using particle_data_type = typename comm_data_type::particle_data_type;
1192 using memory_space = typename comm_data_type::memory_space;
1194 using data_type = typename comm_data_type::data_type;
1196 using buffer_type = typename comm_data_type::buffer_type;
1197
1204 CommunicationData( const CommPlanType& comm_plan,
1205 const particle_data_type& particles,
1206 const double overallocation = 1.0 )
1207 : _comm_plan( comm_plan )
1208 , _comm_data( CommDataType( particles ) )
1209 , _overallocation( overallocation )
1210 {
1211 }
1212
1214 buffer_type getSendBuffer() const { return _comm_data._send_buffer; }
1216 buffer_type getReceiveBuffer() const { return _comm_data._recv_buffer; }
1217
1219 particle_data_type getData() const { return _comm_data._particles; }
1221 void setData( const particle_data_type& particles )
1222 {
1223 _comm_data._particles = particles;
1224 }
1225
1231 auto sendSize() { return _send_size; }
1237 auto receiveSize() { return _recv_size; }
1239 auto sendCapacity() { return _comm_data._send_buffer.extent( 0 ); }
1241 auto receiveCapacity() { return _comm_data._recv_buffer.extent( 0 ); }
1247 void shrinkToFit( const bool use_overallocation = false )
1248 {
1249 auto shrunk_send_size = _send_size;
1250 auto shrunk_recv_size = _recv_size;
1251 if ( use_overallocation )
1252 {
1253 shrunk_send_size *= _overallocation;
1254 shrunk_recv_size *= _overallocation;
1255 }
1256 _comm_data.reallocateSend( shrunk_send_size );
1257 _comm_data.reallocateReceive( shrunk_recv_size );
1258 }
1259
1261 virtual void apply() = 0;
1262
1264 template <class ExecutionSpace>
1265 void apply( ExecutionSpace );
1266
1268 void reserveImpl( const CommPlanType& comm_plan,
1269 const particle_data_type particles,
1270 const std::size_t total_send,
1271 const std::size_t total_recv,
1272 const double overallocation )
1273 {
1274 if ( overallocation < 1.0 )
1275 throw std::runtime_error( "Cabana::CommunicationPlan: "
1276 "Cannot allocate buffers with less space "
1277 "than data to communicate!" );
1278 _overallocation = overallocation;
1279
1280 reserveImpl( comm_plan, particles, total_send, total_recv );
1281 }
1282 void reserveImpl( const CommPlanType& comm_plan,
1283 const particle_data_type particles,
1284 const std::size_t total_send,
1285 const std::size_t total_recv )
1286 {
1287 _comm_plan = comm_plan;
1288 setData( particles );
1289
1290 auto send_capacity = sendCapacity();
1291 auto new_send_size = static_cast<std::size_t>(
1292 static_cast<double>( total_send ) * _overallocation );
1293 if ( new_send_size > send_capacity )
1294 _comm_data.reallocateSend( new_send_size );
1295
1296 auto recv_capacity = receiveCapacity();
1297 auto new_recv_size = static_cast<std::size_t>(
1298 static_cast<double>( total_recv ) * _overallocation );
1299 if ( new_recv_size > recv_capacity )
1300 _comm_data.reallocateReceive( new_recv_size );
1301
1302 _send_size = total_send;
1303 _recv_size = total_recv;
1304 }
1306
1307 protected:
1309 auto getSliceComponents() { return _comm_data._num_comp; };
1310
1318 std::size_t _send_size;
1320 std::size_t _recv_size;
1321};
1322
1323} // end namespace Cabana
1324
1325#endif // end CABANA_COMMUNICATIONPLAN_HPP
Cabana utilities.
void setData(const particle_data_type &particles)
Update particles to communicate.
Definition Cabana_CommunicationPlan.hpp:1221
auto sendCapacity()
Current allocated send buffer space.
Definition Cabana_CommunicationPlan.hpp:1239
typename plan_type::execution_space execution_space
Kokkos execution space.
Definition Cabana_CommunicationPlan.hpp:1184
void shrinkToFit(const bool use_overallocation=false)
Reduce communication buffers to current send/receive sizes.
Definition Cabana_CommunicationPlan.hpp:1247
auto receiveSize()
Current receive buffer size.
Definition Cabana_CommunicationPlan.hpp:1237
buffer_type getSendBuffer() const
Get the communication send buffer.
Definition Cabana_CommunicationPlan.hpp:1214
particle_data_type getData() const
Get the particles to communicate.
Definition Cabana_CommunicationPlan.hpp:1219
plan_type _comm_plan
Definition Cabana_CommunicationPlan.hpp:1312
std::size_t _recv_size
Definition Cabana_CommunicationPlan.hpp:1320
void apply(ExecutionSpace)
Perform the communication (migrate, gather, scatter).
typename comm_data_type::data_type data_type
Communication data type.
Definition Cabana_CommunicationPlan.hpp:1194
comm_data_type _comm_data
Definition Cabana_CommunicationPlan.hpp:1314
double _overallocation
Definition Cabana_CommunicationPlan.hpp:1316
typename comm_data_type::particle_data_type particle_data_type
Particle data type.
Definition Cabana_CommunicationPlan.hpp:1190
CommDataType comm_data_type
Communication data type.
Definition Cabana_CommunicationPlan.hpp:1188
Kokkos::RangePolicy< execution_space > policy_type
Kokkos execution policy.
Definition Cabana_CommunicationPlan.hpp:1186
typename comm_data_type::buffer_type buffer_type
Communication buffer type.
Definition Cabana_CommunicationPlan.hpp:1196
auto getSliceComponents()
Get the total number of components in the slice.
Definition Cabana_CommunicationPlan.hpp:1309
CommPlanType plan_type
Communication plan type (Halo, Distributor)
Definition Cabana_CommunicationPlan.hpp:1182
CommunicationData(const CommPlanType &comm_plan, const particle_data_type &particles, const double overallocation=1.0)
Definition Cabana_CommunicationPlan.hpp:1204
auto receiveCapacity()
Current allocated receive buffer space.
Definition Cabana_CommunicationPlan.hpp:1241
virtual void apply()=0
Perform the communication (migrate, gather, scatter).
auto sendSize()
Current send buffer size.
Definition Cabana_CommunicationPlan.hpp:1231
typename comm_data_type::memory_space memory_space
Kokkos memory space.
Definition Cabana_CommunicationPlan.hpp:1192
buffer_type getReceiveBuffer() const
Get the communication receive buffer.
Definition Cabana_CommunicationPlan.hpp:1216
std::size_t _send_size
Definition Cabana_CommunicationPlan.hpp:1318
Kokkos::View< size_type *, memory_space > createFromExportsOnly(ExecutionSpace exec_space, const ViewType &element_export_ranks)
Export rank creator. Use this when you don't know who you will receiving from - only who you are send...
Definition Cabana_CommunicationPlan.hpp:758
Kokkos::View< std::size_t *, memory_space > getExportSteering() const
Get the steering vector for the exports.
Definition Cabana_CommunicationPlan.hpp:548
std::size_t exportSize() const
Get the number of export elements.
Definition Cabana_CommunicationPlan.hpp:537
typename memory_space::memory_space::size_type size_type
Size type.
Definition Cabana_CommunicationPlan.hpp:439
typename MemorySpace::memory_space memory_space
Memory space.
Definition Cabana_CommunicationPlan.hpp:425
Kokkos::View< size_type *, memory_space > createFromExportsOnly(const ViewType &element_export_ranks)
Export rank creator. Use this when you don't know who you will receiving from - only who you are send...
Definition Cabana_CommunicationPlan.hpp:925
Kokkos::View< size_type *, memory_space > createFromExportsAndTopology(const ViewType &element_export_ranks, const std::vector< int > &neighbor_ranks)
Neighbor and export rank creator. Use this when you already know which ranks neighbor each other (i....
Definition Cabana_CommunicationPlan.hpp:718
int numNeighbor() const
Get the number of neighbor ranks that this rank will communicate with.
Definition Cabana_CommunicationPlan.hpp:475
void createExportSteering(const PackViewType &neighbor_ids, const RankViewType &element_export_ranks)
Create the export steering vector.
Definition Cabana_CommunicationPlan.hpp:946
std::size_t totalNumExport() const
Get the total number of exports this rank will do.
Definition Cabana_CommunicationPlan.hpp:505
MPI_Comm comm() const
Get the MPI communicator.
Definition Cabana_CommunicationPlan.hpp:468
std::size_t numExport(const int neighbor) const
Get the number of elements this rank will export to a given neighbor.
Definition Cabana_CommunicationPlan.hpp:495
std::size_t numImport(const int neighbor) const
Get the number of elements this rank will import from a given neighbor.
Definition Cabana_CommunicationPlan.hpp:515
std::size_t totalNumImport() const
Get the total number of imports this rank will do.
Definition Cabana_CommunicationPlan.hpp:525
int neighborRank(const int neighbor) const
Given a local neighbor id get its rank in the MPI communicator.
Definition Cabana_CommunicationPlan.hpp:482
Kokkos::View< size_type *, memory_space > createFromExportsAndTopology(ExecutionSpace exec_space, const ViewType &element_export_ranks, const std::vector< int > &neighbor_ranks)
Neighbor and export rank creator. Use this when you already know which ranks neighbor each other (i....
Definition Cabana_CommunicationPlan.hpp:596
void createExportSteering(const PackViewType &neighbor_ids, const RankViewType &element_export_ranks, const IdViewType &element_export_ids)
Create the export steering vector.
Definition Cabana_CommunicationPlan.hpp:974
CommunicationPlan(MPI_Comm comm)
Constructor.
Definition Cabana_CommunicationPlan.hpp:446
typename memory_space::execution_space execution_space
Default execution space.
Definition Cabana_CommunicationPlan.hpp:433
Core: particle data structures and algorithms.
Definition Cabana_AoSoA.hpp:36
std::vector< int > getUniqueTopology(MPI_Comm comm, std::vector< int > topology)
Return unique neighbor ranks, with the current rank first.
Definition Cabana_CommunicationPlan.hpp:370
buffer_type _send_buffer
Send buffer.
Definition Cabana_CommunicationPlan.hpp:1102
void reallocateSend(const std::size_t num_send)
Resize the send buffer.
Definition Cabana_CommunicationPlan.hpp:1091
typename particle_data_type::tuple_type data_type
Communication data type.
Definition Cabana_CommunicationPlan.hpp:1073
AoSoAType particle_data_type
Particle data type.
Definition Cabana_CommunicationPlan.hpp:1069
buffer_type _recv_buffer
Receive buffer.
Definition Cabana_CommunicationPlan.hpp:1104
typename particle_data_type::memory_space memory_space
Kokkos memory space.
Definition Cabana_CommunicationPlan.hpp:1071
particle_data_type _particles
Particle AoSoA.
Definition Cabana_CommunicationPlan.hpp:1106
CommunicationDataAoSoA(particle_data_type particles)
Definition Cabana_CommunicationPlan.hpp:1081
typename Kokkos::View< data_type *, memory_space > buffer_type
Communication buffer type.
Definition Cabana_CommunicationPlan.hpp:1075
std::size_t _num_comp
Slice components.
Definition Cabana_CommunicationPlan.hpp:1108
void reallocateReceive(const std::size_t num_recv)
Resize the receive buffer.
Definition Cabana_CommunicationPlan.hpp:1096
void setSliceComponents()
Get the total number of components in the slice.
Definition Cabana_CommunicationPlan.hpp:1156
void reallocateSend(const std::size_t num_send)
Resize the send buffer.
Definition Cabana_CommunicationPlan.hpp:1145
buffer_type _send_buffer
Send buffer.
Definition Cabana_CommunicationPlan.hpp:1164
buffer_type _recv_buffer
Receive buffer.
Definition Cabana_CommunicationPlan.hpp:1166
void reallocateReceive(const std::size_t num_recv)
Resize the receive buffer.
Definition Cabana_CommunicationPlan.hpp:1150
typename Kokkos::View< data_type **, Kokkos::LayoutRight, memory_space > buffer_type
Communication buffer type.
Definition Cabana_CommunicationPlan.hpp:1126
typename particle_data_type::value_type data_type
Communication data type.
Definition Cabana_CommunicationPlan.hpp:1124
typename particle_data_type::memory_space memory_space
Kokkos memory space.
Definition Cabana_CommunicationPlan.hpp:1122
SliceType particle_data_type
Particle data type.
Definition Cabana_CommunicationPlan.hpp:1120
std::size_t _num_comp
Slice components.
Definition Cabana_CommunicationPlan.hpp:1170
particle_data_type _particles
Particle slice.
Definition Cabana_CommunicationPlan.hpp:1168
CommunicationDataSlice(particle_data_type particles)
Definition Cabana_CommunicationPlan.hpp:1133
Definition Cabana_Types.hpp:88
AoSoA static type checker.
Definition Cabana_AoSoA.hpp:61
Slice static type checker.
Definition Cabana_Slice.hpp:861