Cabana 0.8.0-dev
 
Loading...
Searching...
No Matches
Cabana_Migrate_Mpi.hpp
Go to the documentation of this file.
1/****************************************************************************
2 * Copyright (c) 2018-2023 by the Cabana authors *
3 * All rights reserved. *
4 * *
5 * This file is part of the Cabana library. Cabana is distributed under a *
6 * BSD 3-clause license. For the licensing terms see the LICENSE file in *
7 * the top-level directory. *
8 * *
9 * SPDX-License-Identifier: BSD-3-Clause *
10 ****************************************************************************/
11
16#ifndef CABANA_MIGRATE_MPI_HPP
17#define CABANA_MIGRATE_MPI_HPP
18
19#include <Cabana_AoSoA.hpp>
20#include <Cabana_Slice.hpp>
21
22#include <Kokkos_Core.hpp>
23#include <Kokkos_Profiling_ScopedRegion.hpp>
24
25#include <mpi.h>
26
27#include <exception>
28#include <vector>
29
30//---------------------------------------------------------------------------//
31namespace Cabana
32{
33
34namespace Impl
35{
37//---------------------------------------------------------------------------//
38// Synchronously move data between a source and destination AoSoA by executing
39// the forward communication plan.
40template <class ExecutionSpace, class Distributor_t, class AoSoA_t>
41void migrateData(
42 Mpi, ExecutionSpace, const Distributor_t& distributor, const AoSoA_t& src,
43 AoSoA_t& dst,
44 typename std::enable_if<( ( is_distributor<Distributor_t>::value ) &&
45 is_aosoa<AoSoA_t>::value ),
46 int>::type* = 0 )
47{
48 Kokkos::Profiling::ScopedRegion region( "Cabana::migrate" );
49
50 static_assert( is_accessible_from<typename Distributor_t::memory_space,
51 ExecutionSpace>{},
52 "" );
53
54 // Get the MPI rank we are currently on.
55 int my_rank = -1;
56 MPI_Comm_rank( distributor.comm(), &my_rank );
57
58 // Get the number of neighbors.
59 int num_n = distributor.numNeighbor();
60
61 // Calculate the number of elements that are staying on this rank and
62 // therefore can be directly copied. If any of the neighbor ranks are this
63 // rank it will be stored in first position (i.e. the first neighbor in
64 // the local list is always yourself if you are sending to yourself).
65 std::size_t num_stay =
66 ( num_n > 0 && distributor.neighborRank( 0 ) == my_rank )
67 ? distributor.numExport( 0 )
68 : 0;
69
70 // Allocate a send buffer.
71 std::size_t num_send = distributor.totalNumExport() - num_stay;
72 Kokkos::View<typename AoSoA_t::tuple_type*,
73 typename Distributor_t::memory_space>
74 send_buffer( Kokkos::ViewAllocateWithoutInitializing(
75 "distributor_send_buffer" ),
76 num_send );
77
78 // Allocate a receive buffer.
79 Kokkos::View<typename AoSoA_t::tuple_type*,
80 typename Distributor_t::memory_space>
81 recv_buffer( Kokkos::ViewAllocateWithoutInitializing(
82 "distributor_recv_buffer" ),
83 distributor.totalNumImport() );
84
85 // Get the steering vector for the sends.
86 auto steering = distributor.getExportSteering();
87
88 // Gather the exports from the source AoSoA into the tuple-contiguous send
89 // buffer or the receive buffer if the data is staying. We know that the
90 // steering vector is ordered such that the data staying on this rank
91 // comes first.
92 auto build_send_buffer_func = KOKKOS_LAMBDA( const std::size_t i )
93 {
94 auto tpl = src.getTuple( steering( i ) );
95 if ( i < num_stay )
96 recv_buffer( i ) = tpl;
97 else
98 send_buffer( i - num_stay ) = tpl;
99 };
100 Kokkos::RangePolicy<ExecutionSpace> build_send_buffer_policy(
101 0, distributor.totalNumExport() );
102 Kokkos::parallel_for( "Cabana::Impl::distributeData::build_send_buffer",
103 build_send_buffer_policy, build_send_buffer_func );
104 Kokkos::fence();
105
106 // The distributor has its own communication space so choose any tag.
107 const int mpi_tag = 1234;
108
109 // Post non-blocking receives.
110 std::vector<MPI_Request> requests;
111 requests.reserve( num_n );
112 std::pair<std::size_t, std::size_t> recv_range = { 0, 0 };
113 for ( int n = 0; n < num_n; ++n )
114 {
115 recv_range.second = recv_range.first + distributor.numImport( n );
116
117 if ( ( distributor.numImport( n ) > 0 ) &&
118 ( distributor.neighborRank( n ) != my_rank ) )
119 {
120 auto recv_subview = Kokkos::subview( recv_buffer, recv_range );
121
122 requests.push_back( MPI_Request() );
123
124 MPI_Irecv( recv_subview.data(),
125 recv_subview.size() *
126 sizeof( typename AoSoA_t::tuple_type ),
127 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
128 distributor.comm(), &( requests.back() ) );
129 }
130
131 recv_range.first = recv_range.second;
132 }
133
134 // Do blocking sends.
135 std::pair<std::size_t, std::size_t> send_range = { 0, 0 };
136 for ( int n = 0; n < num_n; ++n )
137 {
138 if ( ( distributor.numExport( n ) > 0 ) &&
139 ( distributor.neighborRank( n ) != my_rank ) )
140 {
141 send_range.second = send_range.first + distributor.numExport( n );
142
143 auto send_subview = Kokkos::subview( send_buffer, send_range );
144
145 MPI_Send( send_subview.data(),
146 send_subview.size() *
147 sizeof( typename AoSoA_t::tuple_type ),
148 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
149 distributor.comm() );
150
151 send_range.first = send_range.second;
152 }
153 }
154
155 // Wait on non-blocking receives.
156 std::vector<MPI_Status> status( requests.size() );
157 const int ec =
158 MPI_Waitall( requests.size(), requests.data(), status.data() );
159 if ( MPI_SUCCESS != ec )
160 throw std::logic_error(
161 "Cabana::Distributor: Failed MPI Communication" );
162
163 // Extract the receive buffer into the destination AoSoA.
164 auto extract_recv_buffer_func = KOKKOS_LAMBDA( const std::size_t i )
165 {
166 dst.setTuple( i, recv_buffer( i ) );
167 };
168 Kokkos::RangePolicy<ExecutionSpace> extract_recv_buffer_policy(
169 0, distributor.totalNumImport() );
170 Kokkos::parallel_for( "Cabana::Impl::distributeData::extract_recv_buffer",
171 extract_recv_buffer_policy,
172 extract_recv_buffer_func );
173 Kokkos::fence();
174
175 // Barrier before completing to ensure synchronization.
176 MPI_Barrier( distributor.comm() );
177}
178
179//---------------------------------------------------------------------------//
200template <class ExecutionSpace, class Distributor_t, class Slice_t>
201void migrateSlice(
202 Mpi, ExecutionSpace, const Distributor_t& distributor, const Slice_t& src,
203 Slice_t& dst,
204 typename std::enable_if<( ( is_distributor<Distributor_t>::value ) &&
205 is_slice<Slice_t>::value ),
206 int>::type* = 0 )
207{
208 // Get the number of components in the slices.
209 size_t num_comp = 1;
210 for ( size_t d = 2; d < src.viewRank(); ++d )
211 num_comp *= src.extent( d );
212
213 // Get the raw slice data.
214 auto src_data = src.data();
215 auto dst_data = dst.data();
216
217 // Get the MPI rank we are currently on.
218 int my_rank = -1;
219 MPI_Comm_rank( distributor.comm(), &my_rank );
220
221 // Get the number of neighbors.
222 int num_n = distributor.numNeighbor();
223
224 // Calculate the number of elements that are staying on this rank and
225 // therefore can be directly copied. If any of the neighbor ranks are this
226 // rank it will be stored in first position (i.e. the first neighbor in
227 // the local list is always yourself if you are sending to yourself).
228 std::size_t num_stay =
229 ( num_n > 0 && distributor.neighborRank( 0 ) == my_rank )
230 ? distributor.numExport( 0 )
231 : 0;
232
233 // Allocate a send buffer. Note this one is layout right so the components
234 // of each element are consecutive in memory.
235 std::size_t num_send = distributor.totalNumExport() - num_stay;
236 Kokkos::View<typename Slice_t::value_type**, Kokkos::LayoutRight,
237 typename Distributor_t::memory_space>
238 send_buffer( Kokkos::ViewAllocateWithoutInitializing(
239 "distributor_send_buffer" ),
240 num_send, num_comp );
241
242 // Allocate a receive buffer. Note this one is layout right so the
243 // components of each element are consecutive in memory.
244 Kokkos::View<typename Slice_t::value_type**, Kokkos::LayoutRight,
245 typename Distributor_t::memory_space>
246 recv_buffer( Kokkos::ViewAllocateWithoutInitializing(
247 "distributor_recv_buffer" ),
248 distributor.totalNumImport(), num_comp );
249
250 // Get the steering vector for the sends.
251 auto steering = distributor.getExportSteering();
252
253 // Gather from the source Slice into the contiguous send buffer or,
254 // if it is part of the local copy, put it directly in the destination
255 // Slice.
256 auto build_send_buffer_func = KOKKOS_LAMBDA( const std::size_t i )
257 {
258 auto s_src = Slice_t::index_type::s( steering( i ) );
259 auto a_src = Slice_t::index_type::a( steering( i ) );
260 std::size_t src_offset = s_src * src.stride( 0 ) + a_src;
261 if ( i < num_stay )
262 for ( std::size_t n = 0; n < num_comp; ++n )
263 recv_buffer( i, n ) =
264 src_data[src_offset + n * Slice_t::vector_length];
265 else
266 for ( std::size_t n = 0; n < num_comp; ++n )
267 send_buffer( i - num_stay, n ) =
268 src_data[src_offset + n * Slice_t::vector_length];
269 };
270 Kokkos::RangePolicy<ExecutionSpace> build_send_buffer_policy(
271 0, distributor.totalNumExport() );
272 Kokkos::parallel_for( "Cabana::migrate::build_send_buffer",
273 build_send_buffer_policy, build_send_buffer_func );
274 Kokkos::fence();
275
276 // The distributor has its own communication space so choose any tag.
277 const int mpi_tag = 1234;
278
279 // Post non-blocking receives.
280 std::vector<MPI_Request> requests;
281 requests.reserve( num_n );
282 std::pair<std::size_t, std::size_t> recv_range = { 0, 0 };
283 for ( int n = 0; n < num_n; ++n )
284 {
285 recv_range.second = recv_range.first + distributor.numImport( n );
286
287 if ( ( distributor.numImport( n ) > 0 ) &&
288 ( distributor.neighborRank( n ) != my_rank ) )
289 {
290 auto recv_subview =
291 Kokkos::subview( recv_buffer, recv_range, Kokkos::ALL );
292
293 requests.push_back( MPI_Request() );
294
295 MPI_Irecv( recv_subview.data(),
296 recv_subview.size() *
297 sizeof( typename Slice_t::value_type ),
298 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
299 distributor.comm(), &( requests.back() ) );
300 }
301
302 recv_range.first = recv_range.second;
303 }
304
305 // Do blocking sends.
306 std::pair<std::size_t, std::size_t> send_range = { 0, 0 };
307 for ( int n = 0; n < num_n; ++n )
308 {
309 if ( ( distributor.numExport( n ) > 0 ) &&
310 ( distributor.neighborRank( n ) != my_rank ) )
311 {
312 send_range.second = send_range.first + distributor.numExport( n );
313
314 auto send_subview =
315 Kokkos::subview( send_buffer, send_range, Kokkos::ALL );
316
317 MPI_Send( send_subview.data(),
318 send_subview.size() *
319 sizeof( typename Slice_t::value_type ),
320 MPI_BYTE, distributor.neighborRank( n ), mpi_tag,
321 distributor.comm() );
322
323 send_range.first = send_range.second;
324 }
325 }
326
327 // Wait on non-blocking receives.
328 std::vector<MPI_Status> status( requests.size() );
329 const int ec =
330 MPI_Waitall( requests.size(), requests.data(), status.data() );
331 if ( MPI_SUCCESS != ec )
332 throw std::logic_error( "Cabana::migrate: Failed MPI Communication" );
333
334 // Extract the data from the receive buffer into the destination Slice.
335 auto extract_recv_buffer_func = KOKKOS_LAMBDA( const std::size_t i )
336 {
337 auto s = Slice_t::index_type::s( i );
338 auto a = Slice_t::index_type::a( i );
339 std::size_t dst_offset = s * dst.stride( 0 ) + a;
340 for ( std::size_t n = 0; n < num_comp; ++n )
341 dst_data[dst_offset + n * Slice_t::vector_length] =
342 recv_buffer( i, n );
343 };
344 Kokkos::RangePolicy<ExecutionSpace> extract_recv_buffer_policy(
345 0, distributor.totalNumImport() );
346 Kokkos::parallel_for( "Cabana::migrate::extract_recv_buffer",
347 extract_recv_buffer_policy,
348 extract_recv_buffer_func );
349 Kokkos::fence();
350
351 // Barrier before completing to ensure synchronization.
352 MPI_Barrier( distributor.comm() );
353}
354
355//---------------------------------------------------------------------------//
357} // end namespace Impl
358
359} // end namespace Cabana
360
361#endif // CABANA_MIGRATE_MPI_HPP
Array-of-Struct-of-Arrays particle data structure.
Slice a single particle property from an AoSoA.
Core: particle data structures and algorithms.
Definition Cabana_AoSoA.hpp:36