Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-45394: [C++] Handle Single-Line JSON Without Line Ending #45443

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -760,3 +760,4 @@ config_summary_message()
if(${ARROW_BUILD_CONFIG_SUMMARY_JSON})
config_summary_json()
endif()

63 changes: 54 additions & 9 deletions cpp/src/arrow/dataset/file_json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "arrow/dataset/file_json.h"

#include <algorithm>
#include <iostream>
#include <unordered_set>
#include <vector>

Expand Down Expand Up @@ -108,16 +109,43 @@ class JsonFragmentScanner : public FragmentScanner {
parse_options.unexpected_field_behavior = json::UnexpectedFieldBehavior::Ignore;

int64_t block_size = format_options.read_options.block_size;
if (block_size <= 0) {
return Status::Invalid("Block size must be positive");
}

auto num_batches =
static_cast<int>(bit_util::CeilDiv(inspected.num_bytes, block_size));
if (num_batches < 0) {
return Status::Invalid("Number of batches calculation overflowed");
}

auto future = json::StreamingReader::MakeAsync(
inspected.stream, format_options.read_options, parse_options,
io::default_io_context(), cpu_executor);
return future.Then([num_batches, block_size](const ReaderPtr& reader)
-> Result<std::shared_ptr<FragmentScanner>> {
return std::make_shared<JsonFragmentScanner>(reader, num_batches, block_size);
});

auto future = json::(
inspected.stream, format_options.read_options, parse_options,
io::default_io_context(), cpu_executor);

// ✅ Fix: Handle Single-Line JSON Case
return future.Then([num_batches, block_size](const ReaderPtr& reader)
-> Result<std::shared_ptr<FragmentScanner>> {
if (!reader) {
return Status::Invalid("Failed to create JSON Streaming Reader.");
}

// Check if the input stream has only one JSON object and no newline

auto stream_data = inspected.stream->Read();
if (stream_data.ok()) {
std::string json_content = stream_data->ToString();
if (!json_content.empty() && json_content.back() != '\n') {
json_content += '\n'; // Append a newline to fix the issue
}

// Create a new InputStream with fixed content
inspected.stream = std::make_shared<io::BufferReader>(Buffer::FromString(json_content));
}

return std::make_shared<JsonFragmentScanner>(reader, num_batches, block_size);
// Check if the input stream has only one JSON object and no newline
}

private:
Expand Down Expand Up @@ -298,8 +326,25 @@ Result<Future<ReaderPtr>> DoOpenReader(
json::UnexpectedFieldBehavior::Ignore;
}
return json::StreamingReader::MakeAsync(
std::move(state->stream), state->read_options, state->parse_options);
});
std::move(state->stream), state->read_options, state->parse_options)
.Then([](const ReaderPtr& reader) -> Result<ReaderPtr> {
auto stream_data = reader->stream->Read();
if (!stream_data.ok()) {
return Status::IOError("Failed to read from input stream");
}

std::string json_content = stream_data->ToString();
if (!json_content.empty() && json_content.back() != '\n') {
json_content += '\n'; // Append a newline to fix the issue
}

// Create a new InputStream with fixed content
reader->stream = std::make_shared<io::BufferReader>(Buffer::FromString(json_content));

return reader;
}, [path = source.path()](const Status& error) -> Result<ReaderPtr> {
return error.WithMessage("Could not open JSON input source '", path, "': ", error);
});
ARROW_ASSIGN_OR_RAISE(auto future, maybe_future);
return future.Then([](const ReaderPtr& reader) -> Result<ReaderPtr> { return reader; },
[path = source.path()](const Status& error) -> Result<ReaderPtr> {
Expand Down