Queue bug fixes

This commit is contained in:
2023-03-29 11:04:57 -04:00
parent a6524d7f65
commit 2c78be1b3f
14 changed files with 450 additions and 59 deletions

View File

@@ -15,6 +15,7 @@ export enum Format {
export abstract class AbstractActionHandler<T = Record<string, string | number | boolean>> {
audit = true;
abstract format: Format;
abstract action: Action;
abstract validator: Joi.ObjectSchema<T>;

View File

@@ -23,7 +23,6 @@ import { configValidator } from './config/config.validator';
ConfigModule.forRoot({
load: [localConfig],
isGlobal: true,
validationSchema: configValidator,
}),
TypeOrmModule.forRootAsync({
inject: [ConfigService],

View File

@@ -1,11 +1,10 @@
import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common';
import { CallHandler, ExecutionContext, Inject, Injectable, NestInterceptor } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Observable, tap } from 'rxjs';
import { Repository } from 'typeorm';
import { Audit } from './audit.entity';
import * as uuid from 'uuid';
import { ConfigService } from '@nestjs/config';
import { CommonConfig } from '../config/common-config.interface';
import { ActionHandlers } from '../app.constants';
@Injectable()
export class AuditInterceptor<T> implements NestInterceptor<T, Response> {
@@ -13,14 +12,12 @@ export class AuditInterceptor<T> implements NestInterceptor<T, Response> {
constructor(
@InjectRepository(Audit)
private readonly auditRepo: Repository<Audit>,
private readonly configService: ConfigService<CommonConfig>,
@Inject(ActionHandlers)
private readonly handlers: ActionHandlers,
) {}
intercept(context: ExecutionContext, next: CallHandler<T>): Observable<any> {
if (!this.configService.get('AUDIT')) {
return next.handle();
}
const requestId = uuid.v4();
const httpContext = context.switchToHttp();
@@ -33,14 +30,26 @@ export class AuditInterceptor<T> implements NestInterceptor<T, Response> {
response.header('x-amzn-RequestId', requestId);
if (!this.handlers[action]?.audit) {
return next.handle();
}
return next.handle().pipe(
tap(async (data) => {
await this.auditRepo.create({
tap({
next: async (data) => await this.auditRepo.create({
id: requestId,
action,
request: JSON.stringify({ __path: request.path, ...request.headers, ...request.body }),
response: JSON.stringify(data),
}).save();
}).save(),
error: async (error) => await this.auditRepo.create({
id: requestId,
action,
request: JSON.stringify({ __path: request.path, ...request.headers, ...request.body }),
response: JSON.stringify(error),
}).save(),
})
);
}

View File

@@ -1,5 +1,4 @@
export interface CommonConfig {
AUDIT: boolean;
AWS_ACCOUNT_ID: string;
AWS_REGION: string;
DB_DATABASE: string;

View File

@@ -2,13 +2,12 @@ import * as Joi from 'joi';
import { CommonConfig } from './common-config.interface';
export const configValidator = Joi.object<CommonConfig, true>({
AUDIT: Joi.boolean().default(false),
AWS_ACCOUNT_ID: Joi.string().default('000000000000'),
AWS_REGION: Joi.string().default('us-east-1'),
DB_DATABASE: Joi.string().default(':memory:'),
DB_LOGGING: Joi.boolean().default(false),
DB_SYNCHRONIZE: Joi.boolean().default(true),
HOST: Joi.string().default('localhost'),
PORT: Joi.number().default(8081),
PROTO: Joi.string().valid('http', 'https').default('http'),
AWS_ACCOUNT_ID: Joi.string().required(),
AWS_REGION: Joi.string().required(),
DB_DATABASE: Joi.string().required(),
DB_LOGGING: Joi.boolean().required(),
DB_SYNCHRONIZE: Joi.boolean().valid(true).required(),
HOST: Joi.string().required(),
PORT: Joi.number().required(),
PROTO: Joi.string().valid('http', 'https').required(),
});

View File

@@ -1,13 +1,22 @@
import { CommonConfig } from "./common-config.interface";
import { configValidator } from './config.validator';
export default (): CommonConfig => ({
AUDIT: process.env.DEBUG ? true : false,
AWS_ACCOUNT_ID: process.env.AWS_ACCOUNT_ID,
AWS_REGION: process.env.AWS_REGION,
DB_DATABASE: process.env.PERSISTANCE,
DB_LOGGING: process.env.DEBUG ? true : false,
DB_SYNCHRONIZE: true,
HOST: process.env.HOST,
PROTO: process.env.PROTOCOL,
PORT: Number(process.env.PORT),
});
export default (): CommonConfig => {
const { error, value } = configValidator.validate({
AWS_ACCOUNT_ID: process.env.AWS_ACCOUNT_ID ?? '000000000000',
AWS_REGION: process.env.AWS_REGION ?? 'us-east-1',
DB_DATABASE: process.env.PERSISTANCE ?? ':memory:',
DB_LOGGING: process.env.DEBUG ? true : false,
DB_SYNCHRONIZE: true,
HOST: process.env.HOST ?? 'localhost',
PROTO: process.env.PROTOCOL ?? 'http',
PORT: process.env.PORT as any ?? 8081,
}, { abortEarly: false });
if (error) {
throw error;
}
return value;
}

View File

@@ -39,7 +39,7 @@ export class GetSecretValueHandler extends AbstractActionHandler {
return {
ARN: secret.arn,
CreatedDate: secret.createdAt,
CreatedDate: new Date(secret.createdAt).valueOf(),
Name: secret.name,
SecretString: secret.secretString,
VersionId: secret.versionId,

View File

@@ -22,12 +22,12 @@ export class ListTopicsHandler extends AbstractActionHandler {
format = Format.Xml;
action = Action.SnsListTopics;
validator = Joi.object<QueryParams, true>({ NextToken: Joi.number().default(0) });
validator = Joi.object<QueryParams, true>({ NextToken: Joi.number().default(0) });
protected async handle({ NextToken: skip }: QueryParams, awsProperties: AwsProperties) {
const [ topics, total ] = await this.snsTopicRepo.findAndCount({ order: { name: 'DESC' }, take: 100, skip });
const response = { Topics: topics.map(t => ({ Topic: { TopicArn: t.topicArn } }))};
const response = { Topics: { member: topics.map(t => ({ TopicArn: t.topicArn } ))} };
if (total >= 100) {
return {

View File

@@ -4,7 +4,7 @@ import { Action } from '../action.enum';
import * as Joi from 'joi';
import { SqsQueue } from './sqs-queue.entity';
import { SqsQueueEntryService } from './sqs-queue-entry.service';
import crypto from 'crypto';
import * as crypto from 'crypto';
type QueryParams = {
QueueUrl: string;
@@ -21,6 +21,7 @@ export class ReceiveMessageHandler extends AbstractActionHandler<QueryParams> {
super();
}
audit = false;
format = Format.Xml;
action = Action.SqsReceiveMessage;
validator = Joi.object<QueryParams, true>({
@@ -33,19 +34,19 @@ export class ReceiveMessageHandler extends AbstractActionHandler<QueryParams> {
const [accountId, name] = SqsQueue.tryGetAccountIdAndNameFromPathOrArn(QueueUrl);
const records = await this.sqsQueueEntryService.recieveMessages(accountId, name, MaxNumberOfMessages, VisibilityTimeout);
return records.map(r => ({
Message: {
return {
Message: records.map(r => ({
MessageId: r.id,
ReceiptHandle: r.id,
MD5OfBody: crypto.createHash('md5').update(r.message).digest("hex"),
Body: r.message,
'#': [
{ Attribute: { Name: 'SenderId', Value: r.senderId }},
{ Attribute: { Name: 'SentTimestamp', Value: r.createdAt.getSeconds() }},
{ Attribute: { Name: 'ApproximateReceiveCount', Value: 1 }},
{ Attribute: { Name: 'ApproximateFirstReceiveTimestamp', Value: r.createdAt.getSeconds() }},
]
}
}));
Attribute: [
{ Name: 'SenderId', Value: r.senderId },
{ Name: 'SentTimestamp', Value: r.createdAt.valueOf() },
{ Name: 'ApproximateReceiveCount', Value: 1 },
{ Name: 'ApproximateFirstReceiveTimestamp', Value: r.createdAt.valueOf() },
],
})),
}
}
}

View File

@@ -15,12 +15,16 @@ type QueueEntry = {
type Metrics = { total: number, inFlight: number}
const FIFTEEN_SECONDS = 15 * 1000;
@Injectable()
export class SqsQueueEntryService {
// Heavy use may require event-driven locking implementation
private queues: Record<string, QueueEntry[]> = {};
private queueObjectCache: Record<string, [Date, SqsQueue]> = {};
constructor(
@InjectRepository(SqsQueue)
private readonly sqsQueueRepo: Repository<SqsQueue>,
@@ -59,10 +63,15 @@ export class SqsQueueEntryService {
}
async recieveMessages(accountId: string, queueName: string, maxNumberOfMessages = 10, visabilityTimeout = 0): Promise<QueueEntry[]> {
const queue = await this.sqsQueueRepo.findOne({ where: { accountId, name: queueName }});
if (!this.queueObjectCache[`${accountId}/${queueName}`] || this.queueObjectCache[`${accountId}/${queueName}`][0] < new Date()) {
this.queueObjectCache[`${accountId}/${queueName}`] = [new Date(Date.now() + FIFTEEN_SECONDS), await this.sqsQueueRepo.findOne({ where: { accountId, name: queueName }})];
}
const [_, queue] = this.queueObjectCache[`${accountId}/${queueName}`];
if (!queue) {
throw new BadRequestException();
throw new BadRequestException('Queue not found');
}
const accessDate = new Date();

View File

@@ -25,7 +25,7 @@ export class SqsQueue extends BaseEntity {
updatedAt: string;
get arn(): string {
return `arn:aws:sns:${this.region}:${this.accountId}:${this.name}`;
return `arn:aws:sqs:${this.region}:${this.accountId}:${this.name}`;
}
getUrl(host: string): string {