Son CV dans un terminal web en Javascript!
https://terminal-cv.gregandev.fr
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1357 lines
44 KiB
1357 lines
44 KiB
1 year ago
|
const fs = require('fs')
|
||
|
const { extname, basename, dirname} = require('path')
|
||
|
const { ArrayLikeIterable } = require('./util/ArrayLikeIterable')
|
||
|
const when = require('./util/when')
|
||
|
const EventEmitter = require('events')
|
||
|
Object.assign(exports, require('node-gyp-build')(__dirname))
|
||
|
const { Env, Cursor, Compression, getBufferForAddress, getAddress, keyValueToBuffer, bufferToKeyValue } = exports
|
||
|
const { CachingStore, setGetLastVersion } = require('./caching')
|
||
|
const { writeKey, readKey } = require('ordered-binary')
|
||
|
const os = require('os')
|
||
|
setGetLastVersion(getLastVersion)
|
||
|
Uint8ArraySlice = Uint8Array.prototype.slice
|
||
|
const keyBuffer = Buffer.allocUnsafeSlow(2048)
|
||
|
const keyBufferView = new DataView(keyBuffer.buffer, 0, 2048) // max key size is actually 1978
|
||
|
const buffers = []
|
||
|
|
||
|
const DEFAULT_SYNC_BATCH_THRESHOLD = 200000000 // 200MB
|
||
|
const DEFAULT_IMMEDIATE_BATCH_THRESHOLD = 10000000 // 10MB
|
||
|
const DEFAULT_COMMIT_DELAY = 0
|
||
|
const READING_TNX = {
|
||
|
readOnly: true
|
||
|
}
|
||
|
const ABORT = {}
|
||
|
|
||
|
const allDbs = exports.allDbs = new Map()
|
||
|
const SYNC_PROMISE_RESULT = Promise.resolve(true)
|
||
|
const SYNC_PROMISE_FAIL = Promise.resolve(false)
|
||
|
SYNC_PROMISE_RESULT.isSync = true
|
||
|
SYNC_PROMISE_FAIL.isSync = true
|
||
|
const LAST_KEY = String.fromCharCode(0xffff)
|
||
|
const LAST_BUFFER_KEY = Buffer.from([255, 255, 255, 255])
|
||
|
const FIRST_BUFFER_KEY = Buffer.from([0])
|
||
|
const ITERATOR_DONE = { done: true, value: undefined }
|
||
|
const writeUint32Key = (key, target, start) => {
|
||
|
(target.dataView || (target.dataView = new DataView(target.buffer, 0, target.length))).setUint32(start, key, true)
|
||
|
return start + 4
|
||
|
}
|
||
|
const readUint32Key = (target, start) => {
|
||
|
return (target.dataView || (target.dataView = new DataView(target.buffer, 0, target.length))).getUint32(start, true)
|
||
|
}
|
||
|
const writeBufferKey = (key, target, start) => {
|
||
|
if (key.length > 1978)
|
||
|
throw new Error('Key buffer is too long')
|
||
|
target.set(key, start)
|
||
|
return key.length + start
|
||
|
}
|
||
|
const readBufferKey = (target, start, end) => {
|
||
|
return Uint8ArraySlice.call(target, start, end)
|
||
|
}
|
||
|
let env
|
||
|
let defaultCompression
|
||
|
let lastSize, lastOffset, lastVersion
|
||
|
const MDB_SET_KEY = 0, MDB_SET_RANGE = 1, MDB_GET_BOTH_RANGE = 2, MDB_GET_CURRENT = 3, MDB_FIRST = 4, MDB_LAST = 5, MDB_NEXT = 6, MDB_NEXT_NODUP = 7, MDB_NEXT_DUP = 8, MDB_PREV = 9, MDB_PREV_NODUP = 10, MDB_PREV_DUP = 11
|
||
|
exports.open = open
|
||
|
exports.ABORT = ABORT
|
||
|
let abortedNonChildTransactionWarn
|
||
|
function open(path, options) {
|
||
|
let env = new Env()
|
||
|
let committingWrites
|
||
|
let scheduledTransactions
|
||
|
let scheduledOperations
|
||
|
let asyncTransactionAfter = true, asyncTransactionStrictOrder
|
||
|
let transactionWarned
|
||
|
let readTxn, writeTxn, pendingBatch, currentCommit, runNextBatch, readTxnRenewed, cursorTxns = []
|
||
|
let renewId = 1
|
||
|
if (typeof path == 'object' && !options) {
|
||
|
options = path
|
||
|
path = options.path
|
||
|
}
|
||
|
let extension = extname(path)
|
||
|
let name = basename(path, extension)
|
||
|
let is32Bit = os.arch().endsWith('32')
|
||
|
let remapChunks = (options && options.remapChunks) || ((options && options.mapSize) ?
|
||
|
(is32Bit && options.mapSize > 0x100000000) : // larger than fits in address space, must use dynamic maps
|
||
|
is32Bit) // without a known map size, we default to being able to handle large data correctly/well*/
|
||
|
options = Object.assign({
|
||
|
path,
|
||
|
noSubdir: Boolean(extension),
|
||
|
isRoot: true,
|
||
|
maxDbs: 12,
|
||
|
remapChunks,
|
||
|
keyBuffer,
|
||
|
// default map size limit of 4 exabytes when using remapChunks, since it is not preallocated and we can
|
||
|
// make it super huge.
|
||
|
mapSize: remapChunks ? 0x10000000000000 :
|
||
|
0x20000, // Otherwise we start small with 128KB
|
||
|
}, options)
|
||
|
if (options.asyncTransactionOrder == 'before') {
|
||
|
console.warn('asyncTransactionOrder: "before" is deprecated')
|
||
|
asyncTransactionAfter = false
|
||
|
} else if (options.asyncTransactionOrder == 'strict') {
|
||
|
asyncTransactionStrictOrder = true
|
||
|
asyncTransactionAfter = false
|
||
|
}
|
||
|
if (!fs.existsSync(options.noSubdir ? dirname(path) : path))
|
||
|
fs.mkdirSync(options.noSubdir ? dirname(path) : path, { recursive: true })
|
||
|
if (options.compression) {
|
||
|
let setDefault
|
||
|
if (options.compression == true) {
|
||
|
if (defaultCompression)
|
||
|
options.compression = defaultCompression
|
||
|
else
|
||
|
defaultCompression = options.compression = new Compression({
|
||
|
threshold: 1000,
|
||
|
dictionary: fs.readFileSync(require.resolve('./dict/dict.txt')),
|
||
|
})
|
||
|
} else
|
||
|
options.compression = new Compression(Object.assign({
|
||
|
threshold: 1000,
|
||
|
dictionary: fs.readFileSync(require.resolve('./dict/dict.txt')),
|
||
|
}), options.compression)
|
||
|
}
|
||
|
|
||
|
if (options && options.clearOnStart) {
|
||
|
console.info('Removing', path)
|
||
|
fs.removeSync(path)
|
||
|
console.info('Removed', path)
|
||
|
}
|
||
|
let useWritemap = options.useWritemap
|
||
|
try {
|
||
|
env.open(options)
|
||
|
} catch(error) {
|
||
|
if (error.message.startsWith('MDB_INVALID')) {
|
||
|
require('./util/upgrade-lmdb').upgrade(path, options, open)
|
||
|
env = new Env()
|
||
|
env.open(options)
|
||
|
} else
|
||
|
throw error
|
||
|
}
|
||
|
/* let filePath = noSubdir ? path : (path + '/data.mdb')
|
||
|
if (fs.statSync(filePath).size == env.info().mapSize && !options.remapChunks) {
|
||
|
// if the file size is identical to the map size, that means the OS is taking full disk space for
|
||
|
// mapping and we need to revert back to remapChunks
|
||
|
env.close()
|
||
|
options.remapChunks = true
|
||
|
env.open(options)
|
||
|
}*/
|
||
|
env.readerCheck() // clear out any stale entries
|
||
|
function renewReadTxn() {
|
||
|
if (readTxn)
|
||
|
readTxn.renew()
|
||
|
else
|
||
|
readTxn = env.beginTxn(READING_TNX)
|
||
|
readTxnRenewed = setImmediate(resetReadTxn)
|
||
|
return readTxn
|
||
|
}
|
||
|
function resetReadTxn() {
|
||
|
if (readTxnRenewed) {
|
||
|
renewId++
|
||
|
readTxnRenewed = null
|
||
|
if (readTxn.cursorCount - (readTxn.renewingCursorCount || 0) > 0) {
|
||
|
readTxn.onlyCursor = true
|
||
|
cursorTxns.push(readTxn)
|
||
|
readTxn = null
|
||
|
}
|
||
|
else
|
||
|
readTxn.reset()
|
||
|
}
|
||
|
}
|
||
|
let stores = []
|
||
|
class LMDBStore extends EventEmitter {
|
||
|
constructor(dbName, dbOptions) {
|
||
|
super()
|
||
|
if (dbName === undefined)
|
||
|
throw new Error('Database name must be supplied in name property (may be null for root database)')
|
||
|
|
||
|
const openDB = () => {
|
||
|
try {
|
||
|
this.db = env.openDbi(Object.assign({
|
||
|
name: dbName,
|
||
|
create: true,
|
||
|
txn: writeTxn,
|
||
|
}, dbOptions))
|
||
|
this.db.name = dbName || null
|
||
|
} catch(error) {
|
||
|
handleError(error, null, null, openDB)
|
||
|
}
|
||
|
}
|
||
|
if (dbOptions.compression && !(dbOptions.compression instanceof Compression)) {
|
||
|
if (dbOptions.compression == true && options.compression)
|
||
|
dbOptions.compression = options.compression // use the parent compression if available
|
||
|
else
|
||
|
dbOptions.compression = new Compression(Object.assign({
|
||
|
threshold: 1000,
|
||
|
dictionary: fs.readFileSync(require.resolve('./dict/dict.txt')),
|
||
|
}), dbOptions.compression)
|
||
|
}
|
||
|
|
||
|
if (dbOptions.dupSort && (dbOptions.useVersions || dbOptions.cache)) {
|
||
|
throw new Error('The dupSort flag can not be combined with versions or caching')
|
||
|
}
|
||
|
openDB()
|
||
|
resetReadTxn() // a read transaction becomes invalid after opening another db
|
||
|
this.name = dbName
|
||
|
this.env = env
|
||
|
this.reads = 0
|
||
|
this.writes = 0
|
||
|
this.transactions = 0
|
||
|
this.averageTransactionTime = 5
|
||
|
if (dbOptions.syncBatchThreshold)
|
||
|
console.warn('syncBatchThreshold is no longer supported')
|
||
|
if (dbOptions.immediateBatchThreshold)
|
||
|
console.warn('immediateBatchThreshold is no longer supported')
|
||
|
this.commitDelay = DEFAULT_COMMIT_DELAY
|
||
|
Object.assign(this, { // these are the options that are inherited
|
||
|
path: options.path,
|
||
|
encoding: options.encoding,
|
||
|
strictAsyncOrder: options.strictAsyncOrder,
|
||
|
}, dbOptions)
|
||
|
if (!this.encoding || this.encoding == 'msgpack' || this.encoding == 'cbor') {
|
||
|
this.encoder = this.decoder = new (this.encoding == 'cbor' ? require('cbor-x').Encoder : require('msgpackr').Encoder)
|
||
|
(Object.assign(this.sharedStructuresKey ?
|
||
|
this.setupSharedStructures() : {
|
||
|
copyBuffers: true // need to copy any embedded buffers that are found since we use unsafe buffers
|
||
|
}, options, dbOptions))
|
||
|
} else if (this.encoding == 'json') {
|
||
|
this.encoder = {
|
||
|
encode: JSON.stringify,
|
||
|
}
|
||
|
} else if (this.encoding == 'ordered-binary') {
|
||
|
this.encoder = this.decoder = {
|
||
|
encode(value) {
|
||
|
if (savePosition > 6200)
|
||
|
allocateSaveBuffer()
|
||
|
let start = savePosition
|
||
|
savePosition = writeKey(value, saveBuffer, start)
|
||
|
let buffer = saveBuffer.subarray(start, savePosition)
|
||
|
savePosition = (savePosition + 7) & 0xfffff8
|
||
|
return buffer
|
||
|
},
|
||
|
decode(buffer, end) { return readKey(buffer, 0, end) },
|
||
|
writeKey,
|
||
|
readKey,
|
||
|
}
|
||
|
}
|
||
|
if (this.keyIsUint32) {
|
||
|
this.writeKey = writeUint32Key
|
||
|
this.readKey = readUint32Key
|
||
|
} else if (this.keyIsBuffer) {
|
||
|
this.writeKey = writeBufferKey
|
||
|
this.readKey = readBufferKey
|
||
|
} else {
|
||
|
this.writeKey = writeKey
|
||
|
this.readKey = readKey
|
||
|
}
|
||
|
allDbs.set(dbName ? name + '-' + dbName : name, this)
|
||
|
stores.push(this)
|
||
|
}
|
||
|
openDB(dbName, dbOptions) {
|
||
|
if (typeof dbName == 'object' && !dbOptions) {
|
||
|
dbOptions = dbName
|
||
|
dbName = options.name
|
||
|
} else
|
||
|
dbOptions = dbOptions || {}
|
||
|
try {
|
||
|
return dbOptions.cache ?
|
||
|
new (CachingStore(LMDBStore))(dbName, dbOptions) :
|
||
|
new LMDBStore(dbName, dbOptions)
|
||
|
} catch(error) {
|
||
|
if (error.message.indexOf('MDB_DBS_FULL') > -1) {
|
||
|
error.message += ' (increase your maxDbs option)'
|
||
|
}
|
||
|
throw error
|
||
|
}
|
||
|
}
|
||
|
transactionAsync(callback, asChild) {
|
||
|
if (writeTxn) {
|
||
|
// already nested in a transaction, just execute and return
|
||
|
return callback()
|
||
|
}
|
||
|
let lastOperation
|
||
|
let after, strictOrder
|
||
|
if (scheduledOperations) {
|
||
|
lastOperation = asyncTransactionAfter ? scheduledOperations.appendAsyncTxn :
|
||
|
scheduledOperations[asyncTransactionStrictOrder ? scheduledOperations.length - 1 : 0]
|
||
|
} else {
|
||
|
scheduledOperations = []
|
||
|
scheduledOperations.bytes = 0
|
||
|
}
|
||
|
let transactionSet
|
||
|
let transactionSetIndex
|
||
|
if (lastOperation === true) { // continue last set of transactions
|
||
|
transactionSetIndex = scheduledTransactions.length - 1
|
||
|
transactionSet = scheduledTransactions[transactionSetIndex]
|
||
|
} else {
|
||
|
// for now we signify transactions as a true
|
||
|
if (asyncTransactionAfter) // by default we add a flag to put transactions after other operations
|
||
|
scheduledOperations.appendAsyncTxn = true
|
||
|
else if (asyncTransactionStrictOrder)
|
||
|
scheduledOperations.push(true)
|
||
|
else // in before mode, we put all the async transaction at the beginning
|
||
|
scheduledOperations.unshift(true)
|
||
|
if (!scheduledTransactions) {
|
||
|
scheduledTransactions = []
|
||
|
}
|
||
|
transactionSetIndex = scheduledTransactions.push(transactionSet = []) - 1
|
||
|
}
|
||
|
let index = (transactionSet.push(asChild ?
|
||
|
{asChild, callback } : callback) - 1) << 1
|
||
|
return this.scheduleCommit().results.then((results) => {
|
||
|
let transactionResults = results.transactionResults[transactionSetIndex]
|
||
|
let error = transactionResults[index]
|
||
|
if (error)
|
||
|
throw error
|
||
|
return transactionResults[index + 1]
|
||
|
})
|
||
|
}
|
||
|
childTransaction(callback) {
|
||
|
if (useWritemap)
|
||
|
throw new Error('Child transactions are not supported in writemap mode')
|
||
|
if (writeTxn) {
|
||
|
let parentTxn = writeTxn
|
||
|
let childTxn = writeTxn = env.beginTxn(null, parentTxn)
|
||
|
try {
|
||
|
return when(callback(), (result) => {
|
||
|
writeTxn = parentTxn
|
||
|
if (result === ABORT)
|
||
|
childTxn.abort()
|
||
|
else
|
||
|
childTxn.commit()
|
||
|
return result
|
||
|
}, (error) => {
|
||
|
writeTxn = parentTxn
|
||
|
childTxn.abort()
|
||
|
throw error
|
||
|
})
|
||
|
} catch(error) {
|
||
|
writeTxn = parentTxn
|
||
|
childTxn.abort()
|
||
|
throw error
|
||
|
}
|
||
|
}
|
||
|
return this.transactionAsync(callback, true)
|
||
|
}
|
||
|
transaction(callback, abort) {
|
||
|
if (!transactionWarned) {
|
||
|
console.warn('transaction is deprecated, use transactionSync if you want a synchronous transaction or transactionAsync for asynchronous transaction. In this future this will always call transactionAsync.')
|
||
|
transactionWarned = true
|
||
|
}
|
||
|
let result = this.transactionSync(callback, abort)
|
||
|
return abort ? ABORT : result
|
||
|
}
|
||
|
transactionSync(callback, abort) {
|
||
|
if (writeTxn) {
|
||
|
if (!useWritemap && !this.cache)
|
||
|
// already nested in a transaction, execute as child transaction (if possible) and return
|
||
|
return this.childTransaction(callback)
|
||
|
let result = callback() // else just run in current transaction
|
||
|
if (result == ABORT && !abortedNonChildTransactionWarn) {
|
||
|
console.warn('Can not abort a transaction inside another transaction with ' + (this.cache ? 'caching enabled' : 'useWritemap enabled'))
|
||
|
abortedNonChildTransactionWarn = true
|
||
|
}
|
||
|
return result
|
||
|
}
|
||
|
let txn
|
||
|
try {
|
||
|
this.transactions++
|
||
|
txn = writeTxn = env.beginTxn()
|
||
|
/*if (scheduledOperations && runNextBatch) {
|
||
|
runNextBatch((operations, callback) => {
|
||
|
try {
|
||
|
callback(null, this.commitBatchNow(operations))
|
||
|
} catch (error) {
|
||
|
callback(error)
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
TODO: To reenable forced sequential writes, we need to re-execute the operations if we get an env resize
|
||
|
*/
|
||
|
return when(callback(), (result) => {
|
||
|
try {
|
||
|
if (result === ABORT)
|
||
|
txn.abort()
|
||
|
else {
|
||
|
txn.commit()
|
||
|
resetReadTxn()
|
||
|
}
|
||
|
writeTxn = null
|
||
|
return result
|
||
|
} catch(error) {
|
||
|
if (error.message == 'The transaction is already closed.') {
|
||
|
return result
|
||
|
}
|
||
|
return handleError(error, this, txn, () => this.transaction(callback))
|
||
|
}
|
||
|
}, (error) => {
|
||
|
return handleError(error, this, txn, () => this.transaction(callback))
|
||
|
})
|
||
|
} catch(error) {
|
||
|
return handleError(error, this, txn, () => this.transaction(callback))
|
||
|
}
|
||
|
}
|
||
|
getSharedBufferForGet(id) {
|
||
|
let txn = (writeTxn || (readTxnRenewed ? readTxn : renewReadTxn()))
|
||
|
lastSize = this.keyIsCompatibility ? txn.getBinaryShared(id) : this.db.get(this.writeKey(id, keyBuffer, 0))
|
||
|
if (lastSize === 0xffffffff) { // not found code
|
||
|
return //undefined
|
||
|
}
|
||
|
return lastSize
|
||
|
lastSize = keyBufferView.getUint32(0, true)
|
||
|
let bufferIndex = keyBufferView.getUint32(12, true)
|
||
|
lastOffset = keyBufferView.getUint32(8, true)
|
||
|
let buffer = buffers[bufferIndex]
|
||
|
let startOffset
|
||
|
if (!buffer || lastOffset < (startOffset = buffer.startOffset) || (lastOffset + lastSize > startOffset + 0x100000000)) {
|
||
|
if (buffer)
|
||
|
env.detachBuffer(buffer.buffer)
|
||
|
startOffset = (lastOffset >>> 16) * 0x10000
|
||
|
console.log('make buffer for address', bufferIndex * 0x100000000 + startOffset)
|
||
|
buffer = buffers[bufferIndex] = Buffer.from(getBufferForAddress(bufferIndex * 0x100000000 + startOffset))
|
||
|
buffer.startOffset = startOffset
|
||
|
}
|
||
|
lastOffset -= startOffset
|
||
|
return buffer
|
||
|
return buffer.slice(lastOffset, lastOffset + lastSize)/*Uint8ArraySlice.call(buffer, lastOffset, lastOffset + lastSize)*/
|
||
|
}
|
||
|
|
||
|
getSizeBinaryFast(id) {
|
||
|
(writeTxn || (readTxnRenewed ? readTxn : renewReadTxn()))
|
||
|
lastSize = this.keyIsCompatibility ? this.db.getByPrimitive(id) : this.db.getByBinary(this.writeKey(id, keyBuffer, 0))
|
||
|
}
|
||
|
getString(id) {
|
||
|
let string = (writeTxn || (readTxnRenewed ? readTxn : renewReadTxn()))
|
||
|
.getUtf8(this.db, id)
|
||
|
if (string)
|
||
|
lastSize = string.length
|
||
|
return string
|
||
|
}
|
||
|
getBinaryFast(id) {
|
||
|
this.getSizeBinaryFast(id)
|
||
|
return lastSize === 0xffffffff ? undefined : this.db.unsafeBuffer.subarray(0, lastSize)
|
||
|
}
|
||
|
getBinary(id) {
|
||
|
this.getSizeBinaryFast(id)
|
||
|
return lastSize === 0xffffffff ? undefined : Uint8ArraySlice.call(this.db.unsafeBuffer, 0, lastSize)
|
||
|
}
|
||
|
get(id) {
|
||
|
if (this.decoder) {
|
||
|
this.getSizeBinaryFast(id)
|
||
|
return lastSize === 0xffffffff ? undefined : this.decoder.decode(this.db.unsafeBuffer, lastSize)
|
||
|
}
|
||
|
if (this.encoding == 'binary')
|
||
|
return this.getBinary(id)
|
||
|
|
||
|
let result = this.getString(id)
|
||
|
if (result) {
|
||
|
if (this.encoding == 'json')
|
||
|
return JSON.parse(result)
|
||
|
}
|
||
|
return result
|
||
|
}
|
||
|
getEntry(id) {
|
||
|
let value = this.get(id)
|
||
|
if (value !== undefined) {
|
||
|
if (this.useVersions)
|
||
|
return {
|
||
|
value,
|
||
|
version: getLastVersion(),
|
||
|
//size: lastSize
|
||
|
}
|
||
|
else
|
||
|
return {
|
||
|
value,
|
||
|
//size: lastSize
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
resetReadTxn() {
|
||
|
resetReadTxn()
|
||
|
}
|
||
|
ifNoExists(key, callback) {
|
||
|
return this.ifVersion(key, null, callback)
|
||
|
}
|
||
|
ifVersion(key, version, callback) {
|
||
|
if (typeof version != 'number') {
|
||
|
if (version == null) {
|
||
|
if (version === null)
|
||
|
version = -4.2434325325532E-199 // NO_EXIST_VERSION
|
||
|
else {// if undefined, just do callback without any condition being added
|
||
|
callback()
|
||
|
// TODO: if we are inside another ifVersion, use that promise, or use ANY_VERSION
|
||
|
return pendingBatch ? pendingBatch.unconditionalResults : Promise.resolve(true) // be consistent in returning a promise, indicate success
|
||
|
}
|
||
|
} else {
|
||
|
throw new Error('Version must be a number or null')
|
||
|
}
|
||
|
}
|
||
|
let scheduledOperations = this.getScheduledOperations()
|
||
|
let index = scheduledOperations.push([key, version]) - 1
|
||
|
try {
|
||
|
callback()
|
||
|
let commit = this.scheduleCommit()
|
||
|
return commit.results.then((writeResults) => {
|
||
|
if (writeResults[index] === 0)
|
||
|
return true
|
||
|
if (writeResults[index] === 3) {
|
||
|
throw new Error('The key size was 0 or too large')
|
||
|
}
|
||
|
return false
|
||
|
})
|
||
|
} finally {
|
||
|
scheduledOperations.push(false) // reset condition
|
||
|
}
|
||
|
}
|
||
|
doesExist(key, versionOrValue) {
|
||
|
let txn
|
||
|
try {
|
||
|
if (writeTxn) {
|
||
|
txn = writeTxn
|
||
|
} else {
|
||
|
txn = readTxnRenewed ? readTxn : renewReadTxn()
|
||
|
}
|
||
|
if (versionOrValue === undefined) {
|
||
|
this.getSizeBinaryFast(key)
|
||
|
return lastSize !== 0xffffffff
|
||
|
}
|
||
|
else if (this.useVersions) {
|
||
|
this.getSizeBinaryFast(key)
|
||
|
return lastSize !== 0xffffffff && matches(getLastVersion(), versionOrValue)
|
||
|
}
|
||
|
else {
|
||
|
let cursor = new Cursor(txn, this.db)
|
||
|
if (this.encoder) {
|
||
|
versionOrValue = this.encoder.encode(versionOrValue)
|
||
|
}
|
||
|
if (typeof versionOrValue == 'string')
|
||
|
versionOrValue = Buffer.from(versionOrValue)
|
||
|
let result = cursor.goToDup(key, versionOrValue) !== undefined
|
||
|
cursor.close()
|
||
|
return result
|
||
|
}
|
||
|
} catch(error) {
|
||
|
return handleError(error, this, txn, () => this.doesExist(key, versionOrValue))
|
||
|
}
|
||
|
}
|
||
|
getScheduledOperations() {
|
||
|
if (!scheduledOperations) {
|
||
|
scheduledOperations = []
|
||
|
scheduledOperations.bytes = 0
|
||
|
}
|
||
|
if (scheduledOperations.store != this) {
|
||
|
// issue action to switch dbs
|
||
|
scheduledOperations.store = this
|
||
|
scheduledOperations.push(this.db)
|
||
|
}
|
||
|
return scheduledOperations
|
||
|
}
|
||
|
putToBinary(id, value, version, ifVersion) {
|
||
|
let operations = this.getScheduledOperations()
|
||
|
let position = writeBuffer.position || 0
|
||
|
writeUint32Array[(position++) << 1] = 0 // write the operation
|
||
|
writeFloat64Array[position++] = version
|
||
|
writeFloat64Array[position++] = ifVersion
|
||
|
let keySize = this.writeKey(id, writeBuffer, (position + 2) << 3)
|
||
|
writeUint32Array[(position << 1) - 3] = keySize
|
||
|
if (this.encoder) {
|
||
|
//if (!(value instanceof Uint8Array)) TODO: in a future version, directly store buffers that are provided
|
||
|
value = this.encoder.encode(value)
|
||
|
}
|
||
|
writeUint32Array[(position++ << 1) - 2] = keySize
|
||
|
writeUint32Array[(position++) << 1] = value.length
|
||
|
writeFloat64Array[position] = 0
|
||
|
position += ((keySize - 1) >> 3) + 1
|
||
|
writeBuffer.position = position
|
||
|
}
|
||
|
put(id, value, version, ifVersion) {
|
||
|
if (id.length > 1978) {
|
||
|
throw new Error('Key is larger than maximum key size (1978)')
|
||
|
}
|
||
|
this.writes++
|
||
|
if (writeTxn) {
|
||
|
if (ifVersion !== undefined) {
|
||
|
this.get(id)
|
||
|
let previousVersion = this.get(id) ? getLastVersion() : null
|
||
|
if (!matches(previousVersion, ifVersion)) {
|
||
|
return SYNC_PROMISE_FAIL
|
||
|
}
|
||
|
}
|
||
|
putSync.call(this, id, value, version)
|
||
|
return SYNC_PROMISE_RESULT
|
||
|
}
|
||
|
if (this.encoder) {
|
||
|
//if (!(value instanceof Uint8Array)) TODO: in a future version, directly store buffers that are provided
|
||
|
value = this.encoder.encode(value)
|
||
|
} else if (typeof value != 'string' && !(value instanceof Uint8Array))
|
||
|
throw new Error('Invalid value to put in database ' + value + ' (' + (typeof value) +'), consider using encoder')
|
||
|
let operations = this.getScheduledOperations()
|
||
|
let index = operations.push(ifVersion == null ? version == null ? [id, value] : [id, value, version] : [id, value, version, ifVersion]) - 1
|
||
|
// track the size of the scheduled operations (and include the approx size of the array structure too)
|
||
|
operations.bytes += (id.length || 6) + (value && value.length || 0) + 100
|
||
|
let commit = this.scheduleCommit()
|
||
|
return ifVersion === undefined ? commit.unconditionalResults : // TODO: Technically you can get a bad key if an array is passed in there is no ifVersion and still fail
|
||
|
commit.results.then((writeResults) => {
|
||
|
if (writeResults[index] === 0)
|
||
|
return true
|
||
|
if (writeResults[index] === 3) {
|
||
|
throw new Error('The key size was 0 or too large')
|
||
|
}
|
||
|
return false
|
||
|
})
|
||
|
}
|
||
|
putSync(id, value, version) {
|
||
|
if (id.length > 1978) {
|
||
|
throw new Error('Key is larger than maximum key size (1978)')
|
||
|
}
|
||
|
let localTxn, hadWriteTxn = writeTxn
|
||
|
try {
|
||
|
this.writes++
|
||
|
if (!writeTxn)
|
||
|
localTxn = writeTxn = env.beginTxn()
|
||
|
if (this.encoder)
|
||
|
value = this.encoder.encode(value)
|
||
|
if (typeof value == 'string') {
|
||
|
writeTxn.putUtf8(this.db, id, value, version)
|
||
|
} else {
|
||
|
if (!(value instanceof Uint8Array)) {
|
||
|
throw new Error('Invalid value type ' + typeof value + ' used ' + value)
|
||
|
}
|
||
|
writeTxn.putBinary(this.db, id, value, version)
|
||
|
}
|
||
|
if (localTxn) {
|
||
|
writeTxn.commit()
|
||
|
writeTxn = null
|
||
|
resetReadTxn()
|
||
|
}
|
||
|
} catch(error) {
|
||
|
if (hadWriteTxn)
|
||
|
throw error // if we are in a transaction, the whole transaction probably needs to restart
|
||
|
return handleError(error, this, localTxn, () => this.putSync(id, value, version))
|
||
|
}
|
||
|
}
|
||
|
removeSync(id, ifVersionOrValue) {
|
||
|
if (id.length > 1978) {
|
||
|
throw new Error('Key is larger than maximum key size (1978)')
|
||
|
}
|
||
|
let localTxn, hadWriteTxn = writeTxn
|
||
|
try {
|
||
|
if (!writeTxn)
|
||
|
localTxn = writeTxn = env.beginTxn()
|
||
|
let deleteValue
|
||
|
if (ifVersionOrValue !== undefined) {
|
||
|
if (this.useVersions) {
|
||
|
let previousVersion = this.get(id) ? getLastVersion() : null
|
||
|
if (!matches(previousVersion, ifVersionOrValue))
|
||
|
return false
|
||
|
} else if (this.encoder)
|
||
|
deleteValue = this.encoder.encode(ifVersionOrValue)
|
||
|
else
|
||
|
deleteValue = ifVersionOrValue
|
||
|
}
|
||
|
this.writes++
|
||
|
let result
|
||
|
if (deleteValue)
|
||
|
result = writeTxn.del(this.db, id, deleteValue)
|
||
|
else
|
||
|
result = writeTxn.del(this.db, id)
|
||
|
if (localTxn) {
|
||
|
writeTxn.commit()
|
||
|
writeTxn = null
|
||
|
resetReadTxn()
|
||
|
}
|
||
|
return result // object found and deleted
|
||
|
} catch(error) {
|
||
|
if (hadWriteTxn)
|
||
|
throw error // if we are in a transaction, the whole transaction probably needs to restart
|
||
|
return handleError(error, this, localTxn, () => this.removeSync(id))
|
||
|
}
|
||
|
}
|
||
|
remove(id, ifVersionOrValue) {
|
||
|
if (id.length > 1978) {
|
||
|
throw new Error('Key is larger than maximum key size (1978)')
|
||
|
}
|
||
|
this.writes++
|
||
|
if (writeTxn) {
|
||
|
if (removeSync.call(this, id, ifVersionOrValue) === false)
|
||
|
return SYNC_PROMISE_FAIL
|
||
|
return SYNC_PROMISE_RESULT
|
||
|
}
|
||
|
let scheduledOperations = this.getScheduledOperations()
|
||
|
let operation
|
||
|
if (ifVersionOrValue === undefined)
|
||
|
operation = [id]
|
||
|
else if (this.useVersions)
|
||
|
operation = [id, undefined, undefined, ifVersionOrValue] // version condition
|
||
|
else {
|
||
|
if (this.encoder)
|
||
|
operation = [id, this.encoder.encode(ifVersionOrValue), true]
|
||
|
else
|
||
|
operation = [id, ifVersionOrValue, true]
|
||
|
}
|
||
|
let index = scheduledOperations.push(operation) - 1 // remove specific values
|
||
|
scheduledOperations.bytes += (id.length || 6) + 100
|
||
|
let commit = this.scheduleCommit()
|
||
|
return ifVersionOrValue === undefined ? commit.unconditionalResults :
|
||
|
commit.results.then((writeResults) => {
|
||
|
if (writeResults[index] === 0)
|
||
|
return true
|
||
|
if (writeResults[index] === 3) {
|
||
|
throw new Error('The key size was 0 or too large')
|
||
|
}
|
||
|
return false
|
||
|
})
|
||
|
}
|
||
|
getValues(key, options) {
|
||
|
let defaultOptions = {
|
||
|
key,
|
||
|
valuesForKey: true
|
||
|
}
|
||
|
if (options && options.snapshot === false)
|
||
|
throw new Error('Can not disable snapshots for getValues')
|
||
|
return this.getRange(options ? Object.assign(defaultOptions, options) : defaultOptions)
|
||
|
}
|
||
|
getKeys(options) {
|
||
|
if (!options)
|
||
|
options = {}
|
||
|
options.values = false
|
||
|
return this.getRange(options)
|
||
|
}
|
||
|
getCount(options) {
|
||
|
if (!options)
|
||
|
options = {}
|
||
|
options.onlyCount = true
|
||
|
return this.getRange(options)[Symbol.iterator]()
|
||
|
}
|
||
|
getKeysCount(options) {
|
||
|
if (!options)
|
||
|
options = {}
|
||
|
options.onlyCount = true
|
||
|
options.values = false
|
||
|
return this.getRange(options)[Symbol.iterator]()
|
||
|
}
|
||
|
getValuesCount(key, options) {
|
||
|
if (!options)
|
||
|
options = {}
|
||
|
options.key = key
|
||
|
options.valuesForKey = true
|
||
|
options.onlyCount = true
|
||
|
return this.getRange(options)[Symbol.iterator]()
|
||
|
}
|
||
|
getRange(options) {
|
||
|
let iterable = new ArrayLikeIterable()
|
||
|
if (!options)
|
||
|
options = {}
|
||
|
let includeValues = options.values !== false
|
||
|
let includeVersions = options.versions
|
||
|
let valuesForKey = options.valuesForKey
|
||
|
let limit = options.limit
|
||
|
let db = this.db
|
||
|
let snapshot = options.snapshot
|
||
|
iterable[Symbol.iterator] = () => {
|
||
|
let currentKey = valuesForKey ? options.key : options.start
|
||
|
const reverse = options.reverse
|
||
|
let count = 0
|
||
|
let cursor, cursorRenewId
|
||
|
let txn
|
||
|
let flags = (includeValues ? 0x100 : 0) | (reverse ? 0x400 : 0) | (valuesForKey ? 0x800 : 0)
|
||
|
function resetCursor() {
|
||
|
try {
|
||
|
if (cursor)
|
||
|
finishCursor()
|
||
|
|
||
|
txn = writeTxn || (readTxnRenewed ? readTxn : renewReadTxn())
|
||
|
cursor = !writeTxn && db.availableCursor
|
||
|
if (cursor) {
|
||
|
db.availableCursor = null
|
||
|
if (db.cursorTxn != txn)
|
||
|
cursor.renew(txn)
|
||
|
else// if (db.currentRenewId != renewId)
|
||
|
flags |= 0x2000
|
||
|
} else {
|
||
|
cursor = new Cursor(txn, db)
|
||
|
}
|
||
|
txn.cursorCount = (txn.cursorCount || 0) + 1 // track transaction so we always use the same one
|
||
|
if (snapshot === false) {
|
||
|
cursorRenewId = renewId // use shared read transaction
|
||
|
txn.renewingCursorCount = (txn.renewingCursorCount || 0) + 1 // need to know how many are renewing cursors
|
||
|
}
|
||
|
} catch(error) {
|
||
|
if (cursor) {
|
||
|
try {
|
||
|
cursor.close()
|
||
|
} catch(error) { }
|
||
|
}
|
||
|
return handleError(error, this, txn, resetCursor)
|
||
|
}
|
||
|
}
|
||
|
resetCursor()
|
||
|
let store = this
|
||
|
if (options.onlyCount) {
|
||
|
flags |= 0x1000
|
||
|
let count = position(options.offset)
|
||
|
finishCursor()
|
||
|
return count
|
||
|
}
|
||
|
function position(offset) {
|
||
|
let keySize = store.writeKey(currentKey, keyBuffer, 0)
|
||
|
let endAddress
|
||
|
if (valuesForKey) {
|
||
|
if (options.start === undefined && options.end === undefined)
|
||
|
endAddress = 0
|
||
|
else {
|
||
|
let startAddress
|
||
|
if (store.encoder.writeKey) {
|
||
|
startAddress = BigInt(saveKey(options.start, store.encoder.writeKey, iterable))
|
||
|
keyBufferView.setBigUint64(2000, startAddress, true)
|
||
|
endAddress = saveKey(options.end, store.encoder.writeKey, iterable)
|
||
|
} else {
|
||
|
throw new Error('Only key-based encoding is supported for start/end values')
|
||
|
let encoded = store.encoder.encode(options.start)
|
||
|
let bufferAddress = encoded.buffer.address || (encoded.buffer.address = getAddress(encoded) - encoded.byteOffset)
|
||
|
startAddress = bufferAddress + encoded.byteOffset
|
||
|
}
|
||
|
}
|
||
|
} else
|
||
|
endAddress = saveKey(options.end, store.writeKey, iterable)
|
||
|
return cursor.position(flags, offset || 0, keySize, endAddress)
|
||
|
}
|
||
|
|
||
|
function finishCursor() {
|
||
|
if (txn.isAborted)
|
||
|
return
|
||
|
if (cursorRenewId)
|
||
|
txn.renewingCursorCount--
|
||
|
if (--txn.cursorCount <= 0 && txn.onlyCursor) {
|
||
|
cursor.close()
|
||
|
let index = cursorTxns.indexOf(txn)
|
||
|
if (index > -1)
|
||
|
cursorTxns.splice(index, 1)
|
||
|
txn.abort() // this is no longer main read txn, abort it now that we are done
|
||
|
txn.isAborted = true
|
||
|
} else {
|
||
|
if (db.availableCursor || txn != readTxn)
|
||
|
cursor.close()
|
||
|
else {// try to reuse it
|
||
|
db.availableCursor = cursor
|
||
|
db.cursorTxn = txn
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return {
|
||
|
next() {
|
||
|
let keySize
|
||
|
if (cursorRenewId && cursorRenewId != renewId) {
|
||
|
resetCursor()
|
||
|
keySize = position(0)
|
||
|
}
|
||
|
if (count === 0) { // && includeValues) // on first entry, get current value if we need to
|
||
|
keySize = position(options.offset)
|
||
|
} else
|
||
|
keySize = cursor.iterate()
|
||
|
if (keySize === 0 ||
|
||
|
(count++ >= limit)) {
|
||
|
finishCursor()
|
||
|
return ITERATOR_DONE
|
||
|
}
|
||
|
if (includeValues) // TODO: Can we do this after readKey, ran into issues with this before
|
||
|
lastSize = keyBufferView.getUint32(0, true)
|
||
|
if (!valuesForKey || snapshot === false)
|
||
|
currentKey = store.readKey(keyBuffer, 32, keySize + 32)
|
||
|
if (includeValues) {
|
||
|
let value
|
||
|
if (store.decoder) {
|
||
|
value = store.decoder.decode(db.unsafeBuffer, lastSize)
|
||
|
} else if (store.encoding == 'binary')
|
||
|
value = Uint8ArraySlice.call(db.unsafeBuffer, 0, lastSize)
|
||
|
else {
|
||
|
value = store.db.unsafeBuffer.toString('utf8', 0, lastSize)
|
||
|
if (store.encoding == 'json' && value)
|
||
|
value = JSON.parse(value)
|
||
|
}
|
||
|
if (includeVersions)
|
||
|
return {
|
||
|
value: {
|
||
|
key: currentKey,
|
||
|
value,
|
||
|
version: getLastVersion()
|
||
|
}
|
||
|
}
|
||
|
else if (valuesForKey)
|
||
|
return {
|
||
|
value
|
||
|
}
|
||
|
else
|
||
|
return {
|
||
|
value: {
|
||
|
key: currentKey,
|
||
|
value,
|
||
|
}
|
||
|
}
|
||
|
} else if (includeVersions) {
|
||
|
return {
|
||
|
value: {
|
||
|
key: currentKey,
|
||
|
version: getLastVersion()
|
||
|
}
|
||
|
}
|
||
|
} else {
|
||
|
return {
|
||
|
value: currentKey
|
||
|
}
|
||
|
}
|
||
|
},
|
||
|
return() {
|
||
|
finishCursor()
|
||
|
return ITERATOR_DONE
|
||
|
},
|
||
|
throw() {
|
||
|
finishCursor()
|
||
|
return ITERATOR_DONE
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return iterable
|
||
|
}
|
||
|
scheduleCommit() {
|
||
|
if (!pendingBatch) {
|
||
|
// pendingBatch promise represents the completion of the transaction
|
||
|
let whenCommitted = new Promise((resolve, reject) => {
|
||
|
runNextBatch = (sync) => {
|
||
|
if (!whenCommitted)
|
||
|
return
|
||
|
runNextBatch = null
|
||
|
if (pendingBatch) {
|
||
|
for (const store of stores) {
|
||
|
store.emit('beforecommit', { scheduledOperations })
|
||
|
}
|
||
|
}
|
||
|
clearTimeout(timeout)
|
||
|
currentCommit = whenCommitted
|
||
|
whenCommitted = null
|
||
|
pendingBatch = null
|
||
|
if (scheduledOperations || scheduledTransactions) {
|
||
|
// operations to perform, collect them as an array and start doing them
|
||
|
let operations = scheduledOperations || []
|
||
|
let transactions = scheduledTransactions
|
||
|
if (operations.appendAsyncTxn) {
|
||
|
operations.push(true)
|
||
|
}
|
||
|
scheduledOperations = null
|
||
|
scheduledTransactions = null
|
||
|
const writeBatch = () => {
|
||
|
let start = Date.now()
|
||
|
let results = Buffer.alloc(operations.length)
|
||
|
let continuedWriteTxn
|
||
|
let transactionResults
|
||
|
let transactionSetIndex = 0
|
||
|
let callback = async (error) => {
|
||
|
if (error === true) {
|
||
|
// resume batch transaction
|
||
|
if (!transactionResults) {
|
||
|
// get the transaction we will use
|
||
|
continuedWriteTxn = env.beginTxn(true)
|
||
|
transactionResults = new Array(transactions.length)
|
||
|
results.transactionResults = transactionResults
|
||
|
}
|
||
|
let transactionSet = transactions[transactionSetIndex]
|
||
|
let transactionSetResults = transactionResults[transactionSetIndex++] = []
|
||
|
let promises
|
||
|
for (let i = 0, l = transactionSet.length; i < l; i++) {
|
||
|
let userTxn = transactionSet[i]
|
||
|
let asChild = userTxn.asChild
|
||
|
if (asChild) {
|
||
|
if (promises) {
|
||
|
// must complete any outstanding transactions before proceeding
|
||
|
await Promise.all(promises)
|
||
|
promises = null
|
||
|
}
|
||
|
let childTxn = writeTxn = env.beginTxn(null, continuedWriteTxn)
|
||
|
try {
|
||
|
let result = userTxn.callback()
|
||
|
if (result && result.then) {
|
||
|
await result
|
||
|
}
|
||
|
if (result === ABORT)
|
||
|
childTxn.abort()
|
||
|
else
|
||
|
childTxn.commit()
|
||
|
transactionSetResults[(i << 1) + 1] = result
|
||
|
} catch(error) {
|
||
|
childTxn.abort()
|
||
|
if (!txnError(error, i))
|
||
|
return
|
||
|
}
|
||
|
} else {
|
||
|
writeTxn = continuedWriteTxn
|
||
|
try {
|
||
|
let result = userTxn()
|
||
|
if (result && result.then) {
|
||
|
if (!promises)
|
||
|
promises = []
|
||
|
transactionSetResults[(i << 1) + 1] = result
|
||
|
promises.push(result.catch(() => {
|
||
|
txnError(error, i)
|
||
|
}))
|
||
|
} else
|
||
|
transactionSetResults[(i << 1) + 1] = result
|
||
|
} catch(error) {
|
||
|
if (!txnError(error, i))
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
if (promises) { // finish any outstanding commit functions
|
||
|
await Promise.all(promises)
|
||
|
}
|
||
|
writeTxn = null
|
||
|
return env.continueBatch(0)
|
||
|
function txnError(error, i) {
|
||
|
if (error.message.startsWith('MDB_MAP_FULL')) {
|
||
|
env.continueBatch(-30792)
|
||
|
writeTxn = null
|
||
|
return false
|
||
|
}
|
||
|
if (error.message.startsWith('MDB_MAP_RESIZED')) {
|
||
|
env.continueBatch(-30785)
|
||
|
writeTxn = null
|
||
|
return false
|
||
|
}
|
||
|
// user exception
|
||
|
transactionSetResults[i << 1] = error
|
||
|
return true
|
||
|
}
|
||
|
}
|
||
|
let duration = Date.now() - start
|
||
|
this.averageTransactionTime = (this.averageTransactionTime * 3 + duration) / 4
|
||
|
//console.log('did batch', (duration) + 'ms', name, operations.length/*map(o => o[1].toString('binary')).join(',')*/)
|
||
|
resetReadTxn()
|
||
|
if (error) {
|
||
|
if (error.message == 'Interrupted batch')
|
||
|
// if the batch was interrupted by a sync transaction request we just have to restart it
|
||
|
return writeBatch()
|
||
|
try {
|
||
|
// see if we can recover from recoverable error (like full map with a resize)
|
||
|
handleError(error, this, null, writeBatch)
|
||
|
} catch(error) {
|
||
|
currentCommit = null
|
||
|
for (const store of stores) {
|
||
|
store.emit('aftercommit', { operations })
|
||
|
}
|
||
|
reject(error)
|
||
|
}
|
||
|
} else {
|
||
|
currentCommit = null
|
||
|
for (const store of stores) {
|
||
|
store.emit('aftercommit', { operations, results })
|
||
|
}
|
||
|
resolve(results)
|
||
|
}
|
||
|
}
|
||
|
try {
|
||
|
if (sync === true) {
|
||
|
env.batchWrite(operations, results)
|
||
|
callback()
|
||
|
} else
|
||
|
env.batchWrite(operations, results, callback)
|
||
|
} catch (error) {
|
||
|
callback(error)
|
||
|
}
|
||
|
}
|
||
|
try {
|
||
|
writeBatch()
|
||
|
} catch(error) {
|
||
|
reject(error)
|
||
|
}
|
||
|
} else {
|
||
|
resolve([])
|
||
|
}
|
||
|
}
|
||
|
let timeout
|
||
|
if (this.commitDelay > 0) {
|
||
|
timeout = setTimeout(() => {
|
||
|
when(currentCommit, () => whenCommitted && runNextBatch(), () => whenCommitted && runNextBatch())
|
||
|
}, this.commitDelay)
|
||
|
} else {
|
||
|
timeout = runNextBatch.immediate = setImmediate(() => {
|
||
|
when(currentCommit, () => whenCommitted && runNextBatch(), () => whenCommitted && runNextBatch())
|
||
|
})
|
||
|
}
|
||
|
})
|
||
|
pendingBatch = {
|
||
|
results: whenCommitted,
|
||
|
unconditionalResults: whenCommitted.then(() => true) // for returning from non-conditional operations
|
||
|
}
|
||
|
}
|
||
|
return pendingBatch
|
||
|
}
|
||
|
|
||
|
batch(operations) {
|
||
|
/*if (writeTxn) {
|
||
|
this.commitBatchNow(operations.map(operation => [this.db, operation.key, operation.value]))
|
||
|
return Promise.resolve(true)
|
||
|
}*/
|
||
|
let scheduledOperations = this.getScheduledOperations()
|
||
|
for (let operation of operations) {
|
||
|
let value = operation.value
|
||
|
scheduledOperations.push([operation.key, value])
|
||
|
scheduledOperations.bytes += operation.key.length + (value && value.length || 0) + 200
|
||
|
}
|
||
|
return this.scheduleCommit().unconditionalResults
|
||
|
}
|
||
|
backup(path) {
|
||
|
return new Promise((resolve, reject) => env.copy(path, false, (error) => {
|
||
|
if (error) {
|
||
|
reject(error)
|
||
|
} else {
|
||
|
resolve()
|
||
|
}
|
||
|
}))
|
||
|
}
|
||
|
close() {
|
||
|
this.db.close()
|
||
|
if (this.isRoot) {
|
||
|
if (readTxn) {
|
||
|
try {
|
||
|
readTxn.abort()
|
||
|
} catch(error) {}
|
||
|
}
|
||
|
readTxnRenewed = null
|
||
|
env.close()
|
||
|
}
|
||
|
}
|
||
|
getStats() {
|
||
|
try {
|
||
|
let stats = this.db.stat(readTxnRenewed ? readTxn : renewReadTxn())
|
||
|
return stats
|
||
|
}
|
||
|
catch(error) {
|
||
|
return handleError(error, this, readTxn, () => this.getStats())
|
||
|
}
|
||
|
}
|
||
|
sync(callback) {
|
||
|
return env.sync(callback || function(error) {
|
||
|
if (error) {
|
||
|
console.error(error)
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
deleteDB() {
|
||
|
try {
|
||
|
this.db.drop({
|
||
|
justFreePages: false,
|
||
|
txn: writeTxn,
|
||
|
})
|
||
|
} catch(error) {
|
||
|
handleError(error, this, null, () => this.deleteDB())
|
||
|
}
|
||
|
}
|
||
|
clear() {
|
||
|
try {
|
||
|
this.db.drop({
|
||
|
justFreePages: true,
|
||
|
txn: writeTxn,
|
||
|
})
|
||
|
} catch(error) {
|
||
|
handleError(error, this, null, () => this.clear())
|
||
|
}
|
||
|
if (this.encoder && this.encoder.structures)
|
||
|
this.encoder.structures = []
|
||
|
|
||
|
}
|
||
|
readerCheck() {
|
||
|
return env.readerCheck()
|
||
|
}
|
||
|
readerList() {
|
||
|
return env.readerList().join('')
|
||
|
}
|
||
|
setupSharedStructures() {
|
||
|
const getStructures = () => {
|
||
|
let lastVersion // because we are doing a read here, we may need to save and restore the lastVersion from the last read
|
||
|
if (this.useVersions)
|
||
|
lastVersion = getLastVersion()
|
||
|
try {
|
||
|
let buffer = this.getBinary(this.sharedStructuresKey)
|
||
|
if (this.useVersions)
|
||
|
setLastVersion(lastVersion)
|
||
|
return buffer ? this.encoder.decode(buffer) : []
|
||
|
} catch(error) {
|
||
|
return handleError(error, this, null, getStructures)
|
||
|
}
|
||
|
}
|
||
|
return {
|
||
|
saveStructures: (structures, previousLength) => {
|
||
|
return this.transactionSync(() => {
|
||
|
let existingStructuresBuffer = this.getBinary(this.sharedStructuresKey)
|
||
|
let existingStructures = existingStructuresBuffer ? this.encoder.decode(existingStructuresBuffer) : []
|
||
|
if (existingStructures.length != previousLength)
|
||
|
return false // it changed, we need to indicate that we couldn't update
|
||
|
writeTxn.putBinary(this.db, this.sharedStructuresKey, this.encoder.encode(structures))
|
||
|
})
|
||
|
},
|
||
|
getStructures,
|
||
|
copyBuffers: true // need to copy any embedded buffers that are found since we use unsafe buffers
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
// if caching class overrides putSync, don't want to double call the caching code
|
||
|
const putSync = LMDBStore.prototype.putSync
|
||
|
const removeSync = LMDBStore.prototype.removeSync
|
||
|
return options.cache ?
|
||
|
new (CachingStore(LMDBStore))(options.name || null, options) :
|
||
|
new LMDBStore(options.name || null, options)
|
||
|
function handleError(error, store, txn, retry) {
|
||
|
try {
|
||
|
if (writeTxn)
|
||
|
writeTxn.abort()
|
||
|
} catch(error) {}
|
||
|
if (writeTxn)
|
||
|
writeTxn = null
|
||
|
|
||
|
if (error.message.startsWith('MDB_') &&
|
||
|
!(error.message.startsWith('MDB_KEYEXIST') || error.message.startsWith('MDB_NOTFOUND')) ||
|
||
|
error.message == 'The transaction is already closed.') {
|
||
|
resetReadTxn() // separate out cursor-based read txns
|
||
|
try {
|
||
|
if (readTxn) {
|
||
|
readTxn.abort()
|
||
|
readTxn.isAborted = true
|
||
|
}
|
||
|
} catch(error) {}
|
||
|
readTxn = null
|
||
|
}
|
||
|
if (error.message.startsWith('MDB_PROBLEM'))
|
||
|
console.error(error)
|
||
|
//if (error.message == 'The transaction is already closed.')
|
||
|
// return handleError(error, store, null, retry)
|
||
|
if (error.message.startsWith('MDB_MAP_FULL') || error.message.startsWith('MDB_MAP_RESIZED')) {
|
||
|
const oldSize = env.info().mapSize
|
||
|
const newSize = error.message.startsWith('MDB_MAP_FULL') ?
|
||
|
Math.floor(((1.08 + 3000 / Math.sqrt(oldSize)) * oldSize) / 0x100000) * 0x100000 : // increase size, more rapidly at first, and round to nearest 1 MB
|
||
|
oldSize + 0x2000//Math.pow(2, (Math.round(Math.log2(oldSize)) + 1)) // for resized notifications, we try to align to doubling each time
|
||
|
for (const store of stores) {
|
||
|
store.emit('remap')
|
||
|
}
|
||
|
try {
|
||
|
env.resize(newSize)
|
||
|
} catch(error) {
|
||
|
throw new Error(error.message + ' trying to set map size to ' + newSize)
|
||
|
}
|
||
|
return retry()
|
||
|
}
|
||
|
error.message = 'In database ' + name + ': ' + error.message
|
||
|
throw error
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function matches(previousVersion, ifVersion){
|
||
|
let matches
|
||
|
if (previousVersion) {
|
||
|
if (ifVersion) {
|
||
|
matches = previousVersion == ifVersion
|
||
|
} else {
|
||
|
matches = false
|
||
|
}
|
||
|
} else {
|
||
|
matches = !ifVersion
|
||
|
}
|
||
|
return matches
|
||
|
}
|
||
|
|
||
|
function compareKey(a, b) {
|
||
|
// compare with type consistency that matches ordered-binary
|
||
|
if (typeof a == 'object') {
|
||
|
if (!a) {
|
||
|
return b == null ? 0 : -1
|
||
|
}
|
||
|
if (a.compare) {
|
||
|
if (b == null) {
|
||
|
return 1
|
||
|
} else if (b.compare) {
|
||
|
return a.compare(b)
|
||
|
} else {
|
||
|
return -1
|
||
|
}
|
||
|
}
|
||
|
let arrayComparison
|
||
|
if (b instanceof Array) {
|
||
|
let i = 0
|
||
|
while((arrayComparison = compareKey(a[i], b[i])) == 0 && i <= a.length) {
|
||
|
i++
|
||
|
}
|
||
|
return arrayComparison
|
||
|
}
|
||
|
arrayComparison = compareKey(a[0], b)
|
||
|
if (arrayComparison == 0 && a.length > 1)
|
||
|
return 1
|
||
|
return arrayComparison
|
||
|
} else if (typeof a == typeof b) {
|
||
|
if (typeof a === 'symbol') {
|
||
|
a = Symbol.keyFor(a)
|
||
|
b = Symbol.keyFor(b)
|
||
|
}
|
||
|
return a < b ? -1 : a === b ? 0 : 1
|
||
|
}
|
||
|
else if (typeof b == 'object') {
|
||
|
if (b instanceof Array)
|
||
|
return -compareKey(b, a)
|
||
|
return 1
|
||
|
} else {
|
||
|
return typeOrder[typeof a] < typeOrder[typeof b] ? -1 : 1
|
||
|
}
|
||
|
}
|
||
|
class Entry {
|
||
|
constructor(value, version, db) {
|
||
|
this.value = value
|
||
|
this.version = version
|
||
|
this.db = db
|
||
|
}
|
||
|
ifSamePut() {
|
||
|
|
||
|
}
|
||
|
ifSameRemove() {
|
||
|
|
||
|
}
|
||
|
}
|
||
|
exports.compareKey = compareKey
|
||
|
const typeOrder = {
|
||
|
symbol: 0,
|
||
|
undefined: 1,
|
||
|
boolean: 2,
|
||
|
number: 3,
|
||
|
string: 4
|
||
|
}
|
||
|
exports.getLastEntrySize = function() {
|
||
|
return lastSize
|
||
|
}
|
||
|
function getLastVersion() {
|
||
|
return keyBufferView.getFloat64(16, true)
|
||
|
}
|
||
|
|
||
|
function setLastVersion(version) {
|
||
|
return keyBufferView.setFloat64(16, version, true)
|
||
|
}
|
||
|
let saveBuffer, saveDataView, saveDataAddress
|
||
|
let savePosition = 8000
|
||
|
function allocateSaveBuffer() {
|
||
|
saveBuffer = Buffer.alloc(8192)
|
||
|
saveBuffer.dataView = saveDataView = new DataView(saveBuffer.buffer, saveBuffer.byteOffset, saveBuffer.byteLength)
|
||
|
saveDataAddress = getAddress(saveBuffer)
|
||
|
saveBuffer.buffer.address = saveDataAddress - saveBuffer.byteOffset
|
||
|
savePosition = 0
|
||
|
|
||
|
}
|
||
|
function saveKey(key, writeKey, saveTo) {
|
||
|
if (savePosition > 6200) {
|
||
|
allocateSaveBuffer()
|
||
|
}
|
||
|
let start = savePosition
|
||
|
savePosition = writeKey(key, saveBuffer, start + 4)
|
||
|
saveDataView.setUint32(start, savePosition - start - 4, true)
|
||
|
saveTo.saveBuffer = saveBuffer
|
||
|
savePosition = (savePosition + 7) & 0xfffff8
|
||
|
return start + saveDataAddress
|
||
|
}
|
||
|
exports.getLastVersion = getLastVersion
|
||
|
exports.setLastVersion = setLastVersion
|
||
|
|