import passport from "passport" import redis from "redis" import redisReadStream from "redis-rstream" import createError from "http-errors" import path from "path" import util from "util" import config from "config" import autobind from "autobind-decorator" import Buffer from "safe-buffer" import B64 from "b64" import { PassThrough } from "stream" import { catchAll } from "." function pipeToGridFS(readable, gfsWriteable, decoder) { const promise = new Promise((resolve, reject) => { readable.on("error", (error) => { reject(error) }) gfsWriteable.on("error", (error) => { reject(error) }) gfsWriteable.on("close", (file) => { resolve(file) }) }) readable.pipe(decoder).pipe(gfsWriteable) return promise } @autobind export class AssetRoutes { static rangeRegex = /^(byte|base64) (\d+)/ constructor(container) { const app = container.app this.log = container.log this.db = container.db this.rs = container.rs this.uploadTimeout = config.get("api.uploadTimout") app .route("/assets/:_id") .get( passport.authenticate("bearer", { session: false }), catchAll(this.getAsset) ) .delete( passport.authenticate("bearer", { session: false }), this.deleteAsset ) app .route("/assets/upload") .post( passport.authenticate("bearer", { session: false }), catchAll(this.beginAssetUpload) ) app .route("/assets/upload/:_id") .post( passport.authenticate("bearer", { session: false }), catchAll(this.continueAssetUpload) ) } async getAsset(req, res, next) { const assetId = req.params._id const file = await this.db.gridfs.findOneAsync({ _id: assetId }) if (!file) { throw createError.NotFound(`Asset ${assetId} was not found`) } const ifNoneMatch = req.get("If-None-Match") if (ifNoneMatch && ifNoneMatch === file.md5) { res .status(304) .set({ ETag: file.md5, "Cache-Control": "private,max-age=86400", }) .end() return } res.status(200).set({ "Content-Type": file.contentType, "Content-Length": file.length, ETag: file.md5, }) this.db.gridfs.createReadStream({ _id: file._id }).pipe(res) } async deleteAsset(req, res, next) { const assetId = req.params._id await this.db.gridfs.removeAsync({ _id: assetId }) res.json({}) } async beginAssetUpload(req, res, next) { const uploadId = this.db.newObjectId() let { fileName, uploadSize, numberOfChunks, contentType, chunkContentType, } = req.body if (!fileName || !uploadSize || !numberOfChunks || !contentType) { throw createError.BadRequest( "Must specify fileName, uploadSize, numberOfChunks, contentType" ) } fileName = uploadId + "-" + path.basename(fileName) if (chunkContentType) { if ( chunkContentType !== "application/octet-stream" && chunkContentType !== "application/base64" ) { throw createError.BadRequest( "chunkContentType must be application/octet-stream or application/base64" ) } } else { chunkContentType = "application/octet-stream" } await this.rs.setAsync( uploadId, JSON.stringify({ fileName, uploadSize, numberOfChunks, contentType, chunkContentType, }), "EX", this.uploadTimeout ) res.json({ uploadId }) } async continueAssetUpload(req, res, next) { const uploadId = req.params._id const uploadCountId = uploadId + "$#" const uploadDataId = uploadId + "$@" const content = await this.rs.getAsync(uploadId) const uploadData = JSON.parse(content) const contentType = req.get("Content-Type") const contentRange = req.get("Content-Range") const contentLength = req.get("Content-Length") console.log(uploadData) if (contentType !== uploadData.chunkContentType) { throw createError.BadRequest( `Content-Type ${contentType} does not match chunk type ${ uploadData.chunkContentType }` ) } if (parseInt(contentLength, 10) !== req.body.length) { throw createError.BadRequest( "Must supply Content-Length header matching length of request body" ) } let match = contentRange.match(AssetRoutes.rangeRegex) if (!match || match.length !== 3) { throw createError.BadRequest( "Content-Range header must be supplied and of form '[byte|base64] '" ) } const [, contentOffsetUnit, contentOffset] = match if ( (uploadData.chunkContentType === "application/octet-stream" && contentOffsetUnit !== "byte") || (uploadData.chunkContentType === "application/base64" && contentOffsetUnit !== "base64") ) { throw createError.BadRequest( `Content-Range offset unit must be ${ uploadData.chunkContentType === "application/base64" ? "base64" : "byte" }` ) } let offset = Number.parseInt(contentOffset) if (offset < 0 || offset + req.body.length > uploadData.uploadSize) { throw createError.BadRequest( `Illegal Content-Range ${contentOffsetType} ${contentOffset} and Content-Length ${contentLength} for upload size ${ uploadData.uploadSize }` ) } try { const [uploadedChunks] = await Promise.all([ this.rs.setrangeAsync(uploadDataId, offset, req.body), this.rs.incrAsync(uploadCountId), ]) const chunkInfo = { numberOfChunks: uploadData.numberOfChunks, uploadedChunks, } if (uploadedChunks >= uploadData.numberOfChunks) { let readable = redisReadStream(this.rs.client, uploadDataId) let writeable = this.db.gridfs.createWriteStream({ _id: uploadId, filename: uploadData.fileName, content_type: uploadData.contentType, }) const decoder = uploadData.chunkContentType === "application/base64" ? new B64.Decoder() : new PassThrough() const file = await pipeToGridFS(readable, writeable, decoder) await Promise.all([ this.rs.del(uploadId), this.rs.del(uploadCountId), this.rs.del(uploadDataId), ]) res.json({ assetId: file._id, fileName: file.filename, contentType: file.contentType, uploadDate: file.uploadDate, md5: file.md5, ...chunkInfo, }) } else { await Promise.all([ this.rs.expireAsync(uploadId, this.uploadTimeout), this.rs.expireAsync(uploadCountId, this.uploadTimeout), this.rs.expireAsync(uploadDataId, this.uploadTimeout), ]) res.json(chunkInfo) } } catch (error) { this.rs.del(uploadId) this.rs.del(uploadCountId) this.rs.del(uploadDataId) this.log.error(error.message) throw error } } }