Skip to content

Commit f814abe

Browse files
committed
Merge branch 'rlamb/sdk-1502/handle-null-payload' into rlamb/hooks-contract-tests
2 parents 7eb5a2e + b6ebec5 commit f814abe

File tree

9 files changed

+348
-12
lines changed

9 files changed

+348
-12
lines changed

.github/workflows/server.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ jobs:
3232
with:
3333
# Inform the test harness of test service's port.
3434
test_service_port: ${{ env.TEST_SERVICE_PORT }}
35-
extra_params: '-skip-from ./contract-tests/server-contract-tests/test-suppressions.txt'
3635
token: ${{ secrets.GITHUB_TOKEN }}
3736

3837
contract-tests-curl:
@@ -54,7 +53,6 @@ jobs:
5453
with:
5554
# Inform the test harness of test service's port.
5655
test_service_port: ${{ env.TEST_SERVICE_PORT }}
57-
extra_params: '-skip-from ./contract-tests/server-contract-tests/test-suppressions.txt'
5856
token: ${{ secrets.GITHUB_TOKEN }}
5957

6058
build-test-server:

contract-tests/server-contract-tests/test-suppressions.txt

Lines changed: 0 additions & 7 deletions
This file was deleted.

libs/server-sdk/src/data_systems/background_sync/sources/streaming/event_handler.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ tl::expected<DataSourceEventHandler::Patch, JsonError> Patch(
4141
if (!data.has_value()) {
4242
return tl::unexpected(JsonError::kSchemaFailure);
4343
}
44+
// Check if the optional is empty (indicates null data)
45+
if (!data->has_value()) {
46+
return tl::unexpected(JsonError::kSchemaFailure);
47+
}
4448
return DataSourceEventHandler::Patch{
4549
TStreamingDataKind::Key(path),
4650
data_model::ItemDescriptor<TData>(data->value())};

libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,18 @@ void StreamingDataSource::StartAsync(
127127

128128
client_builder.receiver([weak_self](launchdarkly::sse::Event const& event) {
129129
if (auto self = weak_self.lock()) {
130-
self->event_handler_->HandleMessage(event.type(), event.data());
131-
// TODO: Use the result of handle message to restart the
132-
// event source if we got bad data. sc-204387
130+
auto status =
131+
self->event_handler_->HandleMessage(event.type(), event.data());
132+
if (status == DataSourceEventHandler::MessageStatus::kInvalidMessage) {
133+
// Invalid data received - restart the connection with backoff
134+
// to get a fresh stream. The backoff mechanism prevents rapid
135+
// reconnection attempts.
136+
LD_LOG(self->logger_, LogLevel::kWarn)
137+
<< "Received invalid data from stream, restarting connection";
138+
if (self->client_) {
139+
self->client_->async_restart("invalid data in stream");
140+
}
141+
}
133142
}
134143
});
135144

libs/server-sdk/tests/data_source_event_handler_test.cpp

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,3 +203,276 @@ TEST(DataSourceEventHandlerTests, HandlesDeleteSegment) {
203203

204204
ASSERT_FALSE(store->GetSegment("segmentA")->item);
205205
}
206+
207+
TEST(DataSourceEventHandlerTests, HandlesPatchWithNullDataForFlag) {
208+
auto logger = launchdarkly::logging::NullLogger();
209+
auto store = std::make_shared<MemoryStore>();
210+
DataSourceStatusManager manager;
211+
DataSourceEventHandler event_handler(*store, logger, manager);
212+
213+
// Initialize the store
214+
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");
215+
216+
// Null data should be treated as invalid, not crash the application
217+
auto res = event_handler.HandleMessage(
218+
"patch", R"({"path": "/flags/flagA", "data": null})");
219+
220+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
221+
// The error should be recorded, but we stay in Valid state after a previous successful PUT
222+
EXPECT_EQ(DataSourceStatus::DataSourceState::kValid,
223+
manager.Status().State());
224+
ASSERT_TRUE(manager.Status().LastError().has_value());
225+
EXPECT_EQ(DataSourceStatus::ErrorInfo::ErrorKind::kInvalidData,
226+
manager.Status().LastError()->Kind());
227+
}
228+
229+
TEST(DataSourceEventHandlerTests, HandlesPatchWithNullDataForSegment) {
230+
auto logger = launchdarkly::logging::NullLogger();
231+
auto store = std::make_shared<MemoryStore>();
232+
DataSourceStatusManager manager;
233+
DataSourceEventHandler event_handler(*store, logger, manager);
234+
235+
// Initialize the store
236+
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");
237+
238+
// Null data should be treated as invalid, not crash the application
239+
auto res = event_handler.HandleMessage(
240+
"patch", R"({"path": "/segments/segmentA", "data": null})");
241+
242+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
243+
// The error should be recorded, but we stay in Valid state after a previous successful PUT
244+
EXPECT_EQ(DataSourceStatus::DataSourceState::kValid,
245+
manager.Status().State());
246+
ASSERT_TRUE(manager.Status().LastError().has_value());
247+
EXPECT_EQ(DataSourceStatus::ErrorInfo::ErrorKind::kInvalidData,
248+
manager.Status().LastError()->Kind());
249+
}
250+
251+
TEST(DataSourceEventHandlerTests, HandlesPatchWithMissingDataField) {
252+
auto logger = launchdarkly::logging::NullLogger();
253+
auto store = std::make_shared<MemoryStore>();
254+
DataSourceStatusManager manager;
255+
DataSourceEventHandler event_handler(*store, logger, manager);
256+
257+
// Initialize the store
258+
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");
259+
260+
// Missing data field should also be treated as invalid
261+
auto res = event_handler.HandleMessage(
262+
"patch", R"({"path": "/flags/flagA"})");
263+
264+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
265+
}
266+
267+
TEST(DataSourceEventHandlerTests, HandlesPutWithNullData) {
268+
auto logger = launchdarkly::logging::NullLogger();
269+
auto store = std::make_shared<MemoryStore>();
270+
DataSourceStatusManager manager;
271+
DataSourceEventHandler event_handler(*store, logger, manager);
272+
273+
// PUT with null data should also be handled safely
274+
auto res = event_handler.HandleMessage(
275+
"put", R"({"path":"/", "data": null})");
276+
277+
// PUT handles this differently - it may succeed with empty data
278+
// The important thing is it doesn't crash
279+
ASSERT_TRUE(res == DataSourceEventHandler::MessageStatus::kMessageHandled ||
280+
res == DataSourceEventHandler::MessageStatus::kInvalidMessage);
281+
}
282+
283+
// Tests for wrong data types (schema validation errors)
284+
285+
TEST(DataSourceEventHandlerTests, HandlesPatchWithBooleanData) {
286+
auto logger = launchdarkly::logging::NullLogger();
287+
auto store = std::make_shared<MemoryStore>();
288+
DataSourceStatusManager manager;
289+
DataSourceEventHandler event_handler(*store, logger, manager);
290+
291+
// Initialize the store
292+
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");
293+
294+
// Boolean data instead of object should be treated as invalid
295+
auto res = event_handler.HandleMessage(
296+
"patch", R"({"path": "/flags/flagA", "data": true})");
297+
298+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
299+
}
300+
301+
TEST(DataSourceEventHandlerTests, HandlesPatchWithStringData) {
302+
auto logger = launchdarkly::logging::NullLogger();
303+
auto store = std::make_shared<MemoryStore>();
304+
DataSourceStatusManager manager;
305+
DataSourceEventHandler event_handler(*store, logger, manager);
306+
307+
// Initialize the store
308+
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");
309+
310+
// String data instead of object should be treated as invalid
311+
auto res = event_handler.HandleMessage(
312+
"patch", R"({"path": "/flags/flagA", "data": "not an object"})");
313+
314+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
315+
}
316+
317+
TEST(DataSourceEventHandlerTests, HandlesPatchWithArrayData) {
318+
auto logger = launchdarkly::logging::NullLogger();
319+
auto store = std::make_shared<MemoryStore>();
320+
DataSourceStatusManager manager;
321+
DataSourceEventHandler event_handler(*store, logger, manager);
322+
323+
// Initialize the store
324+
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");
325+
326+
// Array data instead of object should be treated as invalid
327+
auto res = event_handler.HandleMessage(
328+
"patch", R"({"path": "/flags/flagA", "data": []})");
329+
330+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
331+
}
332+
333+
TEST(DataSourceEventHandlerTests, HandlesPatchWithNumberData) {
334+
auto logger = launchdarkly::logging::NullLogger();
335+
auto store = std::make_shared<MemoryStore>();
336+
DataSourceStatusManager manager;
337+
DataSourceEventHandler event_handler(*store, logger, manager);
338+
339+
// Initialize the store
340+
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");
341+
342+
// Number data instead of object should be treated as invalid
343+
auto res = event_handler.HandleMessage(
344+
"patch", R"({"path": "/flags/flagA", "data": 42})");
345+
346+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
347+
}
348+
349+
TEST(DataSourceEventHandlerTests, HandlesDeleteWithStringVersion) {
350+
auto logger = launchdarkly::logging::NullLogger();
351+
auto store = std::make_shared<MemoryStore>();
352+
DataSourceStatusManager manager;
353+
DataSourceEventHandler event_handler(*store, logger, manager);
354+
355+
// Initialize the store
356+
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");
357+
358+
// String version instead of number should be treated as invalid
359+
auto res = event_handler.HandleMessage(
360+
"delete", R"({"path": "/flags/flagA", "version": "not a number"})");
361+
362+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
363+
}
364+
365+
TEST(DataSourceEventHandlerTests, HandlesPutWithInvalidFlagsType) {
366+
auto logger = launchdarkly::logging::NullLogger();
367+
auto store = std::make_shared<MemoryStore>();
368+
DataSourceStatusManager manager;
369+
DataSourceEventHandler event_handler(*store, logger, manager);
370+
371+
// Flags should be an object, not a boolean
372+
auto res = event_handler.HandleMessage(
373+
"put", R"({"path": "/", "data": {"flags": true, "segments": {}}})");
374+
375+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
376+
}
377+
378+
TEST(DataSourceEventHandlerTests, HandlesPutWithInvalidSegmentsType) {
379+
auto logger = launchdarkly::logging::NullLogger();
380+
auto store = std::make_shared<MemoryStore>();
381+
DataSourceStatusManager manager;
382+
DataSourceEventHandler event_handler(*store, logger, manager);
383+
384+
// Segments should be an object, not an array
385+
auto res = event_handler.HandleMessage(
386+
"put", R"({"path": "/", "data": {"flags": {}, "segments": []}})");
387+
388+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
389+
}
390+
391+
// Tests for additional malformed JSON variants
392+
393+
TEST(DataSourceEventHandlerTests, HandlesUnterminatedString) {
394+
auto logger = launchdarkly::logging::NullLogger();
395+
auto store = std::make_shared<MemoryStore>();
396+
DataSourceStatusManager manager;
397+
DataSourceEventHandler event_handler(*store, logger, manager);
398+
399+
// Unterminated string should be treated as malformed JSON
400+
auto res = event_handler.HandleMessage(
401+
"patch", R"({"path": "/flags/x", "data": "unterminated)");
402+
403+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
404+
}
405+
406+
TEST(DataSourceEventHandlerTests, HandlesTrailingComma) {
407+
auto logger = launchdarkly::logging::NullLogger();
408+
auto store = std::make_shared<MemoryStore>();
409+
DataSourceStatusManager manager;
410+
DataSourceEventHandler event_handler(*store, logger, manager);
411+
412+
// Trailing comma should be treated as malformed JSON
413+
auto res = event_handler.HandleMessage(
414+
"patch", R"({"path": "/flags/x", "data": {},})");
415+
416+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
417+
}
418+
419+
// Tests for missing required fields
420+
421+
TEST(DataSourceEventHandlerTests, HandlesDeleteWithMissingPath) {
422+
auto logger = launchdarkly::logging::NullLogger();
423+
auto store = std::make_shared<MemoryStore>();
424+
DataSourceStatusManager manager;
425+
DataSourceEventHandler event_handler(*store, logger, manager);
426+
427+
// Initialize the store
428+
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");
429+
430+
// Missing path field should be treated as invalid
431+
auto res = event_handler.HandleMessage(
432+
"delete", R"({"version": 1})");
433+
434+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
435+
}
436+
437+
TEST(DataSourceEventHandlerTests, HandlesDeleteWithMissingVersion) {
438+
auto logger = launchdarkly::logging::NullLogger();
439+
auto store = std::make_shared<MemoryStore>();
440+
DataSourceStatusManager manager;
441+
DataSourceEventHandler event_handler(*store, logger, manager);
442+
443+
// Initialize the store
444+
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");
445+
446+
// Missing version field should be treated as invalid
447+
auto res = event_handler.HandleMessage(
448+
"delete", R"({"path": "/flags/flagA"})");
449+
450+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
451+
}
452+
453+
TEST(DataSourceEventHandlerTests, HandlesPutWithMissingPath) {
454+
auto logger = launchdarkly::logging::NullLogger();
455+
auto store = std::make_shared<MemoryStore>();
456+
DataSourceStatusManager manager;
457+
DataSourceEventHandler event_handler(*store, logger, manager);
458+
459+
// Missing/empty path is treated as unrecognized (safely ignored)
460+
// This provides forward compatibility
461+
auto res = event_handler.HandleMessage(
462+
"put", R"({"data": {}})");
463+
464+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kMessageHandled, res);
465+
}
466+
467+
TEST(DataSourceEventHandlerTests, HandlesEmptyJsonObject) {
468+
auto logger = launchdarkly::logging::NullLogger();
469+
auto store = std::make_shared<MemoryStore>();
470+
DataSourceStatusManager manager;
471+
DataSourceEventHandler event_handler(*store, logger, manager);
472+
473+
// Empty JSON object with missing path is treated as unrecognized (safely ignored)
474+
// This provides forward compatibility with future event types
475+
auto res = event_handler.HandleMessage("patch", "{}");
476+
477+
ASSERT_EQ(DataSourceEventHandler::MessageStatus::kMessageHandled, res);
478+
}

libs/server-sent-events/include/launchdarkly/sse/client.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,15 @@ class Client {
207207
virtual ~Client() = default;
208208
virtual void async_connect() = 0;
209209
virtual void async_shutdown(std::function<void()> completion) = 0;
210+
211+
/**
212+
* Restart the connection with exponential backoff. This should be called
213+
* when the SDK detects invalid data from the stream and needs to
214+
* reconnect. The backoff mechanism prevents rapid reconnection attempts
215+
* that could overload the service.
216+
* @param reason A description of why the restart was triggered (for logging)
217+
*/
218+
virtual void async_restart(std::string const& reason) = 0;
210219
};
211220

212221
} // namespace launchdarkly::sse

libs/server-sent-events/src/client.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,29 @@ class FoxyClient : public Client,
161161
beast::bind_front_handler(&FoxyClient::do_run, shared_from_this()));
162162
}
163163

164+
void async_restart(std::string const& reason) override {
165+
boost::asio::post(
166+
session_->get_executor(),
167+
beast::bind_front_handler(&FoxyClient::do_restart,
168+
shared_from_this(), reason));
169+
}
170+
171+
void do_restart(std::string const& reason) {
172+
// Cancel any ongoing read operations
173+
try {
174+
if (session_->stream.is_ssl()) {
175+
session_->stream.ssl().next_layer().cancel();
176+
} else {
177+
session_->stream.plain().cancel();
178+
}
179+
} catch (boost::system::system_error const& err) {
180+
logger_("exception canceling stream during restart: " +
181+
std::string(err.what()));
182+
}
183+
// Trigger backoff and reconnect
184+
async_backoff(reason);
185+
}
186+
164187
void do_run() {
165188
session_->async_connect(
166189
host_, port_,

libs/server-sent-events/src/curl_client.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,17 @@ void CurlClient::async_connect() {
7676
[self = shared_from_this()]() { self->do_run(); });
7777
}
7878

79+
void CurlClient::async_restart(std::string const& reason) {
80+
boost::asio::post(backoff_timer_.get_executor(),
81+
[self = shared_from_this(), reason]() {
82+
// Close the socket to abort the current transfer.
83+
// CURL will detect the error and call the completion
84+
// handler, which will trigger backoff and reconnection.
85+
self->log_message("async_restart: aborting transfer due to " + reason);
86+
self->request_context_->abort_transfer();
87+
});
88+
}
89+
7990
void CurlClient::do_run() {
8091
if (request_context_->is_shutting_down()) {
8192
return;

0 commit comments

Comments
 (0)