diff --git a/src/sqs/delete-message.handler.ts b/src/sqs/delete-message.handler.ts new file mode 100644 index 0000000..f73787c --- /dev/null +++ b/src/sqs/delete-message.handler.ts @@ -0,0 +1,35 @@ +import { Injectable } from '@nestjs/common'; +import { AbstractActionHandler, AwsProperties, Format } from '../abstract-action.handler'; +import { Action } from '../action.enum'; +import * as Joi from 'joi'; +import { SqsQueue } from './sqs-queue.entity'; +import { SqsQueueEntryService } from './sqs-queue-entry.service'; + +type QueryParams = { + QueueUrl: string; + ReceiptHandle: string; +} + +@Injectable() +export class DeleteMessageHandler extends AbstractActionHandler { + + constructor( + private readonly sqsQueueEntryService: SqsQueueEntryService, + ) { + super(); + } + + audit = false; + format = Format.Xml; + action = Action.SqsDeleteMessage; + validator = Joi.object({ + QueueUrl: Joi.string().required(), + ReceiptHandle: Joi.string().required(), + }); + + protected async handle({ QueueUrl, ReceiptHandle }: QueryParams, awsProperties: AwsProperties) { + + const [accountId, name] = SqsQueue.tryGetAccountIdAndNameFromPathOrArn(QueueUrl); + await this.sqsQueueEntryService.deleteMessage(accountId, name, ReceiptHandle); + } +} diff --git a/src/sqs/receive-message.handler.ts b/src/sqs/receive-message.handler.ts index 5ae2463..4436e48 100644 --- a/src/sqs/receive-message.handler.ts +++ b/src/sqs/receive-message.handler.ts @@ -33,7 +33,7 @@ export class ReceiveMessageHandler extends AbstractActionHandler { protected async handle({ QueueUrl, MaxNumberOfMessages, VisibilityTimeout }: QueryParams, awsProperties: AwsProperties) { const [accountId, name] = SqsQueue.tryGetAccountIdAndNameFromPathOrArn(QueueUrl); - const records = await this.sqsQueueEntryService.recieveMessages(accountId, name, MaxNumberOfMessages, VisibilityTimeout); + const records = await this.sqsQueueEntryService.receiveMessages(accountId, name, MaxNumberOfMessages, VisibilityTimeout); return { Message: records.map(r => ({ MessageId: r.id, diff --git a/src/sqs/sqs-queue-entry.service.ts b/src/sqs/sqs-queue-entry.service.ts index 2c5ccad..a05b1d5 100644 --- a/src/sqs/sqs-queue-entry.service.ts +++ b/src/sqs/sqs-queue-entry.service.ts @@ -62,8 +62,33 @@ export class SqsQueueEntryService { }); } - async recieveMessages(accountId: string, queueName: string, maxNumberOfMessages = 10, visabilityTimeout = 0): Promise { + async receiveMessages(accountId: string, queueName: string, maxNumberOfMessages = 10, visabilityTimeout = 0): Promise { + const queue = await this.getQueueHelper(accountId, queueName); + + const accessDate = new Date(); + const newInFlightReleaseDate = new Date(accessDate); + newInFlightReleaseDate.setSeconds(accessDate.getSeconds() + visabilityTimeout); + const records = this.getQueueList(queue.arn).filter(e => e.inFlightReleaseDate <= accessDate).slice(0, maxNumberOfMessages - 1); + records.forEach(e => e.inFlightReleaseDate = newInFlightReleaseDate); + return records; + } + + async deleteMessage(accountId: string, queueName: string, id: string): Promise { + + const queue = await this.getQueueHelper(accountId, queueName); + + const records = this.getQueueList(queue.arn); + const loc = records.findIndex(r => r.id === id); + records.splice(loc, 1); + } + + async purge(accountId: string, queueName: string) { + const queue = await this.sqsQueueRepo.findOne({ where: { accountId, name: queueName }}); + this.queues[queue.arn] = []; + } + + private async getQueueHelper(accountId: string, queueName: string): Promise { if (!this.queueObjectCache[`${accountId}/${queueName}`] || this.queueObjectCache[`${accountId}/${queueName}`][0] < new Date()) { this.queueObjectCache[`${accountId}/${queueName}`] = [new Date(Date.now() + FIFTEEN_SECONDS), await this.sqsQueueRepo.findOne({ where: { accountId, name: queueName }})]; } @@ -74,17 +99,7 @@ export class SqsQueueEntryService { throw new BadRequestException('Queue not found'); } - const accessDate = new Date(); - const newInFlightReleaseDate = new Date(accessDate); - newInFlightReleaseDate.setSeconds(accessDate.getSeconds() + visabilityTimeout); - const records = this.getQueueList(queue.arn).filter(e => e.inFlightReleaseDate <= accessDate).slice(0, maxNumberOfMessages - 1); - records.forEach(e => e.inFlightReleaseDate = newInFlightReleaseDate); - return records; - } - - async purge(accountId: string, queueName: string) { - const queue = await this.sqsQueueRepo.findOne({ where: { accountId, name: queueName }}); - this.queues[queue.arn] = []; + return queue; } private getQueueList(arn: string): QueueEntry[] { diff --git a/src/sqs/sqs.module.ts b/src/sqs/sqs.module.ts index a2ea813..4502f18 100644 --- a/src/sqs/sqs.module.ts +++ b/src/sqs/sqs.module.ts @@ -6,6 +6,7 @@ import { AwsSharedEntitiesModule } from '../aws-shared-entities/aws-shared-entit import { DefaultActionHandlerProvider } from '../default-action-handler/default-action-handler.provider'; import { ExistingActionHandlersProvider } from '../default-action-handler/existing-action-handlers.provider'; import { CreateQueueHandler } from './create-queue.handler'; +import { DeleteMessageHandler } from './delete-message.handler'; import { DeleteQueueHandler } from './delete-queue.handler'; import { GetQueueAttributesHandler } from './get-queue-attributes.handler'; import { ListQueuesHandler } from './list-queues.handler'; @@ -18,6 +19,7 @@ import { SqsHandlers } from './sqs.constants'; const handlers = [ CreateQueueHandler, + DeleteMessageHandler, DeleteQueueHandler, GetQueueAttributesHandler, ListQueuesHandler,