Skip to content
Snippets Groups Projects
Commit ed26503f authored by libnewton's avatar libnewton
Browse files

add changes tracking back

parent da3e833e
No related branches found
No related tags found
1 merge request!1chore: sync upstream
Pipeline #210542 passed
......@@ -11,11 +11,10 @@
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let MongoAWS
const settings = require('@overleaf/settings')
const logger = require('@overleaf/logger')
const AWS = require('aws-sdk')
const S3S = require('s3-streams')
// const AWS = require('aws-sdk')
// const S3S = require('s3-streams')
const { db, ObjectId } = require('./mongodb')
const JSONStream = require('JSONStream')
const ReadlineStream = require('byline')
......@@ -31,11 +30,11 @@ const createStream = function (streamConstructor, projectId, docId, packId) {
endpoint: settings.trackchanges.s3.endpoint,
s3ForcePathStyle: settings.trackchanges.s3.pathStyle,
}
return streamConstructor(new AWS.S3(AWS_CONFIG), {
Bucket: settings.trackchanges.stores.doc_history,
Key: projectId + '/changes-' + docId + '/pack-' + packId,
})
return
// return streamConstructor(new AWS.S3(AWS_CONFIG), {
// Bucket: settings.trackchanges.stores.doc_history,
// Key: projectId + '/changes-' + docId + '/pack-' + packId,
// })
}
module.exports = MongoAWS = {
......@@ -62,51 +61,52 @@ module.exports = MongoAWS = {
if (packId == null) {
return callback(new Error('invalid pack id'))
}
logger.debug({ projectId, docId, packId }, 'uploading data to s3')
const upload = createStream(S3S.WriteStream, projectId, docId, packId)
return db.docHistory.findOne(query, function (err, result) {
if (err != null) {
return callback(err)
}
if (result == null) {
return callback(new Error('cannot find pack to send to s3'))
}
if (result.expiresAt != null) {
return callback(new Error('refusing to send pack with TTL to s3'))
}
const uncompressedData = JSON.stringify(result)
if (uncompressedData.indexOf('\u0000') !== -1) {
const error = new Error('null bytes found in upload')
logger.error({ err: error, projectId, docId, packId }, error.message)
return callback(error)
}
return zlib.gzip(uncompressedData, function (err, buf) {
logger.debug(
{
projectId,
docId,
packId,
origSize: uncompressedData.length,
newSize: buf.length,
},
'compressed pack'
)
if (err != null) {
return callback(err)
}
upload.on('error', err => callback(err))
upload.on('finish', function () {
Metrics.inc('archive-pack')
logger.debug({ projectId, docId, packId }, 'upload to s3 completed')
return callback(null)
})
upload.write(buf)
return upload.end()
})
})
// logger.debug({ projectId, docId, packId }, 'uploading data to s3')
// const upload = createStream(S3S.WriteStream, projectId, docId, packId)
// return db.docHistory.findOne(query, function (err, result) {
// if (err != null) {
// return callback(err)
// }
// if (result == null) {
// return callback(new Error('cannot find pack to send to s3'))
// }
// if (result.expiresAt != null) {
// return callback(new Error('refusing to send pack with TTL to s3'))
// }
// const uncompressedData = JSON.stringify(result)
// if (uncompressedData.indexOf('\u0000') !== -1) {
// const error = new Error('null bytes found in upload')
// logger.error({ err: error, projectId, docId, packId }, error.message)
// return callback(error)
// }
// return zlib.gzip(uncompressedData, function (err, buf) {
// logger.debug(
// {
// projectId,
// docId,
// packId,
// origSize: uncompressedData.length,
// newSize: buf.length,
// },
// 'compressed pack'
// )
// if (err != null) {
// return callback(err)
// }
// upload.on('error', err => callback(err))
// upload.on('finish', function () {
// Metrics.inc('archive-pack')
// logger.debug({ projectId, docId, packId }, 'upload to s3 completed')
// return callback(null)
// })
// upload.write(buf)
// return upload.end()
// })
// })
},
readArchivedPack(projectId, docId, packId, _callback) {
......@@ -129,69 +129,71 @@ module.exports = MongoAWS = {
}
logger.debug({ projectId, docId, packId }, 'downloading data from s3')
return callback(new Error('invalid pack id'))
const download = createStream(S3S.ReadStream, projectId, docId, packId)
const inputStream = download
.on('open', obj => 1)
.on('error', err => callback(err))
const gunzip = zlib.createGunzip()
gunzip.setEncoding('utf8')
gunzip.on('error', function (err) {
logger.debug(
{ projectId, docId, packId, err },
'error uncompressing gzip stream'
)
return callback(err)
})
const outputStream = inputStream.pipe(gunzip)
const parts = []
outputStream.on('error', err => callback(err))
outputStream.on('end', function () {
let object
logger.debug({ projectId, docId, packId }, 'download from s3 completed')
try {
object = JSON.parse(parts.join(''))
} catch (e) {
return callback(e)
}
object._id = ObjectId(object._id)
object.doc_id = ObjectId(object.doc_id)
object.project_id = ObjectId(object.project_id)
for (const op of Array.from(object.pack)) {
if (op._id != null) {
op._id = ObjectId(op._id)
}
}
return callback(null, object)
})
return outputStream.on('data', data => parts.push(data))
// const download = createStream(S3S.ReadStream, projectId, docId, packId)
// const inputStream = download
// .on('open', obj => 1)
// .on('error', err => callback(err))
// const gunzip = zlib.createGunzip()
// gunzip.setEncoding('utf8')
// gunzip.on('error', function (err) {
// logger.debug(
// { projectId, docId, packId, err },
// 'error uncompressing gzip stream'
// )
// return callback(err)
// })
// const outputStream = inputStream.pipe(gunzip)
// const parts = []
// outputStream.on('error', err => callback(err))
// outputStream.on('end', function () {
// let object
// logger.debug({ projectId, docId, packId }, 'download from s3 completed')
// try {
// object = JSON.parse(parts.join(''))
// } catch (e) {
// return callback(e)
// }
// object._id = ObjectId(object._id)
// object.doc_id = ObjectId(object.doc_id)
// object.project_id = ObjectId(object.project_id)
// for (const op of Array.from(object.pack)) {
// if (op._id != null) {
// op._id = ObjectId(op._id)
// }
// }
// return callback(null, object)
// })
// return outputStream.on('data', data => parts.push(data))
},
unArchivePack(projectId, docId, packId, callback) {
if (callback == null) {
callback = function () {}
}
return MongoAWS.readArchivedPack(
projectId,
docId,
packId,
function (err, object) {
if (err != null) {
return callback(err)
}
Metrics.inc('unarchive-pack')
// allow the object to expire, we can always retrieve it again
object.expiresAt = new Date(Date.now() + 7 * DAYS)
logger.debug({ projectId, docId, packId }, 'inserting object from s3')
return db.docHistory.insertOne(object, (err, confirmation) => {
if (err) return callback(err)
object._id = confirmation.insertedId
callback(null, object)
})
}
)
return callback(new Error('invalid pack id'))
// return MongoAWS.readArchivedPack(
// projectId,
// docId,
// packId,
// function (err, object) {
// if (err != null) {
// return callback(err)
// }
// Metrics.inc('unarchive-pack')
// // allow the object to expire, we can always retrieve it again
// object.expiresAt = new Date(Date.now() + 7 * DAYS)
// logger.debug({ projectId, docId, packId }, 'inserting object from s3')
// return db.docHistory.insertOne(object, (err, confirmation) => {
// if (err) return callback(err)
// object._id = confirmation.insertedId
// callback(null, object)
// })
// }
// )
},
}
......@@ -528,7 +528,7 @@ module.exports = PackManager = {
return callback(err)
}
if (pack == null) {
return MongoAWS.unArchivePack(projectId, docId, packId, callback)
// return MongoAWS.unArchivePack(projectId, docId, packId, callback)
} else if (pack.expiresAt != null && pack.temporary === false) {
// we only need to touch the TTL when listing the changes in the project
// because diffs on individual documents are always done after that
......@@ -814,26 +814,27 @@ module.exports = PackManager = {
return cb()
}
}
return async.series(
[
cb =>
PackManager.checkArchiveNotInProgress(projectId, docId, packId, cb),
cb =>
PackManager.markPackAsArchiveInProgress(projectId, docId, packId, cb),
cb =>
MongoAWS.archivePack(projectId, docId, packId, err =>
clearFlagOnError(err, cb)
),
cb =>
PackManager.checkArchivedPack(projectId, docId, packId, err =>
clearFlagOnError(err, cb)
),
cb => PackManager.markPackAsArchived(projectId, docId, packId, cb),
cb =>
PackManager.setTTLOnArchivedPack(projectId, docId, packId, callback),
],
callback
)
return []
// return async.series(
// [
// cb =>
// PackManager.checkArchiveNotInProgress(projectId, docId, packId, cb),
// cb =>
// PackManager.markPackAsArchiveInProgress(projectId, docId, packId, cb),
// cb =>
// MongoAWS.archivePack(projectId, docId, packId, err =>
// clearFlagOnError(err, cb)
// ),
// cb =>
// PackManager.checkArchivedPack(projectId, docId, packId, err =>
// clearFlagOnError(err, cb)
// ),
// cb => PackManager.markPackAsArchived(projectId, docId, packId, cb),
// cb =>
// PackManager.setTTLOnArchivedPack(projectId, docId, packId, callback),
// ],
// callback
// )
},
checkArchivedPack(projectId, docId, packId, callback) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment