Skip to content

Commit f0daa9e

Browse files
heterogeneous-tcp
1 parent 496cecd commit f0daa9e

File tree

13 files changed

+575
-14
lines changed

13 files changed

+575
-14
lines changed

mooncake-common/common.cmake

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ option(USE_TCP "option for using TCP transport" ON)
6464
option(USE_ASCEND "option for using npu with HCCL" OFF)
6565
option(USE_ASCEND_DIRECT "option for using ascend npu with adxl engine" OFF)
6666
option(USE_ASCEND_HETEROGENEOUS "option for transferring between ascend npu and gpu" OFF)
67+
option(USE_ASCEND_HETEROGENEOUS_TCP "Option to use TCP for transmission between Ascend NPU and GPU" OFF)
6768
option(USE_MNNVL "option for using Multi-Node NVLink transport" OFF)
6869
option(USE_CXL "option for using CXL protocol" OFF)
6970
option(USE_ETCD "option for enable etcd as metadata server" OFF)
@@ -175,11 +176,16 @@ if (USE_ASCEND_DIRECT)
175176
add_compile_definitions(USE_ASCEND_DIRECT)
176177
endif()
177178

178-
if (USE_ASCEND_HETEROGENEOUS)
179+
if (USE_ASCEND_HETEROGENEOUS OR USE_ASCEND_HETEROGENEOUS_TCP)
179180
file(GLOB ASCEND_TOOLKIT_ROOT "/usr/local/Ascend/ascend-toolkit/latest/*-linux")
180181
set(ASCEND_LIB_DIR "${ASCEND_TOOLKIT_ROOT}/lib64")
181182
set(ASCEND_INCLUDE_DIR "${ASCEND_TOOLKIT_ROOT}/include")
182-
add_compile_definitions(USE_ASCEND_HETEROGENEOUS)
183+
if (USE_ASCEND_HETEROGENEOUS)
184+
add_compile_definitions(USE_ASCEND_HETEROGENEOUS)
185+
endif()
186+
if (USE_ASCEND_HETEROGENEOUS_TCP)
187+
add_compile_definitions(USE_ASCEND_HETEROGENEOUS_TCP)
188+
endif()
183189
include_directories(/usr/local/include /usr/include ${ASCEND_INCLUDE_DIR})
184190
link_directories(${ASCEND_LIB_DIR})
185191
endif()

mooncake-integration/transfer_engine/transfer_engine_py.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ int TransferEnginePy::initializeExt(const char *local_hostname,
131131

132132
free_list_.resize(kSlabSizeKBTabLen);
133133
#if !defined(USE_ASCEND) && !defined(USE_ASCEND_DIRECT) && \
134-
!defined(USE_ASCEND_HETEROGENEOUS)
134+
!defined(USE_ASCEND_HETEROGENEOUS) && !defined(USE_ASCEND_HETEROGENEOUS_TCP)
135135
doBuddyAllocate(kMaxClassId);
136136
#endif
137137
return 0;

mooncake-transfer-engine/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ if (USE_ASCEND)
3030
)
3131
endif()
3232

33-
if (USE_ASCEND_HETEROGENEOUS)
33+
if (USE_ASCEND_HETEROGENEOUS OR USE_ASCEND_HETEROGENEOUS_TCP)
3434
file(GLOB ASCEND_TOOLKIT_ROOT "/usr/local/Ascend/ascend-toolkit/latest/*-linux")
3535
set(ASCEND_INCLUDE_DIR "${ASCEND_TOOLKIT_ROOT}/include")
3636
include_directories(/usr/local/include

mooncake-transfer-engine/example/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ if (USE_ASCEND_DIRECT)
2323
target_link_libraries(transfer_engine_ascend_direct_perf PUBLIC transfer_engine)
2424
endif()
2525

26+
if (USE_ASCEND_HETEROGENEOUS OR USE_ASCEND_HETEROGENEOUS_TCP)
27+
add_executable(transfer_engine_heterogeneous_ascend_perf_initiator transfer_engine_heterogeneous_ascend_perf_initiator.cpp)
28+
target_link_libraries(transfer_engine_heterogeneous_ascend_perf_initiator PUBLIC transfer_engine)
29+
endif()
30+
2631
if (USE_ASCEND_HETEROGENEOUS)
2732
add_executable(transfer_engine_heterogeneous_ascend_perf_initiator transfer_engine_heterogeneous_ascend_perf_initiator.cpp)
2833
target_link_libraries(transfer_engine_heterogeneous_ascend_perf_initiator PUBLIC transfer_engine)
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Copyright 2025 Huawei Technologies Co., Ltd
2+
// Copyright 2024 KVCache.AI
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
#ifndef HETEROGENEOUS_TCP_TRANSPORT_H_
17+
#define HETEROGENEOUS_TCP_TRANSPORT_H_
18+
19+
#include "transport/tcp_transport/tcp_transport.h"
20+
#include "acl/acl.h"
21+
#include <atomic>
22+
#include <new>
23+
#include <condition_variable>
24+
25+
#define HUGE_HOST_SIZE (3ULL * 1024 * 1024 * 1024)
26+
#define HUGE_DEVICE_SIZE (8 * 1024 * 1024)
27+
#define HUGE_DEVICE_NUM 4
28+
29+
namespace mooncake {
30+
31+
class HeterogeneousTcpTransport : public Transport {
32+
public:
33+
HeterogeneousTcpTransport();
34+
35+
~HeterogeneousTcpTransport();
36+
37+
int install(std::string &local_server_name,
38+
std::shared_ptr<TransferMetadata> meta,
39+
std::shared_ptr<Topology> topo) override;
40+
41+
const char *getName() const override { return "ascend"; }
42+
43+
int registerLocalMemory(void *addr, size_t length,
44+
const std::string &location, bool remote_accessible,
45+
bool update_metadata) override;
46+
47+
int unregisterLocalMemory(void *addr, bool update_metadata = true) override;
48+
49+
int registerLocalMemoryBatch(const std::vector<BufferEntry> &buffer_list,
50+
const std::string &location) override;
51+
52+
int unregisterLocalMemoryBatch(
53+
const std::vector<void *> &addr_list) override;
54+
55+
int createStream();
56+
57+
Status submitTransfer(BatchID batch_id,
58+
const std::vector<TransferRequest> &entries) override;
59+
60+
Status submitTransferTask(
61+
const std::vector<TransferTask *> &task_list) override;
62+
63+
Status getTransferStatus(BatchID batch_id, size_t task_id,
64+
TransferStatus &status) override;
65+
std::unique_ptr<TcpTransport> transport_{};
66+
67+
private:
68+
void transferLoop();
69+
70+
private:
71+
struct TransferTaskTCP {
72+
std::vector<TransferTask *> tasks;
73+
uint64_t total_length;
74+
uint64_t devId;
75+
76+
TransferTaskTCP(TransferTaskTCP &&) = default;
77+
TransferTaskTCP &operator=(TransferTaskTCP &&) = default;
78+
79+
TransferTaskTCP(const TransferTaskTCP &) = delete;
80+
TransferTaskTCP &operator=(const TransferTaskTCP &) = delete;
81+
82+
TransferTaskTCP(std::vector<TransferTask *> taskList, uint64_t len,
83+
uint64_t id)
84+
: tasks(std::move(taskList)), total_length(len), devId(id) {}
85+
};
86+
bool running_ = false;
87+
aclrtStream stream_;
88+
void *hostAddr_ = nullptr;
89+
void *devAddr_ = nullptr;
90+
std::vector<void *> hugeDevAddrs;
91+
int deviceLogicId_;
92+
bool firstSubmit_ = true;
93+
std::mutex memcpy_mutex_;
94+
uint64_t offset_ = 0;
95+
std::thread transferThread_;
96+
std::queue<TransferTaskTCP> transferQueues_;
97+
std::mutex transfer_mutex_;
98+
std::condition_variable transfer_cond_;
99+
std::atomic<int> transfer_counter_{0};
100+
int devId_ = 0;
101+
std::array<bool, HUGE_DEVICE_NUM> mem_blocks = {false, false, false, false};
102+
std::mutex dev_mtx_;
103+
std::condition_variable dev_cv_;
104+
};
105+
106+
using TransferRequest = Transport::TransferRequest;
107+
using TransferStatus = Transport::TransferStatus;
108+
using TransferStatusEnum = Transport::TransferStatusEnum;
109+
using SegmentID = Transport::SegmentID;
110+
using BatchID = Transport::BatchID;
111+
112+
} // namespace mooncake
113+
114+
#endif // HETEROGENEOUS_TCP_TRANSPORT_H_

mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ class TcpTransport : public Transport {
5555
Status getTransferStatus(BatchID batch_id, size_t task_id,
5656
TransferStatus &status) override;
5757

58-
private:
5958
int install(std::string &local_server_name,
6059
std::shared_ptr<TransferMetadata> meta,
6160
std::shared_ptr<Topology> topo);

mooncake-transfer-engine/include/transport/transport.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,8 +283,7 @@ class Transport {
283283
#endif
284284

285285
// record the origin request
286-
#ifdef USE_ASCEND_HETEROGENEOUS
287-
// need to modify the request's source address, changing it from an NPU
286+
#if defined(USE_ASCEND_HETEROGENEOUS) || defined(USE_ASCEND_HETEROGENEOUS_TCP) // need to modify the request's source address, changing it from an NPU
288287
// address to a CPU address.
289288
TransferRequest *request = nullptr;
290289
#else

mooncake-transfer-engine/src/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ if (USE_ASCEND_DIRECT)
7777
endif()
7878
endif()
7979

80-
if (USE_ASCEND_HETEROGENEOUS)
80+
if (USE_ASCEND_HETEROGENEOUS OR USE_ASCEND_HETEROGENEOUS_TCP)
8181
file(GLOB ASCEND_TOOLKIT_ROOT "/usr/local/Ascend/ascend-toolkit/latest/*-linux")
8282
set(ASCEND_LIB_DIR "${ASCEND_TOOLKIT_ROOT}/lib64")
8383
link_directories(${ASCEND_LIB_DIR})

mooncake-transfer-engine/src/multi_transport.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
#ifdef USE_ASCEND_HETEROGENEOUS
3434
#include "transport/ascend_transport/heterogeneous_rdma_transport.h"
3535
#endif
36+
#ifdef USE_ASCEND_HETEROGENEOUS_TCP
37+
#include "transport/ascend_transport/heterogeneous_tcp_transport.h"
38+
#endif
3639
#ifdef USE_MNNVL
3740
#include "transport/nvlink_transport/nvlink_transport.h"
3841
#endif
@@ -101,7 +104,7 @@ Status MultiTransport::submitTransfer(
101104
assert(transport);
102105
auto &task = batch_desc.task_list[task_id];
103106
task.batch_id = batch_id;
104-
#ifdef USE_ASCEND_HETEROGENEOUS
107+
#if defined(USE_ASCEND_HETEROGENEOUS) || defined(USE_ASCEND_HETEROGENEOUS_TCP)
105108
task.request = const_cast<Transport::TransferRequest *>(&request);
106109
#else
107110
task.request = &request;
@@ -227,6 +230,11 @@ Transport *MultiTransport::installTransport(const std::string &proto,
227230
transport = new HeterogeneousRdmaTransport();
228231
}
229232
#endif
233+
#ifdef USE_ASCEND_HETEROGENEOUS_TCP
234+
else if (std::string(proto) == "ascend") {
235+
transport = new HeterogeneousTcpTransport();
236+
}
237+
#endif
230238
#ifdef USE_MNNVL
231239
else if (std::string(proto) == "nvlink") {
232240
transport = new NvlinkTransport();
@@ -260,11 +268,11 @@ Status MultiTransport::selectTransport(const TransferRequest &entry,
260268
std::to_string(entry.target_id));
261269
}
262270
auto proto = target_segment_desc->protocol;
263-
#ifdef USE_ASCEND_HETEROGENEOUS
271+
#if defined(USE_ASCEND_HETEROGENEOUS) || defined(USE_ASCEND_HETEROGENEOUS_TCP)
264272
// When USE_ASCEND_HETEROGENEOUS is enabled:
265273
// - Target side directly reuses RDMA Transport
266274
// - Initiator side uses heterogeneous_rdma_transport
267-
if (target_segment_desc->protocol == "rdma") {
275+
if (target_segment_desc->protocol == "rdma" || target_segment_desc->protocol == "tcp") {
268276
proto = "ascend";
269277
}
270278
#endif

mooncake-transfer-engine/src/transfer_engine.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ int TransferEngine::init(const std::string &metadata_conn_string,
170170
#else
171171

172172
#if defined(USE_CXL) && !defined(USE_ASCEND) && \
173-
!defined(USE_ASCEND_HETEROGENEOUS)
173+
!defined(USE_ASCEND_HETEROGENEOUS) && !defined(USE_ASCEND_HETEROGENEOUS_TCP)
174174
if (std::getenv("MC_CXL_DEV_PATH") != nullptr) {
175175
Transport *cxl_transport =
176176
multi_transports_->installTransport("cxl", local_topology_);
@@ -200,7 +200,7 @@ int TransferEngine::init(const std::string &metadata_conn_string,
200200
LOG(INFO) << "Topology discovery complete. Found "
201201
<< local_topology_->getHcaList().size() << " HCAs.";
202202

203-
#ifdef USE_ASCEND_HETEROGENEOUS
203+
#if defined(USE_ASCEND_HETEROGENEOUS) || defined(USE_ASCEND_HETEROGENEOUS_TCP)
204204
Transport *ascend_transport =
205205
multi_transports_->installTransport("ascend", local_topology_);
206206
if (!ascend_transport) {

0 commit comments

Comments
 (0)