diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml index 20d8960a9c..6809ab9db9 100644 --- a/.github/workflows/cmake.yml +++ b/.github/workflows/cmake.yml @@ -42,7 +42,6 @@ jobs: -DMPIEXEC_PREFLAGS='--bind-to;none;--allow-run-as-root' -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/install -DTTG_EXAMPLES=ON - -DCMAKE_CXX_STANDARD=20 steps: - uses: actions/checkout@v4 @@ -50,7 +49,7 @@ jobs: - name: Install prerequisite MacOS packages if: ${{ matrix.os == 'macos-latest' }} run: | - brew install ninja boost eigen open-mpi bison ccache + brew install ninja eigen open-mpi bison ccache echo "MPIEXEC=/opt/homebrew/bin/mpiexec" >> $GITHUB_ENV - name: Install prerequisites Ubuntu packages @@ -59,7 +58,7 @@ jobs: wget -O - https://apt.kitware.com/keys/kitware-archive-latest.asc 2>/dev/null | gpg --dearmor - | sudo tee /etc/apt/trusted.gpg.d/kitware.gpg >/dev/null sudo apt-add-repository "deb https://apt.kitware.com/ubuntu/ $(lsb_release -cs) main" sudo apt-get update - sudo apt-get -y install ninja-build g++-12 liblapack-dev libboost-dev libboost-serialization-dev libboost-random-dev libeigen3-dev openmpi-bin libopenmpi-dev libtbb-dev ccache flex bison cmake doxygen + sudo apt-get -y install ninja-build g++-12 liblapack-dev libeigen3-dev openmpi-bin libopenmpi-dev libtbb-dev ccache flex bison cmake doxygen echo "MPIEXEC=/usr/bin/mpiexec" >> $GITHUB_ENV - name: Install extra dependencies @@ -69,7 +68,7 @@ jobs: sudo dpkg -i cuda-keyring_1.1-1_all.deb sudo apt update sudo apt install -y cuda-toolkit - echo "CUDA_BUILD_OPTS=-DENABLE_CUDA=ON -DTTG_ENABLE_CUDA=ON -DCUDA_TOOLKIT_ROOT_DIR=/usr/local/cuda-12.6 -DCMAKE_CUDA_COMPILER=/usr/local/cuda-12.6/bin/nvcc -DCMAKE_CUDA_HOST_COMPILER=${{ matrix.cxx }}" >> $GITHUB_ENV + echo "CUDA_BUILD_OPTS=-DENABLE_CUDA=ON -DTTG_ENABLE_CUDA=ON -DCUDA_TOOLKIT_ROOT_DIR=/usr/local/cuda-12 -DCMAKE_CUDA_COMPILER=/usr/local/cuda-12/bin/nvcc -DCMAKE_CUDA_HOST_COMPILER=${{ matrix.cxx }}" >> $GITHUB_ENV - name: Create Build Environment # Some projects don't allow in-source building, so create a separate build directory diff --git a/CMakeLists.txt b/CMakeLists.txt index 230c231242..e790d09036 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -52,7 +52,7 @@ set(CMAKE_INSTALL_CMAKEDIR "lib/cmake/ttg" ######################################## #### user-defined configuration options ######################################## -option(TTG_PARSEC_USE_BOOST_SERIALIZATION "Whether to select Boost serialization methods in PaRSEC backend" ON) +option(TTG_PARSEC_USE_BOOST_SERIALIZATION "Whether to select Boost serialization methods in PaRSEC backend" OFF) option(TTG_ENABLE_CUDA "Whether to TTG will look for CUDA" OFF) option(TTG_ENABLE_HIP "Whether to TTG will look for HIP" OFF) option(TTG_ENABLE_LEVEL_ZERO "Whether to TTG will look for Intel oneAPI Level Zero" OFF) diff --git a/cmake/modules/ExternalDependenciesVersions.cmake b/cmake/modules/ExternalDependenciesVersions.cmake index dd7347c254..b408fac581 100644 --- a/cmake/modules/ExternalDependenciesVersions.cmake +++ b/cmake/modules/ExternalDependenciesVersions.cmake @@ -1,10 +1,10 @@ # for each dependency track both current and previous id (the variable for the latter must contain PREVIOUS) # to be able to auto-update them -set(TTG_TRACKED_VG_CMAKE_KIT_TAG 878654d0cb1904049fbd2c37b37d5385ae897658) # provides FindOrFetchLinalgPP and "real" FindOrFetchBoost +set(TTG_TRACKED_VG_CMAKE_KIT_TAG cda539db32be6e8171f5cbebdb1a7c38d5ab4b34) # provides FindOrFetchLinalgPP and "real" FindOrFetchBoost set(TTG_TRACKED_CATCH2_VERSION 3.5.0) set(TTG_TRACKED_MADNESS_TAG 93a9a5cec2a8fa87fba3afe8056607e6062a9058) -set(TTG_TRACKED_PARSEC_TAG 58f8f3089ecad2e8ee50e80a9586e05ce8873b1c) +set(TTG_TRACKED_PARSEC_TAG 996dda4c0ff3120bc65385f86e999befd4b3fe7a) set(TTG_TRACKED_BTAS_TAG c25b0a11d2a76190bfb13fa72f9e9dc3e57c3c2f) set(TTG_TRACKED_TILEDARRAY_TAG 5944bdba3266a3fa19f1809c8e2accf3dad4d815) diff --git a/cmake/modules/FindOrFetchBoost.cmake b/cmake/modules/FindOrFetchBoost.cmake index 8817413a2d..4c6422400f 100644 --- a/cmake/modules/FindOrFetchBoost.cmake +++ b/cmake/modules/FindOrFetchBoost.cmake @@ -24,6 +24,8 @@ if (TTG_PARSEC_USE_BOOST_SERIALIZATION) serialization iostreams ) +else() + list(APPEND BOOST_EXCLUDE_LIBRARIES iostreams) # install of this library fails unless it's already built endif() if (BUILD_EXAMPLES) list(APPEND optional_components diff --git a/cmake/modules/FindOrFetchPARSEC.cmake b/cmake/modules/FindOrFetchPARSEC.cmake index b3fd5faa3a..7b164019fe 100644 --- a/cmake/modules/FindOrFetchPARSEC.cmake +++ b/cmake/modules/FindOrFetchPARSEC.cmake @@ -17,7 +17,7 @@ if (NOT TARGET PaRSEC::parsec) FetchContent_Declare( PARSEC - GIT_REPOSITORY https://github.com/ICLDisco/parsec.git + GIT_REPOSITORY https://github.com/devreal/parsec-1.git GIT_TAG ${TTG_TRACKED_PARSEC_TAG} ) FetchContent_MakeAvailable(PARSEC) diff --git a/examples/matrixtile.h b/examples/matrixtile.h index 203c58bf29..4288d0aa4e 100644 --- a/examples/matrixtile.h +++ b/examples/matrixtile.h @@ -41,8 +41,6 @@ inline void allocator_fini() { } template > class MatrixTile : public ttg::TTValue> { public: - using metadata_t = typename std::tuple; - using buffer_t = typename ttg::Buffer; using ttvalue_type = ttg::TTValue>; @@ -62,6 +60,10 @@ class MatrixTile : public ttg::TTValue> { #endif // DEBUG_TILES_VALUES } + struct non_owning_deleter { + void operator()(T* ptr) { } + }; + public: MatrixTile() {} @@ -73,19 +75,13 @@ class MatrixTile : public ttg::TTValue> { , _lda(lda) { } - MatrixTile(const metadata_t& metadata) - : MatrixTile(std::get<0>(metadata), std::get<1>(metadata), std::get<2>(metadata)) {} - - MatrixTile(const metadata_t& metadata, T* data) - : MatrixTile(std::get<0>(metadata), std::get<1>(metadata), std::forward(data), std::get<2>(metadata)) {} - /** * Constructor with outside memory. The tile will *not* delete this memory * upon destruction. */ MatrixTile(std::size_t rows, std::size_t cols, T* data, std::size_t lda) : ttvalue_type() - , _buffer(data, lda*cols) + , _buffer(std::unique_ptr(data, non_owning_deleter{}), lda*cols) , _rows(rows) , _cols(cols) , _lda(lda) @@ -121,15 +117,6 @@ class MatrixTile : public ttg::TTValue> { return *this; } - void set_metadata(metadata_t meta) { - _rows = std::get<0>(meta); - _cols = std::get<1>(meta); - _lda = std::get<2>(meta); - this->realloc(); - } - - metadata_t get_metadata(void) const { return metadata_t{_rows, _cols, _lda}; } - // Accessing the raw data T* data() { return _buffer.host_ptr(); } @@ -187,46 +174,21 @@ class MatrixTile : public ttg::TTValue> { o << " } "; return o; } -}; - -namespace ttg { - template - struct SplitMetadataDescriptor> { - auto get_metadata(const MatrixTile& t) { return t.get_metadata(); } - - auto get_data(MatrixTile& t) { return std::array({t.size() * sizeof(T), t.data()}); } - - auto create_from_metadata(const typename MatrixTile::metadata_t& meta) { return MatrixTile(meta); } - }; + template + void serialize(Archive& ar, const unsigned int version) { + serialize(ar); + } -} // namespace ttg + template + void serialize(Archive& ar) { + ar & _rows & _cols & _lda; + ar & buffer(); + } +}; #ifdef TTG_SERIALIZATION_SUPPORTS_MADNESS -namespace madness { - namespace archive { - template - struct ArchiveStoreImpl> { - static inline void store(const Archive& ar, const MatrixTile& tile) { - ar << tile.rows() << tile.cols() << tile.lda(); - ar << wrap(tile.data(), tile.rows() * tile.cols()); - } - }; - - template - struct ArchiveLoadImpl> { - static inline void load(const Archive& ar, MatrixTile& tile) { - std::size_t rows, cols, lda; - ar >> rows >> cols >> lda; - tile = MatrixTile(rows, cols, lda); - ar >> wrap(tile.data(), tile.rows() * tile.cols()); // MatrixTile(bm.rows(), bm.cols()); - } - }; - } // namespace archive -} // namespace madness - static_assert(madness::is_serializable_v>); - #endif // TTG_SERIALIZATION_SUPPORTS_MADNESS #endif // TTG_EXAMPLES_MATRIX_TILE_H diff --git a/examples/potrf/testing_dlauum.cc b/examples/potrf/testing_dlauum.cc index bab6d676cf..0ff569dffa 100644 --- a/examples/potrf/testing_dlauum.cc +++ b/examples/potrf/testing_dlauum.cc @@ -71,8 +71,6 @@ int main(int argc, char **argv) int P = std::sqrt(world.size()); int Q = (world.size() + P - 1)/P; - static_assert(ttg::has_split_metadata>::value); - std::cout << "Creating 2D block cyclic matrix with NB " << NB << " N " << N << " M " << M << " P " << P << std::endl; parsec_matrix_sym_block_cyclic_t dcA; diff --git a/examples/potrf/testing_dpoinv.cc b/examples/potrf/testing_dpoinv.cc index 646f2477e6..ba8892e039 100644 --- a/examples/potrf/testing_dpoinv.cc +++ b/examples/potrf/testing_dpoinv.cc @@ -113,8 +113,6 @@ int main(int argc, char **argv) Q = (world.size() + P - 1)/P; } - static_assert(ttg::has_split_metadata>::value); - if(verbose) { std::cout << "Creating 2D block cyclic matrix with NB " << NB << " N " << N << " M " << M << " P " << P << std::endl; } diff --git a/examples/potrf/testing_dpotrf.cc b/examples/potrf/testing_dpotrf.cc index 781cc417ca..2200f30ef1 100644 --- a/examples/potrf/testing_dpotrf.cc +++ b/examples/potrf/testing_dpotrf.cc @@ -92,8 +92,6 @@ int main(int argc, char **argv) check = false; } - static_assert(ttg::has_split_metadata>::value); - if (world.rank() == 0) { std::cout << "Creating 2D block cyclic matrix with NB " << NB << " N " << N << " M " << M << " P " << P << " Q " << Q << std::endl; } diff --git a/examples/potrf/testing_dtrtri.cc b/examples/potrf/testing_dtrtri.cc index 3124ab2be0..41ff385e5b 100644 --- a/examples/potrf/testing_dtrtri.cc +++ b/examples/potrf/testing_dtrtri.cc @@ -92,8 +92,6 @@ int main(int argc, char **argv) check = false; } - static_assert(ttg::has_split_metadata>::value); - std::cout << "Creating 2D block cyclic matrix with NB " << NB << " N " << N << " M " << M << " P " << P << std::endl; parsec_matrix_sym_block_cyclic_t dcA; diff --git a/examples/spmm/devicetensor.h b/examples/spmm/devicetensor.h index d5a0f6d9fe..ce896b6457 100644 --- a/examples/spmm/devicetensor.h +++ b/examples/spmm/devicetensor.h @@ -20,6 +20,18 @@ #if defined(BTAS_IS_USABLE) +namespace detail { + template + struct mohndle_type; + + template + struct mohndle_type> : std::integral_constant + { }; + + template + constexpr btas::Handle mohndle_type_v = mohndle_type::value; +} // namespace detail + /** * Derives from btas::Tensor and wraps a ttg::Buffer * to enable device support in SPMM. The ttg::Buffer @@ -38,6 +50,9 @@ struct DeviceTensor : public ttg::TTValue> using storage_type = typename tensor_type::storage_type; using range_type = typename tensor_type::range_type; + static_assert(detail::mohndle_type_v<_Storage> == btas::Handle::shared_ptr, + "DeviceTensor only supports shared_ptr"); + public: DeviceTensor() = default; @@ -48,7 +63,7 @@ struct DeviceTensor : public ttg::TTValue> explicit DeviceTensor(const size_type& first, const _args&... rest) : ttvalue_type() , tensor_type(first, rest...) - , b(this->size() ? this->data() : nullptr, this->size()) + , b(std::shared_ptr<_T[]>(pointer(), this->size() ? this->data() : nullptr), this->size()) { } /// construct from \c range, allocate data, but not initialized @@ -56,7 +71,7 @@ struct DeviceTensor : public ttg::TTValue> explicit DeviceTensor(const Range& range, typename std::enable_if::value>::type* = 0) : ttvalue_type() , tensor_type(range) - , b(this->size() ? this->data() : nullptr, this->size()) + , b(std::shared_ptr<_T[]>(pointer(), this->size() ? this->data() : nullptr), this->size()) { } /// construct from \c range object, set all elements to \c v @@ -64,7 +79,7 @@ struct DeviceTensor : public ttg::TTValue> DeviceTensor(const Range& range, value_type v, typename std::enable_if::value>::type* = 0) : ttvalue_type() , tensor_type(range) - , b(this->size() ? this->data() : nullptr, this->size()) + , b(std::shared_ptr<_T[]>(pointer(), this->size() ? this->data() : nullptr), this->size()) { } /// construct from \c range object, copy elements from \c vec @@ -72,7 +87,7 @@ struct DeviceTensor : public ttg::TTValue> DeviceTensor(const Range& range, U* vec, typename std::enable_if::value>::type* = 0) : ttvalue_type() , tensor_type(range, vec) - , b(this->size() ? this->data() : nullptr, this->size()) + , b(std::shared_ptr<_T[]>(pointer(), this->size() ? this->data() : nullptr), this->size()) { } /// construct from \c range and \c storage @@ -82,28 +97,28 @@ struct DeviceTensor : public ttg::TTValue> not std::is_same::value>::type* = 0) : ttvalue_type() , tensor_type(range, storage) - , b(this->size() ? this->data() : nullptr, this->size()) + , b(std::shared_ptr<_T[]>(pointer(), this->size() ? this->data() : nullptr), this->size()) { } /// copy-copy-construct from \c range and \c storage DeviceTensor(const range_type& range, const storage_type& storage) : ttvalue_type() , tensor_type(range, storage) - , b(this->size() ? this->data() : nullptr, this->size()) + , b(std::shared_ptr<_T[]>(pointer(), this->size() ? this->data() : nullptr), this->size()) { } /// copy-move-construct from \c range and \c storage DeviceTensor(const range_type& range, storage_type&& storage) : ttvalue_type() , tensor_type(range, std::forward(storage)) - , b(this->size() ? this->data() : nullptr, this->size()) + , b(std::shared_ptr<_T[]>(pointer(), this->size() ? this->data() : nullptr), this->size()) { } /// move-construct from \c range and \c storage DeviceTensor(range_type&& range, storage_type&& storage) : ttvalue_type() , tensor_type(std::forward(range), std::forward(storage)) - , b(this->size() ? this->data() : nullptr, this->size()) + , b(std::shared_ptr<_T[]>(pointer(), this->size() ? this->data() : nullptr), this->size()) { } /// Construct an evaluated tensor @@ -125,7 +140,7 @@ struct DeviceTensor : public ttg::TTValue> typename std::enable_if::value>::type* = 0) : ttvalue_type() , tensor_type(range, it, op) - , b(this->size() ? this->data() : nullptr, this->size()) + , b(std::shared_ptr<_T[]>(pointer(), this->size() ? this->data() : nullptr), this->size()) { } /// copy constructor @@ -134,7 +149,7 @@ struct DeviceTensor : public ttg::TTValue> DeviceTensor(const _Tensor& x) noexcept : ttvalue_type() , tensor_type(x.clone()) - , b(this->size() ? this->data() : nullptr, this->size()) + , b(std::shared_ptr<_T[]>(pointer(), this->size() ? this->data() : nullptr), this->size()) { //std::cout << "DeviceTensor tensor_type copy ctor" << std::endl; } @@ -143,7 +158,7 @@ struct DeviceTensor : public ttg::TTValue> DeviceTensor(const DeviceTensor& x) noexcept : ttvalue_type(x) , tensor_type(x.clone()) - , b(this->size() ? this->data() : nullptr, this->size()) + , b(std::shared_ptr<_T[]>(pointer(), this->size() ? this->data() : nullptr), this->size()) { //std::cout << "DeviceTensor copy ctor" << std::endl; } @@ -152,7 +167,7 @@ struct DeviceTensor : public ttg::TTValue> DeviceTensor(tensor_type&& x) noexcept : ttvalue_type() , tensor_type(std::move(x)) - , b(this->size() ? this->data() : nullptr, this->size()) + , b(std::shared_ptr<_T[]>(pointer(), this->size() ? this->data() : nullptr), this->size()) { //std::cout << "DeviceTensor tensor_type move ctor" << std::endl; } @@ -172,7 +187,7 @@ struct DeviceTensor : public ttg::TTValue> not std::is_same::value>::type> DeviceTensor& operator=(const _Tensor& x) noexcept { tensor_type::operator=(x.clone()); - b.reset(this->size() ? this->data() : nullptr, this->size()); + b.reset(std::shared_ptr<_T[]>(pointer(), this->size() ? this->data() : nullptr), this->size()); //std::cout << "DeviceTensor tensor_type copy operator" << std::endl; return *this; } @@ -183,7 +198,7 @@ struct DeviceTensor : public ttg::TTValue> std::is_same::value>::type> DeviceTensor& operator=(const _Tensor& x) noexcept { tensor_type::operator=(x.clone()); - b.reset(this->size() ? this->data() : nullptr, this->size()); + b.reset(std::shared_ptr<_T[]>(pointer(), this->size() ? this->data() : nullptr), this->size()); //std::cout << "DeviceTensor tensor_type copy operator" << std::endl; return *this; } @@ -192,7 +207,7 @@ struct DeviceTensor : public ttg::TTValue> DeviceTensor& operator=(const DeviceTensor& x) noexcept { ttvalue_type::operator=(x); tensor_type::operator=(x.clone()); - b.reset(this->size() ? this->data() : nullptr, this->size()); + b.reset(std::shared_ptr<_T[]>(pointer(), this->size() ? this->data() : nullptr), this->size()); //std::cout << "DeviceTensor copy operator" << std::endl; return *this; } @@ -206,13 +221,41 @@ struct DeviceTensor : public ttg::TTValue> return *this; } + auto pointer() { + return std::get(detail::mohndle_type_v)>(this->storage().base()); + } + using tensor_type::begin; using tensor_type::cbegin; using tensor_type::end; using tensor_type::cend; - }; +namespace madness { + namespace archive { + template + struct ArchiveLoadImpl> { + static inline void load(const Archive& ar, + DeviceTensor<_T, _Range, _Store>& t) { + _Range range{}; + ar& range; + t = DeviceTensor<_T, _Range, _Store>(std::move(range)); + /* pick the buffer out of the archive + * this should not change the buffer since it's + * already been allocated above */ + ar & t.b; + } + }; + + template + struct ArchiveStoreImpl> { + static inline void store(const Archive& ar, + const DeviceTensor<_T, _Range, _Store>& t) { + ar& t.range() & t.b; + } + }; + } // namespace archive +} // namespace madness #endif // defined(BTAS_IS_USABLE) #endif // HAVE_DEVICETENSOR_H \ No newline at end of file diff --git a/examples/task-benchmarks/chain-ttg-dev.cc b/examples/task-benchmarks/chain-ttg-dev.cc index 80f14bff44..66e15adbff 100644 --- a/examples/task-benchmarks/chain-ttg-dev.cc +++ b/examples/task-benchmarks/chain-ttg-dev.cc @@ -18,13 +18,10 @@ using namespace ttg; std::atomic task_counter = 0; struct A : public ttg::TTValue { - // TODO: allocate pinned memory - int v = 0; ttg::Buffer b; - A() : b(&v, 1) { } + A() : b(1) { } A(A&& a) = default; - A(const A& a) : v(a.v), b(&v, 1) { } template void serialize(Archive& ar) { @@ -38,11 +35,11 @@ struct A : public ttg::TTValue { }; template -auto make_ttg(bool do_move); +auto make_ttg(); // flows task ids via values template <> -auto make_ttg<1>(bool do_move) { +auto make_ttg<1>() { Edge I2N, N2N; Edge N2S; @@ -57,11 +54,7 @@ auto make_ttg<1>(bool do_move) { //++task_counter; co_await ttg::device::select(value.b); if (key < NUM_TASKS) { - if (do_move) { - co_await ttg::device::forward(ttg::device::send<0>(key+1, std::move(value))); - } else { - co_await ttg::device::forward(ttg::device::send<0>(key+1, value)); - } + co_await ttg::device::forward(ttg::device::send<0>(key+1, std::move(value))); } else { } } , edges(fuse(I2N, N2N)), edges(N2N)); @@ -70,7 +63,7 @@ auto make_ttg<1>(bool do_move) { } template <> -auto make_ttg<2>(bool do_move) { +auto make_ttg<2>() { Edge I2N1, I2N2; Edge N2N1, N2N2; Edge N2S1, N2S2; @@ -83,13 +76,8 @@ auto make_ttg<2>(bool do_move) { auto next = make_tt([=](const int &key, A&& v1, A&& v2) -> ttg::device::Task { co_await ttg::device::select(v1.b, v2.b); if (key < NUM_TASKS) { - if (do_move) { - co_await ttg::device::forward(ttg::device::send<0>(key+1, std::move(v1)), - ttg::device::send<1>(key+1, std::move(v2))); - } else { - co_await ttg::device::forward(ttg::device::send<0>(key+1, v1), - ttg::device::send<1>(key+1, v2)); - } + co_await ttg::device::forward(ttg::device::send<0>(key+1, std::move(v1)), + ttg::device::send<1>(key+1, std::move(v2))); } } , edges(fuse(I2N1, N2N1), fuse(I2N2, N2N2)), edges(N2N1, N2N2)); @@ -97,7 +85,7 @@ auto make_ttg<2>(bool do_move) { } template <> -auto make_ttg<4>(bool do_move) { +auto make_ttg<4>() { Edge I2N1, I2N2, I2N3, I2N4; Edge N2N1, N2N2, N2N3, N2N4; Edge N2S1, N2S2, N2S3, N2S4; @@ -113,17 +101,10 @@ auto make_ttg<4>(bool do_move) { auto next = make_tt([=](const int &key, A&& v1, A&& v2, A&& v3, A&& v4) -> ttg::device::Task { co_await ttg::device::select(v1.b, v2.b, v3.b, v4.b); if (key < NUM_TASKS) { - if (do_move) { - co_await ttg::device::forward(ttg::device::send<0>(key+1, std::move(v1)), - ttg::device::send<1>(key+1, std::move(v2)), - ttg::device::send<2>(key+1, std::move(v3)), - ttg::device::send<3>(key+1, std::move(v4))); - } else { - co_await ttg::device::forward(ttg::device::send<0>(key+1, v1), - ttg::device::send<1>(key+1, v2), - ttg::device::send<2>(key+1, v3), - ttg::device::send<3>(key+1, v4)); - } + co_await ttg::device::forward(ttg::device::send<0>(key+1, std::move(v1)), + ttg::device::send<1>(key+1, std::move(v2)), + ttg::device::send<2>(key+1, std::move(v3)), + ttg::device::send<3>(key+1, std::move(v4))); } }, edges(fuse(I2N1, N2N1), fuse(I2N2, N2N2), fuse(I2N3, N2N3), fuse(I2N4, N2N4)), @@ -133,7 +114,7 @@ auto make_ttg<4>(bool do_move) { } template <> -auto make_ttg<8>(bool do_move) { +auto make_ttg<8>() { Edge I2N1, I2N2, I2N3, I2N4, I2N5, I2N6, I2N7, I2N8; Edge N2N1, N2N2, N2N3, N2N4, N2N5, N2N6, N2N7, N2N8; Edge N2S1, N2S2, N2S3, N2S4, N2S5, N2S6, N2S7, N2S8; @@ -153,25 +134,14 @@ auto make_ttg<8>(bool do_move) { auto next = make_tt([=](const int &key, auto&& v1, auto&& v2, auto&& v3, auto&& v4, auto&& v5, auto&& v6, auto&& v7, auto&& v8) -> ttg::device::Task { co_await ttg::device::select(v1.b, v2.b, v3.b, v4.b, v5.b, v6.b, v7.b, v8.b); if (key < NUM_TASKS) { - if (do_move) { - co_await ttg::device::forward(ttg::device::send<0>(key+1, std::move(v1)), - ttg::device::send<1>(key+1, std::move(v2)), - ttg::device::send<2>(key+1, std::move(v3)), - ttg::device::send<3>(key+1, std::move(v4)), - ttg::device::send<4>(key+1, std::move(v5)), - ttg::device::send<5>(key+1, std::move(v6)), - ttg::device::send<6>(key+1, std::move(v7)), - ttg::device::send<7>(key+1, std::move(v8))); - } else { - co_await ttg::device::forward(ttg::device::send<0>(key+1, v1), - ttg::device::send<1>(key+1, v2), - ttg::device::send<2>(key+1, v3), - ttg::device::send<3>(key+1, v4), - ttg::device::send<4>(key+1, v5), - ttg::device::send<5>(key+1, v6), - ttg::device::send<6>(key+1, v7), - ttg::device::send<7>(key+1, v8)); - } + co_await ttg::device::forward(ttg::device::send<0>(key+1, std::move(v1)), + ttg::device::send<1>(key+1, std::move(v2)), + ttg::device::send<2>(key+1, std::move(v3)), + ttg::device::send<3>(key+1, std::move(v4)), + ttg::device::send<4>(key+1, std::move(v5)), + ttg::device::send<5>(key+1, std::move(v6)), + ttg::device::send<6>(key+1, std::move(v7)), + ttg::device::send<7>(key+1, std::move(v8))); } }, edges(fuse(I2N1, N2N1), fuse(I2N2, N2N2), fuse(I2N3, N2N3), fuse(I2N4, N2N4), fuse(I2N5, N2N5), fuse(I2N6, N2N6), fuse(I2N7, N2N7), fuse(I2N8, N2N8)), edges(N2N1, N2N2, N2N3, N2N4, N2N5, N2N6, N2N7, N2N8)); @@ -181,7 +151,7 @@ auto make_ttg<8>(bool do_move) { // flows task ids via keys template <> -auto make_ttg<0>(bool do_move) { +auto make_ttg<0>() { Edge I2N, N2N; Edge N2S; @@ -198,9 +168,9 @@ auto make_ttg<0>(bool do_move) { } template -void run_bench(bool do_move) +void run_bench() { - auto [init, next] = make_ttg(do_move); + auto [init, next] = make_ttg(); auto connected = make_graph_executable(init.get()); assert(connected); @@ -225,23 +195,18 @@ void run_bench(bool do_move) int main(int argc, char* argv[]) { int num_flows = 0; - int do_move = 1; ttg_initialize(argc, argv, -1); if (argc > 1) { num_flows = std::atoi(argv[1]); } - if (argc > 2) { - do_move = std::atoi(argv[2]); - } - switch(num_flows) { - case 0: run_bench<0>(do_move); break; - case 1: run_bench<1>(do_move); break; - case 2: run_bench<2>(do_move); break; - case 4: run_bench<4>(do_move); break; - case 8: run_bench<8>(do_move); break; + case 0: run_bench<0>(); break; + case 1: run_bench<1>(); break; + case 2: run_bench<2>(); break; + case 4: run_bench<4>(); break; + case 8: run_bench<8>(); break; default: std::cout << "Unsupported number of flows: " << NUM_TASKS << std::endl; } diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index a33bc3c942..c0d056dfa2 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -31,8 +31,10 @@ add_ttg_executable(serialization serialization.cc unit_main.cpp COMPILE_DEFINITIONS $<$:TTG_HAS_BTAS=1>) # Boost serialization test: checks low-level codegen -add_ttg_executable(serialization_boost serialization_boost.cc - LINK_LIBRARIES ttg-serialization-boost RUNTIMES "parsec") +if (TTG_PARSEC_USE_BOOST_SERIALIZATION) + add_ttg_executable(serialization_boost serialization_boost.cc + LINK_LIBRARIES ttg-serialization-boost RUNTIMES "parsec") +endif(TTG_PARSEC_USE_BOOST_SERIALIZATION) # TODO: convert into unit test #if (TARGET MADworld) diff --git a/tests/unit/device_coro.cc b/tests/unit/device_coro.cc index 8ed015aadf..a8eabdaf03 100644 --- a/tests/unit/device_coro.cc +++ b/tests/unit/device_coro.cc @@ -12,21 +12,72 @@ struct value_t { template void serialize(Archive& ar, const unsigned int version) { + serialize(ar); + } + + template + void serialize(Archive& ar) { ar& quark; - ar& db; // input: + ar& db; + } +}; + +struct nested_value_t { + value_t v; + ttg::Buffer db; + + template + void serialize(Archive& ar, const unsigned int version) { + serialize(ar); } + + template + void serialize(Archive& ar) { + ar& v; + ar& db; + } +}; + +struct derived_value_t { + nested_value_t v; }; #ifdef TTG_SERIALIZATION_SUPPORTS_MADNESS -/* devicebuf is non-POD so provide serialization - * information for members not a devicebuf */ -namespace madness::archive { - template - struct ArchiveSerializeImpl { - static inline void serialize(const Archive& ar, value_t& obj) { ar& obj.quark & obj.db; }; - }; -} // namespace madness::archive -#endif // TTG_SERIALIZATION_SUPPORTS_MADNESS +namespace madness { + namespace archive { + + template + struct ArchiveLoadImpl { + static inline void load(const Archive& ar, derived_value_t& v) { + ar& v.v; + } + }; + + template + struct ArchiveStoreImpl { + static inline void store(const Archive& ar, const derived_value_t& v) { + ar& v.v; + } + }; + } // namespace archive +} // namespace madness + +static_assert(madness::is_serializable_v, derived_value_t>); +static_assert(ttg::detail::has_buffer_apply_v); + + +TEST_CASE("Device", "coro") { + SECTION("buffer-inspection") { + value_t v1; + std::size_t i = 0; + ttg::detail::buffer_apply(v1, [&](const ttg::Buffer& b){ i++; }); + CHECK(i == 1); + + nested_value_t v2; + i = 0; + ttg::detail::buffer_apply(v2, [&](const ttg::Buffer& b){ i++; }); + } +} #if defined(TTG_HAVE_DEVICE) && defined(TTG_IMPL_DEVICE_SUPPORT) @@ -449,3 +500,5 @@ TEST_CASE("Device", "coro") { } #endif // TTG_IMPL_DEVICE_SUPPORT + +#endif // TTG_SERIALIZATION_SUPPORTS_MADNESS diff --git a/ttg/CMakeLists.txt b/ttg/CMakeLists.txt index 27cb08526d..64f167c8f8 100644 --- a/ttg/CMakeLists.txt +++ b/ttg/CMakeLists.txt @@ -171,7 +171,7 @@ if (TARGET MADworld) list(APPEND ttg-serialization-deps MADworld) list(APPEND ttg-serialization-compile-definitions TTG_SERIALIZATION_SUPPORTS_MADNESS=1) endif(TARGET MADworld) -if (TARGET Boost::serialization) +if (TTG_PARSEC_USE_BOOST_SERIALIZATION AND TARGET Boost::serialization) list(APPEND ttg-serialization-deps Boost::serialization) list(APPEND ttg-serialization-boost-deps Boost::serialization) if (TARGET Boost::iostreams) # using modularized Boost? @@ -179,7 +179,7 @@ if (TARGET Boost::serialization) list(APPEND ttg-serialization-boost-deps Boost::iostreams) endif() list(APPEND ttg-serialization-compile-definitions TTG_SERIALIZATION_SUPPORTS_BOOST=1) -endif (TARGET Boost::serialization) +endif (TTG_PARSEC_USE_BOOST_SERIALIZATION AND TARGET Boost::serialization) add_ttg_library(ttg-serialization "${ttg-serialization-sources}" @@ -195,13 +195,13 @@ if (TARGET MADworld) COMPILE_DEFINITIONS "TTG_SERIALIZATION_SUPPORTS_MADNESS=1") endif(TARGET MADworld) # make boost-only serialization target -if (TARGET Boost::serialization) +if (TTG_PARSEC_USE_BOOST_SERIALIZATION AND TARGET Boost::serialization) add_ttg_library(ttg-serialization-boost "${ttg-serialization-sources}" PUBLIC_HEADER "${ttg-serialization-headers}" LINK_LIBRARIES "${ttg-serialization-boost-deps}" COMPILE_DEFINITIONS "TTG_SERIALIZATION_SUPPORTS_BOOST=1") -endif(TARGET Boost::serialization) +endif(TTG_PARSEC_USE_BOOST_SERIALIZATION AND TARGET Boost::serialization) ######################### ####### MADNESS-specific @@ -237,6 +237,7 @@ if (TARGET PaRSEC::parsec) ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/import.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/ptr.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/parsec-ext.h + ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/parsec_data.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/task.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/thread_local.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/ttg.h diff --git a/ttg/ttg/buffer.h b/ttg/ttg/buffer.h index 59a9264155..8f664215f7 100644 --- a/ttg/ttg/buffer.h +++ b/ttg/ttg/buffer.h @@ -5,6 +5,8 @@ #include "ttg/fwd.h" #include "ttg/util/meta.h" +#include "ttg/serialization.h" +#include namespace ttg { @@ -30,6 +32,130 @@ namespace meta { } // namespace meta +namespace detail { + /** + * Type traits to check whether we can use serialization + * to inspect the buffers owned by an object passing + * through a task graph. + */ + template + struct has_buffer_apply_helper : std::false_type + { }; + + template + struct has_buffer_apply : has_buffer_apply_helper + { }; + + template + constexpr const bool has_buffer_apply_v = has_buffer_apply::value; + +} // namespace detail + } // namespace ttg -#endif // TTG_buffer_H \ No newline at end of file + + +#ifdef TTG_SERIALIZATION_SUPPORTS_MADNESS +#include + +namespace madness { + namespace archive { + template + struct BufferVisitorArchive : public madness::archive::BaseOutputArchive { + private: + Fn m_fn; + + public: + template + BufferVisitorArchive(_Fn&& fn) + : m_fn(fn) + { } + + /// Stores (counts) data into the memory buffer. + + /// The function only appears (due to \c enable_if) if \c T is + /// serializable. + /// \tparam T Type of the data to be stored (counted). + /// \param[in] t Pointer to the data to be stored (counted). + /// \param[in] n Size of data to be stored (counted). + template + void store(const ttg::Buffer* t, long n) const { + /* invoke the function on each buffer */ + for (std::size_t i = 0; i < n; ++i) { + m_fn(t[i]); + } + } + + template + void store(const T* t, long n) const { + /* nothing to be done for other types */ + } + + /// Open a buffer with a specific size. + void open(std::size_t /*hint*/) {} + + /// Close the archive. + void close() {} + + /// Flush the archive. + void flush() {} + + /// Return the amount of data stored (counted) in the buffer. + /// \return The amount of data stored (counted) in the buffer (zero). + std::size_t size() const { + return 0; + }; + }; + + /* deduction guide */ + template + BufferVisitorArchive(Fn&&) -> BufferVisitorArchive; + } // namespace archive + + template + struct is_archive> : std::true_type {}; + + template + struct is_output_archive> : std::true_type {}; + + template + struct is_default_serializable_helper, T, + std::enable_if_t::value>> + : std::true_type {}; + + template + struct is_default_serializable_helper, ttg::Buffer> + : std::true_type {}; +} // namespace madness + +namespace ttg::detail { + template + requires(madness::is_serializable_v, std::decay>) + void buffer_apply(T&& t, Fn&& fn) { + madness::archive::BufferVisitorArchive ar(std::forward(fn)); + ar & t; + } + + /* dummy function type used to check whether buffer_apply is available */ + using buffer_apply_dummy_fn = decltype([](const ttg::Buffer&){}); + + template + struct has_buffer_apply_helper, std::decay_t>>> + : std::true_type + { }; + +} // namespace ttg::detail + +#else + +namespace ttg::detail { + template + void buffer_apply(T&& t, Fn&& fn) { + static_assert(ttg::meta::is_void_v, "Types using ttg::Buffer must be MADNESS serializable."); + } + +} // namespace ttg::detail + +#endif // TTG_SERIALIZATION_SUPPORTS_MADNESS + +#endif // TTG_buffer_H diff --git a/ttg/ttg/devicescope.h b/ttg/ttg/devicescope.h index 594e6db0b3..7184dafff9 100644 --- a/ttg/ttg/devicescope.h +++ b/ttg/ttg/devicescope.h @@ -5,6 +5,7 @@ namespace ttg { enum class scope { Allocate = 0x0, //< memory allocated as scratch, but not moved in or out SyncIn = 0x2, //< memory allocated as scratch and data transferred to device + Invalid = 0xF, //< invalid scope }; } // namespace ttg diff --git a/ttg/ttg/edge.h b/ttg/ttg/edge.h index 9121a503f2..8eed2e3367 100644 --- a/ttg/ttg/edge.h +++ b/ttg/ttg/edge.h @@ -178,6 +178,14 @@ namespace ttg { typedef std::tuple type; }; + template + struct edges_to_output_value_types; + + template + struct edges_to_output_value_types> { + typedef std::tuple type; + }; + namespace detail { template struct edges_tuple; diff --git a/ttg/ttg/madness/buffer.h b/ttg/ttg/madness/buffer.h index fba1ee9e15..a532d9965d 100644 --- a/ttg/ttg/madness/buffer.h +++ b/ttg/ttg/madness/buffer.h @@ -4,6 +4,7 @@ #include "ttg/serialization/traits.h" #include "ttg/device/device.h" +#include namespace ttg_madness { @@ -23,8 +24,8 @@ struct Buffer : private Allocator { private: using delete_fn_t = std::function; - using host_data_ptr = std::add_pointer_t; + std::shared_ptr m_sptr; // to capture smart pointers host_data_ptr m_host_data = nullptr; std::size_t m_count = 0; bool m_owned= false; @@ -48,7 +49,7 @@ struct Buffer : private Allocator { Buffer() : Buffer(nullptr, 0) { } - Buffer(std::size_t n) + Buffer(std::size_t n, ttg::scope scope = ttg::scope::SyncIn) : allocator_type() , m_host_data(allocate(n)) , m_count(n) @@ -58,9 +59,22 @@ struct Buffer : private Allocator { /* Constructing a buffer using application-managed memory. * The memory pointed to by ptr must be accessible during * the life-time of the buffer. */ - Buffer(element_type* ptr, std::size_t n = 1) + template + Buffer(std::unique_ptr ptr, std::size_t n, ttg::scope scope = ttg::scope::SyncIn) + : allocator_type() + , m_sptr(std::move(ptr)) + , m_host_data(m_sptr.get()) + , m_count(n) + , m_owned(false) + { } + + /* Constructing a buffer using application-managed memory. + * The memory pointed to by ptr must be accessible during + * the life-time of the buffer. */ + Buffer(std::shared_ptr ptr, std::size_t n, ttg::scope scope = ttg::scope::SyncIn) : allocator_type() - , m_host_data(ptr) + , m_sptr(std::move(ptr)) + , m_host_data(m_sptr.get()) , m_count(n) , m_owned(false) { } @@ -227,7 +241,7 @@ struct Buffer : private Allocator { } /* Reallocate the buffer with count elements */ - void reset(std::size_t n) { + void reset(std::size_t n, ttg::scope scope = ttg::scope::SyncIn) { if (m_owned) { deallocate(); @@ -244,46 +258,27 @@ struct Buffer : private Allocator { m_count = n; } - /* Reset the buffer to use the ptr to count elements */ - void reset(T* ptr, std::size_t n = 1) { - /* TODO: can we resize if count is smaller than m_count? */ - if (n == m_count) { - return; - } + /* Reallocate the buffer with count elements */ + void reset(std::shared_ptr ptr, std::size_t n, ttg::scope scope = ttg::scope::SyncIn) { if (m_owned) { deallocate(); - } - - if (nullptr == ptr) { - m_host_data = nullptr; - m_count = 0; - m_owned = false; - } else { - m_host_data = ptr; - m_count = n; m_owned = false; } - } - /* serialization support */ + m_sptr = std::move(ptr); + m_host_data = m_sptr.get(); + m_count = n; + } -#if defined(TTG_SERIALIZATION_SUPPORTS_BOOST) && 0 - template - void serialize(Archive& ar, const unsigned int version) { - if constexpr (ttg::detail::is_output_archive_v) { - std::size_t s = size(); - ar& s; - /* TODO: how to serialize the array? */ - } else { - std::size_t s; - ar & s; - /* initialize internal pointers and then reset */ - reset(s); - /* TODO: how to deserialize the array? */ - } + /** + * Resets the scope of the buffer. Ignored in MADNESS. + */ + void reset_scope(ttg::scope scope) { + /* nothing to do here */ } -#endif // TTG_SERIALIZATION_SUPPORTS_BOOST + + /* serialization support */ #if defined(TTG_SERIALIZATION_SUPPORTS_MADNESS) template @@ -293,12 +288,12 @@ struct Buffer : private Allocator { if constexpr (ttg::detail::is_output_archive_v) { std::size_t s = size(); ar& s; - ar << wrap(host_ptr(), s); + ar << madness::archive::wrap(host_ptr(), s); } else { std::size_t s; ar & s; reset(s); - ar >> wrap(host_ptr(), s); // MatrixTile(bm.rows(), bm.cols()); + ar >> madness::archive::wrap(host_ptr(), s); // MatrixTile(bm.rows(), bm.cols()); } } #endif // TTG_SERIALIZATION_SUPPORTS_MADNESS diff --git a/ttg/ttg/madness/devicefunc.h b/ttg/ttg/madness/devicefunc.h index 563b433cf4..23bd167ad5 100644 --- a/ttg/ttg/madness/devicefunc.h +++ b/ttg/ttg/madness/devicefunc.h @@ -1,6 +1,8 @@ #ifndef TTG_MAD_DEVICEFUNC_H #define TTG_MAD_DEVICEFUNC_H +#include "ttg/devicescope.h" + #include "ttg/madness/buffer.h" namespace ttg_madness { diff --git a/ttg/ttg/parsec/buffer.h b/ttg/ttg/parsec/buffer.h index 7bcccbfd54..62be4270ce 100644 --- a/ttg/ttg/parsec/buffer.h +++ b/ttg/ttg/parsec/buffer.h @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -24,6 +25,155 @@ namespace detail { // fwd decl template parsec_data_t* get_parsec_data(const ttg_parsec::Buffer& db); + + template + struct empty_allocator { + + using value_type = std::decay_t; + + value_type* allocate(std::size_t size) { + throw std::runtime_error("Allocate on empty allocator!"); + } + + void deallocate(value_type* ptr, std::size_t size) { + /* nothing to be done; will be called from ~data_copy_type() */ + } + }; + + /* overloads for pointers and smart pointers */ + template + inline T* to_address(T* ptr) { + return ptr; + } + + template + inline auto to_address(T&& ptr) { + return ptr.get(); // smart pointer + } + + /** + * Wrapper type to carry the Allocator into types that are using + * the PaRSEC object system. + */ + template + struct ttg_parsec_data_types { + using allocator_traits = std::allocator_traits; + using allocator_type = typename allocator_traits::allocator_type; + using value_type = typename allocator_traits::value_type; + + /* used as a hook into the PaRSEC object management system + * so we can release the memory back to the allocator once + * data copy is destroyed */ + struct data_copy_type : public parsec_data_copy_t + { + private: + [[no_unique_address]] + allocator_type m_allocator; + PtrT m_ptr; // keep a reference if PtrT is a shared_ptr + std::size_t m_size; + + void allocate(std::size_t size) { + if constexpr (std::is_pointer_v) { + m_ptr = allocator_traits::allocate(m_allocator, size); + } + this->device_private = m_ptr; + m_size = size; + } + + void deallocate() { + allocator_traits::deallocate(m_allocator, static_cast(this->device_private), this->m_size); + this->device_private = nullptr; + this->m_size = 0; + } + + public: + + /* default construction and move, but not copy */ + data_copy_type() = default; + data_copy_type(const data_copy_type&) = delete; + data_copy_type(data_copy_type&&) = default; + data_copy_type& operator=(const data_copy_type&) = delete; + data_copy_type& operator=(data_copy_type&&) = default; + + void construct(PtrT ptr, std::size_t size) { + m_allocator = allocator_type{}; + constexpr const bool is_empty_allocator = std::is_same_v>; + assert(is_empty_allocator); + m_ptr = std::move(ptr); + this->device_private = const_cast(to_address(m_ptr)); + } + + void construct(std::size_t size, + const allocator_type& alloc = allocator_type()) { + constexpr const bool is_empty_allocator = std::is_same_v>; + assert(!is_empty_allocator); + m_allocator = alloc; + allocate(size); + this->device_private = m_ptr; + } + + ~data_copy_type() { + this->deallocate(); + } + }; + + /** + * Create the PaRSEC object infrastructure for the data copy type + */ + static void data_copy_construct(data_copy_type* obj) + { + /* placement new */ + new(obj)(data_copy_type); + } + + static void data_copy_destruct(data_copy_type* obj) + { + obj->~data_copy_type(); // call destructor + } + + inline static PARSEC_OBJ_CLASS_INSTANCE(data_copy_type, parsec_data_copy_t, + data_copy_construct, + data_copy_destruct); + + static parsec_data_t * create_data(std::size_t size, ttg::scope scope, + const allocator_type& allocator = allocator_type()) { + parsec_data_t *data = PARSEC_OBJ_NEW(parsec_data_t); + data->owner_device = 0; + data->nb_elts = size*sizeof(value_type); + + /* create the host copy and allocate host memory */ + data_copy_type *copy = PARSEC_OBJ_NEW(data_copy_type); + copy->construct(size, allocator); + parsec_data_copy_attach(data, copy, 0); + + /* adjust data flags */ + data->device_copies[0]->flags |= PARSEC_DATA_FLAG_PARSEC_MANAGED; + data->device_copies[0]->coherency_state = PARSEC_DATA_COHERENCY_SHARED; + /* setting version to 0 causes data not to be sent to the device */ + data->device_copies[0]->version = (scope == ttg::scope::SyncIn) ? 1 : 0; + + return data; + } + + static parsec_data_t * create_data(PtrT& ptr, std::size_t size, ttg::scope scope) { + parsec_data_t *data = PARSEC_OBJ_NEW(parsec_data_t); + data->owner_device = 0; + data->nb_elts = size*sizeof(value_type); + + /* create the host copy and allocate host memory */ + data_copy_type *copy = PARSEC_OBJ_NEW(data_copy_type); + copy->construct(std::move(ptr), size); + parsec_data_copy_attach(data, copy, 0); + + /* adjust data flags */ + data->device_copies[0]->flags |= PARSEC_DATA_FLAG_PARSEC_MANAGED; + data->device_copies[0]->coherency_state = PARSEC_DATA_COHERENCY_SHARED; + /* setting version to 0 causes data not to be sent to the device */ + data->device_copies[0]->version = (scope == ttg::scope::SyncIn) ? 1 : 0; + + return data; + } + }; } // namespace detail /** @@ -38,8 +188,7 @@ namespace detail { * tracking of the containing object. */ template -struct Buffer : public detail::ttg_parsec_data_wrapper_t - , private Allocator { +struct Buffer { /* TODO: add overloads for T[]? */ using value_type = std::remove_all_extents_t; @@ -47,110 +196,68 @@ struct Buffer : public detail::ttg_parsec_data_wrapper_t using const_pointer_type = const std::remove_const_t*; using element_type = std::decay_t; - using allocator_traits = std::allocator_traits; - using allocator_type = typename allocator_traits::allocator_type; - static_assert(std::is_trivially_copyable_v, "Only trivially copyable types are supported for devices."); private: - using delete_fn_t = std::function; - - using host_data_ptr = std::add_pointer_t; - host_data_ptr m_host_data = nullptr; + parsec_data_t *m_data = nullptr; std::size_t m_count = 0; - bool m_owned= false; - - static void delete_non_owned(element_type *ptr) { - // nothing to be done, we don't own the memory - } - friend parsec_data_t* detail::get_parsec_data(const ttg_parsec::Buffer&); + friend parsec_data_t* detail::get_parsec_data(const ttg_parsec::Buffer&); - allocator_type& get_allocator_reference() { return static_cast(*this); } - - element_type* allocate(std::size_t n) { - return allocator_traits::allocate(get_allocator_reference(), n); - } - void deallocate() { - allocator_traits::deallocate(get_allocator_reference(), m_host_data, m_count); + void release_data() { + if (nullptr == m_data) return; + /* discard the parsec data so it can be collected by the runtime + * and the buffer be free'd in the parsec_data_copy_t destructor */ + parsec_data_discard(m_data); + /* set data to null so we don't repeat the above */ + m_data = nullptr; } public: - Buffer() - : ttg_parsec_data_wrapper_t() - , allocator_type() - , m_host_data(nullptr) - , m_count(0) - , m_owned(false) - { } + Buffer() = default; - /** - * Allocates n elements, unitialized - * By default, data is synchronized to the device, allowing codes - * to fill the buffer before making it available on the device. - * Passing ttg::scope::Allocate will prevent the initial synchronization. - * Subsequent data transfers behave as expected (i.e., data is transferred - * to the host and other devices as needed). - */ Buffer(std::size_t n, ttg::scope scope = ttg::scope::SyncIn) - : ttg_parsec_data_wrapper_t() - , allocator_type() - , m_host_data(allocate(n)) + : m_data(detail::ttg_parsec_data_types::create_data(n, scope)) , m_count(n) - , m_owned(true) - { - //std::cout << "buffer " << this << " ctor count " - // << m_count << "(" << m_host_data << ") ttg_copy " - // << m_ttg_copy - // << " parsec_data " << m_data.get() << std::endl; - this->reset_parsec_data(m_host_data, n*sizeof(element_type), (scope == ttg::scope::SyncIn)); - } + { } /** * Constructing a buffer using application-managed memory. - * The memory pointed to by ptr must be accessible during - * the life-time of the buffer. - * - * Passing ttg::scope::Allocate will prevent the initial synchronization. - * Subsequent data transfers behave as expected (i.e., data is transferred - * to the host and other devices as needed). + * The shared_ptr will ensure that the memory is not free'd before + * the runtime has released all of its references. */ - Buffer(pointer_type ptr, std::size_t n = 1, ttg::scope scope = ttg::scope::SyncIn) - : ttg_parsec_data_wrapper_t() - , allocator_type() - , m_host_data(const_cast(ptr)) + Buffer(std::shared_ptr ptr, std::size_t n, + ttg::scope scope = ttg::scope::SyncIn) + : m_data(detail::ttg_parsec_data_types, + detail::empty_allocator> + ::create_data(ptr, n, scope)) , m_count(n) - , m_owned(false) - { - //std::cout << "buffer " << this << " ctor ptr " << ptr << "count " - // << m_count << "(" << m_host_data << ") ttg_copy " - // << m_ttg_copy - // << " parsec_data " << m_data.get() << std::endl; - this->reset_parsec_data(m_host_data, n*sizeof(element_type), (scope == ttg::scope::SyncIn)); - } + { } + + template + Buffer(std::unique_ptr ptr, std::size_t n, + ttg::scope scope = ttg::scope::SyncIn) + : m_data(detail::ttg_parsec_data_types, + detail::empty_allocator> + ::create_data(ptr, n, scope)) + , m_count(n) + { } virtual ~Buffer() { - if (m_owned) { - deallocate(); - m_owned = false; - } unpin(); // make sure the copies are not pinned + release_data(); } /* allow moving device buffers */ Buffer(Buffer&& db) - : ttg_parsec_data_wrapper_t(std::move(db)) - , allocator_type(std::move(db)) - , m_host_data(db.m_host_data) + : m_data(db.m_data) , m_count(db.m_count) - , m_owned(db.m_owned) { - db.m_host_data = nullptr; + db.m_data = nullptr; db.m_count = 0; - db.m_owned = false; } /* explicitly disable copying of buffers @@ -160,11 +267,8 @@ struct Buffer : public detail::ttg_parsec_data_wrapper_t /* allow moving device buffers */ Buffer& operator=(Buffer&& db) { - ttg_parsec_data_wrapper_t::operator=(std::move(db)); - allocator_type::operator=(std::move(db)); - std::swap(m_host_data, db.m_host_data); + std::swap(m_data, db.m_data); std::swap(m_count, db.m_count); - std::swap(m_owned, db.m_owned); //std::cout << "buffer " << this << " other " << &db << " mv op ttg_copy " << m_ttg_copy << std::endl; //std::cout << "buffer::move-assign from " << &db << " ttg-copy " << db.m_ttg_copy // << " to " << this << " ttg-copy " << m_ttg_copy @@ -252,7 +356,7 @@ struct Buffer : public detail::ttg_parsec_data_wrapper_t assert(is_valid()); if (empty()) return nullptr; int device_id = detail::ttg_device_to_parsec_device(device); - return static_cast(parsec_data_get_ptr(m_data.get(), device_id)); + return static_cast(parsec_data_get_ptr(m_data, device_id)); } /* get the device pointer at the given device @@ -261,23 +365,21 @@ struct Buffer : public detail::ttg_parsec_data_wrapper_t assert(is_valid()); if (empty()) return nullptr; int device_id = detail::ttg_device_to_parsec_device(device); - return static_cast(parsec_data_get_ptr(m_data.get(), device_id)); + return static_cast(parsec_data_get_ptr(m_data, device_id)); } pointer_type host_ptr() { - if (empty()) return nullptr; - return static_cast(parsec_data_get_ptr(m_data.get(), 0)); + return static_cast(parsec_data_get_ptr(m_data, 0)); } const_pointer_type host_ptr() const { - if (empty()) return nullptr; - return static_cast(parsec_data_get_ptr(m_data.get(), 0)); + return static_cast(parsec_data_get_ptr(m_data, 0)); } bool is_valid_on(const ttg::device::Device& device) const { assert(is_valid()); int device_id = detail::ttg_device_to_parsec_device(device); - return (parsec_data_get_ptr(m_data.get(), device_id) != nullptr); + return (parsec_data_get_ptr(m_data, device_id) != nullptr); } void allocate_on(const ttg::device::Device& device_id) { @@ -332,51 +434,19 @@ struct Buffer : public detail::ttg_parsec_data_wrapper_t /* Reallocate the buffer with count elements */ void reset(std::size_t n, ttg::scope scope = ttg::scope::SyncIn) { - /* TODO: can we resize if count is smaller than m_count? */ - - if (m_owned) { - deallocate(); - m_owned = false; - } - - if (n == 0) { - m_host_data = nullptr; - m_owned = false; - } else { - m_host_data = allocate(n); - m_owned = true; - } - reset_parsec_data(m_host_data, n*sizeof(element_type), (scope == ttg::scope::SyncIn)); - //std::cout << "buffer::reset(" << count << ") ptr " << m_host_data.get() - // << " ttg_copy " << m_ttg_copy - // << " parsec_data " << m_data.get() << std::endl; + if (n == m_count) return; + release_data(); + m_data = detail::ttg_parsec_data_types::create_data(n, scope); m_count = n; } - /* Reset the buffer to use the ptr to count elements */ - void reset(pointer_type ptr, std::size_t n = 1, ttg::scope scope = ttg::scope::SyncIn) { - /* TODO: can we resize if count is smaller than m_count? */ - if (n == m_count) { - return; - } - - if (m_owned) { - deallocate(); - } - - if (nullptr == ptr) { - m_host_data = nullptr; - m_count = 0; - m_owned = false; - } else { - m_host_data = ptr; - m_count = n; - m_owned = false; - } - reset_parsec_data(m_host_data, n*sizeof(element_type), (scope == ttg::scope::SyncIn)); - //std::cout << "buffer::reset(" << ptr << ", " << count << ") ptr " << m_host_data.get() - // << " ttg_copy " << m_ttg_copy - // << " parsec_data " << m_data.get() << std::endl; + /* Reallocate the buffer with count elements */ + void reset(std::shared_ptr ptr, std::size_t n, ttg::scope scope = ttg::scope::SyncIn) { + release_data(); + m_data = detail::ttg_parsec_data_types, + detail::empty_allocator> + ::create_data(ptr, n, scope); + m_count = n; } /** @@ -403,14 +473,15 @@ struct Buffer : public detail::ttg_parsec_data_wrapper_t ttg::scope scope() const { /* if the host owns the data and has a version of zero we only have to allocate data */ + if (nullptr != m_data) return ttg::scope::Invalid; return (m_data->device_copies[0]->version == 0 && m_data->owner_device == 0) ? ttg::scope::Allocate : ttg::scope::SyncIn; } void prefer_device(ttg::device::Device dev) { /* only set device if the host has the latest copy as otherwise we might end up with a stale copy */ - if (dev.is_device() && this->parsec_data()->owner_device == 0) { - parsec_advise_data_on_device(this->parsec_data(), detail::ttg_device_to_parsec_device(dev), + if (dev.is_device() && m_data->owner_device == 0) { + parsec_advise_data_on_device(m_data, detail::ttg_device_to_parsec_device(dev), PARSEC_DEV_DATA_ADVICE_PREFERRED_DEVICE); } } @@ -422,7 +493,7 @@ struct Buffer : public detail::ttg_parsec_data_wrapper_t add_copy(detail::ttg_device_to_parsec_device(dev), ptr); if (is_current) { // mark the data as being current on the new device - parsec_data()->owner_device = detail::ttg_device_to_parsec_device(dev); + m_data->owner_device = detail::ttg_device_to_parsec_device(dev); } } @@ -434,15 +505,11 @@ struct Buffer : public detail::ttg_parsec_data_wrapper_t if constexpr (ttg::detail::is_output_archive_v) { std::size_t s = size(); ar& s; - assert(m_ttg_copy != nullptr); // only tracked objects allowed - m_ttg_copy->iovec_add(ttg::iovec{s*sizeof(T), current_device_ptr()}); } else { std::size_t s; ar & s; /* initialize internal pointers and then reset */ reset(s); - assert(m_ttg_copy != nullptr); // only tracked objects allowed - m_ttg_copy->iovec_add(ttg::iovec{s*sizeof(T), current_device_ptr()}); } } #endif // TTG_SERIALIZATION_SUPPORTS_BOOST @@ -455,21 +522,19 @@ struct Buffer : public detail::ttg_parsec_data_wrapper_t if constexpr (ttg::detail::is_output_archive_v) { std::size_t s = size(); ar& s; - assert(m_ttg_copy != nullptr); // only tracked objects allowed - /* transfer from the current device - * note: if the transport layer (MPI) does not support device transfers - * the data will have been pushed out */ - m_ttg_copy->iovec_add(ttg::iovec{s*sizeof(T), current_device_ptr()}); } else { std::size_t s; ar & s; - //std::cout << "serialize(IN) buffer " << this << " size " << s << std::endl; - /* initialize internal pointers and then reset */ - reset(s); - assert(m_ttg_copy != nullptr); // only tracked objects allowed - /* transfer to the current device - * TODO: how can we make sure the device copy is not evicted? */ - m_ttg_copy->iovec_add(ttg::iovec{s*sizeof(T), current_device_ptr()}); + /* if we have been initialized already we just make sure the size matches */ + if (m_data != nullptr) { + if (s != size()) { + throw std::runtime_error("Buffer size mismatch in deserialization!"); + } + } else { + //std::cout << "serialize(IN) buffer " << this << " size " << s << std::endl; + /* initialize internal pointers and then reset */ + reset(s); + } } } #endif // TTG_SERIALIZATION_SUPPORTS_MADNESS @@ -478,10 +543,10 @@ struct Buffer : public detail::ttg_parsec_data_wrapper_t namespace detail { template parsec_data_t* get_parsec_data(const ttg_parsec::Buffer& db) { - return const_cast(db.m_data.get()); + return const_cast(db.m_data); } } // namespace detail } // namespace ttg_parsec -#endif // TTG_PARSEC_BUFFER_H \ No newline at end of file +#endif // TTG_PARSEC_BUFFER_H diff --git a/ttg/ttg/parsec/devicescratch.h b/ttg/ttg/parsec/devicescratch.h index e2c3743aa3..085d92caea 100644 --- a/ttg/ttg/parsec/devicescratch.h +++ b/ttg/ttg/parsec/devicescratch.h @@ -50,19 +50,6 @@ struct devicescratch { return data; } - void remove_from_flow() { - /* remove the scratch from the gpu-task flow */ - assert(nullptr != detail::parsec_ttg_caller); - parsec_task_t *parsec_task = &detail::parsec_ttg_caller->parsec_task; - parsec_flow_t *flows = detail::parsec_ttg_caller->dev_ptr->flows; - for (int i = 0; i < MAX_PARAM_COUNT; ++i) { - if (nullptr != parsec_task->data[i].data_in && parsec_task->data[i].data_in->original == m_data) { - flows[i].flow_flags = PARSEC_FLOW_ACCESS_NONE; // disable this flow - break; - } - } - } - friend parsec_data_t* detail::get_parsec_data(const ttg_parsec::devicescratch&); public: @@ -94,14 +81,9 @@ struct devicescratch { ~devicescratch() { /* remove data from flow */ - //remove_from_flow(); if (nullptr != m_data) { - //parsec_data_destroy(m_data); - //parsec_data_copy_detach(m_data, parsec_data_get_copy(m_data, 0), 0); - //auto *copy = parsec_data_get_copy(m_data, 0); - //PARSEC_OBJ_RELEASE(copy); + parsec_data_discard(m_data); } - //parsec_data_destroy(m_data); m_data = nullptr; } diff --git a/ttg/ttg/parsec/parsec_data.h b/ttg/ttg/parsec/parsec_data.h new file mode 100644 index 0000000000..ab9dba219a --- /dev/null +++ b/ttg/ttg/parsec/parsec_data.h @@ -0,0 +1,22 @@ +#ifndef TTG_PARSEC_PARSEC_DATA_H +#define TTG_PARSEC_PARSEC_DATA_H + +#include "ttg/parsec/buffer.h" +#include "ttg/buffer.h" + +namespace ttg_parsec::detail { + template + void foreach_parsec_data(Value&& value, Fn&& fn) { + /* protect for non-serializable types, allowed if the TT has no device op */ + if constexpr (ttg::detail::has_buffer_apply_v) { + ttg::detail::buffer_apply(value, [&](B&& b){ + parsec_data_t *data = detail::get_parsec_data(b); + if (nullptr != data) { + fn(data); + } + }); + } + } +} // namespace ttg_parsec::detail + +#endif // TTG_PARSEC_PARSEC_DATA_H \ No newline at end of file diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index c21df816f3..ae0bf557b7 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -48,6 +48,7 @@ #include "ttg/parsec/devicefunc.h" #include "ttg/parsec/ttvalue.h" #include "ttg/device/task.h" +#include "ttg/parsec/parsec_data.h" #include #include @@ -120,8 +121,6 @@ #include "ttg/device/device.h" -#include - #undef TTG_PARSEC_DEBUG_TRACK_DATA_COPIES /* PaRSEC function declarations */ @@ -744,18 +743,24 @@ namespace ttg_parsec { } #endif // 0 - template - inline void transfer_ownership_impl(ttg_data_copy_t *copy, int device) { - if constexpr(!std::is_const_v>) { - copy->transfer_ownership(PARSEC_FLOW_ACCESS_RW, device); - copy->inc_current_version(); + template + inline void transfer_ownership_impl(T&& arg, int device) { + if constexpr(!std::is_const_v>) { + detail::foreach_parsec_data(arg, [&](parsec_data_t *data){ + parsec_data_transfer_ownership_to_copy(data, device, PARSEC_FLOW_ACCESS_RW); + /* make sure we increment the version since we will modify the data */ + data->device_copies[0]->version++; + }); } } template inline void transfer_ownership(parsec_ttg_task_t *me, int device, std::index_sequence) { /* transfer ownership of each data */ - int junk[] = {0, (transfer_ownership_impl(me->copies[Is], device), 0)...}; + int junk[] = {0, + (transfer_ownership_impl( + *reinterpret_cast> *>( + me->copies[Is]->get_ptr()), device), 0)...}; junk[0]++; } @@ -1035,7 +1040,7 @@ namespace ttg_parsec { } } else { - throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id>().pretty_name() + " but the type is not copyable"); + throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + typeid(std::decay_t).name() + " but the type is not copyable"); } } return copy_res; @@ -1218,25 +1223,35 @@ namespace ttg_parsec { public: /// @return true if derivedT::have_cuda_op exists and is defined to true + template static constexpr bool derived_has_cuda_op() { return Space == ttg::ExecutionSpace::CUDA; } /// @return true if derivedT::have_hip_op exists and is defined to true + template static constexpr bool derived_has_hip_op() { return Space == ttg::ExecutionSpace::HIP; } /// @return true if derivedT::have_hip_op exists and is defined to true + template static constexpr bool derived_has_level_zero_op() { return Space == ttg::ExecutionSpace::L0; } /// @return true if the TT supports device execution + template static constexpr bool derived_has_device_op() { - return (derived_has_cuda_op() || derived_has_hip_op() || derived_has_level_zero_op()); + return (derived_has_cuda_op() || + derived_has_hip_op() || + derived_has_level_zero_op()); } + static_assert(!derived_has_device_op() || ttg::meta::probe_all_v>, + "Data sent from a device-capable template task must be serializable."); + using ttT = TT; using key_type = keyT; using input_terminals_type = ttg::detail::input_terminals_tuple_t; @@ -1919,11 +1934,6 @@ namespace ttg_parsec { uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy = nullptr) { using dd_t = ttg::default_data_descriptor>; uint64_t payload_size = dd_t::payload_size(&obj); - if (copy) { - /* reset any tracked data, we don't care about the packing from the payload size */ - copy->iovec_reset(); - } - if constexpr (!dd_t::serialize_size_is_const) { pos = ttg::default_data_descriptor::pack_payload(&payload_size, sizeof(uint64_t), pos, bytes); } @@ -2016,7 +2026,7 @@ namespace ttg_parsec { if (std::size(keylist) == 1) set_arg_local_impl(key, std::move(*reinterpret_cast(copy->get_ptr())), copy, &task_ring); else { - throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id>().pretty_name() + " but the type is not copyable"); + throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + typeid(std::decay_t).name() + " but the type is not copyable"); } } } @@ -2112,8 +2122,6 @@ namespace ttg_parsec { #endif // 0 /* unpack the object, potentially discovering iovecs */ pos = unpack(*static_cast(copy->get_ptr()), msg->bytes, pos); - //std::cout << "set_arg_from_msg iovec_begin num_iovecs " << num_iovecs << " distance " << std::distance(copy->iovec_begin(), copy->iovec_end()) << std::endl; - assert(std::distance(copy->iovec_begin(), copy->iovec_end()) == num_iovecs); } if (num_iovecs == 0) { @@ -2130,67 +2138,79 @@ namespace ttg_parsec { bool inline_data = msg->tt_id.inline_data; int nv = 0; + parsec_ce_tag_t cbtag; /* start the RMA transfers */ - auto handle_iovecs_fn = - [&](auto&& iovecs) { - - if (inline_data) { - /* unpack the data from the message */ - for (auto &&iov : iovecs) { - ++nv; - std::memcpy(iov.data, msg->bytes + pos, iov.num_bytes); - pos += iov.num_bytes; - } - } else { - /* extract the callback tag */ - parsec_ce_tag_t cbtag; - std::memcpy(&cbtag, msg->bytes + pos, sizeof(cbtag)); - pos += sizeof(cbtag); - - /* create the value from the metadata */ - auto activation = new detail::rma_delayed_activate( - std::move(keylist), copy, num_iovecs, [this](std::vector &&keylist, detail::ttg_data_copy_t *copy) { - set_arg_from_msg_keylist(keylist, copy); - this->world.impl().decrement_inflight_msg(); - }); - - using ActivationT = std::decay_t; - - for (auto &&iov : iovecs) { - ++nv; - parsec_ce_mem_reg_handle_t rreg; - int32_t rreg_size_i; - std::memcpy(&rreg_size_i, msg->bytes + pos, sizeof(rreg_size_i)); - pos += sizeof(rreg_size_i); - rreg = static_cast(msg->bytes + pos); - pos += rreg_size_i; - // std::intptr_t *fn_ptr = reinterpret_cast(msg->bytes + pos); - // pos += sizeof(*fn_ptr); - std::intptr_t fn_ptr; - std::memcpy(&fn_ptr, msg->bytes + pos, sizeof(fn_ptr)); - pos += sizeof(fn_ptr); - - /* register the local memory */ - parsec_ce_mem_reg_handle_t lreg; - size_t lreg_size; - parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t, - iov.num_bytes, &lreg, &lreg_size); - world.impl().increment_inflight_msg(); - /* TODO: PaRSEC should treat the remote callback as a tag, not a function pointer! */ - //std::cout << "set_arg_from_msg: get rreg " << rreg << " remote " << remote << std::endl; - parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iov.num_bytes, remote, - &detail::get_complete_cb, activation, - /*world.impl().parsec_ttg_rma_tag()*/ - cbtag, &fn_ptr, sizeof(std::intptr_t)); - } - } + auto create_activation_fn = [&]() { + /* extract the callback tag */ + std::memcpy(&cbtag, msg->bytes + pos, sizeof(cbtag)); + pos += sizeof(cbtag); + + /* create the value from the metadata */ + auto activation = new detail::rma_delayed_activate( + std::move(keylist), copy, num_iovecs, [this](std::vector &&keylist, detail::ttg_data_copy_t *copy) { + set_arg_from_msg_keylist(keylist, copy); + this->world.impl().decrement_inflight_msg(); + }); + return activation; + }; + auto read_inline_data = [&](auto&& iovec){ + /* unpack the data from the message */ + ++nv; + std::memcpy(iovec.data, msg->bytes + pos, iovec.num_bytes); + pos += iovec.num_bytes; + }; + auto handle_iovec_fn = [&](auto&& iovec, auto activation) { + using ActivationT = std::decay_t; + + ++nv; + parsec_ce_mem_reg_handle_t rreg; + int32_t rreg_size_i; + std::memcpy(&rreg_size_i, msg->bytes + pos, sizeof(rreg_size_i)); + pos += sizeof(rreg_size_i); + rreg = static_cast(msg->bytes + pos); + pos += rreg_size_i; + // std::intptr_t *fn_ptr = reinterpret_cast(msg->bytes + pos); + // pos += sizeof(*fn_ptr); + std::intptr_t fn_ptr; + std::memcpy(&fn_ptr, msg->bytes + pos, sizeof(fn_ptr)); + pos += sizeof(fn_ptr); + + /* register the local memory */ + parsec_ce_mem_reg_handle_t lreg; + size_t lreg_size; + parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t, + iovec.num_bytes, &lreg, &lreg_size); + world.impl().increment_inflight_msg(); + /* TODO: PaRSEC should treat the remote callback as a tag, not a function pointer! */ + //std::cout << "set_arg_from_msg: get rreg " << rreg << " remote " << remote << std::endl; + parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iovec.num_bytes, remote, + &detail::get_complete_cb, activation, + /*world.impl().parsec_ttg_rma_tag()*/ + cbtag, &fn_ptr, sizeof(std::intptr_t)); }; if constexpr (ttg::has_split_metadata::value) { ttg::SplitMetadataDescriptor descr; - handle_iovecs_fn(descr.get_data(val)); + if (inline_data) { + for (auto&& iov : descr.get_data(val)) { + read_inline_data(iov); + } + } else { + auto activation = create_activation_fn(); + for (auto&& iov : descr.get_data(val)) { + handle_iovec_fn(iov, activation); + } + } } else if constexpr (!ttg::has_split_metadata::value) { - handle_iovecs_fn(copy->iovec_span()); - copy->iovec_reset(); + if (inline_data) { + detail::foreach_parsec_data(val, [&](parsec_data_t* data){ + read_inline_data(ttg::iovec{data->nb_elts, data->device_copies[data->owner_device]->device_private}); + }); + } else { + auto activation = create_activation_fn(); + detail::foreach_parsec_data(val, [&](parsec_data_t* data){ + handle_iovec_fn(ttg::iovec{data->nb_elts, data->device_copies[data->owner_device]->device_private}, activation); + }); + } } assert(num_iovecs == nv); @@ -2719,6 +2739,11 @@ namespace ttg_parsec { template bool can_inline_data(Value* value_ptr, detail::ttg_data_copy_t *copy, const Key& key, std::size_t num_keys) { + if constexpr (derived_has_device_op()) { + /* don't inline if data is possibly on the device */ + return false; + } + /* non-device data */ using decvalueT = std::decay_t; bool inline_data = false; /* check whether to send data in inline */ @@ -2734,8 +2759,7 @@ namespace ttg_parsec { } else { /* TODO: how can we query the iovecs of the buffers here without actually packing the data? */ metadata_size = ttg::default_data_descriptor>::payload_size(value_ptr); - iov_size = std::accumulate(copy->iovec_begin(), copy->iovec_end(), 0, - [](std::size_t s, auto& iov){ return s + iov.num_bytes; }); + detail::foreach_parsec_data(*value_ptr, [&](parsec_data_t* data){ iov_size += data->nb_elts; }); } /* key is packed at the end */ std::size_t key_pack_size = ttg::default_data_descriptor::payload_size(&key); @@ -2803,55 +2827,54 @@ namespace ttg_parsec { bool inline_data = can_inline_data(value_ptr, copy, key, 1); msg->tt_id.inline_data = inline_data; - auto handle_iovec_fn = [&](auto&& iovecs){ - - if (inline_data) { - /* inline data is packed right after the tt_id in the message */ - for (auto &&iov : iovecs) { - std::memcpy(msg->bytes + pos, iov.data, iov.num_bytes); - pos += iov.num_bytes; - } - } else { - + auto write_header_fn = [&]() { + if (!inline_data) { /* TODO: at the moment, the tag argument to parsec_ce.get() is treated as a * raw function pointer instead of a preregistered AM tag, so play that game. * Once this is fixed in PaRSEC we need to use parsec_ttg_rma_tag instead! */ parsec_ce_tag_t cbtag = reinterpret_cast(&detail::get_remote_complete_cb); std::memcpy(msg->bytes + pos, &cbtag, sizeof(cbtag)); pos += sizeof(cbtag); + } + }; + auto handle_iovec_fn = [&](auto&& iovec){ + + if (inline_data) { + /* inline data is packed right after the tt_id in the message */ + std::memcpy(msg->bytes + pos, iovec.data, iovec.num_bytes); + pos += iovec.num_bytes; + } else { /** * register the generic iovecs and pack the registration handles * memory layout: [, ...] */ - for (auto &&iov : iovecs) { - copy = detail::register_data_copy(copy, nullptr, true); - parsec_ce_mem_reg_handle_t lreg; - size_t lreg_size; - /* TODO: only register once when we can broadcast the data! */ - parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t, - iov.num_bytes, &lreg, &lreg_size); - auto lreg_ptr = std::shared_ptr{lreg, [](void *ptr) { - parsec_ce_mem_reg_handle_t memreg = (parsec_ce_mem_reg_handle_t)ptr; - parsec_ce.mem_unregister(&memreg); - }}; - int32_t lreg_size_i = lreg_size; - std::memcpy(msg->bytes + pos, &lreg_size_i, sizeof(lreg_size_i)); - pos += sizeof(lreg_size_i); - std::memcpy(msg->bytes + pos, lreg, lreg_size); - pos += lreg_size; - //std::cout << "set_arg_impl lreg " << lreg << std::endl; - /* TODO: can we avoid the extra indirection of going through std::function? */ - std::function *fn = new std::function([=]() mutable { - /* shared_ptr of value and registration captured by value so resetting - * them here will eventually release the memory/registration */ - detail::release_data_copy(copy); - lreg_ptr.reset(); - }); - std::intptr_t fn_ptr{reinterpret_cast(fn)}; - std::memcpy(msg->bytes + pos, &fn_ptr, sizeof(fn_ptr)); - pos += sizeof(fn_ptr); - } + copy = detail::register_data_copy(copy, nullptr, true); + parsec_ce_mem_reg_handle_t lreg; + size_t lreg_size; + /* TODO: only register once when we can broadcast the data! */ + parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t, + iovec.num_bytes, &lreg, &lreg_size); + auto lreg_ptr = std::shared_ptr{lreg, [](void *ptr) { + parsec_ce_mem_reg_handle_t memreg = (parsec_ce_mem_reg_handle_t)ptr; + parsec_ce.mem_unregister(&memreg); + }}; + int32_t lreg_size_i = lreg_size; + std::memcpy(msg->bytes + pos, &lreg_size_i, sizeof(lreg_size_i)); + pos += sizeof(lreg_size_i); + std::memcpy(msg->bytes + pos, lreg, lreg_size); + pos += lreg_size; + //std::cout << "set_arg_impl lreg " << lreg << std::endl; + /* TODO: can we avoid the extra indirection of going through std::function? */ + std::function *fn = new std::function([=]() mutable { + /* shared_ptr of value and registration captured by value so resetting + * them here will eventually release the memory/registration */ + detail::release_data_copy(copy); + lreg_ptr.reset(); + }); + std::intptr_t fn_ptr{reinterpret_cast(fn)}; + std::memcpy(msg->bytes + pos, &fn_ptr, sizeof(fn_ptr)); + pos += sizeof(fn_ptr); } }; @@ -2863,16 +2886,20 @@ namespace ttg_parsec { auto metadata = descr.get_metadata(*const_cast(value_ptr)); pos = pack(metadata, msg->bytes, pos); //std::cout << "set_arg_impl splitmd num_iovecs " << num_iovecs << std::endl; - handle_iovec_fn(iovs); + write_header_fn(); + for (auto&& iov : iovs) { + handle_iovec_fn(iov); + } } else if constexpr (!ttg::has_split_metadata>::value) { /* serialize the object */ - //std::cout << "PRE pack num_iovecs " << std::distance(copy->iovec_begin(), copy->iovec_end()) << std::endl; pos = pack(*value_ptr, msg->bytes, pos, copy); - num_iovecs = std::distance(copy->iovec_begin(), copy->iovec_end()); + detail::foreach_parsec_data(value, [&](parsec_data_t *data){ ++num_iovecs; }); //std::cout << "POST pack num_iovecs " << num_iovecs << std::endl; /* handle any iovecs contained in it */ - handle_iovec_fn(copy->iovec_span()); - copy->iovec_reset(); + write_header_fn(); + detail::foreach_parsec_data(value, [&](parsec_data_t *data){ + handle_iovec_fn(ttg::iovec{data->nb_elts, data->device_copies[data->owner_device]->device_private}); + }); } msg->tt_id.num_iovecs = num_iovecs; @@ -2995,39 +3022,36 @@ namespace ttg_parsec { msg->tt_id.inline_data = inline_data; std::vector>> memregs; - auto handle_iovs_fn = [&](auto&& iovs){ - - if (inline_data) { - /* inline data is packed right after the tt_id in the message */ - for (auto &&iov : iovs) { - std::memcpy(msg->bytes + pos, iov.data, iov.num_bytes); - pos += iov.num_bytes; - } - } else { - + auto write_iov_header = [&](){ + if (!inline_data) { /* TODO: at the moment, the tag argument to parsec_ce.get() is treated as a * raw function pointer instead of a preregistered AM tag, so play that game. * Once this is fixed in PaRSEC we need to use parsec_ttg_rma_tag instead! */ parsec_ce_tag_t cbtag = reinterpret_cast(&detail::get_remote_complete_cb); std::memcpy(msg->bytes + pos, &cbtag, sizeof(cbtag)); pos += sizeof(cbtag); - - for (auto &&iov : iovs) { - parsec_ce_mem_reg_handle_t lreg; - size_t lreg_size; - parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t, - iov.num_bytes, &lreg, &lreg_size); - /* TODO: use a static function for deregistration here? */ - memregs.push_back(std::make_pair(static_cast(lreg_size), - /* TODO: this assumes that parsec_ce_mem_reg_handle_t is void* */ - std::shared_ptr{lreg, [](void *ptr) { - parsec_ce_mem_reg_handle_t memreg = - (parsec_ce_mem_reg_handle_t)ptr; - //std::cout << "broadcast_arg memunreg lreg " << memreg << std::endl; - parsec_ce.mem_unregister(&memreg); - }})); - //std::cout << "broadcast_arg memreg lreg " << lreg << std::endl; - } + } + }; + auto handle_iov_fn = [&](auto&& iovec){ + if (inline_data) { + /* inline data is packed right after the tt_id in the message */ + std::memcpy(msg->bytes + pos, iovec.data, iovec.num_bytes); + pos += iovec.num_bytes; + } else { + parsec_ce_mem_reg_handle_t lreg; + size_t lreg_size; + parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t, + iovec.num_bytes, &lreg, &lreg_size); + /* TODO: use a static function for deregistration here? */ + memregs.push_back(std::make_pair(static_cast(lreg_size), + /* TODO: this assumes that parsec_ce_mem_reg_handle_t is void* */ + std::shared_ptr{lreg, [](void *ptr) { + parsec_ce_mem_reg_handle_t memreg = + (parsec_ce_mem_reg_handle_t)ptr; + //std::cout << "broadcast_arg memunreg lreg " << memreg << std::endl; + parsec_ce.mem_unregister(&memreg); + }})); + //std::cout << "broadcast_arg memreg lreg " << lreg << std::endl; } }; @@ -3039,14 +3063,21 @@ namespace ttg_parsec { auto iovs = descr.get_data(*const_cast(&value)); num_iovs = std::distance(std::begin(iovs), std::end(iovs)); memregs.reserve(num_iovs); - handle_iovs_fn(iovs); + write_iov_header(); + for (auto &&iov : iovs) { + handle_iov_fn(iov); + } //std::cout << "broadcast_arg splitmd num_iovecs " << num_iovs << std::endl; } else if constexpr (!ttg::has_split_metadata>::value) { /* serialize the object once */ pos = pack(value, msg->bytes, pos, copy); - num_iovs = std::distance(copy->iovec_begin(), copy->iovec_end()); - handle_iovs_fn(copy->iovec_span()); - copy->iovec_reset(); + detail::foreach_parsec_data(value, [&](parsec_data_t *data){ ++num_iovs; }); + memregs.reserve(num_iovs); + write_iov_header(); + detail::foreach_parsec_data(value, [&](parsec_data_t *data){ + handle_iov_fn(ttg::iovec{data->nb_elts, + data->device_copies[data->owner_device]->device_private}); + }); } msg->tt_id.num_iovecs = num_iovs; @@ -3407,7 +3438,8 @@ namespace ttg_parsec { } } - void copy_mark_pushout(detail::ttg_data_copy_t *copy) { + template + void copy_mark_pushout(const Value& value) { assert(detail::parsec_ttg_caller->dev_ptr && detail::parsec_ttg_caller->dev_ptr->gpu_task); parsec_gpu_task_t *gpu_task = detail::parsec_ttg_caller->dev_ptr->gpu_task; @@ -3431,12 +3463,15 @@ namespace ttg_parsec { detail::parsec_ttg_caller->parsec_task.data[flowidx].data_in = data->device_copies[0]; gpu_task->flow_nb_elts[flowidx] = data->nb_elts; } - /* need to mark the flow WRITE to convince PaRSEC that the data changed */ - ((parsec_flow_t *)gpu_task->flow[flowidx])->flow_flags |= PARSEC_FLOW_ACCESS_WRITE; + /* need to mark the flow RW to make PaRSEC happy */ + ((parsec_flow_t *)gpu_task->flow[flowidx])->flow_flags |= PARSEC_FLOW_ACCESS_RW; gpu_task->pushout |= 1<foreach_parsec_data(check_parsec_data); + detail::foreach_parsec_data(value, + [&](parsec_data_t* data){ + check_parsec_data(data); + }); } @@ -3468,7 +3503,7 @@ namespace ttg_parsec { auto &reducer = std::get(input_reducers); if (reducer) { /* reductions are currently done only on the host so push out */ - copy_mark_pushout(copy); + copy_mark_pushout(value); caller->data_flags |= detail::ttg_parsec_data_flags::MARKED_PUSHOUT; return; } @@ -3531,7 +3566,7 @@ namespace ttg_parsec { } if (need_pushout) { - copy_mark_pushout(copy); + copy_mark_pushout(value); caller->data_flags |= detail::ttg_parsec_data_flags::MARKED_PUSHOUT; } } @@ -3629,7 +3664,7 @@ namespace ttg_parsec { set_arg(value); } else { - throw std::logic_error(std::string("TTG::PaRSEC: send_callback is invoked on datum of type ") + boost::typeindex::type_id().pretty_name() + " which is not copy constructible, std::move datum into send/broadcast statement"); + throw std::logic_error(std::string("TTG::PaRSEC: send_callback is invoked on datum of type ") + typeid(std::decay_t).name() + " which is not copy constructible, std::move datum into send/broadcast statement"); } }; auto setsize_callback = [this](std::size_t size) { set_argstream_size(size); }; @@ -3777,22 +3812,6 @@ namespace ttg_parsec { parsec_key_fn_t tasks_hash_fcts = {key_equal, key_print, key_hash}; - template - inline static void increment_data_version_impl(task_t *task) { - if constexpr (!std::is_const_v>) { - if (task->copies[I] != nullptr){ - task->copies[I]->inc_current_version(); - } - } - } - - template - inline static void increment_data_versions(task_t *task, std::index_sequence) { - /* increment version of each mutable data */ - int junk[] = {0, (increment_data_version_impl(task), 0)...}; - junk[0]++; - } - static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *parsec_task) { //std::cout << "complete_task_and_release: task " << parsec_task << std::endl; @@ -3805,10 +3824,6 @@ namespace ttg_parsec { assert(task->coroutine_id != ttg::TaskCoroutineID::Invalid); #ifdef TTG_HAVE_DEVICE if (task->coroutine_id == ttg::TaskCoroutineID::DeviceTask) { - /* increment versions of all data we might have modified - * this must happen before we issue the sends */ - //increment_data_versions(task, std::make_index_sequence>{}); - // get the device task from the coroutine handle auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); @@ -4353,7 +4368,7 @@ namespace ttg_parsec { /// @arg pm a function that provides a hint on which device the task should execute. template void set_devicemap(Devicemap&& dm) { - static_assert(derived_has_device_op(), "Device map only allowed on device-enabled TT!"); + //static_assert(derived_has_device_op(), "Device map only allowed on device-enabled TT!"); if constexpr (std::is_same_v()))>) { // dm returns a Device devicemap = std::forward(dm); diff --git a/ttg/ttg/parsec/ttg_data_copy.h b/ttg/ttg/parsec/ttg_data_copy.h index fbd58c64ec..b79f794a2f 100644 --- a/ttg/ttg/parsec/ttg_data_copy.h +++ b/ttg/ttg/parsec/ttg_data_copy.h @@ -26,91 +26,6 @@ namespace ttg_parsec { // fwd-decl struct ttg_data_copy_t; - /* Wrapper managing the relationship between a ttg data copy and the parsec_data_t object */ - struct ttg_parsec_data_wrapper_t { - - protected: - using parsec_data_ptr = std::unique_ptr; - - ttg_data_copy_t *m_ttg_copy = nullptr; - parsec_data_ptr m_data; - - friend ttg_data_copy_t; - - static parsec_data_t* create_parsec_data(void *ptr, size_t size, bool sync_to_device) { - parsec_data_t *data = parsec_data_create_with_type(nullptr, 0, ptr, size, - parsec_datatype_int8_t); - data->device_copies[0]->flags |= PARSEC_DATA_FLAG_PARSEC_MANAGED; - data->device_copies[0]->coherency_state = PARSEC_DATA_COHERENCY_SHARED; - // if we don't want to synchronize data to the device we set the version to 0 - data->device_copies[0]->version = (sync_to_device) ? 1 : 0; - return data; - } - - parsec_data_t* parsec_data() { - return m_data.get(); - } - - const parsec_data_t* parsec_data() const { - return m_data.get(); - } - - static void delete_parsec_data(parsec_data_t *data) { -#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) - if (data->device_copies[0]->flags & TTG_PARSEC_DATA_FLAG_REGISTERED) { - // register the memory for faster access - cudaError_t status; - status = cudaHostUnregister(data->device_copies[0]->device_private); - assert(cudaSuccess == status); - data->device_copies[0]->flags ^= TTG_PARSEC_DATA_FLAG_REGISTERED; - } -#endif // PARSEC_HAVE_DEV_CUDA_SUPPORT - assert(data->device_copies[0] != nullptr); - auto copy = data->device_copies[0]; - parsec_data_copy_detach(data, data->device_copies[0], 0); - PARSEC_OBJ_RELEASE(copy); - PARSEC_OBJ_RELEASE(data); - } - - static void delete_null_parsec_data(parsec_data_t *) { - // nothing to be done, only used for nullptr - } - - protected: - - /* remove the data from the owning data copy */ - void remove_from_owner(); - - /* add the data to the owning data copy */ - void reset_parsec_data(void *ptr, size_t size, bool sync_to_device); - - ttg_parsec_data_wrapper_t(); - - ttg_parsec_data_wrapper_t(const ttg_parsec_data_wrapper_t& other) = delete; - - ttg_parsec_data_wrapper_t(ttg_parsec_data_wrapper_t&& other); - - ttg_parsec_data_wrapper_t& operator=(const ttg_parsec_data_wrapper_t& other) = delete; - - ttg_parsec_data_wrapper_t& operator=(ttg_parsec_data_wrapper_t&& other); - - virtual ~ttg_parsec_data_wrapper_t(); - - /* set a new owning data copy object */ - void set_owner(ttg_data_copy_t& new_copy) { - m_ttg_copy = &new_copy; - } - - /* add a new copy to the data on the give device backed by ptr */ - void add_copy(int parsec_dev, void *ptr) { - parsec_data_copy_t* copy = parsec_data_copy_new(m_data.get(), parsec_dev, - parsec_datatype_int8_t, - PARSEC_DATA_FLAG_PARSEC_MANAGED); - copy->device_private = ptr; - } - }; - - /* templated to break cyclic dependency with ttg_data_copy_container */ template struct ttg_data_copy_container_setter { @@ -162,17 +77,8 @@ namespace ttg_parsec { , m_next_task(c.m_next_task) , m_readers(c.m_readers) , m_refs(c.m_refs.load(std::memory_order_relaxed)) - , m_dev_data(std::move(c.m_dev_data)) - , m_single_dev_data(c.m_single_dev_data) - , m_num_dev_data(c.m_num_dev_data) { - c.m_num_dev_data = 0; c.m_readers = 0; - c.m_single_dev_data = nullptr; - - foreach_wrapper([&](ttg_parsec_data_wrapper_t* data){ - data->set_owner(*this); - }); } ttg_data_copy_t& operator=(ttg_data_copy_t&& c) @@ -183,16 +89,6 @@ namespace ttg_parsec { c.m_readers = 0; m_refs.store(c.m_refs.load(std::memory_order_relaxed), std::memory_order_relaxed); c.m_refs.store(0, std::memory_order_relaxed); - m_dev_data = std::move(c.m_dev_data); - m_single_dev_data = c.m_single_dev_data; - c.m_single_dev_data = nullptr; - m_num_dev_data = c.m_num_dev_data; - c.m_num_dev_data = 0; - - /* move all data to the new owner */ - foreach_wrapper([&](ttg_parsec_data_wrapper_t* data){ - data->set_owner(*this); - }); return *this; } @@ -287,147 +183,6 @@ namespace ttg_parsec { return m_refs.load(std::memory_order_relaxed); } - /* increment the version of the current copy */ - void inc_current_version() { - //std::cout << "data-copy " << this << " inc_current_version " << " count " << m_num_dev_data << std::endl; - foreach_parsec_data([](parsec_data_t* data){ - assert(data->device_copies[0] != nullptr); - data->device_copies[0]->version++; - }); - } - - void transfer_ownership(int access, int device = 0) { - foreach_parsec_data([&](parsec_data_t* data){ - parsec_data_transfer_ownership_to_copy(data, device, access); - }); - } - - /* manage device copies owned by this object - * we only touch the vector if we have more than one copies to track - * and otherwise use the single-element member. - */ - using iterator = ttg_parsec_data_wrapper_t**; - - void add_device_data(ttg_parsec_data_wrapper_t* data) { - switch (m_num_dev_data) { - case 0: - m_single_dev_data = data; - break; - case 1: - /* move single copy into vector and add new copy below */ - m_dev_data.push_back(m_single_dev_data); - m_single_dev_data = nullptr; - /* fall-through */ - default: - /* store in multi-copy vector */ - m_dev_data.push_back(data); - break; - } - //std::cout << "data-copy " << this << " add data " << data << " count " << m_num_dev_data << std::endl; - m_num_dev_data++; - } - - void remove_device_data(ttg_parsec_data_wrapper_t* data) { - //std::cout << "data-copy " << this << " remove data " << data << " count " << m_num_dev_data << std::endl; - if (m_num_dev_data == 0) { - /* this may happen if we're integrated into the object and have been moved */ - return; - } - if (m_num_dev_data == 1) { - assert(m_single_dev_data == data); - m_single_dev_data = nullptr; - } else if (m_num_dev_data > 1) { - auto it = std::find(m_dev_data.begin(), m_dev_data.end(), data); - if (it != m_dev_data.end()) { - m_dev_data.erase(it); - } - } - --m_num_dev_data; - /* make single-entry if needed */ - if (m_num_dev_data == 1) { - m_single_dev_data = m_dev_data[0]; - m_dev_data.clear(); - } - } - - int num_dev_data() const { - return m_num_dev_data; - } - - template - void foreach_wrapper(Fn&& fn) { - if (m_num_dev_data == 1) { - fn(m_single_dev_data); - } else if (m_num_dev_data > 1) { - std::for_each(m_dev_data.begin(), m_dev_data.end(), fn); - } - } - - template - void foreach_parsec_data(Fn&& fn) { - if (m_num_dev_data == 1) { - if (m_single_dev_data->parsec_data()) { - fn(m_single_dev_data->parsec_data()); - } - } else if (m_num_dev_data > 1) { - std::for_each(m_dev_data.begin(), m_dev_data.end(), - [&](ttg_parsec_data_wrapper_t* data){ - if (data->parsec_data()) { - fn(data->parsec_data()); - } - } - ); - } - } - - -#if 0 - iterator begin() { - switch(m_num_dev_data) { - // no device copies - case 0: return end(); - case 1: return &m_single_dev_data; - default: return m_dev_data.data(); - } - } - - iterator end() { - switch(m_num_dev_data) { - case 0: - case 1: - return &(m_single_dev_data) + 1; - default: - return m_dev_data.data() + m_dev_data.size(); - } - } -#endif // 0 - - using iovec_iterator = typename std::vector::iterator; - - iovec_iterator iovec_begin() { - return m_iovecs.begin(); - } - - iovec_iterator iovec_end() { - return m_iovecs.end(); - } - - void iovec_reset() { - m_iovecs.clear(); - } - - void iovec_add(const ttg::iovec& iov) { - m_iovecs.push_back(iov); - } - - ttg::span iovec_span() { - return ttg::span(m_iovecs.data(), m_iovecs.size()); - } - - std::size_t iovec_count() const { - return m_iovecs.size(); - } - #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND) int64_t size; int64_t uid; @@ -436,13 +191,6 @@ namespace ttg_parsec { parsec_task_t *m_next_task = nullptr; int32_t m_readers = 1; std::atomic m_refs = 1; //< number of entities referencing this copy (TTGs, external) - - std::vector m_iovecs; - - std::vector m_dev_data; //< used if there are multiple device copies - // that belong to this object - ttg_parsec_data_wrapper_t *m_single_dev_data; //< used if there is a single device copy - int m_num_dev_data = 0; //< number of device copies }; @@ -521,77 +269,6 @@ namespace ttg_parsec { return &m_value; } }; - - /** - * definition of ttg_parsec_data_wrapper_t members that depend on ttg_data_copy_t - */ - - inline - void ttg_parsec_data_wrapper_t::remove_from_owner() { - if (nullptr != m_ttg_copy) { - m_ttg_copy->remove_device_data(this); - m_ttg_copy = nullptr; - } - } - - inline - void ttg_parsec_data_wrapper_t::reset_parsec_data(void *ptr, size_t size, bool sync_to_device) { - if (ptr == m_data.get()) return; - - if (nullptr == ptr) { - m_data = parsec_data_ptr(nullptr, &delete_null_parsec_data); - } else { - m_data = parsec_data_ptr(create_parsec_data(ptr, size, sync_to_device), &delete_parsec_data); - } - } - - inline - ttg_parsec_data_wrapper_t::ttg_parsec_data_wrapper_t() - : m_data(nullptr, delete_null_parsec_data) - , m_ttg_copy(detail::ttg_data_copy_container()) - { - if (m_ttg_copy) { - m_ttg_copy->add_device_data(this); - } - } - - inline - ttg_parsec_data_wrapper_t::ttg_parsec_data_wrapper_t(ttg_parsec_data_wrapper_t&& other) - : m_data(std::move(other.m_data)) - , m_ttg_copy(detail::ttg_data_copy_container()) - { - // try to remove the old buffer from the *old* ttg_copy - other.remove_from_owner(); - - // register with the new ttg_copy - if (nullptr != m_ttg_copy) { - m_ttg_copy->add_device_data(this); - } - } - - inline - ttg_parsec_data_wrapper_t& ttg_parsec_data_wrapper_t::operator=(ttg_parsec_data_wrapper_t&& other) { - m_data = std::move(other.m_data); - /* remove from old ttg copy */ - other.remove_from_owner(); - - if (nullptr != m_ttg_copy) { - /* register with the new ttg_copy */ - m_ttg_copy->add_device_data(this); - } - return *this; - } - - - inline - ttg_parsec_data_wrapper_t::~ttg_parsec_data_wrapper_t() { - if (nullptr != m_ttg_copy) { - m_ttg_copy->remove_device_data(this); - m_ttg_copy = nullptr; - } - } - - } // namespace detail } // namespace ttg_parsec