Add 'archive-bundler.service.ts'
This commit is contained in:
parent
5eea47d63b
commit
7ef25e55e4
|
|
@ -0,0 +1,111 @@
|
|||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { Readable, Writable } from 'stream';
|
||||
import { S3Service } from '../s3/s3.service';
|
||||
import { BufferManager } from '../utils/buffer.manager';
|
||||
import * as archiver from 'archiver';
|
||||
import { CompletedPart } from '@aws-sdk/client-s3';
|
||||
|
||||
const MAX_CHUNK_SIZE = 1024 * 1024 * 5;
|
||||
|
||||
@Injectable()
|
||||
export class ArchiveBundlerService {
|
||||
|
||||
private readonly logger = new Logger(ArchiveBundlerService.name);
|
||||
|
||||
constructor(
|
||||
private readonly s3Service: S3Service,
|
||||
) { }
|
||||
|
||||
async generateZipFromKeys(bucket: string, keys: string[], archiveKey: string): Promise<void> {
|
||||
|
||||
const archiveTransformer = archiver('zip', {
|
||||
zlib: { level: 9 },
|
||||
});
|
||||
|
||||
const { UploadId } = await this.s3Service.createMultiPartUpload({
|
||||
Bucket: bucket,
|
||||
Key: archiveKey,
|
||||
});
|
||||
|
||||
let partNumber = 1;
|
||||
const completedParts: CompletedPart[] = [];
|
||||
const bufferManager = new BufferManager(MAX_CHUNK_SIZE);
|
||||
|
||||
const eventDataHandler = async (data: Buffer, final: boolean = false) => {
|
||||
|
||||
if (bufferManager.hasRoom(data) && !final) {
|
||||
bufferManager.append(data);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`Currently at ${process.memoryUsage().heapUsed / 1024 / 1024} MB RAM`);
|
||||
|
||||
const eventPartNumber = partNumber++;
|
||||
const Body = bufferManager.appendAndGetCompletedBuffer(data);
|
||||
|
||||
const { ETag } = await this.s3Service.uploadPart({
|
||||
UploadId,
|
||||
Body,
|
||||
Bucket: bucket,
|
||||
Key: archiveKey,
|
||||
PartNumber: eventPartNumber,
|
||||
ContentLength: Body.length,
|
||||
});
|
||||
|
||||
completedParts.push({
|
||||
ETag,
|
||||
PartNumber: eventPartNumber,
|
||||
});
|
||||
};
|
||||
|
||||
const writableStream = new Writable({
|
||||
write: async (record, _, callback) => {
|
||||
await eventDataHandler(record);
|
||||
callback();
|
||||
},
|
||||
});
|
||||
|
||||
archiveTransformer.pipe(writableStream);
|
||||
|
||||
archiveTransformer.on('warning', (err) => {
|
||||
this.logger.warn(err);
|
||||
});
|
||||
|
||||
for (const key of keys) {
|
||||
|
||||
const fileNameParts = key.split('/');
|
||||
const fileName = fileNameParts[fileNameParts.length - 1];
|
||||
|
||||
console.log(`Reading ${key} into ${fileName}`);
|
||||
|
||||
const { Body } = await this.s3Service.getObject({
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
});
|
||||
|
||||
const readStream = Body as Readable;
|
||||
|
||||
archiveTransformer.append(readStream, { name: fileName });
|
||||
}
|
||||
|
||||
archiveTransformer.finalize();
|
||||
|
||||
return await new Promise((resolve) => {
|
||||
writableStream.on('finish', async () => {
|
||||
|
||||
await eventDataHandler(Buffer.alloc(0), true);
|
||||
|
||||
await this.s3Service.completeMultiPartUpload({
|
||||
UploadId,
|
||||
Bucket: bucket,
|
||||
Key: archiveKey,
|
||||
MultipartUpload: {
|
||||
Parts: completedParts,
|
||||
},
|
||||
});
|
||||
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue