16#ifndef CABANA_DISTRIBUTOR_HPP
17#define CABANA_DISTRIBUTOR_HPP
23#include <Kokkos_Core.hpp>
24#include <Kokkos_Profiling_ScopedRegion.hpp>
61template <
class MemorySpace>
99 template <
class ViewType>
101 const std::vector<int>& neighbor_ranks )
105 element_export_ranks, neighbor_ranks );
136 template <
class ViewType>
148struct is_distributor_impl :
public std::false_type
152template <
typename MemorySpace>
153struct is_distributor_impl<
Distributor<MemorySpace>> :
public std::true_type
161 :
public is_distributor_impl<typename std::remove_cv<T>::type>::type
172template <
class ExecutionSpace,
class Distributor_t,
class AoSoA_t>
174 ExecutionSpace,
const Distributor_t& distributor,
const AoSoA_t& src,
180 Kokkos::Profiling::ScopedRegion region(
"Cabana::migrate" );
188 MPI_Comm_rank( distributor.comm(), &my_rank );
191 int num_n = distributor.numNeighbor();
197 std::size_t num_stay =
198 ( num_n > 0 && distributor.neighborRank( 0 ) == my_rank )
199 ? distributor.numExport( 0 )
203 std::size_t num_send = distributor.totalNumExport() - num_stay;
204 Kokkos::View<
typename AoSoA_t::tuple_type*,
205 typename Distributor_t::memory_space>
206 send_buffer( Kokkos::ViewAllocateWithoutInitializing(
207 "distributor_send_buffer" ),
211 Kokkos::View<
typename AoSoA_t::tuple_type*,
212 typename Distributor_t::memory_space>
213 recv_buffer( Kokkos::ViewAllocateWithoutInitializing(
214 "distributor_recv_buffer" ),
215 distributor.totalNumImport() );
218 auto steering = distributor.getExportSteering();
224 auto build_send_buffer_func = KOKKOS_LAMBDA(
const std::size_t i )
226 auto tpl = src.getTuple( steering( i ) );
228 recv_buffer( i ) = tpl;
230 send_buffer( i - num_stay ) = tpl;
232 Kokkos::RangePolicy<ExecutionSpace> build_send_buffer_policy(
233 0, distributor.totalNumExport() );
234 Kokkos::parallel_for(
"Cabana::Impl::distributeData::build_send_buffer",
235 build_send_buffer_policy, build_send_buffer_func );
239 const int mpi_tag = 1234;
242 std::vector<MPI_Request> requests;
243 requests.reserve( num_n );
244 std::pair<std::size_t, std::size_t> recv_range = { 0, 0 };
245 for (
int n = 0; n < num_n; ++n )
247 recv_range.second = recv_range.first + distributor.numImport( n );
249 if ( ( distributor.numImport( n ) > 0 ) &&
250 ( distributor.neighborRank( n ) != my_rank ) )
252 auto recv_subview = Kokkos::subview( recv_buffer, recv_range );
254 requests.push_back( MPI_Request() );
256 MPI_Irecv( recv_subview.data(),
257 recv_subview.size() *
258 sizeof(
typename AoSoA_t::tuple_type ),
259 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
260 distributor.comm(), &( requests.back() ) );
263 recv_range.first = recv_range.second;
267 std::pair<std::size_t, std::size_t> send_range = { 0, 0 };
268 for (
int n = 0; n < num_n; ++n )
270 if ( ( distributor.numExport( n ) > 0 ) &&
271 ( distributor.neighborRank( n ) != my_rank ) )
273 send_range.second = send_range.first + distributor.numExport( n );
275 auto send_subview = Kokkos::subview( send_buffer, send_range );
277 MPI_Send( send_subview.data(),
278 send_subview.size() *
279 sizeof(
typename AoSoA_t::tuple_type ),
280 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
281 distributor.comm() );
283 send_range.first = send_range.second;
288 std::vector<MPI_Status> status( requests.size() );
290 MPI_Waitall( requests.size(), requests.data(), status.data() );
291 if ( MPI_SUCCESS != ec )
292 throw std::logic_error(
"Failed MPI Communication" );
295 auto extract_recv_buffer_func = KOKKOS_LAMBDA(
const std::size_t i )
297 dst.setTuple( i, recv_buffer( i ) );
299 Kokkos::RangePolicy<ExecutionSpace> extract_recv_buffer_policy(
300 0, distributor.totalNumImport() );
301 Kokkos::parallel_for(
"Cabana::Impl::distributeData::extract_recv_buffer",
302 extract_recv_buffer_policy,
303 extract_recv_buffer_func );
307 MPI_Barrier( distributor.comm() );
334template <
class ExecutionSpace,
class Distributor_t,
class AoSoA_t>
335void migrate( ExecutionSpace exec_space,
const Distributor_t& distributor,
336 const AoSoA_t& src, AoSoA_t& dst,
342 if ( src.size() != distributor.exportSize() )
343 throw std::runtime_error(
"Source is the wrong size for migration!" );
344 if ( dst.size() != distributor.totalNumImport() )
345 throw std::runtime_error(
346 "Destination is the wrong size for migration!" );
349 Impl::distributeData( exec_space, distributor, src, dst );
369template <
class Distributor_t,
class AoSoA_t>
370void migrate(
const Distributor_t& distributor,
const AoSoA_t& src,
376 migrate(
typename Distributor_t::execution_space{}, distributor, src, dst );
403template <
class ExecutionSpace,
class Distributor_t,
class AoSoA_t>
404void migrate( ExecutionSpace exec_space,
const Distributor_t& distributor,
411 if ( aosoa.size() != distributor.exportSize() )
412 throw std::runtime_error(
"AoSoA is the wrong size for migration!" );
417 ( distributor.totalNumImport() > distributor.exportSize() );
422 aosoa.resize( distributor.totalNumImport() );
425 Impl::distributeData( exec_space, distributor, aosoa, aosoa );
429 if ( !dst_is_bigger )
430 aosoa.resize( distributor.totalNumImport() );
454template <
class Distributor_t,
class AoSoA_t>
455void migrate(
const Distributor_t& distributor, AoSoA_t& aosoa,
460 migrate(
typename Distributor_t::execution_space{}, distributor, aosoa );
484template <
class ExecutionSpace,
class Distributor_t,
class Slice_t>
485void migrate( ExecutionSpace,
const Distributor_t& distributor,
486 const Slice_t& src, Slice_t& dst,
492 if ( src.size() != distributor.exportSize() )
493 throw std::runtime_error(
"Source is the wrong size for migration!" );
494 if ( dst.size() != distributor.totalNumImport() )
495 throw std::runtime_error(
496 "Destination is the wrong size for migration!" );
500 for (
size_t d = 2; d < src.viewRank(); ++d )
501 num_comp *= src.extent( d );
504 auto src_data = src.data();
505 auto dst_data = dst.data();
509 MPI_Comm_rank( distributor.comm(), &my_rank );
512 int num_n = distributor.numNeighbor();
518 std::size_t num_stay =
519 ( num_n > 0 && distributor.neighborRank( 0 ) == my_rank )
520 ? distributor.numExport( 0 )
525 std::size_t num_send = distributor.totalNumExport() - num_stay;
526 Kokkos::View<
typename Slice_t::value_type**, Kokkos::LayoutRight,
527 typename Distributor_t::memory_space>
528 send_buffer( Kokkos::ViewAllocateWithoutInitializing(
529 "distributor_send_buffer" ),
530 num_send, num_comp );
534 Kokkos::View<
typename Slice_t::value_type**, Kokkos::LayoutRight,
535 typename Distributor_t::memory_space>
536 recv_buffer( Kokkos::ViewAllocateWithoutInitializing(
537 "distributor_recv_buffer" ),
538 distributor.totalNumImport(), num_comp );
541 auto steering = distributor.getExportSteering();
546 auto build_send_buffer_func = KOKKOS_LAMBDA(
const std::size_t i )
548 auto s_src = Slice_t::index_type::s( steering( i ) );
549 auto a_src = Slice_t::index_type::a( steering( i ) );
550 std::size_t src_offset = s_src * src.stride( 0 ) + a_src;
552 for ( std::size_t n = 0; n < num_comp; ++n )
553 recv_buffer( i, n ) =
554 src_data[src_offset + n * Slice_t::vector_length];
556 for ( std::size_t n = 0; n < num_comp; ++n )
557 send_buffer( i - num_stay, n ) =
558 src_data[src_offset + n * Slice_t::vector_length];
560 Kokkos::RangePolicy<ExecutionSpace> build_send_buffer_policy(
561 0, distributor.totalNumExport() );
562 Kokkos::parallel_for(
"Cabana::migrate::build_send_buffer",
563 build_send_buffer_policy, build_send_buffer_func );
567 const int mpi_tag = 1234;
570 std::vector<MPI_Request> requests;
571 requests.reserve( num_n );
572 std::pair<std::size_t, std::size_t> recv_range = { 0, 0 };
573 for (
int n = 0; n < num_n; ++n )
575 recv_range.second = recv_range.first + distributor.numImport( n );
577 if ( ( distributor.numImport( n ) > 0 ) &&
578 ( distributor.neighborRank( n ) != my_rank ) )
581 Kokkos::subview( recv_buffer, recv_range, Kokkos::ALL );
583 requests.push_back( MPI_Request() );
585 MPI_Irecv( recv_subview.data(),
586 recv_subview.size() *
587 sizeof(
typename Slice_t::value_type ),
588 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
589 distributor.comm(), &( requests.back() ) );
592 recv_range.first = recv_range.second;
596 std::pair<std::size_t, std::size_t> send_range = { 0, 0 };
597 for (
int n = 0; n < num_n; ++n )
599 if ( ( distributor.numExport( n ) > 0 ) &&
600 ( distributor.neighborRank( n ) != my_rank ) )
602 send_range.second = send_range.first + distributor.numExport( n );
605 Kokkos::subview( send_buffer, send_range, Kokkos::ALL );
607 MPI_Send( send_subview.data(),
608 send_subview.size() *
609 sizeof(
typename Slice_t::value_type ),
610 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
611 distributor.comm() );
613 send_range.first = send_range.second;
618 std::vector<MPI_Status> status( requests.size() );
620 MPI_Waitall( requests.size(), requests.data(), status.data() );
621 if ( MPI_SUCCESS != ec )
622 throw std::logic_error(
"Failed MPI Communication" );
625 auto extract_recv_buffer_func = KOKKOS_LAMBDA(
const std::size_t i )
627 auto s = Slice_t::index_type::s( i );
628 auto a = Slice_t::index_type::a( i );
629 std::size_t dst_offset = s * dst.stride( 0 ) + a;
630 for ( std::size_t n = 0; n < num_comp; ++n )
631 dst_data[dst_offset + n * Slice_t::vector_length] =
634 Kokkos::RangePolicy<ExecutionSpace> extract_recv_buffer_policy(
635 0, distributor.totalNumImport() );
636 Kokkos::parallel_for(
"Cabana::migrate::extract_recv_buffer",
637 extract_recv_buffer_policy,
638 extract_recv_buffer_func );
642 MPI_Barrier( distributor.comm() );
664template <
class Distributor_t,
class Slice_t>
665void migrate(
const Distributor_t& distributor,
const Slice_t& src,
671 migrate(
typename Distributor_t::execution_space{}, distributor, src, dst );
Array-of-Struct-of-Arrays particle data structure.
Multi-node communication patterns.
Slice a single particle property from an AoSoA.
Kokkos::View< size_type *, memory_space > createFromExportsOnly(ExecutionSpace exec_space, const ViewType &element_export_ranks)
Export rank creator. Use this when you don't know who you will receiving from - only who you are send...
Definition Cabana_CommunicationPlan.hpp:756
void createExportSteering(const PackViewType &neighbor_ids, const RankViewType &element_export_ranks)
Create the export steering vector.
Definition Cabana_CommunicationPlan.hpp:942
MPI_Comm comm() const
Get the MPI communicator.
Definition Cabana_CommunicationPlan.hpp:468
Kokkos::View< size_type *, memory_space > createFromExportsAndTopology(ExecutionSpace exec_space, const ViewType &element_export_ranks, const std::vector< int > &neighbor_ranks)
Neighbor and export rank creator. Use this when you already know which ranks neighbor each other (i....
Definition Cabana_CommunicationPlan.hpp:596
CommunicationPlan(MPI_Comm comm)
Constructor.
Definition Cabana_CommunicationPlan.hpp:446
A communication plan for migrating data from one uniquely-owned decomposition to another uniquely own...
Definition Cabana_Distributor.hpp:63
Distributor(MPI_Comm comm, const ViewType &element_export_ranks)
Export rank constructor. Use this when you don't know who you will be receiving from - only who you a...
Definition Cabana_Distributor.hpp:137
Distributor(MPI_Comm comm, const ViewType &element_export_ranks, const std::vector< int > &neighbor_ranks)
Topology and export rank constructor. Use this when you already know which ranks neighbor each other ...
Definition Cabana_Distributor.hpp:100
Core: particle data structures and algorithms.
Definition Cabana_AoSoA.hpp:36
void migrate(ExecutionSpace exec_space, const Distributor_t &distributor, const AoSoA_t &src, AoSoA_t &dst, typename std::enable_if<(is_distributor< Distributor_t >::value &&is_aosoa< AoSoA_t >::value), int >::type *=0)
Synchronously migrate data between two different decompositions using the distributor forward communi...
Definition Cabana_Distributor.hpp:335
Definition Cabana_Types.hpp:88
AoSoA static type checker.
Definition Cabana_AoSoA.hpp:61
Distributor static type checker.
Definition Cabana_Distributor.hpp:162
Slice static type checker.
Definition Cabana_Slice.hpp:861