Fixed Issue #1: [SNS] Subscriptions all sub to the same topic arn
This commit is contained in:
parent
ee0babc2e3
commit
e1db34e7c1
|
|
@ -30,6 +30,10 @@ export class AttributesService {
|
||||||
return await this.repo.save(dto);
|
return await this.repo.save(dto);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async deleteByArn(arn: string) {
|
||||||
|
await this.repo.delete({ arn });
|
||||||
|
}
|
||||||
|
|
||||||
async deleteByArnAndName(arn: string, name: string) {
|
async deleteByArnAndName(arn: string, name: string) {
|
||||||
await this.repo.delete({ arn, name });
|
await this.repo.delete({ arn, name });
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,10 @@ export class TagsService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async deleteByArn(arn: string) {
|
||||||
|
await this.repo.delete({ arn });
|
||||||
|
}
|
||||||
|
|
||||||
async deleteByArnAndName(arn: string, name: string) {
|
async deleteByArnAndName(arn: string, name: string) {
|
||||||
await this.repo.delete({ arn, name });
|
await this.repo.delete({ arn, name });
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,8 +28,13 @@ export class GetSubscriptionAttributesHandler extends AbstractActionHandler {
|
||||||
|
|
||||||
protected async handle({ SubscriptionArn }: QueryParams, awsProperties: AwsProperties) {
|
protected async handle({ SubscriptionArn }: QueryParams, awsProperties: AwsProperties) {
|
||||||
|
|
||||||
const id = SubscriptionArn.split(':')[-1];
|
const id = SubscriptionArn.split(':').pop();
|
||||||
const subscription = await this.snsTopicSubscriptionRepo.findOne({ where: { id }});
|
const subscription = await this.snsTopicSubscriptionRepo.findOne({ where: { id }});
|
||||||
|
|
||||||
|
if (!subscription) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const attributes = await this.attributeService.getByArn(SubscriptionArn);
|
const attributes = await this.attributeService.getByArn(SubscriptionArn);
|
||||||
const attributeMap = attributes.reduce((m, a) => {
|
const attributeMap = attributes.reduce((m, a) => {
|
||||||
m[a.name] = a.value;
|
m[a.name] = a.value;
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import { Action } from '../action.enum';
|
||||||
import { SnsTopic } from './sns-topic.entity';
|
import { SnsTopic } from './sns-topic.entity';
|
||||||
import * as Joi from 'joi';
|
import * as Joi from 'joi';
|
||||||
import { AttributesService } from '../aws-shared-entities/attributes.service';
|
import { AttributesService } from '../aws-shared-entities/attributes.service';
|
||||||
|
import { SnsTopicSubscription } from './sns-topic-subscription.entity';
|
||||||
|
|
||||||
type QueryParams = {
|
type QueryParams = {
|
||||||
TopicArn: string;
|
TopicArn: string;
|
||||||
|
|
@ -17,6 +18,8 @@ export class GetTopicAttributesHandler extends AbstractActionHandler {
|
||||||
constructor(
|
constructor(
|
||||||
@InjectRepository(SnsTopic)
|
@InjectRepository(SnsTopic)
|
||||||
private readonly snsTopicRepo: Repository<SnsTopic>,
|
private readonly snsTopicRepo: Repository<SnsTopic>,
|
||||||
|
@InjectRepository(SnsTopicSubscription)
|
||||||
|
private readonly snsTopicSubscriptionRepo: Repository<SnsTopicSubscription>,
|
||||||
private readonly attributeService: AttributesService,
|
private readonly attributeService: AttributesService,
|
||||||
) {
|
) {
|
||||||
super();
|
super();
|
||||||
|
|
@ -28,7 +31,7 @@ export class GetTopicAttributesHandler extends AbstractActionHandler {
|
||||||
|
|
||||||
protected async handle({ TopicArn }: QueryParams, awsProperties: AwsProperties) {
|
protected async handle({ TopicArn }: QueryParams, awsProperties: AwsProperties) {
|
||||||
|
|
||||||
const name = TopicArn.split(':')[-1];
|
const name = TopicArn.split(':').pop();
|
||||||
const topic = await this.snsTopicRepo.findOne({ where: { name }});
|
const topic = await this.snsTopicRepo.findOne({ where: { name }});
|
||||||
const attributes = await this.attributeService.getByArn(TopicArn);
|
const attributes = await this.attributeService.getByArn(TopicArn);
|
||||||
const attributeMap = attributes.reduce((m, a) => {
|
const attributeMap = attributes.reduce((m, a) => {
|
||||||
|
|
@ -36,10 +39,12 @@ export class GetTopicAttributesHandler extends AbstractActionHandler {
|
||||||
return m;
|
return m;
|
||||||
}, {});
|
}, {});
|
||||||
|
|
||||||
|
const subscriptionCount = await this.snsTopicSubscriptionRepo.count({ where: { topicArn: TopicArn } });
|
||||||
|
|
||||||
const response = {
|
const response = {
|
||||||
DisplayName: topic.name,
|
DisplayName: topic.name,
|
||||||
Owner: topic.accountId,
|
Owner: topic.accountId,
|
||||||
SubscriptionsConfirmed: '0',
|
SubscriptionsConfirmed: `${subscriptionCount}`,
|
||||||
SubscriptionsDeleted: '0',
|
SubscriptionsDeleted: '0',
|
||||||
SubscriptionsPending: '0',
|
SubscriptionsPending: '0',
|
||||||
TopicArn: topic.topicArn,
|
TopicArn: topic.topicArn,
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ import { SnsTopicSubscription } from './sns-topic-subscription.entity';
|
||||||
import { SnsTopic } from './sns-topic.entity';
|
import { SnsTopic } from './sns-topic.entity';
|
||||||
import { SnsHandlers } from './sns.constants';
|
import { SnsHandlers } from './sns.constants';
|
||||||
import { SubscribeHandler } from './subscribe.handler';
|
import { SubscribeHandler } from './subscribe.handler';
|
||||||
|
import { UnsubscribeHandler } from './unsubscribe.handler';
|
||||||
|
|
||||||
const handlers = [
|
const handlers = [
|
||||||
CreateTopicHandler,
|
CreateTopicHandler,
|
||||||
|
|
@ -30,6 +31,7 @@ const handlers = [
|
||||||
SetSubscriptionAttributesHandler,
|
SetSubscriptionAttributesHandler,
|
||||||
SetTopicAttributesHandler,
|
SetTopicAttributesHandler,
|
||||||
SubscribeHandler,
|
SubscribeHandler,
|
||||||
|
UnsubscribeHandler,
|
||||||
];
|
];
|
||||||
|
|
||||||
const actions = [
|
const actions = [
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,43 @@
|
||||||
|
import { Injectable } from '@nestjs/common';
|
||||||
|
import { InjectRepository } from '@nestjs/typeorm';
|
||||||
|
import { Repository } from 'typeorm';
|
||||||
|
import { AbstractActionHandler, AwsProperties, Format } from '../abstract-action.handler';
|
||||||
|
import { Action } from '../action.enum';
|
||||||
|
import * as Joi from 'joi';
|
||||||
|
import { TagsService } from '../aws-shared-entities/tags.service';
|
||||||
|
import { AttributesService } from '../aws-shared-entities/attributes.service';
|
||||||
|
import { SnsTopicSubscription } from './sns-topic-subscription.entity';
|
||||||
|
import * as uuid from 'uuid';
|
||||||
|
|
||||||
|
type QueryParams = {
|
||||||
|
SubscriptionArn: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class UnsubscribeHandler extends AbstractActionHandler<QueryParams> {
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
@InjectRepository(SnsTopicSubscription)
|
||||||
|
private readonly snsTopicSubscription: Repository<SnsTopicSubscription>,
|
||||||
|
private readonly tagsService: TagsService,
|
||||||
|
private readonly attributeService: AttributesService,
|
||||||
|
) {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
format = Format.Xml;
|
||||||
|
action = Action.SnsUnsubscribe;
|
||||||
|
validator = Joi.object<QueryParams, true>({
|
||||||
|
SubscriptionArn: Joi.string().required(),
|
||||||
|
});
|
||||||
|
|
||||||
|
protected async handle(params: QueryParams, awsProperties: AwsProperties) {
|
||||||
|
|
||||||
|
const id = params.SubscriptionArn.split(':').pop();
|
||||||
|
const subscription = await this.snsTopicSubscription.findOne({ where: { id } });
|
||||||
|
|
||||||
|
await this.tagsService.deleteByArn(subscription.arn);
|
||||||
|
await this.attributeService.deleteByArn(subscription.arn);
|
||||||
|
await this.snsTopicSubscription.delete({ id });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -7,7 +7,7 @@ import { SqsQueueEntryService } from './sqs-queue-entry.service';
|
||||||
import crypto from 'crypto';
|
import crypto from 'crypto';
|
||||||
|
|
||||||
type QueryParams = {
|
type QueryParams = {
|
||||||
__path: string;
|
QueueUrl: string;
|
||||||
MaxNumberOfMessages?: number;
|
MaxNumberOfMessages?: number;
|
||||||
VisibilityTimeout?: number;
|
VisibilityTimeout?: number;
|
||||||
}
|
}
|
||||||
|
|
@ -24,14 +24,14 @@ export class ReceiveMessageHandler extends AbstractActionHandler<QueryParams> {
|
||||||
format = Format.Xml;
|
format = Format.Xml;
|
||||||
action = Action.SqsReceiveMessage;
|
action = Action.SqsReceiveMessage;
|
||||||
validator = Joi.object<QueryParams, true>({
|
validator = Joi.object<QueryParams, true>({
|
||||||
__path: Joi.string().required(),
|
QueueUrl: Joi.string().required(),
|
||||||
MaxNumberOfMessages: Joi.number(),
|
MaxNumberOfMessages: Joi.number(),
|
||||||
VisibilityTimeout: Joi.number(),
|
VisibilityTimeout: Joi.number(),
|
||||||
});
|
});
|
||||||
|
|
||||||
protected async handle({ __path, MaxNumberOfMessages, VisibilityTimeout }: QueryParams, awsProperties: AwsProperties) {
|
protected async handle({ QueueUrl, MaxNumberOfMessages, VisibilityTimeout }: QueryParams, awsProperties: AwsProperties) {
|
||||||
|
|
||||||
const [accountId, name] = SqsQueue.getAccountIdAndNameFromPath(__path);
|
const [accountId, name] = SqsQueue.tryGetAccountIdAndNameFromPathOrArn(QueueUrl);
|
||||||
const records = await this.sqsQueueEntryService.recieveMessages(accountId, name, MaxNumberOfMessages, VisibilityTimeout);
|
const records = await this.sqsQueueEntryService.recieveMessages(accountId, name, MaxNumberOfMessages, VisibilityTimeout);
|
||||||
return records.map(r => ({
|
return records.map(r => ({
|
||||||
Message: {
|
Message: {
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import { DefaultActionHandlerProvider } from '../default-action-handler/default-
|
||||||
import { ExistingActionHandlersProvider } from '../default-action-handler/existing-action-handlers.provider';
|
import { ExistingActionHandlersProvider } from '../default-action-handler/existing-action-handlers.provider';
|
||||||
import { CreateQueueHandler } from './create-queue.handler';
|
import { CreateQueueHandler } from './create-queue.handler';
|
||||||
import { PurgeQueueHandler } from './purge-queue.handler';
|
import { PurgeQueueHandler } from './purge-queue.handler';
|
||||||
|
import { ReceiveMessageHandler } from './receive-message.handler';
|
||||||
import { SetQueueAttributesHandler } from './set-queue-attributes.handler';
|
import { SetQueueAttributesHandler } from './set-queue-attributes.handler';
|
||||||
import { SqsQueueEntryService } from './sqs-queue-entry.service';
|
import { SqsQueueEntryService } from './sqs-queue-entry.service';
|
||||||
import { SqsQueue } from './sqs-queue.entity';
|
import { SqsQueue } from './sqs-queue.entity';
|
||||||
|
|
@ -15,6 +16,7 @@ import { SqsHandlers } from './sqs.constants';
|
||||||
const handlers = [
|
const handlers = [
|
||||||
CreateQueueHandler,
|
CreateQueueHandler,
|
||||||
PurgeQueueHandler,
|
PurgeQueueHandler,
|
||||||
|
ReceiveMessageHandler,
|
||||||
SetQueueAttributesHandler,
|
SetQueueAttributesHandler,
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue