import { Injectable, Logger } from '@nestjs/common'; import { Readable, Writable } from 'stream'; import { S3Service } from '../s3/s3.service'; import { BufferManager } from '../utils/buffer.manager'; import * as archiver from 'archiver'; import { CompletedPart } from '@aws-sdk/client-s3'; const MAX_CHUNK_SIZE = 1024 * 1024 * 5; @Injectable() export class ArchiveBundlerService { private readonly logger = new Logger(ArchiveBundlerService.name); constructor( private readonly s3Service: S3Service, ) { } async generateZipFromKeys(bucket: string, keys: string[], archiveKey: string): Promise { const archiveTransformer = archiver('zip', { zlib: { level: 9 }, }); const { UploadId } = await this.s3Service.createMultiPartUpload({ Bucket: bucket, Key: archiveKey, }); let partNumber = 1; const completedParts: CompletedPart[] = []; const bufferManager = new BufferManager(MAX_CHUNK_SIZE); const eventDataHandler = async (data: Buffer, final: boolean = false) => { if (bufferManager.hasRoom(data) && !final) { bufferManager.append(data); return; } console.log(`Currently at ${process.memoryUsage().heapUsed / 1024 / 1024} MB RAM`); const eventPartNumber = partNumber++; const Body = bufferManager.appendAndGetCompletedBuffer(data); const { ETag } = await this.s3Service.uploadPart({ UploadId, Body, Bucket: bucket, Key: archiveKey, PartNumber: eventPartNumber, ContentLength: Body.length, }); completedParts.push({ ETag, PartNumber: eventPartNumber, }); }; const writableStream = new Writable({ write: async (record, _, callback) => { await eventDataHandler(record); callback(); }, }); archiveTransformer.pipe(writableStream); archiveTransformer.on('warning', (err) => { this.logger.warn(err); }); for (const key of keys) { const fileNameParts = key.split('/'); const fileName = fileNameParts[fileNameParts.length - 1]; console.log(`Reading ${key} into ${fileName}`); const { Body } = await this.s3Service.getObject({ Bucket: bucket, Key: key, }); const readStream = Body as Readable; archiveTransformer.append(readStream, { name: fileName }); } archiveTransformer.finalize(); return await new Promise((resolve) => { writableStream.on('finish', async () => { await eventDataHandler(Buffer.alloc(0), true); await this.s3Service.completeMultiPartUpload({ UploadId, Bucket: bucket, Key: archiveKey, MultipartUpload: { Parts: completedParts, }, }); resolve(); }); }); } }