You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
585 lines
16 KiB
JavaScript
585 lines
16 KiB
JavaScript
/*
|
|
* Copyright DataStax, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
'use strict';
|
|
|
|
const types = require('./types');
|
|
const token = require('./token');
|
|
const utils = require('./utils');
|
|
const MutableLong = require('./types/mutable-long');
|
|
const { Integer } = types;
|
|
|
|
// Murmur3 constants
|
|
//-0x783C846EEEBDAC2B
|
|
const mconst1 = new MutableLong(0x53d5, 0x1142, 0x7b91, 0x87c3);
|
|
//0x4cf5ad432745937f
|
|
const mconst2 = new MutableLong(0x937f, 0x2745, 0xad43, 0x4cf5);
|
|
const mlongFive = MutableLong.fromNumber(5);
|
|
//0xff51afd7ed558ccd
|
|
const mconst3 = new MutableLong(0x8ccd, 0xed55, 0xafd7, 0xff51);
|
|
//0xc4ceb9fe1a85ec53
|
|
const mconst4 = new MutableLong(0xec53, 0x1a85, 0xb9fe, 0xc4ce);
|
|
const mconst5 = MutableLong.fromNumber(0x52dce729);
|
|
const mconst6 = MutableLong.fromNumber(0x38495ab5);
|
|
|
|
/**
|
|
* Represents a set of methods that are able to generate and parse tokens for the C* partitioner.
|
|
* @abstract
|
|
*/
|
|
class Tokenizer {
|
|
constructor() {
|
|
|
|
}
|
|
|
|
/**
|
|
* Creates a token based on the Buffer value provided
|
|
* @abstract
|
|
* @param {Buffer|Array} value
|
|
* @returns {Token} Computed token
|
|
*/
|
|
hash(value) {
|
|
throw new Error('You must implement a hash function for the tokenizer');
|
|
}
|
|
|
|
/**
|
|
* Parses a token string and returns a representation of the token
|
|
* @abstract
|
|
* @param {String} value
|
|
*/
|
|
parse(value) {
|
|
throw new Error('You must implement a parse function for the tokenizer');
|
|
}
|
|
|
|
minToken() {
|
|
throw new Error('You must implement a minToken function for the tokenizer');
|
|
}
|
|
|
|
/**
|
|
* Splits the range specified by start and end into numberOfSplits equal parts.
|
|
* @param {Token} start Starting token
|
|
* @param {Token} end End token
|
|
* @param {Number} numberOfSplits Number of splits to make.
|
|
*/
|
|
split(start, end, numberOfSplits) {
|
|
throw new Error('You must implement a split function for the tokenizer');
|
|
}
|
|
|
|
/**
|
|
* Common implementation for splitting token ranges when start is in
|
|
* a shared Integer format.
|
|
*
|
|
* @param {Integer} start Starting token
|
|
* @param {Integer} range How large the range of the split is
|
|
* @param {Integer} ringEnd The end point of the ring so we know where to wrap
|
|
* @param {Integer} ringLength The total size of the ring
|
|
* @param {Number} numberOfSplits The number of splits to make
|
|
* @returns {Array<Integer>} The evenly-split points on the range
|
|
*/
|
|
splitBase(start, range, ringEnd, ringLength, numberOfSplits) {
|
|
const numberOfSplitsInt = Integer.fromInt(numberOfSplits);
|
|
const divider = range.divide(numberOfSplitsInt);
|
|
let remainder = range.modulo(numberOfSplitsInt);
|
|
|
|
const results = [];
|
|
let current = start;
|
|
const dividerPlusOne = divider.add(Integer.ONE);
|
|
|
|
for(let i = 1; i < numberOfSplits; i++) {
|
|
if (remainder.greaterThan(Integer.ZERO)) {
|
|
current = current.add(dividerPlusOne);
|
|
} else {
|
|
current = current.add(divider);
|
|
}
|
|
if (ringLength && current.greaterThan(ringEnd)) {
|
|
current = current.subtract(ringLength);
|
|
}
|
|
results.push(current);
|
|
remainder = remainder.subtract(Integer.ONE);
|
|
}
|
|
return results;
|
|
}
|
|
|
|
/**
|
|
* Return internal string based representation of a Token.
|
|
* @param {Token} token
|
|
*/
|
|
stringify(token) {
|
|
return token.getValue().toString();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Uniformly distributes data across the cluster based on Cassandra flavored Murmur3 hashed values.
|
|
*/
|
|
class Murmur3Tokenizer extends Tokenizer {
|
|
|
|
constructor() {
|
|
super();
|
|
}
|
|
|
|
/**
|
|
* @param {Buffer} value
|
|
* @return {Murmur3Token}
|
|
*/
|
|
hash(value) {
|
|
// This is an adapted version of the MurmurHash.hash3_x64_128 from Cassandra used
|
|
// for M3P. Compared to that methods, there's a few inlining of arguments and we
|
|
// only return the first 64-bits of the result since that's all M3 partitioner uses.
|
|
|
|
const data = value;
|
|
let offset = 0;
|
|
const length = data.length;
|
|
|
|
const nblocks = length >> 4; // Process as 128-bit blocks.
|
|
|
|
const h1 = new MutableLong();
|
|
const h2 = new MutableLong();
|
|
let k1 = new MutableLong();
|
|
let k2 = new MutableLong();
|
|
|
|
for (let i = 0; i < nblocks; i++) {
|
|
k1 = this.getBlock(data, offset, i * 2);
|
|
k2 = this.getBlock(data, offset, i * 2 + 1);
|
|
|
|
k1.multiply(mconst1);
|
|
this.rotl64(k1, 31);
|
|
k1.multiply(mconst2);
|
|
|
|
h1.xor(k1);
|
|
this.rotl64(h1, 27);
|
|
h1.add(h2);
|
|
h1.multiply(mlongFive).add(mconst5);
|
|
|
|
k2.multiply(mconst2);
|
|
this.rotl64(k2, 33);
|
|
k2.multiply(mconst1);
|
|
h2.xor(k2);
|
|
this.rotl64(h2, 31);
|
|
h2.add(h1);
|
|
h2.multiply(mlongFive).add(mconst6);
|
|
}
|
|
//----------
|
|
// tail
|
|
|
|
// Advance offset to the unprocessed tail of the data.
|
|
offset += nblocks * 16;
|
|
|
|
k1 = new MutableLong();
|
|
k2 = new MutableLong();
|
|
|
|
/* eslint-disable no-fallthrough */
|
|
switch(length & 15) {
|
|
case 15:
|
|
k2.xor(fromSignedByte(data[offset+14]).shiftLeft(48));
|
|
case 14:
|
|
k2.xor(fromSignedByte(data[offset+13]).shiftLeft(40));
|
|
case 13:
|
|
k2.xor(fromSignedByte(data[offset+12]).shiftLeft(32));
|
|
case 12:
|
|
k2.xor(fromSignedByte(data[offset+11]).shiftLeft(24));
|
|
case 11:
|
|
k2.xor(fromSignedByte(data[offset+10]).shiftLeft(16));
|
|
case 10:
|
|
k2.xor(fromSignedByte(data[offset+9]).shiftLeft(8));
|
|
case 9:
|
|
k2.xor(fromSignedByte(data[offset+8]));
|
|
k2.multiply(mconst2);
|
|
this.rotl64(k2, 33);
|
|
k2.multiply(mconst1);
|
|
h2.xor(k2);
|
|
case 8:
|
|
k1.xor(fromSignedByte(data[offset+7]).shiftLeft(56));
|
|
case 7:
|
|
k1.xor(fromSignedByte(data[offset+6]).shiftLeft(48));
|
|
case 6:
|
|
k1.xor(fromSignedByte(data[offset+5]).shiftLeft(40));
|
|
case 5:
|
|
k1.xor(fromSignedByte(data[offset+4]).shiftLeft(32));
|
|
case 4:
|
|
k1.xor(fromSignedByte(data[offset+3]).shiftLeft(24));
|
|
case 3:
|
|
k1.xor(fromSignedByte(data[offset+2]).shiftLeft(16));
|
|
case 2:
|
|
k1.xor(fromSignedByte(data[offset+1]).shiftLeft(8));
|
|
case 1:
|
|
k1.xor(fromSignedByte(data[offset]));
|
|
k1.multiply(mconst1);
|
|
this.rotl64(k1,31);
|
|
k1.multiply(mconst2);
|
|
h1.xor(k1);
|
|
}
|
|
/* eslint-enable no-fallthrough */
|
|
|
|
h1.xor(MutableLong.fromNumber(length));
|
|
h2.xor(MutableLong.fromNumber(length));
|
|
|
|
h1.add(h2);
|
|
h2.add(h1);
|
|
|
|
|
|
this.fmix(h1);
|
|
this.fmix(h2);
|
|
|
|
h1.add(h2);
|
|
|
|
return new token.Murmur3Token(h1);
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {Array<Number>} key
|
|
* @param {Number} offset
|
|
* @param {Number} index
|
|
* @return {MutableLong}
|
|
*/
|
|
getBlock(key, offset, index) {
|
|
const i8 = index << 3;
|
|
const blockOffset = offset + i8;
|
|
return new MutableLong(
|
|
(key[blockOffset]) | (key[blockOffset + 1] << 8),
|
|
(key[blockOffset + 2]) | (key[blockOffset + 3] << 8),
|
|
(key[blockOffset + 4]) | (key[blockOffset + 5] << 8),
|
|
(key[blockOffset + 6]) | (key[blockOffset + 7] << 8)
|
|
);
|
|
}
|
|
|
|
/**
|
|
* @param {MutableLong} v
|
|
* @param {Number} n
|
|
*/
|
|
rotl64(v, n) {
|
|
const left = v.clone().shiftLeft(n);
|
|
v.shiftRightUnsigned(64 - n).or(left);
|
|
}
|
|
|
|
/** @param {MutableLong} k */
|
|
fmix(k) {
|
|
k.xor(new MutableLong(k.getUint16(2) >>> 1 | ((k.getUint16(3) << 15) & 0xffff), k.getUint16(3) >>> 1, 0, 0));
|
|
k.multiply(mconst3);
|
|
const other = new MutableLong(
|
|
(k.getUint16(2) >>> 1) | ((k.getUint16(3) << 15) & 0xffff),
|
|
k.getUint16(3) >>> 1,
|
|
0,
|
|
0
|
|
);
|
|
k.xor(other);
|
|
k.multiply(mconst4);
|
|
k.xor(new MutableLong(k.getUint16(2) >>> 1 | (k.getUint16(3) << 15 & 0xffff), k.getUint16(3) >>> 1, 0, 0));
|
|
}
|
|
|
|
/**
|
|
* Parses a int64 decimal string representation into a MutableLong.
|
|
* @param {String} value
|
|
* @returns {Murmur3Token}
|
|
*/
|
|
parse(value) {
|
|
return new token.Murmur3Token(MutableLong.fromString(value));
|
|
}
|
|
|
|
minToken() {
|
|
if (!this._minToken) {
|
|
// minimum long value.
|
|
this._minToken = this.parse('-9223372036854775808');
|
|
}
|
|
return this._minToken;
|
|
}
|
|
|
|
maxToken() {
|
|
if (!this._maxToken) {
|
|
this._maxToken = this.parse('9223372036854775807');
|
|
}
|
|
return this._maxToken;
|
|
}
|
|
|
|
maxValue() {
|
|
if (!this._maxValue) {
|
|
this._maxValue = Integer.fromString('9223372036854775807');
|
|
}
|
|
return this._maxValue;
|
|
}
|
|
|
|
minValue() {
|
|
if (!this._minValue) {
|
|
this._minValue = Integer.fromString('-9223372036854775808');
|
|
}
|
|
return this._minValue;
|
|
}
|
|
|
|
ringLength() {
|
|
if (!this._ringLength) {
|
|
this._ringLength = this.maxValue().subtract(this.minValue());
|
|
}
|
|
return this._ringLength;
|
|
}
|
|
|
|
split(start, end, numberOfSplits) {
|
|
// ]min, min] means the whole ring.
|
|
if (start.equals(end) && start.equals(this.minToken())) {
|
|
end = this.maxToken();
|
|
}
|
|
|
|
const startVal = Integer.fromString(start.getValue().toString());
|
|
const endVal = Integer.fromString(end.getValue().toString());
|
|
|
|
let range = endVal.subtract(startVal);
|
|
if (range.isNegative()) {
|
|
range = range.add(this.ringLength());
|
|
}
|
|
|
|
const values = this.splitBase(startVal, range, this.maxValue(), this.ringLength(), numberOfSplits);
|
|
return values.map(v => this.parse(v.toString()));
|
|
}
|
|
|
|
stringify(token) {
|
|
// Get the underlying MutableLong
|
|
const value = token.getValue();
|
|
// We need a way to uniquely represent a token, it doesn't have to be the decimal string representation
|
|
// Using the uint16 avoids divisions and other expensive operations on the longs
|
|
return value.getUint16(0) + ',' + value.getUint16(1) + ',' + value.getUint16(2) + ',' + value.getUint16(3);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Uniformly distributes data across the cluster based on MD5 hash values.
|
|
*/
|
|
class RandomTokenizer extends Tokenizer {
|
|
constructor() {
|
|
super();
|
|
// eslint-disable-next-line
|
|
this._crypto = require('crypto');
|
|
}
|
|
|
|
/**
|
|
* @param {Buffer|Array} value
|
|
* @returns {RandomToken}
|
|
*/
|
|
hash(value) {
|
|
if (Array.isArray(value)) {
|
|
value = utils.allocBufferFromArray(value);
|
|
}
|
|
const hashedValue = this._crypto.createHash('md5').update(value).digest();
|
|
return new token.RandomToken(Integer.fromBuffer(hashedValue).abs());
|
|
}
|
|
|
|
/**
|
|
* @returns {Token}
|
|
*/
|
|
parse(value) {
|
|
return new token.RandomToken(Integer.fromString(value));
|
|
}
|
|
|
|
minToken() {
|
|
if (!this._minToken) {
|
|
this._minToken = this.parse('-1');
|
|
}
|
|
return this._minToken;
|
|
}
|
|
|
|
maxValue() {
|
|
if (!this._maxValue) {
|
|
this._maxValue = Integer.fromNumber(Math.pow(2, 127));
|
|
}
|
|
return this._maxValue;
|
|
}
|
|
|
|
maxToken() {
|
|
if (!this._maxToken) {
|
|
this._maxToken = new token.RandomToken(this.maxValue());
|
|
}
|
|
return this._maxToken;
|
|
}
|
|
|
|
ringLength() {
|
|
if (!this._ringLength) {
|
|
this._ringLength = this.maxValue().add(Integer.ONE);
|
|
}
|
|
return this._ringLength;
|
|
}
|
|
|
|
split(start, end, numberOfSplits) {
|
|
// ]min, min] means the whole ring.
|
|
if (start.equals(end) && start.equals(this.minToken())) {
|
|
end = this.maxToken();
|
|
}
|
|
|
|
const startVal = start.getValue();
|
|
const endVal = end.getValue();
|
|
|
|
let range = endVal.subtract(startVal);
|
|
if (range.lessThan(Integer.ZERO)) {
|
|
range = range.add(this.ringLength());
|
|
}
|
|
|
|
const values = this.splitBase(startVal, range, this.maxValue(), this.ringLength(), numberOfSplits);
|
|
return values.map(v => new token.RandomToken(v));
|
|
}
|
|
}
|
|
|
|
class ByteOrderedTokenizer extends Tokenizer {
|
|
constructor() {
|
|
super();
|
|
}
|
|
|
|
/**
|
|
* @param {Buffer} value
|
|
* @returns {ByteOrderedToken}
|
|
*/
|
|
hash(value) {
|
|
// strip any trailing zeros as tokens with trailing zeros are equivalent
|
|
// to those who don't have them.
|
|
if (Array.isArray(value)) {
|
|
value = utils.allocBufferFromArray(value);
|
|
}
|
|
let zeroIndex = value.length;
|
|
for(let i = value.length - 1; i > 0; i--) {
|
|
if(value[i] === 0) {
|
|
zeroIndex = i;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
return new token.ByteOrderedToken(value.slice(0, zeroIndex));
|
|
}
|
|
|
|
stringify(token) {
|
|
return token.getValue().toString('hex');
|
|
}
|
|
|
|
parse(value) {
|
|
return this.hash(utils.allocBufferFromString(value, 'hex'));
|
|
}
|
|
|
|
minToken() {
|
|
if (!this._minToken) {
|
|
this._minToken = this.hash([]);
|
|
}
|
|
return this._minToken;
|
|
}
|
|
|
|
_toNumber(buffer, significantBytes) {
|
|
// Convert a token's byte array to a number in order to perform computations.
|
|
// This depends on the number of significant bytes that is used to normalize all tokens
|
|
// to the same size. For example if the token is 0x01 but significant bytes is 2, the
|
|
// result is 0x0100.
|
|
let target = buffer;
|
|
if(buffer.length !== significantBytes) {
|
|
target = Buffer.alloc(significantBytes);
|
|
buffer.copy(target);
|
|
}
|
|
|
|
// similar to Integer.fromBuffer except we force the sign to 0.
|
|
const bits = new Array(Math.ceil(target.length / 4));
|
|
for (let i = 0; i < bits.length; i++) {
|
|
let offset = target.length - ((i + 1) * 4);
|
|
let value;
|
|
if (offset < 0) {
|
|
//The buffer length is not multiple of 4
|
|
offset = offset + 4;
|
|
value = 0;
|
|
for (let j = 0; j < offset; j++) {
|
|
const byte = target[j];
|
|
value = value | (byte << (offset - j - 1) * 8);
|
|
}
|
|
}
|
|
else {
|
|
value = target.readInt32BE(offset);
|
|
}
|
|
bits[i] = value;
|
|
}
|
|
return new Integer(bits, 0);
|
|
}
|
|
|
|
_toBuffer(number, significantBytes) {
|
|
// Convert numeric representation back to a buffer.
|
|
const buffer = Integer.toBuffer(number);
|
|
if (buffer.length === significantBytes) {
|
|
return buffer;
|
|
}
|
|
|
|
// if first byte is a sign byte, skip it.
|
|
let start, length;
|
|
if (buffer[0] === 0) {
|
|
start = 1;
|
|
length = buffer.length - 1;
|
|
} else {
|
|
start = 0;
|
|
length = buffer.length;
|
|
}
|
|
|
|
const target = Buffer.alloc(significantBytes);
|
|
buffer.copy(target, significantBytes - length, start, length + start);
|
|
return target;
|
|
}
|
|
|
|
split(start, end, numberOfSplits) {
|
|
const tokenOrder = start.compare(end);
|
|
|
|
if (tokenOrder === 0 && start.equals(this.minToken())) {
|
|
throw new Error("Cannot split whole ring with ordered partitioner");
|
|
}
|
|
|
|
let startVal, endVal, range, ringLength, ringEnd;
|
|
const intNumberOfSplits = Integer.fromNumber(numberOfSplits);
|
|
// Since tokens are compared lexicographically, convert to numbers using the
|
|
// largest length (i.e. given 0x0A and 0x0BCD, switch to 0x0A00 and 0x0BCD)
|
|
let significantBytes = Math.max(start.getValue().length, end.getValue().length);
|
|
if (tokenOrder < 0) {
|
|
let addedBytes = 0;
|
|
while (true) {
|
|
startVal = this._toNumber(start.getValue(), significantBytes);
|
|
endVal = this._toNumber(end.getValue(), significantBytes);
|
|
range = endVal.subtract(startVal);
|
|
if (addedBytes === 4 || range.compare(intNumberOfSplits) >= 0) {
|
|
break;
|
|
}
|
|
significantBytes += 1;
|
|
addedBytes += 1;
|
|
}
|
|
} else {
|
|
let addedBytes = 0;
|
|
while (true) {
|
|
startVal = this._toNumber(start.getValue(), significantBytes);
|
|
endVal = this._toNumber(end.getValue(), significantBytes);
|
|
ringLength = Integer.fromNumber(Math.pow(2, significantBytes * 8));
|
|
ringEnd = ringLength.subtract(Integer.ONE);
|
|
range = endVal.subtract(startVal).add(ringLength);
|
|
if (addedBytes === 4 || range.compare(intNumberOfSplits) >= 0) {
|
|
break;
|
|
}
|
|
significantBytes += 1;
|
|
addedBytes += 1;
|
|
}
|
|
}
|
|
|
|
const values = this.splitBase(startVal, range, ringEnd, ringLength, numberOfSplits);
|
|
return values.map(v => new token.ByteOrderedToken(this._toBuffer(v, significantBytes)));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @param {Number} value
|
|
* @return {MutableLong}
|
|
*/
|
|
function fromSignedByte(value) {
|
|
if (value < 128) {
|
|
return new MutableLong(value, 0, 0, 0);
|
|
}
|
|
return new MutableLong((value - 256) & 0xffff, 0xffff, 0xffff, 0xffff);
|
|
}
|
|
|
|
exports.Murmur3Tokenizer = Murmur3Tokenizer;
|
|
exports.RandomTokenizer = RandomTokenizer;
|
|
exports.ByteOrderedTokenizer = ByteOrderedTokenizer;
|