import { BadRequestException, Injectable } from '@nestjs/common'; import * as Joi from 'joi'; import * as uuid from 'uuid'; import { PrismaService } from '../_prisma/prisma.service'; import { AbstractActionHandler, AwsProperties, Format } from '../abstract-action.handler'; import { Action } from '../action.enum'; import { AttributesService } from '../aws-shared-entities/attributes.service'; import { SqsQueueEntryService } from '../sqs/sqs-queue-entry.service'; import { SqsQueue } from '../sqs/sqs-queue.entity'; import { ArnUtil } from '../util/arn-util.static'; type QueryParams = { TopicArn: string; TargetArn: string; Subject?: string; Message: string; } @Injectable() export class PublishHandler extends AbstractActionHandler { constructor( private readonly prismaService: PrismaService, private readonly sqsQueueEntryService: SqsQueueEntryService, private readonly attributeService: AttributesService, ) { super(); } format = Format.Xml; action = Action.SnsPublish; validator = Joi.object({ TargetArn: Joi.string(), TopicArn: Joi.string(), Subject: Joi.string().allow(null, ''), Message: Joi.string().required(), }); protected async handle({ TopicArn, TargetArn, Message, Subject }: QueryParams, awsProperties: AwsProperties) { const arn = TopicArn ?? TargetArn; if (!arn) { throw new BadRequestException(); } const MessageId = uuid.v4(); const subscriptions = await this.prismaService.snsTopicSubscription.findMany({ where: { topicArn: arn } }); const topicAttributes = await this.attributeService.getByArn(arn); for (const sub of subscriptions) { const subArn = ArnUtil.fromTopicSub(sub); const attributes = await this.attributeService.getByArn(subArn); if (sub.protocol === 'sqs' && sub.endpoint) { const { value: isRaw } = attributes.find(a => a.name === 'RawMessageDelivery') ?? {}; const [queueAccountId, queueName] = SqsQueue.tryGetAccountIdAndNameFromPathOrArn(sub.endpoint); const message = isRaw === 'true' ? Message : JSON.stringify({ Type: "Notification", MessageId, TopicArn: arn, Subject, Message, Timestamp: new Date().toISOString(), SignatureVersion: topicAttributes.find(a => a.name === 'SignatureVersion')?.value ?? '1', Signature: '', SigningCertURL: '', UnsubscribeURL: `${awsProperties.host}/?Action=Unsubscribe&SubscriptionArn=${subArn}`, }); await this.sqsQueueEntryService.publish(queueAccountId, queueName, message); } } return { MessageId }; } }