Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api-service): validate recipients in event request parsing #7758

Open
wants to merge 11 commits into
base: next
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion apps/api/src/app/events/events.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
StorageHelperService,
} from '@novu/application-generic';

import { CommunityOrganizationRepository } from '@novu/dal';
import { EventsController } from './events.controller';
import { USE_CASES } from './usecases';

Expand All @@ -22,7 +23,12 @@ import { TenantModule } from '../tenant/tenant.module';
import { BridgeModule } from '../bridge';
import { SubscribersV1Module } from '../subscribers/subscribersV1.module';

const PROVIDERS = [GetNovuProviderCredentials, StorageHelperService, EventsDistributedLockService];
const PROVIDERS = [
GetNovuProviderCredentials,
StorageHelperService,
EventsDistributedLockService,
CommunityOrganizationRepository,
];

@Module({
imports: [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import { Injectable, Logger, UnprocessableEntityException } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { addBreadcrumb } from '@sentry/node';
import { randomBytes } from 'crypto';
import { merge } from 'lodash';
import { v4 as uuidv4 } from 'uuid';

import {
ExecuteBridgeRequest,
ExecuteBridgeRequestCommand,
ExecuteBridgeRequestDto,
FeatureFlagsService,
Instrument,
InstrumentUsecase,
IWorkflowDataDto,
Expand All @@ -17,28 +13,33 @@ import {
WorkflowQueueService,
} from '@novu/application-generic';
import {
CommunityOrganizationRepository,
EnvironmentEntity,
EnvironmentRepository,
NotificationTemplateEntity,
NotificationTemplateRepository,
OrganizationEntity,
TenantEntity,
TenantRepository,
UserEntity,
WorkflowOverrideEntity,
WorkflowOverrideRepository,
} from '@novu/dal';
import { DiscoverWorkflowOutput, GetActionEnum } from '@novu/framework/internal';
import {
FeatureFlagsKeysEnum,
ReservedVariablesMap,
SUBSCRIBER_ID_REGEX,
TriggerContextTypeEnum,
TriggerEventStatusEnum,
TriggerRecipient,
TriggerRecipients,
TriggerRecipientsPayload,
WorkflowOriginEnum,
} from '@novu/shared';

import { addBreadcrumb } from '@sentry/node';
import { randomBytes } from 'crypto';
import { merge } from 'lodash';
import { v4 as uuidv4 } from 'uuid';
import { ApiException } from '../../../shared/exceptions/api.exception';
import { RecipientSchema, RecipientsSchema } from '../../utils/trigger-recipient-validation';
import { VerifyPayload, VerifyPayloadCommand } from '../verify-payload';
import {
ParseEventRequestBroadcastCommand,
Expand All @@ -53,13 +54,15 @@ export class ParseEventRequest {
constructor(
private notificationTemplateRepository: NotificationTemplateRepository,
private environmentRepository: EnvironmentRepository,
private communityOrganizationRepository: CommunityOrganizationRepository,
private verifyPayload: VerifyPayload,
private storageHelperService: StorageHelperService,
private workflowQueueService: WorkflowQueueService,
private tenantRepository: TenantRepository,
private workflowOverrideRepository: WorkflowOverrideRepository,
private executeBridgeRequest: ExecuteBridgeRequest,
private logger: PinoLogger,
private featureFlagService: FeatureFlagsService,
protected moduleRef: ModuleRef
) {}

Expand All @@ -68,19 +71,35 @@ export class ParseEventRequest {
this.logger.info(command, 'TriggerEventUseCase - START');
const transactionId = command.transactionId || uuidv4();

const { environment, statelessWorkflowAllowed } = await this.isStatelessWorkflowAllowed(
command.environmentId,
command.bridgeUrl
);
const [environment, organization] = await Promise.all([
this.environmentRepository.findOne({ _id: command.environmentId }),
this.communityOrganizationRepository.findOne({ _id: command.organizationId }),
]);

if (environment && statelessWorkflowAllowed) {
if (!environment) {
throw new UnprocessableEntityException('Environment not found');
}

if (!organization) {
throw new UnprocessableEntityException('Organization not found');
}

const statelessWorkflowAllowed = this.isStatelessWorkflowAllowed(command.bridgeUrl);

if (statelessWorkflowAllowed) {
const discoveredWorkflow = await this.queryDiscoverWorkflow(command);

if (!discoveredWorkflow) {
throw new UnprocessableEntityException('workflow_not_found');
}

return await this.dispatchEventToWorkflowQueue(command, transactionId, discoveredWorkflow);
return await this.dispatchEventToWorkflowQueue({
command,
transactionId,
discoveredWorkflow,
environment,
organization,
});
}

const template = await this.getNotificationTemplateByTriggerIdentifier({
Expand Down Expand Up @@ -172,7 +191,7 @@ export class ParseEventRequest {
// eslint-disable-next-line no-param-reassign
command.payload = merge({}, defaultPayload, command.payload);

const result = await this.dispatchEventToWorkflowQueue(command, transactionId);
const result = await this.dispatchEventToWorkflowQueue({ command, transactionId, environment, organization });

return result;
}
Expand All @@ -194,15 +213,54 @@ export class ParseEventRequest {
return discover?.workflows?.find((findWorkflow) => findWorkflow.workflowId === command.identifier) || null;
}

private async dispatchEventToWorkflowQueue(
command: ParseEventRequestMulticastCommand | ParseEventRequestBroadcastCommand,
private async dispatchEventToWorkflowQueue({
command,
transactionId,
discoveredWorkflow?: DiscoverWorkflowOutput | null
) {
discoveredWorkflow,
environment,
organization,
}: {
command: ParseEventRequestMulticastCommand | ParseEventRequestBroadcastCommand;
transactionId: string;
discoveredWorkflow?: DiscoverWorkflowOutput | null;
environment?: EnvironmentEntity;
organization?: OrganizationEntity;
}) {
const commandArgs = {
...command,
};

const isDryRun = await this.featureFlagService.getFlag({
environment,
organization,
user: { _id: command.userId } as UserEntity,
key: FeatureFlagsKeysEnum.IS_SUBSCRIBER_ID_VALIDATION_DRY_RUN_ENABLED,
defaultValue: true,
});

if ('to' in commandArgs) {
const { validRecipients, invalidRecipients } = this.parseRecipients(commandArgs.to);

if (invalidRecipients.length > 0 && isDryRun) {
Logger.warn(
`[Dry run] Invalid recipients: ${invalidRecipients.map((recipient) => JSON.stringify(recipient)).join(', ')}`,
'ParseEventRequest'
);
}

if (!validRecipients && !isDryRun) {
return {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to show in the error the invalid subscribers and why they are invalid. The error must be actionable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs more work,
users will receive the invalid_recipients status when all of them are invalid
but lets say if out of 5 only 2 were invalid we need to add that in the activity feed

acknowledged: true,
status: TriggerEventStatusEnum.INVALID_RECIPIENTS,
transactionId,
};
}

if (!isDryRun && validRecipients) {
commandArgs.to = validRecipients as TriggerRecipientsPayload;
}
}

const jobData: IWorkflowDataDto = {
...commandArgs,
actor: command.actor,
Expand All @@ -223,21 +281,12 @@ export class ParseEventRequest {
};
}

private async isStatelessWorkflowAllowed(
environmentId: string,
bridgeUrl: string | undefined
): Promise<{ environment: EnvironmentEntity | null; statelessWorkflowAllowed: boolean }> {
private isStatelessWorkflowAllowed(bridgeUrl: string | undefined) {
if (!bridgeUrl) {
return { environment: null, statelessWorkflowAllowed: false };
}

const environment = await this.environmentRepository.findOne({ _id: environmentId });

if (!environment) {
throw new UnprocessableEntityException('Environment not found');
return false;
}

return { environment, statelessWorkflowAllowed: true };
return true;
}

@Instrument()
Expand Down Expand Up @@ -299,43 +348,57 @@ export class ParseEventRequest {
return reservedVariables?.map((reservedVariable) => reservedVariable.type) || [];
}

private isValidId(subscriberId: string) {
if (subscriberId.trim().match(SUBSCRIBER_ID_REGEX)) {
return subscriberId.trim();
}
}

private removeInvalidRecipients(payload: TriggerRecipientsPayload): TriggerRecipientsPayload | null {
if (!payload) return null;
/**
* Validates a single Parent item.
* @param item - The item to validate
* @param invalidValues - Array to collect invalid values
* @returns The valid item or null if invalid
*/
private validateItem(item: unknown, invalidValues: unknown[]) {
const result = RecipientSchema.safeParse(item);
if (result.success) {
return result.data;
} else {
invalidValues.push(item);

if (!Array.isArray(payload)) {
return this.filterValidRecipient(payload) as TriggerRecipientsPayload;
return null;
}

const filteredRecipients: TriggerRecipients = payload
.map((subscriber) => this.filterValidRecipient(subscriber))
.filter((subscriber): subscriber is TriggerRecipient => subscriber !== null);

return filteredRecipients.length > 0 ? filteredRecipients : null;
}

private filterValidRecipient(subscriber: TriggerRecipient): TriggerRecipient | null {
if (typeof subscriber === 'string') {
return this.isValidId(subscriber) ? subscriber : null;
/**
* Parses and validates the recipients from the given input.
*
* The input can be a single recipient or an array of recipients. Each recipient can be:
* - A string that matches the `SUBSCRIBER_ID_REGEX`
* - An object with a `subscriberId` property that matches the `SUBSCRIBER_ID_REGEX`
* - An object with a `topicKey` property that matches the `SUBSCRIBER_ID_REGEX`
*
* If the input is valid, it returns the parsed data. If the input is an array, it returns an object
* containing arrays of valid and invalid values. If the input is a single item, it returns an object
* containing the valid item and an array of invalid values.
*
* @param input - The input to parse and validate. Can be a single recipient or an array of recipients.
* @returns The object containing valid and invalid values.
*/
private parseRecipients(input: unknown) {
const invalidValues: unknown[] = [];

// Try to validate the whole input first
const parsed = RecipientsSchema.safeParse(input);
if (parsed.success) {
return { validRecipients: parsed.data, invalidRecipients: [] };
}

if (typeof subscriber === 'object' && subscriber !== null) {
if ('topicKey' in subscriber) {
return subscriber;
}

if ('subscriberId' in subscriber) {
const subscriberId = this.isValidId(subscriber.subscriberId);
// If input is an array, validate each item
if (Array.isArray(input)) {
const validValues = input.map((item) => this.validateItem(item, invalidValues)).filter(Boolean);

return subscriberId ? { ...subscriber, subscriberId } : null;
}
return { validRecipients: validValues, invalidRecipients: invalidValues };
}

return null;
// If input is a single item
const validItem = this.validateItem(input, invalidValues);

return { validRecipients: validItem, invalidRecipients: invalidValues };
}
}
8 changes: 8 additions & 0 deletions apps/api/src/app/events/utils/trigger-recipient-validation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { SUBSCRIBER_ID_REGEX } from '@novu/shared';
import { z } from 'zod';

export const subscriberIdSchema = z.string().trim().regex(SUBSCRIBER_ID_REGEX).or(z.string().trim().email());
export const subscriberObjectSchema = z.object({ subscriberId: subscriberIdSchema }).passthrough();
export const topicSchema = z.object({ topicKey: subscriberIdSchema }).passthrough();
export const RecipientSchema = z.union([subscriberIdSchema, subscriberObjectSchema, topicSchema]);
export const RecipientsSchema = z.union([RecipientSchema, z.array(RecipientSchema)]);
4 changes: 4 additions & 0 deletions apps/api/src/app/subscribers-v2/dtos/create-subscriber.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ export class CreateSubscriberRequestDto {
description: 'Unique identifier of the subscriber',
})
@IsString()
@Matches(SUBSCRIBER_ID_REGEX, {
message:
'SubscriberId must be a valid email address or a string of alphanumeric characters, hyphens, and underscores',
})
@IsDefined()
@IsNotEmpty({
message: 'SubscriberId is required',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,7 @@ import {
Matches,
ValidateNested,
} from 'class-validator';
import {
ChatProviderIdEnum,
IChannelCredentials,
PushProviderIdEnum,
SUBSCRIBER_ID_REGEX,
SubscriberCustomData,
} from '@novu/shared';
import { ChatProviderIdEnum, IChannelCredentials, PushProviderIdEnum, SubscriberCustomData } from '@novu/shared';
import { Type } from 'class-transformer';

export class ChannelCredentialsDto implements IChannelCredentials {
Expand Down
6 changes: 5 additions & 1 deletion apps/dashboard/src/components/workflow-editor/schema.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as z from 'zod';
import { type JSONSchemaDefinition, ChannelTypeEnum } from '@novu/shared';
import { type JSONSchemaDefinition, ChannelTypeEnum, SUBSCRIBER_ID_REGEX } from '@novu/shared';

export const MAX_TAG_ELEMENTS = 16;
export const MAX_TAG_LENGTH = 32;
Expand Down Expand Up @@ -51,6 +51,10 @@ export const buildDynamicFormSchema = ({
if (value.type === 'string') {
zodValue = z.string().min(1);

if (key === 'subscriberId') {
zodValue = zodValue.regex(SUBSCRIBER_ID_REGEX, "Only alphanumeric characters and !#$%&'*+-.@_ are allowed");
}

if (value.format === 'email') {
zodValue = zodValue.email();
}
Expand Down
2 changes: 1 addition & 1 deletion packages/shared/src/consts/subscriberIdRegex.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export const SUBSCRIBER_ID_REGEX = /^[a-zA-Z0-9_-]+$/;
export const SUBSCRIBER_ID_REGEX = /^(?:[a-zA-Z0-9_-]+|\S+@\S+\.\S+)$/;
1 change: 1 addition & 0 deletions packages/shared/src/types/feature-flags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export enum FeatureFlagsKeysEnum {
IS_WORKFLOW_LIMIT_ENABLED = 'IS_WORKFLOW_LIMIT_ENABLED',
IS_MAX_WORKFLOW_LIMIT_ENABLED = 'IS_MAX_WORKFLOW_LIMIT_ENABLED',
IS_INBOX_V3_ENABLED = 'IS_INBOX_V3_ENABLED',
IS_SUBSCRIBER_ID_VALIDATION_DRY_RUN_ENABLED = 'IS_SUBSCRIBER_ID_VALIDATION_DRY_RUN_ENABLED',

// Numeric flags
MAX_WORKFLOW_LIMIT_NUMBER = 'MAX_WORKFLOW_LIMIT_NUMBER',
Expand Down
Loading