Fixed issue #2: [SQS or Terraform] Queues are not being created
This commit is contained in:
parent
e1db34e7c1
commit
8389db4367
|
|
@ -1,6 +1,6 @@
|
|||
import { Injectable } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Repository } from 'typeorm';
|
||||
import { In, Repository } from 'typeorm';
|
||||
import { Attribute } from './attributes.entity';
|
||||
import { CreateAttributeDto } from './create-attribute.dto';
|
||||
|
||||
|
|
@ -22,6 +22,14 @@ export class AttributesService {
|
|||
return await this.repo.findOne({ where: { arn, name: ResourcePolicyName }});
|
||||
}
|
||||
|
||||
async getByArnAndName(arn: string, name: string): Promise<Attribute> {
|
||||
return await this.repo.findOne({ where: { arn, name }});
|
||||
}
|
||||
|
||||
async getByArnAndNames(arn: string, names: string[]): Promise<Attribute[]> {
|
||||
return await this.repo.find({ where: { arn, name: In(names) }});
|
||||
}
|
||||
|
||||
async createResourcePolicy(arn: string, value: string): Promise<Attribute> {
|
||||
return await this.create({arn, value, name: ResourcePolicyName });
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import { CommonConfig } from "./common-config.interface";
|
||||
|
||||
export default (): CommonConfig => ({
|
||||
AWS_ACCOUNT_ID: '123456789012',
|
||||
AWS_ACCOUNT_ID: '000000000000',
|
||||
AWS_REGION: 'us-east-1',
|
||||
// DB_DATABASE: ':memory:',
|
||||
DB_DATABASE: 'local-aws.sqlite',
|
||||
|
|
|
|||
|
|
@ -1,10 +1,7 @@
|
|||
import { Injectable } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Repository } from 'typeorm';
|
||||
import { AbstractActionHandler, AwsProperties, Format } from '../abstract-action.handler';
|
||||
import { Action } from '../action.enum';
|
||||
import * as Joi from 'joi';
|
||||
import { Secret } from './secret.entity';
|
||||
import { SecretService } from './secret.service';
|
||||
|
||||
type QueryParams = {
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ import { AbstractActionHandler, AwsProperties, Format } from '../abstract-action
|
|||
import { Action } from '../action.enum';
|
||||
import * as Joi from 'joi';
|
||||
import { Secret } from './secret.entity';
|
||||
import { TagsService } from '../aws-shared-entities/tags.service';
|
||||
import { AttributesService } from '../aws-shared-entities/attributes.service';
|
||||
|
||||
type QueryParams = {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
import { Injectable } from '@nestjs/common';
|
||||
import { BadRequestException, Injectable } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Repository } from 'typeorm';
|
||||
import { AbstractActionHandler, AwsProperties, Format } from '../abstract-action.handler';
|
||||
|
|
@ -33,6 +33,11 @@ export class GetTopicAttributesHandler extends AbstractActionHandler {
|
|||
|
||||
const name = TopicArn.split(':').pop();
|
||||
const topic = await this.snsTopicRepo.findOne({ where: { name }});
|
||||
|
||||
if (!topic) {
|
||||
throw new BadRequestException();
|
||||
}
|
||||
|
||||
const attributes = await this.attributeService.getByArn(TopicArn);
|
||||
const attributeMap = attributes.reduce((m, a) => {
|
||||
m[a.name] = a.value;
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
import { Injectable } from '@nestjs/common';
|
||||
import { BadRequestException, Injectable } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Repository } from 'typeorm';
|
||||
import { AbstractActionHandler, AwsProperties, Format } from '../abstract-action.handler';
|
||||
|
|
@ -8,6 +8,8 @@ import { TagsService } from '../aws-shared-entities/tags.service';
|
|||
import { AttributesService } from '../aws-shared-entities/attributes.service';
|
||||
import { SnsTopicSubscription } from './sns-topic-subscription.entity';
|
||||
import * as uuid from 'uuid';
|
||||
import { SqsQueueEntryService } from '../sqs/sqs-queue-entry.service';
|
||||
import { SqsQueue } from '../sqs/sqs-queue.entity';
|
||||
|
||||
type QueryParams = {
|
||||
TopicArn: string;
|
||||
|
|
@ -23,6 +25,7 @@ export class SubscribeHandler extends AbstractActionHandler<QueryParams> {
|
|||
private readonly snsTopicSubscription: Repository<SnsTopicSubscription>,
|
||||
private readonly tagsService: TagsService,
|
||||
private readonly attributeService: AttributesService,
|
||||
private readonly sqsQueueEntryService: SqsQueueEntryService,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ export class CreateQueueHandler extends AbstractActionHandler<QueryParams> {
|
|||
const tags = TagsService.tagPairs(params);
|
||||
await this.tagsService.createMany(queue.arn, tags);
|
||||
|
||||
const attributes = AttributesService.attributePairs(params);
|
||||
const attributes = SqsQueue.attributePairs(params);
|
||||
await this.attributeService.createMany(queue.arn, attributes);
|
||||
|
||||
return { QueueUrl: queue.getUrl(awsProperties.host) };
|
||||
|
|
|
|||
|
|
@ -0,0 +1,58 @@
|
|||
import { BadRequestException, Injectable } from '@nestjs/common';
|
||||
import { AbstractActionHandler, AwsProperties, Format } from '../abstract-action.handler';
|
||||
import { Action } from '../action.enum';
|
||||
import * as Joi from 'joi';
|
||||
import { AttributesService } from '../aws-shared-entities/attributes.service';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { SqsQueue } from './sqs-queue.entity';
|
||||
import { Repository } from 'typeorm';
|
||||
import { SqsQueueEntryService } from './sqs-queue-entry.service';
|
||||
import { TagsService } from '../aws-shared-entities/tags.service';
|
||||
|
||||
type QueryParams = {
|
||||
QueueUrl?: string,
|
||||
__path: string;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class DeleteQueueHandler extends AbstractActionHandler<QueryParams> {
|
||||
|
||||
constructor(
|
||||
@InjectRepository(SqsQueue)
|
||||
private readonly sqsQueueRepo: Repository<SqsQueue>,
|
||||
private readonly tagsService: TagsService,
|
||||
private readonly attributeService: AttributesService,
|
||||
private readonly sqsQueueEntryService: SqsQueueEntryService,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
format = Format.Xml;
|
||||
action = Action.SqsDeleteQueue;
|
||||
validator = Joi.object<QueryParams, true>({
|
||||
QueueUrl: Joi.string(),
|
||||
__path: Joi.string().required(),
|
||||
});
|
||||
|
||||
protected async handle(params: QueryParams, awsProperties: AwsProperties) {
|
||||
|
||||
const [accountId, name] = SqsQueue.tryGetAccountIdAndNameFromPathOrArn(params.QueueUrl ?? params.__path);
|
||||
const queue = await this.sqsQueueRepo.findOne({ where: { accountId , name } });
|
||||
|
||||
if(!queue) {
|
||||
throw new BadRequestException('ResourceNotFoundException');
|
||||
}
|
||||
|
||||
await this.sqsQueueEntryService.purge(accountId, name);
|
||||
await this.tagsService.deleteByArn(queue.arn);
|
||||
await this.attributeService.deleteByArn(queue.arn);
|
||||
await queue.remove();
|
||||
}
|
||||
|
||||
private async getAttributes(attributeNames: string[], queueArn: string) {
|
||||
if (attributeNames.length === 0 || attributeNames.length === 1 && attributeNames[0] === 'All') {
|
||||
return await this.attributeService.getByArn(queueArn);
|
||||
}
|
||||
return await this.attributeService.getByArnAndNames(queueArn, attributeNames);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,82 @@
|
|||
import { BadRequestException, Injectable } from '@nestjs/common';
|
||||
import { AbstractActionHandler, AwsProperties, Format } from '../abstract-action.handler';
|
||||
import { Action } from '../action.enum';
|
||||
import * as Joi from 'joi';
|
||||
import { AttributesService } from '../aws-shared-entities/attributes.service';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { SqsQueue } from './sqs-queue.entity';
|
||||
import { Repository } from 'typeorm';
|
||||
import { SqsQueueEntryService } from './sqs-queue-entry.service';
|
||||
|
||||
type QueryParams = {
|
||||
QueueUrl?: string,
|
||||
'AttributeName.1'?: string;
|
||||
__path: string;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class GetQueueAttributesHandler extends AbstractActionHandler<QueryParams> {
|
||||
|
||||
constructor(
|
||||
@InjectRepository(SqsQueue)
|
||||
private readonly sqsQueueRepo: Repository<SqsQueue>,
|
||||
private readonly attributeService: AttributesService,
|
||||
private readonly sqsQueueEntryService: SqsQueueEntryService,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
format = Format.Xml;
|
||||
action = Action.SqsGetQueueAttributes;
|
||||
validator = Joi.object<QueryParams, true>({
|
||||
QueueUrl: Joi.string(),
|
||||
'AttributeName.1': Joi.string(),
|
||||
__path: Joi.string().required(),
|
||||
});
|
||||
|
||||
protected async handle(params: QueryParams, awsProperties: AwsProperties) {
|
||||
|
||||
const attributeNames = Object.keys(params).reduce((l, k) => {
|
||||
const [name, _] = k.split('.');
|
||||
if (name === 'AttributeName') {
|
||||
l.push(params[k]);
|
||||
}
|
||||
return l;
|
||||
}, []);
|
||||
|
||||
const [accountId, name] = SqsQueue.tryGetAccountIdAndNameFromPathOrArn(params.QueueUrl ?? params.__path);
|
||||
const queue = await this.sqsQueueRepo.findOne({ where: { accountId , name } });
|
||||
|
||||
if(!queue) {
|
||||
return;
|
||||
}
|
||||
|
||||
const queueMetrics = this.sqsQueueEntryService.metrics(queue.arn);
|
||||
const attributes = await this.getAttributes(attributeNames, queue.arn);
|
||||
const attributeMap = attributes.reduce((m, a) => {
|
||||
m[a.name] = a.value;
|
||||
return m;
|
||||
}, {});
|
||||
|
||||
const response = {
|
||||
...attributeMap,
|
||||
ApproximateNumberOfMessages: `${queueMetrics.total}`,
|
||||
ApproximateNumberOfMessagesNotVisible: `${queueMetrics.inFlight}`,
|
||||
CreatedTimestamp: `${new Date(queue.createdAt).getTime()}`,
|
||||
LastModifiedTimestamp: `${new Date(queue.updatedAt).getTime()}`,
|
||||
QueueArn: queue.arn,
|
||||
}
|
||||
return { Attribute: Object.keys(response).map(k => ({
|
||||
Name: k,
|
||||
Value: response[k],
|
||||
}))
|
||||
};
|
||||
}
|
||||
|
||||
private async getAttributes(attributeNames: string[], queueArn: string) {
|
||||
if (attributeNames.length === 0 || attributeNames.length === 1 && attributeNames[0] === 'All') {
|
||||
return await this.attributeService.getByArn(queueArn);
|
||||
}
|
||||
return await this.attributeService.getByArnAndNames(queueArn, attributeNames);
|
||||
}
|
||||
}
|
||||
|
|
@ -13,6 +13,8 @@ type QueueEntry = {
|
|||
createdAt: Date;
|
||||
}
|
||||
|
||||
type Metrics = { total: number, inFlight: number}
|
||||
|
||||
@Injectable()
|
||||
export class SqsQueueEntryService {
|
||||
|
||||
|
|
@ -24,6 +26,20 @@ export class SqsQueueEntryService {
|
|||
private readonly sqsQueueRepo: Repository<SqsQueue>,
|
||||
) {}
|
||||
|
||||
async findQueueByAccountIdAndName(accountId: string, name: string): Promise<SqsQueue> {
|
||||
return await this.sqsQueueRepo.findOne({ where: { accountId, name } });
|
||||
}
|
||||
|
||||
metrics(queueArn: string): Metrics {
|
||||
|
||||
const now = new Date();
|
||||
return this.getQueueList(queueArn).reduce<Metrics>((acc, e) => {
|
||||
acc.total += 1;
|
||||
acc.inFlight += e.inFlightReleaseDate > now ? 1 : 0;
|
||||
return acc;
|
||||
}, { total: 0, inFlight: 0 });
|
||||
}
|
||||
|
||||
async publish(accountId: string, queueName: string, message: string) {
|
||||
const queue = await this.sqsQueueRepo.findOne({ where: { accountId, name: queueName }});
|
||||
|
||||
|
|
@ -32,11 +48,7 @@ export class SqsQueueEntryService {
|
|||
return;
|
||||
}
|
||||
|
||||
if (this.queues) {
|
||||
this.queues[queue.arn] = [];
|
||||
}
|
||||
|
||||
this.queues[queue.arn].push({
|
||||
this.getQueueList(queue.arn).push({
|
||||
id: uuid.v4(),
|
||||
queueArn: queue.arn,
|
||||
senderId: accountId,
|
||||
|
|
@ -56,7 +68,7 @@ export class SqsQueueEntryService {
|
|||
const accessDate = new Date();
|
||||
const newInFlightReleaseDate = new Date(accessDate);
|
||||
newInFlightReleaseDate.setSeconds(accessDate.getSeconds() + visabilityTimeout);
|
||||
const records = this.queues[queue.arn]?.filter(e => e.inFlightReleaseDate <= accessDate).slice(0, maxNumberOfMessages - 1);
|
||||
const records = this.getQueueList(queue.arn).filter(e => e.inFlightReleaseDate <= accessDate).slice(0, maxNumberOfMessages - 1);
|
||||
records.forEach(e => e.inFlightReleaseDate = newInFlightReleaseDate);
|
||||
return records;
|
||||
}
|
||||
|
|
@ -65,4 +77,13 @@ export class SqsQueueEntryService {
|
|||
const queue = await this.sqsQueueRepo.findOne({ where: { accountId, name: queueName }});
|
||||
this.queues[queue.arn] = [];
|
||||
}
|
||||
|
||||
private getQueueList(arn: string): QueueEntry[] {
|
||||
|
||||
if (!this.queues[arn]) {
|
||||
this.queues[arn] = [];
|
||||
}
|
||||
|
||||
return this.queues[arn];
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,10 @@
|
|||
import { BaseEntity, Column, Entity, PrimaryColumn } from 'typeorm';
|
||||
import { BaseEntity, Column, CreateDateColumn, Entity, PrimaryColumn, UpdateDateColumn } from 'typeorm';
|
||||
import { getPathFromUrl } from '../util/get-path-from-url';
|
||||
|
||||
const attributeSlotMap = {
|
||||
'Name': 'key',
|
||||
'Value': 'value',
|
||||
}
|
||||
|
||||
@Entity('sqs_queue')
|
||||
export class SqsQueue extends BaseEntity {
|
||||
|
|
@ -12,6 +18,12 @@ export class SqsQueue extends BaseEntity {
|
|||
@Column({ name: 'region', nullable: false })
|
||||
region: string;
|
||||
|
||||
@CreateDateColumn()
|
||||
createdAt: string;
|
||||
|
||||
@UpdateDateColumn()
|
||||
updatedAt: string;
|
||||
|
||||
get arn(): string {
|
||||
return `arn:aws:sns:${this.region}:${this.accountId}:${this.name}`;
|
||||
}
|
||||
|
|
@ -33,9 +45,26 @@ export class SqsQueue extends BaseEntity {
|
|||
}
|
||||
|
||||
static tryGetAccountIdAndNameFromPathOrArn(pathOrArn: string): [string, string] {
|
||||
if (pathOrArn.split(':').length) {
|
||||
return SqsQueue.getAccountIdAndNameFromArn(pathOrArn);
|
||||
const workingString = getPathFromUrl(pathOrArn);
|
||||
if (workingString.split(':').length > 1) {
|
||||
return SqsQueue.getAccountIdAndNameFromArn(workingString);
|
||||
}
|
||||
return SqsQueue.getAccountIdAndNameFromPath(pathOrArn);
|
||||
return SqsQueue.getAccountIdAndNameFromPath(workingString);
|
||||
}
|
||||
|
||||
static attributePairs(queryParams: Record<string, string>): { key: string, value: string }[] {
|
||||
const pairs = [null];
|
||||
for (const param of Object.keys(queryParams)) {
|
||||
const [type, idx, slot] = param.split('.');
|
||||
if (type === 'Attribute') {
|
||||
if (!pairs[+idx]) {
|
||||
pairs[+idx] = { key: '', value: ''};
|
||||
}
|
||||
pairs[+idx][attributeSlotMap[slot]] = queryParams[param];
|
||||
}
|
||||
}
|
||||
|
||||
pairs.shift();
|
||||
return pairs;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,8 @@ import { AwsSharedEntitiesModule } from '../aws-shared-entities/aws-shared-entit
|
|||
import { DefaultActionHandlerProvider } from '../default-action-handler/default-action-handler.provider';
|
||||
import { ExistingActionHandlersProvider } from '../default-action-handler/existing-action-handlers.provider';
|
||||
import { CreateQueueHandler } from './create-queue.handler';
|
||||
import { DeleteQueueHandler } from './delete-queue.handler';
|
||||
import { GetQueueAttributesHandler } from './get-queue-attributes.handler';
|
||||
import { PurgeQueueHandler } from './purge-queue.handler';
|
||||
import { ReceiveMessageHandler } from './receive-message.handler';
|
||||
import { SetQueueAttributesHandler } from './set-queue-attributes.handler';
|
||||
|
|
@ -15,6 +17,8 @@ import { SqsHandlers } from './sqs.constants';
|
|||
|
||||
const handlers = [
|
||||
CreateQueueHandler,
|
||||
DeleteQueueHandler,
|
||||
GetQueueAttributesHandler,
|
||||
PurgeQueueHandler,
|
||||
ReceiveMessageHandler,
|
||||
SetQueueAttributesHandler,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,9 @@
|
|||
export const getPathFromUrl = (url: string) => {
|
||||
|
||||
try {
|
||||
const obj = new URL(url);
|
||||
return obj.pathname;
|
||||
} catch (err) {}
|
||||
|
||||
return url;
|
||||
}
|
||||
Loading…
Reference in New Issue