|
36 | 36 | namespace mooncake { |
37 | 37 | namespace { |
38 | 38 | constexpr size_t kMemcpyBatchLimit = 4096; |
39 | | -} |
| 39 | +constexpr int32_t kMaxAdxlConnectRetries = 3; |
| 40 | +} // namespace |
40 | 41 | AscendDirectTransport::AscendDirectTransport() : running_(false) {} |
41 | 42 |
|
42 | 43 | AscendDirectTransport::~AscendDirectTransport() { |
@@ -78,6 +79,7 @@ AscendDirectTransport::~AscendDirectTransport() { |
78 | 79 | } |
79 | 80 | } |
80 | 81 | addr_to_mem_handle_.clear(); |
| 82 | + adxl_->Finalize(); |
81 | 83 | } |
82 | 84 |
|
83 | 85 | int AscendDirectTransport::install(std::string &local_server_name, |
@@ -582,10 +584,14 @@ void AscendDirectTransport::processSliceList( |
582 | 584 | << "us"; |
583 | 585 | return; |
584 | 586 | } |
| 587 | + return connectAndTransfer(target_adxl_engine_name, operation, slice_list); |
| 588 | +} |
| 589 | + |
| 590 | +void AscendDirectTransport::connectAndTransfer( |
| 591 | + const std::string &target_adxl_engine_name, adxl::TransferOp operation, |
| 592 | + const std::vector<Slice *> &slice_list, int32_t times) { |
585 | 593 | int ret = checkAndConnect(target_adxl_engine_name); |
586 | 594 | if (ret != 0) { |
587 | | - LOG(ERROR) << "Failed to connect to segment: " |
588 | | - << target_segment_desc->name; |
589 | 595 | for (auto &slice : slice_list) { |
590 | 596 | slice->markFailed(); |
591 | 597 | } |
@@ -613,6 +619,14 @@ void AscendDirectTransport::processSliceList( |
613 | 619 | std::chrono::steady_clock::now() - start) |
614 | 620 | .count() |
615 | 621 | << " us"; |
| 622 | + } else if (status == adxl::NOT_CONNECTED) { |
| 623 | + LOG(INFO) << "Connection reset by backend, retry times:" << times; |
| 624 | + disconnect(target_adxl_engine_name, 0, true); |
| 625 | + if (times < kMaxAdxlConnectRetries) { |
| 626 | + return connectAndTransfer(target_adxl_engine_name, operation, |
| 627 | + slice_list, times + 1); |
| 628 | + } |
| 629 | + return; |
616 | 630 | } else { |
617 | 631 | if (status == adxl::TIMEOUT) { |
618 | 632 | LOG(ERROR) << "Transfer timeout to: " << target_adxl_engine_name |
@@ -844,21 +858,24 @@ int AscendDirectTransport::checkAndConnect( |
844 | 858 | } |
845 | 859 |
|
846 | 860 | int AscendDirectTransport::disconnect( |
847 | | - const std::string &target_adxl_engine_name, int32_t timeout_in_millis) { |
| 861 | + const std::string &target_adxl_engine_name, int32_t timeout_in_millis, |
| 862 | + bool force) { |
848 | 863 | std::lock_guard<std::mutex> lock(connection_mutex_); |
849 | 864 | auto it = connected_segments_.find(target_adxl_engine_name); |
850 | 865 | if (it == connected_segments_.end()) { |
851 | 866 | LOG(INFO) << "Target adxl engine: " << target_adxl_engine_name |
852 | 867 | << " is not connected."; |
853 | 868 | return 0; |
854 | 869 | } |
855 | | - auto status = |
856 | | - adxl_->Disconnect(target_adxl_engine_name.c_str(), timeout_in_millis); |
857 | | - if (status != adxl::SUCCESS) { |
858 | | - LOG(ERROR) << "Failed to disconnect to: " << target_adxl_engine_name |
859 | | - << ", status: " << status; |
860 | | - connected_segments_.erase(target_adxl_engine_name); |
861 | | - return -1; |
| 870 | + if (!force) { |
| 871 | + auto status = adxl_->Disconnect(target_adxl_engine_name.c_str(), |
| 872 | + timeout_in_millis); |
| 873 | + if (status != adxl::SUCCESS) { |
| 874 | + LOG(ERROR) << "Failed to disconnect to: " << target_adxl_engine_name |
| 875 | + << ", status: " << status; |
| 876 | + connected_segments_.erase(target_adxl_engine_name); |
| 877 | + return -1; |
| 878 | + } |
862 | 879 | } |
863 | 880 | connected_segments_.erase(target_adxl_engine_name); |
864 | 881 | return 0; |
|
0 commit comments