local-aws/src/sqs/sqs-queue-entry.service.ts

136 lines
4.0 KiB
TypeScript

import { BadRequestException, Injectable } from '@nestjs/common';
import { Prisma, SqsQueueMessage } from '@prisma/client';
import { randomUUID } from 'crypto';
import { PrismaService } from '../_prisma/prisma.service';
import { SqsQueue } from './sqs-queue.entity';
type QueueEntry = {
id: string;
queueArn: string;
senderId: string;
message: string;
inFlightReleaseDate: Date;
createdAt: Date;
}
type Metrics = { total: number, inFlight: number}
const FIFTEEN_SECONDS = 15 * 1000;
@Injectable()
export class SqsQueueEntryService {
private queueObjectCache: Record<string, [Date, SqsQueue]> = {};
constructor(
private readonly prismaService: PrismaService,
) {}
async findQueueByAccountIdAndName(accountId: string, name: string): Promise<SqsQueue | null> {
const prisma = await this.prismaService.sqsQueue.findFirst({ where: { accountId, name } });
return prisma ? new SqsQueue(prisma) : null;
}
async createQueue(data: Prisma.SqsQueueCreateInput): Promise<SqsQueue> {
const prisma = await this.prismaService.sqsQueue.create({ data });
return new SqsQueue(prisma);
}
async deleteQueue(id: number): Promise<void> {
await this.prismaService.sqsQueue.delete({ where: { id }});
}
async metrics(queueId: number): Promise<Metrics> {
const now = new Date();
const [total, inFlight] = await Promise.all([
this.prismaService.sqsQueueMessage.count({ where: { queueId }}),
this.prismaService.sqsQueueMessage.count({ where: { queueId, inFlightRelease: { gt: now } }}),
]);
return { total, inFlight }
}
async publish(accountId: string, queueName: string, message: string) {
const prisma = await this.prismaService.sqsQueue.findFirst({ where: { accountId, name: queueName }});
if (!prisma) {
console.warn(`Warning bad subscription to ${queueName}`);
return;
}
const queue = new SqsQueue(prisma);
await this.prismaService.sqsQueueMessage.create({
data: {
id: randomUUID(),
queueId: queue.id,
senderId: accountId,
message,
inFlightRelease: new Date(),
}
});
}
async receiveMessages(accountId: string, queueName: string, maxNumberOfMessages = 10, visabilityTimeout = 0): Promise<SqsQueueMessage[]> {
const queue = await this.getQueueHelper(accountId, queueName);
const accessDate = new Date();
const newInFlightReleaseDate = new Date(accessDate);
newInFlightReleaseDate.setSeconds(accessDate.getSeconds() + visabilityTimeout);
const records = await this.prismaService.sqsQueueMessage.findMany({
where: {
queueId: queue.id,
inFlightRelease: {
lte: accessDate,
}
},
take: maxNumberOfMessages,
});
await this.prismaService.sqsQueueMessage.updateMany({
data: {
inFlightRelease: newInFlightReleaseDate
},
where: {
id: {
in: records.map(r => r.id)
}
}
});
return records.map(r => ({ ...r, inFlightRelease: newInFlightReleaseDate }));
}
async deleteMessage(id: string): Promise<void> {
await this.prismaService.sqsQueueMessage.delete({ where: { id }});
}
async purge(accountId: string, queueName: string) {
const queue = await this.findQueueByAccountIdAndName(accountId, queueName);
if (!queue) {
return;
}
await this.prismaService.sqsQueueMessage.deleteMany({ where: { queueId: queue.id }});
}
private async getQueueHelper(accountId: string, queueName: string): Promise<SqsQueue> {
if (!this.queueObjectCache[`${accountId}/${queueName}`] || this.queueObjectCache[`${accountId}/${queueName}`][0] < new Date()) {
const queue = await this.findQueueByAccountIdAndName(accountId, queueName);
if (!queue) {
throw new BadRequestException('Queue not found');
}
this.queueObjectCache[`${accountId}/${queueName}`] = [new Date(Date.now() + FIFTEEN_SECONDS), queue];
}
const [_, queue] = this.queueObjectCache[`${accountId}/${queueName}`];
return queue;
}
}