Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINFICPP-2243 - In ListenHTTP process incoming request only in onTrigger #1826

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

adamdebreceni
Copy link
Contributor

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically main)?

  • Is your initial contribution a single, squashed commit?

For code changes:

  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file?
  • If applicable, have you updated the NOTICE file?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.

Comment on lines +274 to +275
size_t offset_{0};
std::optional<size_t> size_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found these a bit confusing, whose offsets are these. I realized that the network stream, so I'd rename them.

Suggested change
size_t offset_{0};
std::optional<size_t> size_;
size_t netstream_offset_{0}; // how much has been read from conn_
std::optional<size_t> netstream_size_limit_; // how much can we read from conn_

Comment on lines +264 to +268
const auto mg_read_return = mg_read(conn_, out_buffer.data(), std::min(out_buffer.size(), size_.value_or(std::numeric_limits<size_t>::max()) - offset_));
if (mg_read_return < 0) {
return io::STREAM_ERROR;
}
offset_ += gsl::narrow<size_t>(mg_read_return);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const auto mg_read_return = mg_read(conn_, out_buffer.data(), std::min(out_buffer.size(), size_.value_or(std::numeric_limits<size_t>::max()) - offset_));
if (mg_read_return < 0) {
return io::STREAM_ERROR;
}
offset_ += gsl::narrow<size_t>(mg_read_return);
const auto read_size_limit = netstream_size_limit_.value_or(std::numeric_limits<size_t>::max()) - netstream_offset_;
const auto limited_out_buf = out_buffer.subspan(0, std::min(out_buffer.size(), read_size_limit);
const auto mg_read_return = mg_read(conn_, limited_out_buf.data(), limited_out_buf.size());
if (mg_read_return < 0) {
return io::STREAM_ERROR;
}
offset_ += gsl::narrow<size_t>(mg_read_return);


namespace org::apache::nifi::minifi::processors {

class ListenHTTP : public core::Processor {
private:
static constexpr std::string_view DEFAULT_BUFFER_SIZE_STR = "20000";
static constexpr std::string_view DEFAULT_BUFFER_SIZE_STR = "5";
static const core::Relationship Self;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static const core::Relationship Self;
static constexpr core::RelationshipDefinition Self{"__self__", "Marks the FlowFile to be owned by this processor"};

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I committed this in 4ab6514. Feel free to remove it if you don't like the change.

Comment on lines +170 to +178
struct RequestValue {
std::reference_wrapper<core::ProcessSession> session;
std::promise<void> ret;
};
struct FailureValue {
FailureReason reason;
std::promise<void> ret;
};
using Request = std::promise<nonstd::expected<RequestValue, FailureValue>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have promise in promise here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants