Added support for sqs delete message
This commit is contained in:
parent
bdfab94810
commit
e1aaeaa90e
|
|
@ -0,0 +1,35 @@
|
|||
import { Injectable } from '@nestjs/common';
|
||||
import { AbstractActionHandler, AwsProperties, Format } from '../abstract-action.handler';
|
||||
import { Action } from '../action.enum';
|
||||
import * as Joi from 'joi';
|
||||
import { SqsQueue } from './sqs-queue.entity';
|
||||
import { SqsQueueEntryService } from './sqs-queue-entry.service';
|
||||
|
||||
type QueryParams = {
|
||||
QueueUrl: string;
|
||||
ReceiptHandle: string;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class DeleteMessageHandler extends AbstractActionHandler<QueryParams> {
|
||||
|
||||
constructor(
|
||||
private readonly sqsQueueEntryService: SqsQueueEntryService,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
audit = false;
|
||||
format = Format.Xml;
|
||||
action = Action.SqsDeleteMessage;
|
||||
validator = Joi.object<QueryParams, true>({
|
||||
QueueUrl: Joi.string().required(),
|
||||
ReceiptHandle: Joi.string().required(),
|
||||
});
|
||||
|
||||
protected async handle({ QueueUrl, ReceiptHandle }: QueryParams, awsProperties: AwsProperties) {
|
||||
|
||||
const [accountId, name] = SqsQueue.tryGetAccountIdAndNameFromPathOrArn(QueueUrl);
|
||||
await this.sqsQueueEntryService.deleteMessage(accountId, name, ReceiptHandle);
|
||||
}
|
||||
}
|
||||
|
|
@ -33,7 +33,7 @@ export class ReceiveMessageHandler extends AbstractActionHandler<QueryParams> {
|
|||
protected async handle({ QueueUrl, MaxNumberOfMessages, VisibilityTimeout }: QueryParams, awsProperties: AwsProperties) {
|
||||
|
||||
const [accountId, name] = SqsQueue.tryGetAccountIdAndNameFromPathOrArn(QueueUrl);
|
||||
const records = await this.sqsQueueEntryService.recieveMessages(accountId, name, MaxNumberOfMessages, VisibilityTimeout);
|
||||
const records = await this.sqsQueueEntryService.receiveMessages(accountId, name, MaxNumberOfMessages, VisibilityTimeout);
|
||||
return {
|
||||
Message: records.map(r => ({
|
||||
MessageId: r.id,
|
||||
|
|
|
|||
|
|
@ -62,8 +62,33 @@ export class SqsQueueEntryService {
|
|||
});
|
||||
}
|
||||
|
||||
async recieveMessages(accountId: string, queueName: string, maxNumberOfMessages = 10, visabilityTimeout = 0): Promise<QueueEntry[]> {
|
||||
async receiveMessages(accountId: string, queueName: string, maxNumberOfMessages = 10, visabilityTimeout = 0): Promise<QueueEntry[]> {
|
||||
|
||||
const queue = await this.getQueueHelper(accountId, queueName);
|
||||
|
||||
const accessDate = new Date();
|
||||
const newInFlightReleaseDate = new Date(accessDate);
|
||||
newInFlightReleaseDate.setSeconds(accessDate.getSeconds() + visabilityTimeout);
|
||||
const records = this.getQueueList(queue.arn).filter(e => e.inFlightReleaseDate <= accessDate).slice(0, maxNumberOfMessages - 1);
|
||||
records.forEach(e => e.inFlightReleaseDate = newInFlightReleaseDate);
|
||||
return records;
|
||||
}
|
||||
|
||||
async deleteMessage(accountId: string, queueName: string, id: string): Promise<void> {
|
||||
|
||||
const queue = await this.getQueueHelper(accountId, queueName);
|
||||
|
||||
const records = this.getQueueList(queue.arn);
|
||||
const loc = records.findIndex(r => r.id === id);
|
||||
records.splice(loc, 1);
|
||||
}
|
||||
|
||||
async purge(accountId: string, queueName: string) {
|
||||
const queue = await this.sqsQueueRepo.findOne({ where: { accountId, name: queueName }});
|
||||
this.queues[queue.arn] = [];
|
||||
}
|
||||
|
||||
private async getQueueHelper(accountId: string, queueName: string): Promise<SqsQueue> {
|
||||
if (!this.queueObjectCache[`${accountId}/${queueName}`] || this.queueObjectCache[`${accountId}/${queueName}`][0] < new Date()) {
|
||||
this.queueObjectCache[`${accountId}/${queueName}`] = [new Date(Date.now() + FIFTEEN_SECONDS), await this.sqsQueueRepo.findOne({ where: { accountId, name: queueName }})];
|
||||
}
|
||||
|
|
@ -74,17 +99,7 @@ export class SqsQueueEntryService {
|
|||
throw new BadRequestException('Queue not found');
|
||||
}
|
||||
|
||||
const accessDate = new Date();
|
||||
const newInFlightReleaseDate = new Date(accessDate);
|
||||
newInFlightReleaseDate.setSeconds(accessDate.getSeconds() + visabilityTimeout);
|
||||
const records = this.getQueueList(queue.arn).filter(e => e.inFlightReleaseDate <= accessDate).slice(0, maxNumberOfMessages - 1);
|
||||
records.forEach(e => e.inFlightReleaseDate = newInFlightReleaseDate);
|
||||
return records;
|
||||
}
|
||||
|
||||
async purge(accountId: string, queueName: string) {
|
||||
const queue = await this.sqsQueueRepo.findOne({ where: { accountId, name: queueName }});
|
||||
this.queues[queue.arn] = [];
|
||||
return queue;
|
||||
}
|
||||
|
||||
private getQueueList(arn: string): QueueEntry[] {
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import { AwsSharedEntitiesModule } from '../aws-shared-entities/aws-shared-entit
|
|||
import { DefaultActionHandlerProvider } from '../default-action-handler/default-action-handler.provider';
|
||||
import { ExistingActionHandlersProvider } from '../default-action-handler/existing-action-handlers.provider';
|
||||
import { CreateQueueHandler } from './create-queue.handler';
|
||||
import { DeleteMessageHandler } from './delete-message.handler';
|
||||
import { DeleteQueueHandler } from './delete-queue.handler';
|
||||
import { GetQueueAttributesHandler } from './get-queue-attributes.handler';
|
||||
import { ListQueuesHandler } from './list-queues.handler';
|
||||
|
|
@ -18,6 +19,7 @@ import { SqsHandlers } from './sqs.constants';
|
|||
|
||||
const handlers = [
|
||||
CreateQueueHandler,
|
||||
DeleteMessageHandler,
|
||||
DeleteQueueHandler,
|
||||
GetQueueAttributesHandler,
|
||||
ListQueuesHandler,
|
||||
|
|
|
|||
Loading…
Reference in New Issue