Skip to content

Commit eda3897

Browse files
committed
rpc : use ggml logging facilities
Add --verbose to rpc-server and configure the ggml logger to print debug messages only if set. Add helper macro LOG_DBG() which does an early check of the verbose flag before calling GGML_LOG_DEBUG(). Make sure we log a debug message for every server function.
1 parent 459c0c2 commit eda3897

File tree

3 files changed

+66
-50
lines changed

3 files changed

+66
-50
lines changed

ggml/include/ggml-rpc.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ GGML_BACKEND_API ggml_backend_buffer_type_t ggml_backend_rpc_buffer_type(const c
2121
GGML_BACKEND_API void ggml_backend_rpc_get_device_memory(const char * endpoint, size_t * free, size_t * total);
2222

2323
GGML_BACKEND_API void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint,
24-
const char * cache_dir,
24+
const char * cache_dir, bool verbose,
2525
size_t free_mem, size_t total_mem);
2626

2727
GGML_BACKEND_API ggml_backend_reg_t ggml_backend_rpc_reg(void);

ggml/src/ggml-rpc/ggml-rpc.cpp

Lines changed: 45 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ struct socket_t {
4747
sockfd_t fd;
4848
socket_t(sockfd_t fd) : fd(fd) {}
4949
~socket_t() {
50-
GGML_PRINT_DEBUG("[%s] closing socket %d\n", __func__, this->fd);
50+
GGML_LOG_DEBUG("[%s] closing socket %d\n", __func__, this->fd);
5151
#ifdef _WIN32
5252
closesocket(this->fd);
5353
#else
@@ -265,14 +265,14 @@ static std::shared_ptr<socket_t> socket_connect(const char * host, int port) {
265265
return nullptr;
266266
}
267267
if (!set_no_delay(sockfd)) {
268-
fprintf(stderr, "Failed to set TCP_NODELAY\n");
268+
GGML_LOG_ERROR("Failed to set TCP_NODELAY\n");
269269
return nullptr;
270270
}
271271
addr.sin_family = AF_INET;
272272
addr.sin_port = htons(port);
273273
struct hostent * server = gethostbyname(host);
274274
if (server == NULL) {
275-
fprintf(stderr, "Cannot resolve host '%s'\n", host);
275+
GGML_LOG_ERROR("Cannot resolve host '%s'\n", host);
276276
return nullptr;
277277
}
278278
memcpy(&addr.sin_addr.s_addr, server->h_addr, server->h_length);
@@ -289,7 +289,7 @@ static std::shared_ptr<socket_t> socket_accept(sockfd_t srv_sockfd) {
289289
return nullptr;
290290
}
291291
if (!set_no_delay(client_socket_fd)) {
292-
fprintf(stderr, "Failed to set TCP_NODELAY\n");
292+
GGML_LOG_ERROR("Failed to set TCP_NODELAY\n");
293293
return nullptr;
294294
}
295295
return client_socket;
@@ -302,11 +302,11 @@ static std::shared_ptr<socket_t> create_server_socket(const char * host, int por
302302
return nullptr;
303303
}
304304
if (!set_reuse_addr(sockfd)) {
305-
fprintf(stderr, "Failed to set SO_REUSEADDR\n");
305+
GGML_LOG_ERROR("Failed to set SO_REUSEADDR\n");
306306
return nullptr;
307307
}
308308
if (inet_addr(host) == INADDR_NONE) {
309-
fprintf(stderr, "Invalid host address: %s\n", host);
309+
GGML_LOG_ERROR("Invalid host address: %s\n", host);
310310
return nullptr;
311311
}
312312
struct sockaddr_in serv_addr;
@@ -349,7 +349,7 @@ static bool recv_data(sockfd_t sockfd, void * data, size_t size) {
349349
return false;
350350
}
351351
if (n == 0) {
352-
GGML_LOG_ERROR("recv returned 0 (peer closed?)\n");
352+
GGML_LOG_DEBUG("recv returned 0 (peer closed?)\n");
353353
return false;
354354
}
355355
bytes_recv += (size_t)n;
@@ -383,7 +383,7 @@ static bool recv_msg(sockfd_t sockfd, std::vector<uint8_t> & input) {
383383
try {
384384
input.resize(size);
385385
} catch (const std::bad_alloc & e) {
386-
fprintf(stderr, "Failed to allocate input buffer of size %" PRIu64 "\n", size);
386+
GGML_LOG_ERROR("Failed to allocate input buffer of size %" PRIu64 "\n", size);
387387
return false;
388388
}
389389
return recv_data(sockfd, input.data(), size);
@@ -443,11 +443,11 @@ static bool check_server_version(const std::shared_ptr<socket_t> & sock) {
443443
bool status = send_rpc_cmd(sock, RPC_CMD_HELLO, nullptr, 0, &response, sizeof(response));
444444
RPC_STATUS_ASSERT(status);
445445
if (response.major != RPC_PROTO_MAJOR_VERSION || response.minor > RPC_PROTO_MINOR_VERSION) {
446-
fprintf(stderr, "RPC server version mismatch: %d.%d.%d\n", response.major, response.minor, response.patch);
446+
GGML_LOG_ERROR("RPC server version mismatch: %d.%d.%d\n", response.major, response.minor, response.patch);
447447
return false;
448448
}
449449
if (response.minor != RPC_PROTO_MINOR_VERSION || response.patch != RPC_PROTO_PATCH_VERSION) {
450-
fprintf(stderr, "WARNING: RPC server version mismatch: %d.%d.%d\n", response.major, response.minor, response.patch);
450+
GGML_LOG_INFO("WARNING: RPC server version mismatch: %d.%d.%d\n", response.major, response.minor, response.patch);
451451
}
452452
return true;
453453
}
@@ -488,7 +488,7 @@ static std::shared_ptr<socket_t> get_socket(const std::string & endpoint) {
488488
if (!check_server_version(sock)) {
489489
return nullptr;
490490
}
491-
GGML_PRINT_DEBUG("[%s] connected to %s, sockfd=%d\n", __func__, endpoint.c_str(), sock->fd);
491+
GGML_LOG_DEBUG("[%s] connected to %s, sockfd=%d\n", __func__, endpoint.c_str(), sock->fd);
492492
sockets[endpoint] = sock;
493493
return sock;
494494
}
@@ -809,7 +809,7 @@ ggml_backend_buffer_type_t ggml_backend_rpc_buffer_type(const char * endpoint) {
809809
}
810810
auto sock = get_socket(endpoint);
811811
if (sock == nullptr) {
812-
fprintf(stderr, "Failed to connect to %s\n", endpoint);
812+
GGML_LOG_ERROR("Failed to connect to %s\n", endpoint);
813813
return nullptr;
814814
}
815815
size_t alignment = get_alignment(sock);
@@ -871,8 +871,8 @@ void ggml_backend_rpc_get_device_memory(const char * endpoint, size_t * free, si
871871

872872
class rpc_server {
873873
public:
874-
rpc_server(ggml_backend_t backend, const char * cache_dir)
875-
: backend(backend), cache_dir(cache_dir) {
874+
rpc_server(ggml_backend_t backend, const char * cache_dir, bool verbose)
875+
: backend(backend), cache_dir(cache_dir), verbose(verbose) {
876876
}
877877
~rpc_server();
878878

@@ -902,14 +902,18 @@ class rpc_server {
902902

903903
ggml_backend_t backend;
904904
const char * cache_dir;
905+
bool verbose;
905906
std::unordered_set<ggml_backend_buffer_t> buffers;
906907
};
907908

909+
#define LOG_DBG(msg, ...) \
910+
do { if (verbose) GGML_LOG_DEBUG(msg, ##__VA_ARGS__); } while (0)
911+
908912
void rpc_server::hello(rpc_msg_hello_rsp & response) {
909913
response.major = RPC_PROTO_MAJOR_VERSION;
910914
response.minor = RPC_PROTO_MINOR_VERSION;
911915
response.patch = RPC_PROTO_PATCH_VERSION;
912-
GGML_PRINT_DEBUG("[%s] version: %d.%d.%d\n", __func__, response.major, response.minor, response.patch);
916+
LOG_DBG("[%s] version: %d.%d.%d\n", __func__, response.major, response.minor, response.patch);
913917
}
914918

915919
bool rpc_server::get_alloc_size(const rpc_msg_get_alloc_size_req & request, rpc_msg_get_alloc_size_rsp & response) {
@@ -929,15 +933,15 @@ bool rpc_server::get_alloc_size(const rpc_msg_get_alloc_size_req & request, rpc_
929933
GGML_LOG_ERROR("Null tensor pointer passed to server get_alloc_size function.\n");
930934
return false;
931935
}
932-
936+
LOG_DBG("[%s] buffer: %p, data: %p\n", __func__, (void*)tensor->buffer, tensor->data);
933937
if (tensor->buffer == nullptr) {
934938
//No buffer allocated.
935939
buft = ggml_backend_get_default_buffer_type(backend);
936940
} else {
937941
buft = tensor->buffer->buft;
938942
}
939943

940-
response.alloc_size = ggml_backend_buft_get_alloc_size(buft,tensor);
944+
response.alloc_size = ggml_backend_buft_get_alloc_size(buft, tensor);
941945

942946
return true;
943947
}
@@ -950,29 +954,29 @@ void rpc_server::alloc_buffer(const rpc_msg_alloc_buffer_req & request, rpc_msg_
950954
if (buffer != nullptr) {
951955
response.remote_ptr = reinterpret_cast<uint64_t>(buffer);
952956
response.remote_size = buffer->size;
953-
GGML_PRINT_DEBUG("[%s] size: %" PRIu64 " -> remote_ptr: %" PRIx64 ", remote_size: %" PRIu64 "\n", __func__, request.size, response.remote_ptr, response.remote_size);
957+
LOG_DBG("[%s] size: %" PRIu64 " -> remote_ptr: %" PRIx64 ", remote_size: %" PRIu64 "\n", __func__, request.size, response.remote_ptr, response.remote_size);
954958
buffers.insert(buffer);
955959
} else {
956-
GGML_LOG_ERROR("[%s] size: %" PRIu64 " -> failed\n", __func__, request.size);
960+
LOG_DBG("[%s] size: %" PRIu64 " -> failed\n", __func__, request.size);
957961
}
958962
}
959963

960964
void rpc_server::get_alignment(rpc_msg_get_alignment_rsp & response) {
961965
ggml_backend_buffer_type_t buft = ggml_backend_get_default_buffer_type(backend);
962966
size_t alignment = ggml_backend_buft_get_alignment(buft);
963-
GGML_PRINT_DEBUG("[%s] alignment: %lu\n", __func__, alignment);
967+
LOG_DBG("[%s] alignment: %lu\n", __func__, alignment);
964968
response.alignment = alignment;
965969
}
966970

967971
void rpc_server::get_max_size(rpc_msg_get_max_size_rsp & response) {
968972
ggml_backend_buffer_type_t buft = ggml_backend_get_default_buffer_type(backend);
969973
size_t max_size = ggml_backend_buft_get_max_size(buft);
970-
GGML_PRINT_DEBUG("[%s] max_size: %lu\n", __func__, max_size);
974+
LOG_DBG("[%s] max_size: %lu\n", __func__, max_size);
971975
response.max_size = max_size;
972976
}
973977

974978
bool rpc_server::buffer_get_base(const rpc_msg_buffer_get_base_req & request, rpc_msg_buffer_get_base_rsp & response) {
975-
GGML_PRINT_DEBUG("[%s] remote_ptr: %" PRIx64 "\n", __func__, request.remote_ptr);
979+
LOG_DBG("[%s] remote_ptr: %" PRIx64 "\n", __func__, request.remote_ptr);
976980
ggml_backend_buffer_t buffer = reinterpret_cast<ggml_backend_buffer_t>(request.remote_ptr);
977981
if (buffers.find(buffer) == buffers.end()) {
978982
GGML_LOG_ERROR("[%s] buffer not found\n", __func__);
@@ -984,7 +988,7 @@ bool rpc_server::buffer_get_base(const rpc_msg_buffer_get_base_req & request, rp
984988
}
985989

986990
bool rpc_server::free_buffer(const rpc_msg_free_buffer_req & request) {
987-
GGML_PRINT_DEBUG("[%s] remote_ptr: %" PRIx64 "\n", __func__, request.remote_ptr);
991+
LOG_DBG("[%s] remote_ptr: %" PRIx64 "\n", __func__, request.remote_ptr);
988992
ggml_backend_buffer_t buffer = reinterpret_cast<ggml_backend_buffer_t>(request.remote_ptr);
989993
if (buffers.find(buffer) == buffers.end()) {
990994
GGML_LOG_ERROR("[%s] buffer not found\n", __func__);
@@ -996,7 +1000,7 @@ bool rpc_server::free_buffer(const rpc_msg_free_buffer_req & request) {
9961000
}
9971001

9981002
bool rpc_server::buffer_clear(const rpc_msg_buffer_clear_req & request) {
999-
GGML_PRINT_DEBUG("[%s] remote_ptr: %" PRIx64 ", value: %u\n", __func__, request.remote_ptr, request.value);
1003+
LOG_DBG("[%s] remote_ptr: %" PRIx64 ", value: %u\n", __func__, request.remote_ptr, request.value);
10001004
ggml_backend_buffer_t buffer = reinterpret_cast<ggml_backend_buffer_t>(request.remote_ptr);
10011005
if (buffers.find(buffer) == buffers.end()) {
10021006
GGML_LOG_ERROR("[%s] buffer not found\n", __func__);
@@ -1073,7 +1077,7 @@ bool rpc_server::set_tensor(const std::vector<uint8_t> & input) {
10731077
GGML_LOG_ERROR("[%s] error deserializing tensor\n", __func__);
10741078
return false;
10751079
}
1076-
GGML_PRINT_DEBUG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu\n", __func__, (void*)tensor->buffer, tensor->data, offset, size);
1080+
LOG_DBG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu\n", __func__, (void*)tensor->buffer, tensor->data, offset, size);
10771081

10781082
// sanitize tensor->data
10791083
{
@@ -1096,7 +1100,7 @@ bool rpc_server::set_tensor(const std::vector<uint8_t> & input) {
10961100
fs::path cache_file = fs::path(cache_dir) / hash_str;
10971101
std::ofstream ofs(cache_file, std::ios::binary);
10981102
ofs.write((const char *)data, size);
1099-
printf("[%s] saved to '%s'\n", __func__, cache_file.c_str());
1103+
GGML_LOG_INFO("[%s] saved to '%s'\n", __func__, cache_file.c_str());
11001104
}
11011105
ggml_backend_tensor_set(tensor, data, offset, size);
11021106
return true;
@@ -1142,8 +1146,8 @@ bool rpc_server::set_tensor_hash(const rpc_msg_set_tensor_hash_req & request, rp
11421146
GGML_LOG_ERROR("[%s] error deserializing tensor\n", __func__);
11431147
return false;
11441148
}
1145-
GGML_PRINT_DEBUG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu, hash: %" PRIx64 "\n",
1146-
__func__, (void*)tensor->buffer, tensor->data, request.offset, size, request.hash);
1149+
LOG_DBG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu, hash: %" PRIx64 "\n",
1150+
__func__, (void*)tensor->buffer, tensor->data, request.offset, size, request.hash);
11471151

11481152
// sanitize tensor->data
11491153
{
@@ -1177,7 +1181,7 @@ bool rpc_server::init_tensor(const rpc_msg_init_tensor_req & request) {
11771181
GGML_LOG_ERROR("Null tensor pointer passed to server init_tensor function.\n");
11781182
return false;
11791183
}
1180-
1184+
LOG_DBG("[%s] buffer: %p, data: %p\n", __func__, (void*)tensor->buffer, tensor->data);
11811185
// Call the backend's buffer_init_tensor function
11821186
ggml_backend_buffer_t buffer = tensor->buffer;
11831187
if (buffer && buffer->iface.init_tensor) {
@@ -1210,7 +1214,7 @@ bool rpc_server::get_tensor(const rpc_msg_get_tensor_req & request, std::vector<
12101214
GGML_LOG_ERROR("[%s] error deserializing tensor\n", __func__);
12111215
return false;
12121216
}
1213-
GGML_PRINT_DEBUG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %" PRIu64 "\n", __func__, (void*)tensor->buffer, tensor->data, request.offset, request.size);
1217+
LOG_DBG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %" PRIu64 "\n", __func__, (void*)tensor->buffer, tensor->data, request.offset, request.size);
12141218

12151219
// sanitize tensor->data
12161220
{
@@ -1254,7 +1258,7 @@ bool rpc_server::copy_tensor(const rpc_msg_copy_tensor_req & request, rpc_msg_co
12541258
uint64_t dst_buf_sz = (uint64_t) ggml_backend_buffer_get_size(dst->buffer);
12551259

12561260
if (dst_data + src_size > dst_base + dst_buf_sz) {
1257-
GGML_PRINT_DEBUG("[%s] out-of-bounds write in rpc_server::copy_tensor:\n"
1261+
GGML_LOG_ERROR("[%s] out-of-bounds write in rpc_server::copy_tensor:\n"
12581262
" write range : [0x%" PRIx64 ", 0x%" PRIx64 "]\n"
12591263
" buffer base: [0x%" PRIx64 ", 0x%" PRIx64 "]\n",
12601264
__func__,
@@ -1265,8 +1269,8 @@ bool rpc_server::copy_tensor(const rpc_msg_copy_tensor_req & request, rpc_msg_co
12651269
return false;
12661270
}
12671271

1268-
GGML_PRINT_DEBUG("[%s] src->buffer: %p, dst->buffer: %p\n",
1269-
__func__, (void*) src->buffer, (void*) dst->buffer);
1272+
LOG_DBG("[%s] src->buffer: %p, dst->buffer: %p\n",
1273+
__func__, (void*) src->buffer, (void*) dst->buffer);
12701274

12711275
response.result = ggml_backend_buffer_copy_tensor(src, dst);
12721276
return true;
@@ -1342,7 +1346,7 @@ bool rpc_server::graph_compute(const std::vector<uint8_t> & input, rpc_msg_graph
13421346
return false;
13431347
}
13441348
const rpc_tensor * tensors = (const rpc_tensor *)(input.data() + sizeof(n_nodes) + n_nodes*sizeof(uint64_t) + sizeof(n_tensors));
1345-
GGML_PRINT_DEBUG("[%s] n_nodes: %u, n_tensors: %u\n", __func__, n_nodes, n_tensors);
1349+
LOG_DBG("[%s] n_nodes: %u, n_tensors: %u\n", __func__, n_nodes, n_tensors);
13461350

13471351
size_t buf_size = ggml_tensor_overhead()*(n_nodes + n_tensors) + ggml_graph_overhead_custom(n_nodes, false);
13481352

@@ -1385,16 +1389,16 @@ rpc_server::~rpc_server() {
13851389
}
13861390
}
13871391

1388-
static void rpc_serve_client(ggml_backend_t backend, const char * cache_dir,
1392+
static void rpc_serve_client(ggml_backend_t backend, const char * cache_dir, bool verbose,
13891393
sockfd_t sockfd, size_t free_mem, size_t total_mem) {
1390-
rpc_server server(backend, cache_dir);
1394+
rpc_server server(backend, cache_dir, verbose);
13911395
uint8_t cmd;
13921396
if (!recv_data(sockfd, &cmd, 1)) {
13931397
return;
13941398
}
13951399
// the first command sent by the client must be HELLO
13961400
if (cmd != RPC_CMD_HELLO) {
1397-
fprintf(stderr, "Expected HELLO command, update client\n");
1401+
GGML_LOG_ERROR("Expected HELLO command, update client\n");
13981402
return;
13991403
}
14001404
if (!recv_msg(sockfd, nullptr, 0)) {
@@ -1411,7 +1415,7 @@ static void rpc_serve_client(ggml_backend_t backend, const char * cache_dir,
14111415
}
14121416
if (cmd >= RPC_CMD_COUNT) {
14131417
// fail fast if the command is invalid
1414-
fprintf(stderr, "Unknown command: %d\n", cmd);
1418+
GGML_LOG_ERROR("Unknown command: %d\n", cmd);
14151419
break;
14161420
}
14171421
switch (cmd) {
@@ -1599,15 +1603,15 @@ static void rpc_serve_client(ggml_backend_t backend, const char * cache_dir,
15991603
break;
16001604
}
16011605
default: {
1602-
fprintf(stderr, "Unknown command: %d\n", cmd);
1606+
GGML_LOG_ERROR("Unknown command: %d\n", cmd);
16031607
return;
16041608
}
16051609
}
16061610
}
16071611
}
16081612

16091613
void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint,
1610-
const char * cache_dir,
1614+
const char * cache_dir, bool verbose,
16111615
size_t free_mem, size_t total_mem) {
16121616
printf("Starting RPC server v%d.%d.%d\n",
16131617
RPC_PROTO_MAJOR_VERSION,
@@ -1645,7 +1649,7 @@ void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint
16451649
}
16461650
printf("Accepted client connection, free_mem=%zu, total_mem=%zu\n", free_mem, total_mem);
16471651
fflush(stdout);
1648-
rpc_serve_client(backend, cache_dir, client_socket->fd, free_mem, total_mem);
1652+
rpc_serve_client(backend, cache_dir, verbose, client_socket->fd, free_mem, total_mem);
16491653
printf("Client connection closed\n");
16501654
fflush(stdout);
16511655
}

0 commit comments

Comments
 (0)