diff --git a/apps/api/src/app/events/events.module.ts b/apps/api/src/app/events/events.module.ts index d195f9f4489..059bca82723 100644 --- a/apps/api/src/app/events/events.module.ts +++ b/apps/api/src/app/events/events.module.ts @@ -7,6 +7,7 @@ import { StorageHelperService, } from '@novu/application-generic'; +import { CommunityOrganizationRepository } from '@novu/dal'; import { EventsController } from './events.controller'; import { USE_CASES } from './usecases'; @@ -22,7 +23,12 @@ import { TenantModule } from '../tenant/tenant.module'; import { BridgeModule } from '../bridge'; import { SubscribersV1Module } from '../subscribers/subscribersV1.module'; -const PROVIDERS = [GetNovuProviderCredentials, StorageHelperService, EventsDistributedLockService]; +const PROVIDERS = [ + GetNovuProviderCredentials, + StorageHelperService, + EventsDistributedLockService, + CommunityOrganizationRepository, +]; @Module({ imports: [ diff --git a/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts b/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts index ebdc022faff..0f941640649 100644 --- a/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts +++ b/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts @@ -1,14 +1,10 @@ import { Injectable, Logger, UnprocessableEntityException } from '@nestjs/common'; import { ModuleRef } from '@nestjs/core'; -import { addBreadcrumb } from '@sentry/node'; -import { randomBytes } from 'crypto'; -import { merge } from 'lodash'; -import { v4 as uuidv4 } from 'uuid'; - import { ExecuteBridgeRequest, ExecuteBridgeRequestCommand, ExecuteBridgeRequestDto, + FeatureFlagsService, Instrument, InstrumentUsecase, IWorkflowDataDto, @@ -17,28 +13,33 @@ import { WorkflowQueueService, } from '@novu/application-generic'; import { + CommunityOrganizationRepository, EnvironmentEntity, EnvironmentRepository, NotificationTemplateEntity, NotificationTemplateRepository, + OrganizationEntity, TenantEntity, TenantRepository, + UserEntity, WorkflowOverrideEntity, WorkflowOverrideRepository, } from '@novu/dal'; import { DiscoverWorkflowOutput, GetActionEnum } from '@novu/framework/internal'; import { + FeatureFlagsKeysEnum, ReservedVariablesMap, - SUBSCRIBER_ID_REGEX, TriggerContextTypeEnum, TriggerEventStatusEnum, - TriggerRecipient, - TriggerRecipients, TriggerRecipientsPayload, WorkflowOriginEnum, } from '@novu/shared'; - +import { addBreadcrumb } from '@sentry/node'; +import { randomBytes } from 'crypto'; +import { merge } from 'lodash'; +import { v4 as uuidv4 } from 'uuid'; import { ApiException } from '../../../shared/exceptions/api.exception'; +import { RecipientSchema, RecipientsSchema } from '../../utils/trigger-recipient-validation'; import { VerifyPayload, VerifyPayloadCommand } from '../verify-payload'; import { ParseEventRequestBroadcastCommand, @@ -53,6 +54,7 @@ export class ParseEventRequest { constructor( private notificationTemplateRepository: NotificationTemplateRepository, private environmentRepository: EnvironmentRepository, + private communityOrganizationRepository: CommunityOrganizationRepository, private verifyPayload: VerifyPayload, private storageHelperService: StorageHelperService, private workflowQueueService: WorkflowQueueService, @@ -60,6 +62,7 @@ export class ParseEventRequest { private workflowOverrideRepository: WorkflowOverrideRepository, private executeBridgeRequest: ExecuteBridgeRequest, private logger: PinoLogger, + private featureFlagService: FeatureFlagsService, protected moduleRef: ModuleRef ) {} @@ -68,19 +71,35 @@ export class ParseEventRequest { this.logger.info(command, 'TriggerEventUseCase - START'); const transactionId = command.transactionId || uuidv4(); - const { environment, statelessWorkflowAllowed } = await this.isStatelessWorkflowAllowed( - command.environmentId, - command.bridgeUrl - ); + const [environment, organization] = await Promise.all([ + this.environmentRepository.findOne({ _id: command.environmentId }), + this.communityOrganizationRepository.findOne({ _id: command.organizationId }), + ]); - if (environment && statelessWorkflowAllowed) { + if (!environment) { + throw new UnprocessableEntityException('Environment not found'); + } + + if (!organization) { + throw new UnprocessableEntityException('Organization not found'); + } + + const statelessWorkflowAllowed = this.isStatelessWorkflowAllowed(command.bridgeUrl); + + if (statelessWorkflowAllowed) { const discoveredWorkflow = await this.queryDiscoverWorkflow(command); if (!discoveredWorkflow) { throw new UnprocessableEntityException('workflow_not_found'); } - return await this.dispatchEventToWorkflowQueue(command, transactionId, discoveredWorkflow); + return await this.dispatchEventToWorkflowQueue({ + command, + transactionId, + discoveredWorkflow, + environment, + organization, + }); } const template = await this.getNotificationTemplateByTriggerIdentifier({ @@ -172,7 +191,7 @@ export class ParseEventRequest { // eslint-disable-next-line no-param-reassign command.payload = merge({}, defaultPayload, command.payload); - const result = await this.dispatchEventToWorkflowQueue(command, transactionId); + const result = await this.dispatchEventToWorkflowQueue({ command, transactionId, environment, organization }); return result; } @@ -194,15 +213,54 @@ export class ParseEventRequest { return discover?.workflows?.find((findWorkflow) => findWorkflow.workflowId === command.identifier) || null; } - private async dispatchEventToWorkflowQueue( - command: ParseEventRequestMulticastCommand | ParseEventRequestBroadcastCommand, + private async dispatchEventToWorkflowQueue({ + command, transactionId, - discoveredWorkflow?: DiscoverWorkflowOutput | null - ) { + discoveredWorkflow, + environment, + organization, + }: { + command: ParseEventRequestMulticastCommand | ParseEventRequestBroadcastCommand; + transactionId: string; + discoveredWorkflow?: DiscoverWorkflowOutput | null; + environment?: EnvironmentEntity; + organization?: OrganizationEntity; + }) { const commandArgs = { ...command, }; + const isDryRun = await this.featureFlagService.getFlag({ + environment, + organization, + user: { _id: command.userId } as UserEntity, + key: FeatureFlagsKeysEnum.IS_SUBSCRIBER_ID_VALIDATION_DRY_RUN_ENABLED, + defaultValue: true, + }); + + if ('to' in commandArgs) { + const { validRecipients, invalidRecipients } = this.parseRecipients(commandArgs.to); + + if (invalidRecipients.length > 0 && isDryRun) { + Logger.warn( + `[Dry run] Invalid recipients: ${invalidRecipients.map((recipient) => JSON.stringify(recipient)).join(', ')}`, + 'ParseEventRequest' + ); + } + + if (!validRecipients && !isDryRun) { + return { + acknowledged: true, + status: TriggerEventStatusEnum.INVALID_RECIPIENTS, + transactionId, + }; + } + + if (!isDryRun && validRecipients) { + commandArgs.to = validRecipients as TriggerRecipientsPayload; + } + } + const jobData: IWorkflowDataDto = { ...commandArgs, actor: command.actor, @@ -223,21 +281,12 @@ export class ParseEventRequest { }; } - private async isStatelessWorkflowAllowed( - environmentId: string, - bridgeUrl: string | undefined - ): Promise<{ environment: EnvironmentEntity | null; statelessWorkflowAllowed: boolean }> { + private isStatelessWorkflowAllowed(bridgeUrl: string | undefined) { if (!bridgeUrl) { - return { environment: null, statelessWorkflowAllowed: false }; - } - - const environment = await this.environmentRepository.findOne({ _id: environmentId }); - - if (!environment) { - throw new UnprocessableEntityException('Environment not found'); + return false; } - return { environment, statelessWorkflowAllowed: true }; + return true; } @Instrument() @@ -299,43 +348,57 @@ export class ParseEventRequest { return reservedVariables?.map((reservedVariable) => reservedVariable.type) || []; } - private isValidId(subscriberId: string) { - if (subscriberId.trim().match(SUBSCRIBER_ID_REGEX)) { - return subscriberId.trim(); - } - } - - private removeInvalidRecipients(payload: TriggerRecipientsPayload): TriggerRecipientsPayload | null { - if (!payload) return null; + /** + * Validates a single Parent item. + * @param item - The item to validate + * @param invalidValues - Array to collect invalid values + * @returns The valid item or null if invalid + */ + private validateItem(item: unknown, invalidValues: unknown[]) { + const result = RecipientSchema.safeParse(item); + if (result.success) { + return result.data; + } else { + invalidValues.push(item); - if (!Array.isArray(payload)) { - return this.filterValidRecipient(payload) as TriggerRecipientsPayload; + return null; } - - const filteredRecipients: TriggerRecipients = payload - .map((subscriber) => this.filterValidRecipient(subscriber)) - .filter((subscriber): subscriber is TriggerRecipient => subscriber !== null); - - return filteredRecipients.length > 0 ? filteredRecipients : null; } - private filterValidRecipient(subscriber: TriggerRecipient): TriggerRecipient | null { - if (typeof subscriber === 'string') { - return this.isValidId(subscriber) ? subscriber : null; + /** + * Parses and validates the recipients from the given input. + * + * The input can be a single recipient or an array of recipients. Each recipient can be: + * - A string that matches the `SUBSCRIBER_ID_REGEX` + * - An object with a `subscriberId` property that matches the `SUBSCRIBER_ID_REGEX` + * - An object with a `topicKey` property that matches the `SUBSCRIBER_ID_REGEX` + * + * If the input is valid, it returns the parsed data. If the input is an array, it returns an object + * containing arrays of valid and invalid values. If the input is a single item, it returns an object + * containing the valid item and an array of invalid values. + * + * @param input - The input to parse and validate. Can be a single recipient or an array of recipients. + * @returns The object containing valid and invalid values. + */ + private parseRecipients(input: unknown) { + const invalidValues: unknown[] = []; + + // Try to validate the whole input first + const parsed = RecipientsSchema.safeParse(input); + if (parsed.success) { + return { validRecipients: parsed.data, invalidRecipients: [] }; } - if (typeof subscriber === 'object' && subscriber !== null) { - if ('topicKey' in subscriber) { - return subscriber; - } - - if ('subscriberId' in subscriber) { - const subscriberId = this.isValidId(subscriber.subscriberId); + // If input is an array, validate each item + if (Array.isArray(input)) { + const validValues = input.map((item) => this.validateItem(item, invalidValues)).filter(Boolean); - return subscriberId ? { ...subscriber, subscriberId } : null; - } + return { validRecipients: validValues, invalidRecipients: invalidValues }; } - return null; + // If input is a single item + const validItem = this.validateItem(input, invalidValues); + + return { validRecipients: validItem, invalidRecipients: invalidValues }; } } diff --git a/apps/api/src/app/events/utils/trigger-recipient-validation.ts b/apps/api/src/app/events/utils/trigger-recipient-validation.ts new file mode 100644 index 00000000000..cc429b4cd90 --- /dev/null +++ b/apps/api/src/app/events/utils/trigger-recipient-validation.ts @@ -0,0 +1,8 @@ +import { SUBSCRIBER_ID_REGEX } from '@novu/shared'; +import { z } from 'zod'; + +export const subscriberIdSchema = z.string().trim().regex(SUBSCRIBER_ID_REGEX).or(z.string().trim().email()); +export const subscriberObjectSchema = z.object({ subscriberId: subscriberIdSchema }).passthrough(); +export const topicSchema = z.object({ topicKey: subscriberIdSchema }).passthrough(); +export const RecipientSchema = z.union([subscriberIdSchema, subscriberObjectSchema, topicSchema]); +export const RecipientsSchema = z.union([RecipientSchema, z.array(RecipientSchema)]); diff --git a/apps/api/src/app/subscribers-v2/dtos/create-subscriber.dto.ts b/apps/api/src/app/subscribers-v2/dtos/create-subscriber.dto.ts index c7a4164edcf..00fef34a270 100644 --- a/apps/api/src/app/subscribers-v2/dtos/create-subscriber.dto.ts +++ b/apps/api/src/app/subscribers-v2/dtos/create-subscriber.dto.ts @@ -20,6 +20,10 @@ export class CreateSubscriberRequestDto { description: 'Unique identifier of the subscriber', }) @IsString() + @Matches(SUBSCRIBER_ID_REGEX, { + message: + 'SubscriberId must be a valid email address or a string of alphanumeric characters, hyphens, and underscores', + }) @IsDefined() @IsNotEmpty({ message: 'SubscriberId is required', diff --git a/apps/api/src/app/subscribers/dtos/create-subscriber-request.dto.ts b/apps/api/src/app/subscribers/dtos/create-subscriber-request.dto.ts index 9655e70cdcc..5df238912f9 100644 --- a/apps/api/src/app/subscribers/dtos/create-subscriber-request.dto.ts +++ b/apps/api/src/app/subscribers/dtos/create-subscriber-request.dto.ts @@ -13,13 +13,7 @@ import { Matches, ValidateNested, } from 'class-validator'; -import { - ChatProviderIdEnum, - IChannelCredentials, - PushProviderIdEnum, - SUBSCRIBER_ID_REGEX, - SubscriberCustomData, -} from '@novu/shared'; +import { ChatProviderIdEnum, IChannelCredentials, PushProviderIdEnum, SubscriberCustomData } from '@novu/shared'; import { Type } from 'class-transformer'; export class ChannelCredentialsDto implements IChannelCredentials { diff --git a/apps/dashboard/src/components/workflow-editor/schema.ts b/apps/dashboard/src/components/workflow-editor/schema.ts index 16fce9dc8d3..03782e31f07 100644 --- a/apps/dashboard/src/components/workflow-editor/schema.ts +++ b/apps/dashboard/src/components/workflow-editor/schema.ts @@ -1,5 +1,5 @@ import * as z from 'zod'; -import { type JSONSchemaDefinition, ChannelTypeEnum } from '@novu/shared'; +import { type JSONSchemaDefinition, ChannelTypeEnum, SUBSCRIBER_ID_REGEX } from '@novu/shared'; export const MAX_TAG_ELEMENTS = 16; export const MAX_TAG_LENGTH = 32; @@ -51,6 +51,10 @@ export const buildDynamicFormSchema = ({ if (value.type === 'string') { zodValue = z.string().min(1); + if (key === 'subscriberId') { + zodValue = zodValue.regex(SUBSCRIBER_ID_REGEX, "Only alphanumeric characters and !#$%&'*+-.@_ are allowed"); + } + if (value.format === 'email') { zodValue = zodValue.email(); } diff --git a/packages/shared/src/consts/subscriberIdRegex.ts b/packages/shared/src/consts/subscriberIdRegex.ts index 328f3e19137..0d9d70a2cf4 100644 --- a/packages/shared/src/consts/subscriberIdRegex.ts +++ b/packages/shared/src/consts/subscriberIdRegex.ts @@ -1 +1 @@ -export const SUBSCRIBER_ID_REGEX = /^[a-zA-Z0-9_-]+$/; +export const SUBSCRIBER_ID_REGEX = /^(?:[a-zA-Z0-9_-]+|\S+@\S+\.\S+)$/; diff --git a/packages/shared/src/types/feature-flags.ts b/packages/shared/src/types/feature-flags.ts index 46bbc555f73..3eb32615717 100644 --- a/packages/shared/src/types/feature-flags.ts +++ b/packages/shared/src/types/feature-flags.ts @@ -49,6 +49,7 @@ export enum FeatureFlagsKeysEnum { IS_WORKFLOW_LIMIT_ENABLED = 'IS_WORKFLOW_LIMIT_ENABLED', IS_MAX_WORKFLOW_LIMIT_ENABLED = 'IS_MAX_WORKFLOW_LIMIT_ENABLED', IS_INBOX_V3_ENABLED = 'IS_INBOX_V3_ENABLED', + IS_SUBSCRIBER_ID_VALIDATION_DRY_RUN_ENABLED = 'IS_SUBSCRIBER_ID_VALIDATION_DRY_RUN_ENABLED', // Numeric flags MAX_WORKFLOW_LIMIT_NUMBER = 'MAX_WORKFLOW_LIMIT_NUMBER',