Cabana 0.8.0-dev
 
Loading...
Searching...
No Matches
Cabana_Distributor.hpp
Go to the documentation of this file.
1/****************************************************************************
2 * Copyright (c) 2018-2023 by the Cabana authors *
3 * All rights reserved. *
4 * *
5 * This file is part of the Cabana library. Cabana is distributed under a *
6 * BSD 3-clause license. For the licensing terms see the LICENSE file in *
7 * the top-level directory. *
8 * *
9 * SPDX-License-Identifier: BSD-3-Clause *
10 ****************************************************************************/
11
16#ifndef CABANA_DISTRIBUTOR_HPP
17#define CABANA_DISTRIBUTOR_HPP
18
19#include <Cabana_AoSoA.hpp>
21#include <Cabana_Slice.hpp>
22
23#include <Kokkos_Core.hpp>
24#include <Kokkos_Profiling_ScopedRegion.hpp>
25
26#include <mpi.h>
27
28#include <exception>
29#include <vector>
30
31namespace Cabana
32{
33//---------------------------------------------------------------------------//
61template <class MemorySpace>
62class Distributor : public CommunicationPlan<MemorySpace>
63{
64 public:
99 template <class ViewType>
100 Distributor( MPI_Comm comm, const ViewType& element_export_ranks,
101 const std::vector<int>& neighbor_ranks )
102 : CommunicationPlan<MemorySpace>( comm )
103 {
104 auto neighbor_ids = this->createWithTopology(
105 Export(), element_export_ranks, neighbor_ranks );
106 this->createExportSteering( neighbor_ids, element_export_ranks );
107 }
108
136 template <class ViewType>
137 Distributor( MPI_Comm comm, const ViewType& element_export_ranks )
138 : CommunicationPlan<MemorySpace>( comm )
139 {
140 auto neighbor_ids =
141 this->createWithoutTopology( Export(), element_export_ranks );
142 this->createExportSteering( neighbor_ids, element_export_ranks );
143 }
144};
145
146//---------------------------------------------------------------------------//
148template <typename>
149struct is_distributor_impl : public std::false_type
150{
151};
152
153template <typename MemorySpace>
154struct is_distributor_impl<Distributor<MemorySpace>> : public std::true_type
155{
156};
158
160template <class T>
162 : public is_distributor_impl<typename std::remove_cv<T>::type>::type
163{
164};
165
166//---------------------------------------------------------------------------//
167namespace Impl
168{
170//---------------------------------------------------------------------------//
171// Synchronously move data between a source and destination AoSoA by executing
172// the forward communication plan.
173template <class ExecutionSpace, class Distributor_t, class AoSoA_t>
174void distributeData(
175 ExecutionSpace, const Distributor_t& distributor, const AoSoA_t& src,
176 AoSoA_t& dst,
177 typename std::enable_if<( is_distributor<Distributor_t>::value &&
179 int>::type* = 0 )
180{
181 Kokkos::Profiling::ScopedRegion region( "Cabana::migrate" );
182
183 static_assert( is_accessible_from<typename Distributor_t::memory_space,
184 ExecutionSpace>{},
185 "" );
186
187 // Get the MPI rank we are currently on.
188 int my_rank = -1;
189 MPI_Comm_rank( distributor.comm(), &my_rank );
190
191 // Get the number of neighbors.
192 int num_n = distributor.numNeighbor();
193
194 // Calculate the number of elements that are staying on this rank and
195 // therefore can be directly copied. If any of the neighbor ranks are this
196 // rank it will be stored in first position (i.e. the first neighbor in
197 // the local list is always yourself if you are sending to yourself).
198 std::size_t num_stay =
199 ( num_n > 0 && distributor.neighborRank( 0 ) == my_rank )
200 ? distributor.numExport( 0 )
201 : 0;
202
203 // Allocate a send buffer.
204 std::size_t num_send = distributor.totalNumExport() - num_stay;
205 Kokkos::View<typename AoSoA_t::tuple_type*,
206 typename Distributor_t::memory_space>
207 send_buffer( Kokkos::ViewAllocateWithoutInitializing(
208 "distributor_send_buffer" ),
209 num_send );
210
211 // Allocate a receive buffer.
212 Kokkos::View<typename AoSoA_t::tuple_type*,
213 typename Distributor_t::memory_space>
214 recv_buffer( Kokkos::ViewAllocateWithoutInitializing(
215 "distributor_recv_buffer" ),
216 distributor.totalNumImport() );
217
218 // Get the steering vector for the sends.
219 auto steering = distributor.getExportSteering();
220
221 // Gather the exports from the source AoSoA into the tuple-contiguous send
222 // buffer or the receive buffer if the data is staying. We know that the
223 // steering vector is ordered such that the data staying on this rank
224 // comes first.
225 auto build_send_buffer_func = KOKKOS_LAMBDA( const std::size_t i )
226 {
227 auto tpl = src.getTuple( steering( i ) );
228 if ( i < num_stay )
229 recv_buffer( i ) = tpl;
230 else
231 send_buffer( i - num_stay ) = tpl;
232 };
233 Kokkos::RangePolicy<ExecutionSpace> build_send_buffer_policy(
234 0, distributor.totalNumExport() );
235 Kokkos::parallel_for( "Cabana::Impl::distributeData::build_send_buffer",
236 build_send_buffer_policy, build_send_buffer_func );
237 Kokkos::fence();
238
239 // The distributor has its own communication space so choose any tag.
240 const int mpi_tag = 1234;
241
242 // Post non-blocking receives.
243 std::vector<MPI_Request> requests;
244 requests.reserve( num_n );
245 std::pair<std::size_t, std::size_t> recv_range = { 0, 0 };
246 for ( int n = 0; n < num_n; ++n )
247 {
248 recv_range.second = recv_range.first + distributor.numImport( n );
249
250 if ( ( distributor.numImport( n ) > 0 ) &&
251 ( distributor.neighborRank( n ) != my_rank ) )
252 {
253 auto recv_subview = Kokkos::subview( recv_buffer, recv_range );
254
255 requests.push_back( MPI_Request() );
256
257 MPI_Irecv( recv_subview.data(),
258 recv_subview.size() *
259 sizeof( typename AoSoA_t::tuple_type ),
260 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
261 distributor.comm(), &( requests.back() ) );
262 }
263
264 recv_range.first = recv_range.second;
265 }
266
267 // Do blocking sends.
268 std::pair<std::size_t, std::size_t> send_range = { 0, 0 };
269 for ( int n = 0; n < num_n; ++n )
270 {
271 if ( ( distributor.numExport( n ) > 0 ) &&
272 ( distributor.neighborRank( n ) != my_rank ) )
273 {
274 send_range.second = send_range.first + distributor.numExport( n );
275
276 auto send_subview = Kokkos::subview( send_buffer, send_range );
277
278 MPI_Send( send_subview.data(),
279 send_subview.size() *
280 sizeof( typename AoSoA_t::tuple_type ),
281 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
282 distributor.comm() );
283
284 send_range.first = send_range.second;
285 }
286 }
287
288 // Wait on non-blocking receives.
289 std::vector<MPI_Status> status( requests.size() );
290 const int ec =
291 MPI_Waitall( requests.size(), requests.data(), status.data() );
292 if ( MPI_SUCCESS != ec )
293 throw std::logic_error(
294 "Cabana::Distributor: Failed MPI Communication" );
295
296 // Extract the receive buffer into the destination AoSoA.
297 auto extract_recv_buffer_func = KOKKOS_LAMBDA( const std::size_t i )
298 {
299 dst.setTuple( i, recv_buffer( i ) );
300 };
301 Kokkos::RangePolicy<ExecutionSpace> extract_recv_buffer_policy(
302 0, distributor.totalNumImport() );
303 Kokkos::parallel_for( "Cabana::Impl::distributeData::extract_recv_buffer",
304 extract_recv_buffer_policy,
305 extract_recv_buffer_func );
306 Kokkos::fence();
307
308 // Barrier before completing to ensure synchronization.
309 MPI_Barrier( distributor.comm() );
310}
311
312//---------------------------------------------------------------------------//
314} // end namespace Impl
315
316//---------------------------------------------------------------------------//
336template <class ExecutionSpace, class Distributor_t, class AoSoA_t>
337void migrate( ExecutionSpace exec_space, const Distributor_t& distributor,
338 const AoSoA_t& src, AoSoA_t& dst,
339 typename std::enable_if<( is_distributor<Distributor_t>::value &&
341 int>::type* = 0 )
342{
343 // Check that src and dst are the right size.
344 if ( src.size() != distributor.exportSize() )
345 throw std::runtime_error( "Cabana::migrate: Source is "
346 "the wrong size for migration! (Label: " +
347 src.label() + ")" );
348 if ( dst.size() != distributor.totalNumImport() )
349 throw std::runtime_error( "Cabana::migrate: Destination "
350 "is the wrong size for migration! (Label: " +
351 dst.label() + ")" );
352
353 // Move the data.
354 Impl::distributeData( exec_space, distributor, src, dst );
355}
356
374template <class Distributor_t, class AoSoA_t>
375void migrate( const Distributor_t& distributor, const AoSoA_t& src,
376 AoSoA_t& dst,
377 typename std::enable_if<( is_distributor<Distributor_t>::value &&
379 int>::type* = 0 )
380{
381 migrate( typename Distributor_t::execution_space{}, distributor, src, dst );
382}
383
384//---------------------------------------------------------------------------//
408template <class ExecutionSpace, class Distributor_t, class AoSoA_t>
409void migrate( ExecutionSpace exec_space, const Distributor_t& distributor,
410 AoSoA_t& aosoa,
411 typename std::enable_if<( is_distributor<Distributor_t>::value &&
413 int>::type* = 0 )
414{
415 // Check that the AoSoA is the right size.
416 if ( aosoa.size() != distributor.exportSize() )
417 throw std::runtime_error(
418 "Cabana::migrate (in-place): "
419 "AoSoA is the wrong size for migration! (Label: " +
420 aosoa.label() + ")" );
421
422 // Determine if the source of destination decomposition has more data on
423 // this rank.
424 bool dst_is_bigger =
425 ( distributor.totalNumImport() > distributor.exportSize() );
426
427 // If the destination decomposition is bigger than the source
428 // decomposition resize now so we have enough space to do the operation.
429 if ( dst_is_bigger )
430 aosoa.resize( distributor.totalNumImport() );
431
432 // Move the data.
433 Impl::distributeData( exec_space, distributor, aosoa, aosoa );
434
435 // If the destination decomposition is smaller than the source
436 // decomposition resize after we have moved the data.
437 if ( !dst_is_bigger )
438 aosoa.resize( distributor.totalNumImport() );
439}
440
462template <class Distributor_t, class AoSoA_t>
463void migrate( const Distributor_t& distributor, AoSoA_t& aosoa,
464 typename std::enable_if<( is_distributor<Distributor_t>::value &&
466 int>::type* = 0 )
467{
468 migrate( typename Distributor_t::execution_space{}, distributor, aosoa );
469}
470
471//---------------------------------------------------------------------------//
492template <class ExecutionSpace, class Distributor_t, class Slice_t>
493void migrate( ExecutionSpace, const Distributor_t& distributor,
494 const Slice_t& src, Slice_t& dst,
495 typename std::enable_if<( is_distributor<Distributor_t>::value &&
497 int>::type* = 0 )
498{
499 // Check that src and dst are the right size.
500 if ( src.size() != distributor.exportSize() )
501 throw std::runtime_error( "Cabana::migrate: Source Slice is the wrong "
502 "size for migration! (Label: " +
503 src.label() + ")" );
504 if ( dst.size() != distributor.totalNumImport() )
505 throw std::runtime_error( "Cabana::migrate: Destination Slice is the "
506 "wrong size for migration! (Label: " +
507 dst.label() + ")" );
508
509 // Get the number of components in the slices.
510 size_t num_comp = 1;
511 for ( size_t d = 2; d < src.viewRank(); ++d )
512 num_comp *= src.extent( d );
513
514 // Get the raw slice data.
515 auto src_data = src.data();
516 auto dst_data = dst.data();
517
518 // Get the MPI rank we are currently on.
519 int my_rank = -1;
520 MPI_Comm_rank( distributor.comm(), &my_rank );
521
522 // Get the number of neighbors.
523 int num_n = distributor.numNeighbor();
524
525 // Calculate the number of elements that are staying on this rank and
526 // therefore can be directly copied. If any of the neighbor ranks are this
527 // rank it will be stored in first position (i.e. the first neighbor in
528 // the local list is always yourself if you are sending to yourself).
529 std::size_t num_stay =
530 ( num_n > 0 && distributor.neighborRank( 0 ) == my_rank )
531 ? distributor.numExport( 0 )
532 : 0;
533
534 // Allocate a send buffer. Note this one is layout right so the components
535 // of each element are consecutive in memory.
536 std::size_t num_send = distributor.totalNumExport() - num_stay;
537 Kokkos::View<typename Slice_t::value_type**, Kokkos::LayoutRight,
538 typename Distributor_t::memory_space>
539 send_buffer( Kokkos::ViewAllocateWithoutInitializing(
540 "distributor_send_buffer" ),
541 num_send, num_comp );
542
543 // Allocate a receive buffer. Note this one is layout right so the
544 // components of each element are consecutive in memory.
545 Kokkos::View<typename Slice_t::value_type**, Kokkos::LayoutRight,
546 typename Distributor_t::memory_space>
547 recv_buffer( Kokkos::ViewAllocateWithoutInitializing(
548 "distributor_recv_buffer" ),
549 distributor.totalNumImport(), num_comp );
550
551 // Get the steering vector for the sends.
552 auto steering = distributor.getExportSteering();
553
554 // Gather from the source Slice into the contiguous send buffer or,
555 // if it is part of the local copy, put it directly in the destination
556 // Slice.
557 auto build_send_buffer_func = KOKKOS_LAMBDA( const std::size_t i )
558 {
559 auto s_src = Slice_t::index_type::s( steering( i ) );
560 auto a_src = Slice_t::index_type::a( steering( i ) );
561 std::size_t src_offset = s_src * src.stride( 0 ) + a_src;
562 if ( i < num_stay )
563 for ( std::size_t n = 0; n < num_comp; ++n )
564 recv_buffer( i, n ) =
565 src_data[src_offset + n * Slice_t::vector_length];
566 else
567 for ( std::size_t n = 0; n < num_comp; ++n )
568 send_buffer( i - num_stay, n ) =
569 src_data[src_offset + n * Slice_t::vector_length];
570 };
571 Kokkos::RangePolicy<ExecutionSpace> build_send_buffer_policy(
572 0, distributor.totalNumExport() );
573 Kokkos::parallel_for( "Cabana::migrate::build_send_buffer",
574 build_send_buffer_policy, build_send_buffer_func );
575 Kokkos::fence();
576
577 // The distributor has its own communication space so choose any tag.
578 const int mpi_tag = 1234;
579
580 // Post non-blocking receives.
581 std::vector<MPI_Request> requests;
582 requests.reserve( num_n );
583 std::pair<std::size_t, std::size_t> recv_range = { 0, 0 };
584 for ( int n = 0; n < num_n; ++n )
585 {
586 recv_range.second = recv_range.first + distributor.numImport( n );
587
588 if ( ( distributor.numImport( n ) > 0 ) &&
589 ( distributor.neighborRank( n ) != my_rank ) )
590 {
591 auto recv_subview =
592 Kokkos::subview( recv_buffer, recv_range, Kokkos::ALL );
593
594 requests.push_back( MPI_Request() );
595
596 MPI_Irecv( recv_subview.data(),
597 recv_subview.size() *
598 sizeof( typename Slice_t::value_type ),
599 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
600 distributor.comm(), &( requests.back() ) );
601 }
602
603 recv_range.first = recv_range.second;
604 }
605
606 // Do blocking sends.
607 std::pair<std::size_t, std::size_t> send_range = { 0, 0 };
608 for ( int n = 0; n < num_n; ++n )
609 {
610 if ( ( distributor.numExport( n ) > 0 ) &&
611 ( distributor.neighborRank( n ) != my_rank ) )
612 {
613 send_range.second = send_range.first + distributor.numExport( n );
614
615 auto send_subview =
616 Kokkos::subview( send_buffer, send_range, Kokkos::ALL );
617
618 MPI_Send( send_subview.data(),
619 send_subview.size() *
620 sizeof( typename Slice_t::value_type ),
621 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
622 distributor.comm() );
623
624 send_range.first = send_range.second;
625 }
626 }
627
628 // Wait on non-blocking receives.
629 std::vector<MPI_Status> status( requests.size() );
630 const int ec =
631 MPI_Waitall( requests.size(), requests.data(), status.data() );
632 if ( MPI_SUCCESS != ec )
633 throw std::logic_error( "Cabana::migrate: Failed MPI Communication" );
634
635 // Extract the data from the receive buffer into the destination Slice.
636 auto extract_recv_buffer_func = KOKKOS_LAMBDA( const std::size_t i )
637 {
638 auto s = Slice_t::index_type::s( i );
639 auto a = Slice_t::index_type::a( i );
640 std::size_t dst_offset = s * dst.stride( 0 ) + a;
641 for ( std::size_t n = 0; n < num_comp; ++n )
642 dst_data[dst_offset + n * Slice_t::vector_length] =
643 recv_buffer( i, n );
644 };
645 Kokkos::RangePolicy<ExecutionSpace> extract_recv_buffer_policy(
646 0, distributor.totalNumImport() );
647 Kokkos::parallel_for( "Cabana::migrate::extract_recv_buffer",
648 extract_recv_buffer_policy,
649 extract_recv_buffer_func );
650 Kokkos::fence();
651
652 // Barrier before completing to ensure synchronization.
653 MPI_Barrier( distributor.comm() );
654}
655
675template <class Distributor_t, class Slice_t>
676void migrate( const Distributor_t& distributor, const Slice_t& src,
677 Slice_t& dst,
678 typename std::enable_if<( is_distributor<Distributor_t>::value &&
680 int>::type* = 0 )
681{
682 migrate( typename Distributor_t::execution_space{}, distributor, src, dst );
683}
684
685//---------------------------------------------------------------------------//
686
687} // end namespace Cabana
688
689#endif // end CABANA_DISTRIBUTOR_HPP
Array-of-Struct-of-Arrays particle data structure.
Multi-node communication patterns.
Slice a single particle property from an AoSoA.
Kokkos::View< size_type *, memory_space > createWithoutTopology(ExecutionSpace exec_space, Export, const RankViewType &element_export_ranks)
Export rank creator. Use this when you don't know who you will receiving from - only who you are send...
Definition Cabana_CommunicationPlan.hpp:770
Kokkos::View< size_type *, memory_space > createWithTopology(ExecutionSpace exec_space, Export, const RankViewType &element_export_ranks, const std::vector< int > &neighbor_ranks)
Neighbor and export rank creator. Use this when you already know which ranks neighbor each other (i....
Definition Cabana_CommunicationPlan.hpp:608
void createExportSteering(const PackViewType &neighbor_ids, const RankViewType &element_export_ranks)
Create the export steering vector.
Definition Cabana_CommunicationPlan.hpp:1504
MPI_Comm comm() const
Get the MPI communicator.
Definition Cabana_CommunicationPlan.hpp:480
CommunicationPlan(MPI_Comm comm)
Constructor.
Definition Cabana_CommunicationPlan.hpp:458
A communication plan for migrating data from one uniquely-owned decomposition to another uniquely own...
Definition Cabana_Distributor.hpp:63
Distributor(MPI_Comm comm, const ViewType &element_export_ranks)
Export rank constructor. Use this when you don't know who you will be receiving from - only who you a...
Definition Cabana_Distributor.hpp:137
Distributor(MPI_Comm comm, const ViewType &element_export_ranks, const std::vector< int > &neighbor_ranks)
Topology and export rank constructor. Use this when you already know which ranks neighbor each other ...
Definition Cabana_Distributor.hpp:100
Core: particle data structures and algorithms.
Definition Cabana_AoSoA.hpp:36
void migrate(ExecutionSpace exec_space, const Distributor_t &distributor, const AoSoA_t &src, AoSoA_t &dst, typename std::enable_if<(is_distributor< Distributor_t >::value &&is_aosoa< AoSoA_t >::value), int >::type *=0)
Synchronously migrate data between two different decompositions using the distributor forward communi...
Definition Cabana_Distributor.hpp:337
Export-based tag - default.
Definition Cabana_CommunicationPlan.hpp:42
Definition Cabana_Types.hpp:88
AoSoA static type checker.
Definition Cabana_AoSoA.hpp:61
Distributor static type checker.
Definition Cabana_Distributor.hpp:163
Slice static type checker.
Definition Cabana_Slice.hpp:868