Skip to content

Commit

Permalink
Prevent overriding of message flags with RequestOption flags (https:/…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Feb 8, 2024
1 parent 24e87c7 commit b6b036d
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 17 deletions.
30 changes: 20 additions & 10 deletions src/org/jgroups/BaseMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public BaseMessage(Address dest) {
public Message setFlag(Flag... flags) {
if(flags != null) {
short tmp=this.flags;
for(Flag flag : flags) {
for(Flag flag: flags) {
if(flag != null)
tmp|=flag.value();
}
Expand All @@ -84,14 +84,24 @@ public Message setFlag(TransientFlag... flags) {
return this;
}


public Message setFlag(short flag, boolean transient_flags) {
short tmp=transient_flags? this.transient_flags : this.flags;
tmp|=flag;
if(transient_flags)
this.transient_flags=(byte)tmp;
else
this.flags=tmp;
@Override
public Message setFlag(short flag, boolean transient_flags, boolean xor) {
if(transient_flags) {
if(xor) {
byte tmp=this.transient_flags;
this.transient_flags=(byte)(tmp | (byte)flag);
}
else
this.transient_flags=(byte)flag;
}
else {
if(xor) {
short tmp=this.flags;
this.flags=(short)(tmp | flag);
}
else
this.flags=flag;
}
return this;
}

Expand Down Expand Up @@ -133,7 +143,7 @@ public Message clearFlag(TransientFlag... flags) {
/**
* Checks if a given flag is set
* @param flag The flag
* @return Whether or not the flag is currently set
* @return Whether the flag is currently set
*/
public boolean isFlagSet(Flag flag) {
return Util.isFlagSet(flags, flag);
Expand Down
15 changes: 12 additions & 3 deletions src/org/jgroups/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,26 @@ public interface Message extends SizeStreamable, Constructable<Message> {
/** Returns a pretty-printed string of the headers */
String printHeaders();

/** Sets one or more flags */
/** Sets one or more flags (xor-ing existing flags) */
Message setFlag(Flag... flags);


/** Sets the flags as a short; this way, multiple flags can be set in one operation
* @param flag The flag to be set (as a short). Overrides existing flags (no xor-ing)
* @param transient_flags True if the flag is transient, false otherwise
*/
Message setFlag(short flag, boolean transient_flags);
default Message setFlag(short flag, boolean transient_flags) {
return setFlag(flag, transient_flags, false);
}

/** Sets the flags as a short; this way, multiple flags can be set in one operation
* @param flag The flag to be set (as a short). Overrides existing flags (no xor-ing)
* @param transient_flags True if the flag is transient, false otherwise
* @param xor When true, existing flags will be xor-ed with flag, otherwise not
*/
Message setFlag(short flag, boolean transient_flags, boolean xor);

/** Sets one or more transient flags. Transient flags are not marshalled */
/** Sets one or more transient flags (xor-ing). Transient flags are not marshalled */
Message setFlag(TransientFlag... flags);

/** Atomically sets a transient flag if not set. Returns true if the flags was set, else false (already set) */
Expand Down
9 changes: 5 additions & 4 deletions src/org/jgroups/blocks/RequestCorrelator.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public <T> void sendMulticastRequest(Collection<Address> dest_mbrs, Message msg,
: new Header(Header.REQ, 0, this.corr_id);

msg.putHeader(this.corr_id, hdr)
.setFlag(opts.flags(), false).setFlag(opts.transientFlags(), true);
.setFlag(opts.flags(), false, true)
.setFlag(opts.transientFlags(), true, true);

if(req != null) // sync
addEntry(req, hdr, false);
Expand All @@ -141,8 +142,8 @@ public <T> void sendMulticastRequest(Collection<Address> dest_mbrs, Message msg,
public <T> void sendUnicastRequest(Message msg, Request<T> req, RequestOptions opts) throws Exception {
Address dest=msg.getDest();
Header hdr=new Header(Header.REQ, 0, this.corr_id);
msg.putHeader(this.corr_id, hdr).setFlag(opts.flags(), false)
.setFlag(opts.transientFlags(), true);
msg.putHeader(this.corr_id, hdr).setFlag(opts.flags(), false, true)
.setFlag(opts.transientFlags(), true, true);

if(req != null) // sync RPC
addEntry(req, hdr, true);
Expand Down Expand Up @@ -419,7 +420,7 @@ protected void handleResponse(Message rsp, Header hdr) {


protected void sendReply(final Message req, final long req_id, Object reply, boolean is_exception) {
Message rsp=makeReply(req).setFlag(req.getFlags(false), false)
Message rsp=makeReply(req).setFlag(req.getFlags(false), false, true)
.setPayload(reply)
.clearFlag(Message.Flag.RSVP); // JGRP-1940
sendResponse(rsp, req_id, is_exception);
Expand Down

0 comments on commit b6b036d

Please sign in to comment.