Cabana 0.8.0-dev
 
Loading...
Searching...
No Matches
Cabana_Distributor.hpp
Go to the documentation of this file.
1/****************************************************************************
2 * Copyright (c) 2018-2023 by the Cabana authors *
3 * All rights reserved. *
4 * *
5 * This file is part of the Cabana library. Cabana is distributed under a *
6 * BSD 3-clause license. For the licensing terms see the LICENSE file in *
7 * the top-level directory. *
8 * *
9 * SPDX-License-Identifier: BSD-3-Clause *
10 ****************************************************************************/
11
16#ifndef CABANA_DISTRIBUTOR_HPP
17#define CABANA_DISTRIBUTOR_HPP
18
19#include <Cabana_AoSoA.hpp>
21#include <Cabana_Slice.hpp>
22
23#include <Kokkos_Core.hpp>
24#include <Kokkos_Profiling_ScopedRegion.hpp>
25
26#include <mpi.h>
27
28#include <exception>
29#include <vector>
30
31namespace Cabana
32{
33//---------------------------------------------------------------------------//
61template <class MemorySpace>
62class Distributor : public CommunicationPlan<MemorySpace>
63{
64 public:
99 template <class ViewType>
100 Distributor( MPI_Comm comm, const ViewType& element_export_ranks,
101 const std::vector<int>& neighbor_ranks )
102 : CommunicationPlan<MemorySpace>( comm )
103 {
104 auto neighbor_ids = this->createFromExportsAndTopology(
105 element_export_ranks, neighbor_ranks );
106 this->createExportSteering( neighbor_ids, element_export_ranks );
107 }
108
136 template <class ViewType>
137 Distributor( MPI_Comm comm, const ViewType& element_export_ranks )
138 : CommunicationPlan<MemorySpace>( comm )
139 {
140 auto neighbor_ids = this->createFromExportsOnly( element_export_ranks );
141 this->createExportSteering( neighbor_ids, element_export_ranks );
142 }
143};
144
145//---------------------------------------------------------------------------//
147template <typename>
148struct is_distributor_impl : public std::false_type
149{
150};
151
152template <typename MemorySpace>
153struct is_distributor_impl<Distributor<MemorySpace>> : public std::true_type
154{
155};
157
159template <class T>
161 : public is_distributor_impl<typename std::remove_cv<T>::type>::type
162{
163};
164
165//---------------------------------------------------------------------------//
166namespace Impl
167{
169//---------------------------------------------------------------------------//
170// Synchronously move data between a source and destination AoSoA by executing
171// the forward communication plan.
172template <class ExecutionSpace, class Distributor_t, class AoSoA_t>
173void distributeData(
174 ExecutionSpace, const Distributor_t& distributor, const AoSoA_t& src,
175 AoSoA_t& dst,
176 typename std::enable_if<( is_distributor<Distributor_t>::value &&
178 int>::type* = 0 )
179{
180 Kokkos::Profiling::ScopedRegion region( "Cabana::migrate" );
181
182 static_assert( is_accessible_from<typename Distributor_t::memory_space,
183 ExecutionSpace>{},
184 "" );
185
186 // Get the MPI rank we are currently on.
187 int my_rank = -1;
188 MPI_Comm_rank( distributor.comm(), &my_rank );
189
190 // Get the number of neighbors.
191 int num_n = distributor.numNeighbor();
192
193 // Calculate the number of elements that are staying on this rank and
194 // therefore can be directly copied. If any of the neighbor ranks are this
195 // rank it will be stored in first position (i.e. the first neighbor in
196 // the local list is always yourself if you are sending to yourself).
197 std::size_t num_stay =
198 ( num_n > 0 && distributor.neighborRank( 0 ) == my_rank )
199 ? distributor.numExport( 0 )
200 : 0;
201
202 // Allocate a send buffer.
203 std::size_t num_send = distributor.totalNumExport() - num_stay;
204 Kokkos::View<typename AoSoA_t::tuple_type*,
205 typename Distributor_t::memory_space>
206 send_buffer( Kokkos::ViewAllocateWithoutInitializing(
207 "distributor_send_buffer" ),
208 num_send );
209
210 // Allocate a receive buffer.
211 Kokkos::View<typename AoSoA_t::tuple_type*,
212 typename Distributor_t::memory_space>
213 recv_buffer( Kokkos::ViewAllocateWithoutInitializing(
214 "distributor_recv_buffer" ),
215 distributor.totalNumImport() );
216
217 // Get the steering vector for the sends.
218 auto steering = distributor.getExportSteering();
219
220 // Gather the exports from the source AoSoA into the tuple-contiguous send
221 // buffer or the receive buffer if the data is staying. We know that the
222 // steering vector is ordered such that the data staying on this rank
223 // comes first.
224 auto build_send_buffer_func = KOKKOS_LAMBDA( const std::size_t i )
225 {
226 auto tpl = src.getTuple( steering( i ) );
227 if ( i < num_stay )
228 recv_buffer( i ) = tpl;
229 else
230 send_buffer( i - num_stay ) = tpl;
231 };
232 Kokkos::RangePolicy<ExecutionSpace> build_send_buffer_policy(
233 0, distributor.totalNumExport() );
234 Kokkos::parallel_for( "Cabana::Impl::distributeData::build_send_buffer",
235 build_send_buffer_policy, build_send_buffer_func );
236 Kokkos::fence();
237
238 // The distributor has its own communication space so choose any tag.
239 const int mpi_tag = 1234;
240
241 // Post non-blocking receives.
242 std::vector<MPI_Request> requests;
243 requests.reserve( num_n );
244 std::pair<std::size_t, std::size_t> recv_range = { 0, 0 };
245 for ( int n = 0; n < num_n; ++n )
246 {
247 recv_range.second = recv_range.first + distributor.numImport( n );
248
249 if ( ( distributor.numImport( n ) > 0 ) &&
250 ( distributor.neighborRank( n ) != my_rank ) )
251 {
252 auto recv_subview = Kokkos::subview( recv_buffer, recv_range );
253
254 requests.push_back( MPI_Request() );
255
256 MPI_Irecv( recv_subview.data(),
257 recv_subview.size() *
258 sizeof( typename AoSoA_t::tuple_type ),
259 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
260 distributor.comm(), &( requests.back() ) );
261 }
262
263 recv_range.first = recv_range.second;
264 }
265
266 // Do blocking sends.
267 std::pair<std::size_t, std::size_t> send_range = { 0, 0 };
268 for ( int n = 0; n < num_n; ++n )
269 {
270 if ( ( distributor.numExport( n ) > 0 ) &&
271 ( distributor.neighborRank( n ) != my_rank ) )
272 {
273 send_range.second = send_range.first + distributor.numExport( n );
274
275 auto send_subview = Kokkos::subview( send_buffer, send_range );
276
277 MPI_Send( send_subview.data(),
278 send_subview.size() *
279 sizeof( typename AoSoA_t::tuple_type ),
280 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
281 distributor.comm() );
282
283 send_range.first = send_range.second;
284 }
285 }
286
287 // Wait on non-blocking receives.
288 std::vector<MPI_Status> status( requests.size() );
289 const int ec =
290 MPI_Waitall( requests.size(), requests.data(), status.data() );
291 if ( MPI_SUCCESS != ec )
292 throw std::logic_error(
293 "Cabana::Distributor: Failed MPI Communication" );
294
295 // Extract the receive buffer into the destination AoSoA.
296 auto extract_recv_buffer_func = KOKKOS_LAMBDA( const std::size_t i )
297 {
298 dst.setTuple( i, recv_buffer( i ) );
299 };
300 Kokkos::RangePolicy<ExecutionSpace> extract_recv_buffer_policy(
301 0, distributor.totalNumImport() );
302 Kokkos::parallel_for( "Cabana::Impl::distributeData::extract_recv_buffer",
303 extract_recv_buffer_policy,
304 extract_recv_buffer_func );
305 Kokkos::fence();
306
307 // Barrier before completing to ensure synchronization.
308 MPI_Barrier( distributor.comm() );
309}
310
311//---------------------------------------------------------------------------//
313} // end namespace Impl
314
315//---------------------------------------------------------------------------//
335template <class ExecutionSpace, class Distributor_t, class AoSoA_t>
336void migrate( ExecutionSpace exec_space, const Distributor_t& distributor,
337 const AoSoA_t& src, AoSoA_t& dst,
338 typename std::enable_if<( is_distributor<Distributor_t>::value &&
340 int>::type* = 0 )
341{
342 // Check that src and dst are the right size.
343 if ( src.size() != distributor.exportSize() )
344 throw std::runtime_error( "Cabana::migrate: Source is "
345 "the wrong size for migration! (Label: " +
346 src.label() + ")" );
347 if ( dst.size() != distributor.totalNumImport() )
348 throw std::runtime_error( "Cabana::migrate: Destination "
349 "is the wrong size for migration! (Label: " +
350 dst.label() + ")" );
351
352 // Move the data.
353 Impl::distributeData( exec_space, distributor, src, dst );
354}
355
373template <class Distributor_t, class AoSoA_t>
374void migrate( const Distributor_t& distributor, const AoSoA_t& src,
375 AoSoA_t& dst,
376 typename std::enable_if<( is_distributor<Distributor_t>::value &&
378 int>::type* = 0 )
379{
380 migrate( typename Distributor_t::execution_space{}, distributor, src, dst );
381}
382
383//---------------------------------------------------------------------------//
407template <class ExecutionSpace, class Distributor_t, class AoSoA_t>
408void migrate( ExecutionSpace exec_space, const Distributor_t& distributor,
409 AoSoA_t& aosoa,
410 typename std::enable_if<( is_distributor<Distributor_t>::value &&
412 int>::type* = 0 )
413{
414 // Check that the AoSoA is the right size.
415 if ( aosoa.size() != distributor.exportSize() )
416 throw std::runtime_error(
417 "Cabana::migrate (in-place): "
418 "AoSoA is the wrong size for migration! (Label: " +
419 aosoa.label() + ")" );
420
421 // Determine if the source of destination decomposition has more data on
422 // this rank.
423 bool dst_is_bigger =
424 ( distributor.totalNumImport() > distributor.exportSize() );
425
426 // If the destination decomposition is bigger than the source
427 // decomposition resize now so we have enough space to do the operation.
428 if ( dst_is_bigger )
429 aosoa.resize( distributor.totalNumImport() );
430
431 // Move the data.
432 Impl::distributeData( exec_space, distributor, aosoa, aosoa );
433
434 // If the destination decomposition is smaller than the source
435 // decomposition resize after we have moved the data.
436 if ( !dst_is_bigger )
437 aosoa.resize( distributor.totalNumImport() );
438}
439
461template <class Distributor_t, class AoSoA_t>
462void migrate( const Distributor_t& distributor, AoSoA_t& aosoa,
463 typename std::enable_if<( is_distributor<Distributor_t>::value &&
465 int>::type* = 0 )
466{
467 migrate( typename Distributor_t::execution_space{}, distributor, aosoa );
468}
469
470//---------------------------------------------------------------------------//
491template <class ExecutionSpace, class Distributor_t, class Slice_t>
492void migrate( ExecutionSpace, const Distributor_t& distributor,
493 const Slice_t& src, Slice_t& dst,
494 typename std::enable_if<( is_distributor<Distributor_t>::value &&
496 int>::type* = 0 )
497{
498 // Check that src and dst are the right size.
499 if ( src.size() != distributor.exportSize() )
500 throw std::runtime_error( "Cabana::migrate: Source Slice is the wrong "
501 "size for migration! (Label: " +
502 src.label() + ")" );
503 if ( dst.size() != distributor.totalNumImport() )
504 throw std::runtime_error( "Cabana::migrate: Destination Slice is the "
505 "wrong size for migration! (Label: " +
506 dst.label() + ")" );
507
508 // Get the number of components in the slices.
509 size_t num_comp = 1;
510 for ( size_t d = 2; d < src.viewRank(); ++d )
511 num_comp *= src.extent( d );
512
513 // Get the raw slice data.
514 auto src_data = src.data();
515 auto dst_data = dst.data();
516
517 // Get the MPI rank we are currently on.
518 int my_rank = -1;
519 MPI_Comm_rank( distributor.comm(), &my_rank );
520
521 // Get the number of neighbors.
522 int num_n = distributor.numNeighbor();
523
524 // Calculate the number of elements that are staying on this rank and
525 // therefore can be directly copied. If any of the neighbor ranks are this
526 // rank it will be stored in first position (i.e. the first neighbor in
527 // the local list is always yourself if you are sending to yourself).
528 std::size_t num_stay =
529 ( num_n > 0 && distributor.neighborRank( 0 ) == my_rank )
530 ? distributor.numExport( 0 )
531 : 0;
532
533 // Allocate a send buffer. Note this one is layout right so the components
534 // of each element are consecutive in memory.
535 std::size_t num_send = distributor.totalNumExport() - num_stay;
536 Kokkos::View<typename Slice_t::value_type**, Kokkos::LayoutRight,
537 typename Distributor_t::memory_space>
538 send_buffer( Kokkos::ViewAllocateWithoutInitializing(
539 "distributor_send_buffer" ),
540 num_send, num_comp );
541
542 // Allocate a receive buffer. Note this one is layout right so the
543 // components of each element are consecutive in memory.
544 Kokkos::View<typename Slice_t::value_type**, Kokkos::LayoutRight,
545 typename Distributor_t::memory_space>
546 recv_buffer( Kokkos::ViewAllocateWithoutInitializing(
547 "distributor_recv_buffer" ),
548 distributor.totalNumImport(), num_comp );
549
550 // Get the steering vector for the sends.
551 auto steering = distributor.getExportSteering();
552
553 // Gather from the source Slice into the contiguous send buffer or,
554 // if it is part of the local copy, put it directly in the destination
555 // Slice.
556 auto build_send_buffer_func = KOKKOS_LAMBDA( const std::size_t i )
557 {
558 auto s_src = Slice_t::index_type::s( steering( i ) );
559 auto a_src = Slice_t::index_type::a( steering( i ) );
560 std::size_t src_offset = s_src * src.stride( 0 ) + a_src;
561 if ( i < num_stay )
562 for ( std::size_t n = 0; n < num_comp; ++n )
563 recv_buffer( i, n ) =
564 src_data[src_offset + n * Slice_t::vector_length];
565 else
566 for ( std::size_t n = 0; n < num_comp; ++n )
567 send_buffer( i - num_stay, n ) =
568 src_data[src_offset + n * Slice_t::vector_length];
569 };
570 Kokkos::RangePolicy<ExecutionSpace> build_send_buffer_policy(
571 0, distributor.totalNumExport() );
572 Kokkos::parallel_for( "Cabana::migrate::build_send_buffer",
573 build_send_buffer_policy, build_send_buffer_func );
574 Kokkos::fence();
575
576 // The distributor has its own communication space so choose any tag.
577 const int mpi_tag = 1234;
578
579 // Post non-blocking receives.
580 std::vector<MPI_Request> requests;
581 requests.reserve( num_n );
582 std::pair<std::size_t, std::size_t> recv_range = { 0, 0 };
583 for ( int n = 0; n < num_n; ++n )
584 {
585 recv_range.second = recv_range.first + distributor.numImport( n );
586
587 if ( ( distributor.numImport( n ) > 0 ) &&
588 ( distributor.neighborRank( n ) != my_rank ) )
589 {
590 auto recv_subview =
591 Kokkos::subview( recv_buffer, recv_range, Kokkos::ALL );
592
593 requests.push_back( MPI_Request() );
594
595 MPI_Irecv( recv_subview.data(),
596 recv_subview.size() *
597 sizeof( typename Slice_t::value_type ),
598 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
599 distributor.comm(), &( requests.back() ) );
600 }
601
602 recv_range.first = recv_range.second;
603 }
604
605 // Do blocking sends.
606 std::pair<std::size_t, std::size_t> send_range = { 0, 0 };
607 for ( int n = 0; n < num_n; ++n )
608 {
609 if ( ( distributor.numExport( n ) > 0 ) &&
610 ( distributor.neighborRank( n ) != my_rank ) )
611 {
612 send_range.second = send_range.first + distributor.numExport( n );
613
614 auto send_subview =
615 Kokkos::subview( send_buffer, send_range, Kokkos::ALL );
616
617 MPI_Send( send_subview.data(),
618 send_subview.size() *
619 sizeof( typename Slice_t::value_type ),
620 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
621 distributor.comm() );
622
623 send_range.first = send_range.second;
624 }
625 }
626
627 // Wait on non-blocking receives.
628 std::vector<MPI_Status> status( requests.size() );
629 const int ec =
630 MPI_Waitall( requests.size(), requests.data(), status.data() );
631 if ( MPI_SUCCESS != ec )
632 throw std::logic_error( "Cabana::migrate: Failed MPI Communication" );
633
634 // Extract the data from the receive buffer into the destination Slice.
635 auto extract_recv_buffer_func = KOKKOS_LAMBDA( const std::size_t i )
636 {
637 auto s = Slice_t::index_type::s( i );
638 auto a = Slice_t::index_type::a( i );
639 std::size_t dst_offset = s * dst.stride( 0 ) + a;
640 for ( std::size_t n = 0; n < num_comp; ++n )
641 dst_data[dst_offset + n * Slice_t::vector_length] =
642 recv_buffer( i, n );
643 };
644 Kokkos::RangePolicy<ExecutionSpace> extract_recv_buffer_policy(
645 0, distributor.totalNumImport() );
646 Kokkos::parallel_for( "Cabana::migrate::extract_recv_buffer",
647 extract_recv_buffer_policy,
648 extract_recv_buffer_func );
649 Kokkos::fence();
650
651 // Barrier before completing to ensure synchronization.
652 MPI_Barrier( distributor.comm() );
653}
654
674template <class Distributor_t, class Slice_t>
675void migrate( const Distributor_t& distributor, const Slice_t& src,
676 Slice_t& dst,
677 typename std::enable_if<( is_distributor<Distributor_t>::value &&
679 int>::type* = 0 )
680{
681 migrate( typename Distributor_t::execution_space{}, distributor, src, dst );
682}
683
684//---------------------------------------------------------------------------//
685
686} // end namespace Cabana
687
688#endif // end CABANA_DISTRIBUTOR_HPP
Array-of-Struct-of-Arrays particle data structure.
Multi-node communication patterns.
Slice a single particle property from an AoSoA.
Kokkos::View< size_type *, memory_space > createFromExportsOnly(ExecutionSpace exec_space, const ViewType &element_export_ranks)
Export rank creator. Use this when you don't know who you will receiving from - only who you are send...
Definition Cabana_CommunicationPlan.hpp:758
void createExportSteering(const PackViewType &neighbor_ids, const RankViewType &element_export_ranks)
Create the export steering vector.
Definition Cabana_CommunicationPlan.hpp:946
MPI_Comm comm() const
Get the MPI communicator.
Definition Cabana_CommunicationPlan.hpp:468
Kokkos::View< size_type *, memory_space > createFromExportsAndTopology(ExecutionSpace exec_space, const ViewType &element_export_ranks, const std::vector< int > &neighbor_ranks)
Neighbor and export rank creator. Use this when you already know which ranks neighbor each other (i....
Definition Cabana_CommunicationPlan.hpp:596
CommunicationPlan(MPI_Comm comm)
Constructor.
Definition Cabana_CommunicationPlan.hpp:446
A communication plan for migrating data from one uniquely-owned decomposition to another uniquely own...
Definition Cabana_Distributor.hpp:63
Distributor(MPI_Comm comm, const ViewType &element_export_ranks)
Export rank constructor. Use this when you don't know who you will be receiving from - only who you a...
Definition Cabana_Distributor.hpp:137
Distributor(MPI_Comm comm, const ViewType &element_export_ranks, const std::vector< int > &neighbor_ranks)
Topology and export rank constructor. Use this when you already know which ranks neighbor each other ...
Definition Cabana_Distributor.hpp:100
Core: particle data structures and algorithms.
Definition Cabana_AoSoA.hpp:36
void migrate(ExecutionSpace exec_space, const Distributor_t &distributor, const AoSoA_t &src, AoSoA_t &dst, typename std::enable_if<(is_distributor< Distributor_t >::value &&is_aosoa< AoSoA_t >::value), int >::type *=0)
Synchronously migrate data between two different decompositions using the distributor forward communi...
Definition Cabana_Distributor.hpp:336
Definition Cabana_Types.hpp:88
AoSoA static type checker.
Definition Cabana_AoSoA.hpp:61
Distributor static type checker.
Definition Cabana_Distributor.hpp:162
Slice static type checker.
Definition Cabana_Slice.hpp:861