Node JS version
This commit is contained in:
310
node_modules/cassandra-driver/lib/writers.js
generated
vendored
Normal file
310
node_modules/cassandra-driver/lib/writers.js
generated
vendored
Normal file
@@ -0,0 +1,310 @@
|
||||
/*
|
||||
* Copyright DataStax, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
'use strict';
|
||||
const events = require('events');
|
||||
|
||||
const types = require('./types');
|
||||
const utils = require('./utils.js');
|
||||
const FrameHeader = types.FrameHeader;
|
||||
|
||||
/**
|
||||
* Contains the logic to write all the different types to the frame.
|
||||
*/
|
||||
class FrameWriter {
|
||||
/**
|
||||
* Creates a new instance of FrameWriter.
|
||||
* @param {Number} opcode
|
||||
*/
|
||||
constructor(opcode) {
|
||||
if (!opcode) {
|
||||
throw new Error('Opcode not provided');
|
||||
}
|
||||
this.buffers = [];
|
||||
this.opcode = opcode;
|
||||
this.bodyLength = 0;
|
||||
}
|
||||
|
||||
add(buf) {
|
||||
this.buffers.push(buf);
|
||||
this.bodyLength += buf.length;
|
||||
}
|
||||
|
||||
writeShort(num) {
|
||||
const buf = utils.allocBufferUnsafe(2);
|
||||
buf.writeUInt16BE(num, 0);
|
||||
this.add(buf);
|
||||
}
|
||||
|
||||
writeInt(num) {
|
||||
const buf = utils.allocBufferUnsafe(4);
|
||||
buf.writeInt32BE(num, 0);
|
||||
this.add(buf);
|
||||
}
|
||||
|
||||
/** @param {Long} num */
|
||||
writeLong(num) {
|
||||
this.add(types.Long.toBuffer(num));
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes bytes according to Cassandra <int byteLength><bytes>
|
||||
* @param {Buffer|null|types.unset} bytes
|
||||
*/
|
||||
writeBytes(bytes) {
|
||||
if (bytes === null) {
|
||||
//Only the length buffer containing -1
|
||||
this.writeInt(-1);
|
||||
return;
|
||||
}
|
||||
if (bytes === types.unset) {
|
||||
this.writeInt(-2);
|
||||
return;
|
||||
}
|
||||
//Add the length buffer
|
||||
this.writeInt(bytes.length);
|
||||
//Add the actual buffer
|
||||
this.add(bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a buffer according to Cassandra protocol: bytes.length (2) + bytes
|
||||
* @param {Buffer} bytes
|
||||
*/
|
||||
writeShortBytes(bytes) {
|
||||
if(bytes === null) {
|
||||
//Only the length buffer containing -1
|
||||
this.writeShort(-1);
|
||||
return;
|
||||
}
|
||||
//Add the length buffer
|
||||
this.writeShort(bytes.length);
|
||||
//Add the actual buffer
|
||||
this.add(bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a single byte
|
||||
* @param {Number} num Value of the byte, a number between 0 and 255.
|
||||
*/
|
||||
writeByte(num) {
|
||||
this.add(utils.allocBufferFromArray([num]));
|
||||
}
|
||||
|
||||
writeString(str) {
|
||||
if (typeof str === "undefined") {
|
||||
throw new Error("can not write undefined");
|
||||
}
|
||||
const len = Buffer.byteLength(str, 'utf8');
|
||||
const buf = utils.allocBufferUnsafe(2 + len);
|
||||
buf.writeUInt16BE(len, 0);
|
||||
buf.write(str, 2, buf.length-2, 'utf8');
|
||||
this.add(buf);
|
||||
}
|
||||
|
||||
writeLString(str) {
|
||||
const len = Buffer.byteLength(str, 'utf8');
|
||||
const buf = utils.allocBufferUnsafe(4 + len);
|
||||
buf.writeInt32BE(len, 0);
|
||||
buf.write(str, 4, buf.length-4, 'utf8');
|
||||
this.add(buf);
|
||||
}
|
||||
|
||||
writeStringList(values) {
|
||||
this.writeShort(values.length);
|
||||
values.forEach(this.writeString, this);
|
||||
}
|
||||
|
||||
writeCustomPayload(payload) {
|
||||
const keys = Object.keys(payload);
|
||||
this.writeShort(keys.length);
|
||||
keys.forEach(k => {
|
||||
this.writeString(k);
|
||||
this.writeBytes(payload[k]);
|
||||
});
|
||||
}
|
||||
|
||||
writeStringMap(map) {
|
||||
const keys = [];
|
||||
for (const k in map) {
|
||||
if (map.hasOwnProperty(k)) {
|
||||
keys.push(k);
|
||||
}
|
||||
}
|
||||
|
||||
this.writeShort(keys.length);
|
||||
|
||||
for(let i = 0; i < keys.length; i++) {
|
||||
const key = keys[i];
|
||||
this.writeString(key);
|
||||
this.writeString(map[key]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Number} version
|
||||
* @param {Number} streamId
|
||||
* @param {Number} [flags] Header flags
|
||||
* @returns {Buffer}
|
||||
* @throws {TypeError}
|
||||
*/
|
||||
write(version, streamId, flags) {
|
||||
const header = new FrameHeader(version, flags || 0, streamId, this.opcode, this.bodyLength);
|
||||
const headerBuffer = header.toBuffer();
|
||||
this.buffers.unshift(headerBuffer);
|
||||
return Buffer.concat(this.buffers, headerBuffer.length + this.bodyLength);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a queue that process one write at a time (FIFO).
|
||||
* @extends {EventEmitter}
|
||||
*/
|
||||
class WriteQueue extends events.EventEmitter {
|
||||
/**
|
||||
* Creates a new WriteQueue instance.
|
||||
* @param {Socket} netClient
|
||||
* @param {Encoder} encoder
|
||||
* @param {ClientOptions} options
|
||||
*/
|
||||
constructor(netClient, encoder, options) {
|
||||
super();
|
||||
this.netClient = netClient;
|
||||
this.encoder = encoder;
|
||||
this.isRunning = false;
|
||||
/** @type {Array<{operation: OperationState, callback: Function}>} */
|
||||
this.queue = [];
|
||||
this.coalescingThreshold = options.socketOptions.coalescingThreshold;
|
||||
this.error = null;
|
||||
this.canWrite = true;
|
||||
|
||||
// Listen to drain event that is going to be fired once
|
||||
// the underlying buffer is empty
|
||||
netClient.on('drain', () => {
|
||||
this.canWrite = true;
|
||||
this.run();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueues a new request
|
||||
* @param {OperationState} operation
|
||||
* @param {Function} callback The write callback.
|
||||
*/
|
||||
push(operation, callback) {
|
||||
const self = this;
|
||||
|
||||
if (this.error) {
|
||||
// There was a write error, there is no point in further trying to write to the socket.
|
||||
return process.nextTick(function writePushError() {
|
||||
callback(self.error);
|
||||
});
|
||||
}
|
||||
|
||||
this.queue.push({ operation: operation, callback: callback});
|
||||
this.run();
|
||||
}
|
||||
|
||||
run() {
|
||||
if (!this.isRunning && this.canWrite) {
|
||||
this.isRunning = true;
|
||||
// Use nextTick to allow the queue to build up on the current phase
|
||||
process.nextTick(() => this.process());
|
||||
}
|
||||
}
|
||||
|
||||
process() {
|
||||
if (this.error) {
|
||||
return;
|
||||
}
|
||||
|
||||
const buffers = [];
|
||||
const callbacks = [];
|
||||
let totalLength = 0;
|
||||
|
||||
while (this.queue.length > 0 && totalLength < this.coalescingThreshold) {
|
||||
const writeItem = this.queue.shift();
|
||||
if (!writeItem.operation.canBeWritten()) {
|
||||
// Invoke the write callback with an error that is not going to be yielded to user
|
||||
// as the operation has timed out or was cancelled.
|
||||
writeItem.callback(new Error('The operation was already cancelled or timeout elapsed'));
|
||||
continue;
|
||||
}
|
||||
let data;
|
||||
try {
|
||||
data = writeItem.operation.request.write(this.encoder, writeItem.operation.streamId);
|
||||
}
|
||||
catch (err) {
|
||||
writeItem.callback(err);
|
||||
continue;
|
||||
}
|
||||
totalLength += data.length;
|
||||
buffers.push(data);
|
||||
callbacks.push(writeItem.callback);
|
||||
}
|
||||
|
||||
if (totalLength === 0) {
|
||||
this.isRunning = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// We have to invoke the callbacks to avoid race conditions.
|
||||
// There is a performance benefit from executing all of them in a loop
|
||||
for (let i = 0; i < callbacks.length; i++) {
|
||||
callbacks[i]();
|
||||
}
|
||||
|
||||
// Concatenate buffers and write it to the socket
|
||||
// Further writes will be throttled until flushed
|
||||
this.canWrite = this.netClient.write(Buffer.concat(buffers, totalLength), err => {
|
||||
if (err) {
|
||||
this.setWriteError(err);
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.queue.length === 0 || !this.canWrite) {
|
||||
// It will start running once we get the next message or has drained
|
||||
this.isRunning = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// Allow IO between writes
|
||||
setImmediate(() => this.process());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Emits the 'error' event and callbacks items that haven't been written and clears them from the queue.
|
||||
* @param err
|
||||
*/
|
||||
setWriteError(err) {
|
||||
err.isSocketError = true;
|
||||
this.error = new types.DriverError('Socket was closed');
|
||||
this.error.isSocketError = true;
|
||||
// Use an special flag for items that haven't been written
|
||||
this.error.requestNotWritten = true;
|
||||
this.error.innerError = err;
|
||||
const q = this.queue;
|
||||
// Not more items can be added to the queue.
|
||||
this.queue = utils.emptyArray;
|
||||
for (let i = 0; i < q.length; i++) {
|
||||
const item = q[i];
|
||||
// Use the error marking that it was not written
|
||||
item.callback(this.error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = { FrameWriter, WriteQueue };
|
Reference in New Issue
Block a user