From 7ef25e55e4bf2bca44f48c8fdd07e71a414045f5 Mon Sep 17 00:00:00 2001 From: Matthew Bessette Date: Mon, 14 Feb 2022 06:23:39 +0000 Subject: [PATCH] Add 'archive-bundler.service.ts' --- archive-bundler.service.ts | 111 +++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 archive-bundler.service.ts diff --git a/archive-bundler.service.ts b/archive-bundler.service.ts new file mode 100644 index 0000000..5925905 --- /dev/null +++ b/archive-bundler.service.ts @@ -0,0 +1,111 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Readable, Writable } from 'stream'; +import { S3Service } from '../s3/s3.service'; +import { BufferManager } from '../utils/buffer.manager'; +import * as archiver from 'archiver'; +import { CompletedPart } from '@aws-sdk/client-s3'; + +const MAX_CHUNK_SIZE = 1024 * 1024 * 5; + +@Injectable() +export class ArchiveBundlerService { + + private readonly logger = new Logger(ArchiveBundlerService.name); + + constructor( + private readonly s3Service: S3Service, + ) { } + + async generateZipFromKeys(bucket: string, keys: string[], archiveKey: string): Promise { + + const archiveTransformer = archiver('zip', { + zlib: { level: 9 }, + }); + + const { UploadId } = await this.s3Service.createMultiPartUpload({ + Bucket: bucket, + Key: archiveKey, + }); + + let partNumber = 1; + const completedParts: CompletedPart[] = []; + const bufferManager = new BufferManager(MAX_CHUNK_SIZE); + + const eventDataHandler = async (data: Buffer, final: boolean = false) => { + + if (bufferManager.hasRoom(data) && !final) { + bufferManager.append(data); + return; + } + + console.log(`Currently at ${process.memoryUsage().heapUsed / 1024 / 1024} MB RAM`); + + const eventPartNumber = partNumber++; + const Body = bufferManager.appendAndGetCompletedBuffer(data); + + const { ETag } = await this.s3Service.uploadPart({ + UploadId, + Body, + Bucket: bucket, + Key: archiveKey, + PartNumber: eventPartNumber, + ContentLength: Body.length, + }); + + completedParts.push({ + ETag, + PartNumber: eventPartNumber, + }); + }; + + const writableStream = new Writable({ + write: async (record, _, callback) => { + await eventDataHandler(record); + callback(); + }, + }); + + archiveTransformer.pipe(writableStream); + + archiveTransformer.on('warning', (err) => { + this.logger.warn(err); + }); + + for (const key of keys) { + + const fileNameParts = key.split('/'); + const fileName = fileNameParts[fileNameParts.length - 1]; + + console.log(`Reading ${key} into ${fileName}`); + + const { Body } = await this.s3Service.getObject({ + Bucket: bucket, + Key: key, + }); + + const readStream = Body as Readable; + + archiveTransformer.append(readStream, { name: fileName }); + } + + archiveTransformer.finalize(); + + return await new Promise((resolve) => { + writableStream.on('finish', async () => { + + await eventDataHandler(Buffer.alloc(0), true); + + await this.s3Service.completeMultiPartUpload({ + UploadId, + Bucket: bucket, + Key: archiveKey, + MultipartUpload: { + Parts: completedParts, + }, + }); + + resolve(); + }); + }); + } +}