Skip to content

Commit

Permalink
Email sending is now non-blocking. The blocking code for email sendin…
Browse files Browse the repository at this point in the history
…g is triggered and then immediately returned. (#1762)
  • Loading branch information
trishaanand committed Nov 18, 2020
1 parent 3e1c0a7 commit 2489754
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ public Mono<ResponseDTO<Boolean>> confirmInviteUser(@RequestBody User inviteUser
* @return List of new users who have been created/existing users who have been added to the organization.
*/
@PostMapping("/invite")
public Mono<ResponseDTO<List<User>>> inviteUser(@RequestBody InviteUsersDTO inviteUsersDTO, @RequestHeader("Origin") String originHeader) {
return service.inviteUser(inviteUsersDTO, originHeader).collectList()
public Mono<ResponseDTO<List<User>>> inviteUsers(@RequestBody InviteUsersDTO inviteUsersDTO, @RequestHeader("Origin") String originHeader) {
return service.inviteUsers(inviteUsersDTO, originHeader).collectList()
.map(users -> new ResponseDTO<>(HttpStatus.OK.value(), users, null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,30 @@ public EmailSender(JavaMailSender javaMailSender, EmailConfig emailConfig) {
REPLY_TO = makeReplyTo();
}

public Mono<String> sendMail(String to, String subject, String text, Map<String, String> params) {
return Mono
.fromSupplier(() -> {
public Mono<Boolean> sendMail(String to, String subject, String text, Map<String, String> params) {

/**
* Creating a publisher which sends email in a blocking fashion, subscribing on the bounded elastic
* scheduler and then subscribing to it so that the publisher starts emitting immediately. We do not
* wait for the blocking call of `sendMailSync` to finish. BoundedElastic scheduler would ensure that
* when the number of tasks go beyond the number of available threads, the tasks would be deferred till
* a thread becomes available without overloading the server.
*/
Mono.fromCallable(() -> {
try {
return replaceEmailTemplate(text, params);
} catch (IOException e) {
throw Exceptions.propagate(e);
}
})
// Sending email is a high cost I/O operation. Schedule the same on non-netty threads
// to implement a fire-and-forget strategy.
// CAUTION : We may run into scenarios where too many tasks have been created and queued and the master tasks have already exited with success.
.doOnNext(emailBody ->
Mono.fromRunnable(() -> sendMailSync(to, subject, emailBody))
// Scheduling using boundedElastic because the number of active tasks are capped
// and hence not allowing the background threads to grow indefinitely
.subscribeOn(Schedulers.boundedElastic())
.subscribe()
);
.doOnNext(emailBody -> {
sendMailSync(to, subject, emailBody);
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe();

// Creating a hot source which would be created, emitted, and returned immediately.
return Mono.just(Boolean.TRUE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,15 +300,11 @@ public Mono<UserRole> updateRoleForMember(String orgId, UserRole userRole, Strin
params.put("inviteUrl", originHeader);
params.put("user_role_name", userRole.getRoleName());

Mono<String> emailMono = emailSender.sendMail(user.getEmail(),
Mono<Boolean> emailMono = emailSender.sendMail(user.getEmail(),
"Appsmith: Your Role has been changed",
UPDATE_ROLE_EXISTING_USER_TEMPLATE, params);
return emailMono
.thenReturn(addedOrganization)
.onErrorResume(error -> {
log.error("Unable to send role change email to {}. Cause: ", user.getEmail(), error);
return Mono.just(addedOrganization);
});
.thenReturn(addedOrganization);
});
} else {
// If the roleName was not present, then it implies that the user is being removed from the org.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ public interface UserService extends CrudService<User, String> {

Mono<User> userCreate(User user);

Flux<User> inviteUser(InviteUsersDTO inviteUsersDTO, String originHeader);
Flux<User> inviteUsers(InviteUsersDTO inviteUsersDTO, String originHeader);
}
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ public Mono<User> userCreate(User user) {
* platform. This flow also ensures that a default organization name is created for the user. The new user is then
* given admin permissions to the default organization.
* <p>
* For new user invite flow, please {@link UserService#inviteUser(InviteUsersDTO, String)}
* For new user invite flow, please {@link UserService#inviteUsers(InviteUsersDTO, String)}
*
* @param user User object representing the user to be created/enabled.
* @return Publishes the user object, after having been saved.
Expand Down Expand Up @@ -568,7 +568,7 @@ public Mono<User> update(String id, User userUpdate) {
* @return Publishes the invited users, after being saved with the new organization ID.
*/
@Override
public Flux<User> inviteUser(InviteUsersDTO inviteUsersDTO, String originHeader) {
public Flux<User> inviteUsers(InviteUsersDTO inviteUsersDTO, String originHeader) {

if (originHeader == null || originHeader.isBlank()) {
return Flux.error(new AppsmithException(AppsmithError.INVALID_PARAMETER, FieldName.ORIGIN));
Expand Down Expand Up @@ -633,16 +633,12 @@ public Flux<User> inviteUser(InviteUsersDTO inviteUsersDTO, String originHeader)
log.debug("Going to send email to user {} informing that the user has been added to new organization {}",
existingUser.getEmail(), organization.getName());
params.put("inviteUrl", originHeader);
Mono<String> emailMono = emailSender.sendMail(existingUser.getEmail(),
Mono<Boolean> emailMono = emailSender.sendMail(existingUser.getEmail(),
"Appsmith: You have been added to a new organization",
USER_ADDED_TO_ORGANIZATION_EMAIL_TEMPLATE, params);

return emailMono
.thenReturn(existingUser)
.onErrorResume(error -> {
log.error("Unable to send invite user email to {}. Cause: ", existingUser.getEmail(), error);
return Mono.just(existingUser);
});
.thenReturn(existingUser);
})
.switchIfEmpty(createNewUserAndSendInviteEmail(username, originHeader, params));
})
Expand Down Expand Up @@ -705,15 +701,11 @@ private Mono<User> createNewUserAndSendInviteEmail(String email, String originHe
);

params.put("inviteUrl", inviteUrl);
Mono<String> emailMono = emailSender.sendMail(createdUser.getEmail(), "Invite for Appsmith", INVITE_USER_EMAIL_TEMPLATE, params);
Mono<Boolean> emailMono = emailSender.sendMail(createdUser.getEmail(), "Invite for Appsmith", INVITE_USER_EMAIL_TEMPLATE, params);

// We have sent out the emails. Just send back the saved user.
return emailMono
.thenReturn(createdUser)
.onErrorResume(error -> {
log.error("Unable to send invite user email to {}. Cause: ", createdUser.getEmail(), error);
return Mono.just(createdUser);
});
.thenReturn(createdUser);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ public void addExistingUserToOrganizationAsAdmin() {
inviteUsersDTO.setOrgId(organization1.getId());
inviteUsersDTO.setRoleName(AppsmithRole.ORGANIZATION_ADMIN.getName());

return userService.inviteUser(inviteUsersDTO, "http://localhost:8080")
return userService.inviteUsers(inviteUsersDTO, "http://localhost:8080")
.collectList();
})
.cache();
Expand Down Expand Up @@ -395,7 +395,7 @@ public void addNewUserToOrganizationAsAdmin() {
inviteUsersDTO.setOrgId(organization1.getId());
inviteUsersDTO.setRoleName(AppsmithRole.ORGANIZATION_ADMIN.getName());

return userService.inviteUser(inviteUsersDTO, "http://localhost:8080")
return userService.inviteUsers(inviteUsersDTO, "http://localhost:8080")
.collectList();
})
.cache();
Expand Down Expand Up @@ -468,7 +468,7 @@ public void addNewUserToOrganizationAsViewer() {
inviteUsersDTO.setOrgId(organization1.getId());
inviteUsersDTO.setRoleName(AppsmithRole.ORGANIZATION_VIEWER.getName());

return userService.inviteUser(inviteUsersDTO, "http://localhost:8080")
return userService.inviteUsers(inviteUsersDTO, "http://localhost:8080")
.collectList();
})
.cache();
Expand Down Expand Up @@ -857,7 +857,7 @@ public void addNewUsersBulkToOrganizationAsViewer() {
inviteUsersDTO.setOrgId(organization1.getId());
inviteUsersDTO.setRoleName(AppsmithRole.ORGANIZATION_VIEWER.getName());

return userService.inviteUser(inviteUsersDTO, "http://localhost:8080")
return userService.inviteUsers(inviteUsersDTO, "http://localhost:8080")
.collectList();
})
.cache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ public void signUpAfterBeingInvitedToAppsmithOrganization() {
inviteUsersDTO.setOrgId(organization1.getId());
inviteUsersDTO.setRoleName(AppsmithRole.ORGANIZATION_VIEWER.getName());

return userService.inviteUser(inviteUsersDTO, "http://localhost:8080")
return userService.inviteUsers(inviteUsersDTO, "http://localhost:8080")
.collectList();
}).block();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ public void setup() {
emails.add("[email protected]");
inviteUsersDTO.setUsernames(emails);
inviteUsersDTO.setRoleName(AppsmithRole.ORGANIZATION_ADMIN.getName());
userService.inviteUser(inviteUsersDTO, "http://localhost:8080").blockLast();
userService.inviteUsers(inviteUsersDTO, "http://localhost:8080").blockLast();

emails.clear();

// Invite Developer
emails.add("[email protected]");
inviteUsersDTO.setUsernames(emails);
inviteUsersDTO.setRoleName(AppsmithRole.ORGANIZATION_DEVELOPER.getName());
userService.inviteUser(inviteUsersDTO, "http://localhost:8080").blockLast();
userService.inviteUsers(inviteUsersDTO, "http://localhost:8080").blockLast();
}

@Test
Expand Down

0 comments on commit 2489754

Please sign in to comment.