You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

311 lines
8.0 KiB
JavaScript

/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const events = require('events');
const types = require('./types');
const utils = require('./utils.js');
const FrameHeader = types.FrameHeader;
/**
* Contains the logic to write all the different types to the frame.
*/
class FrameWriter {
/**
* Creates a new instance of FrameWriter.
* @param {Number} opcode
*/
constructor(opcode) {
if (!opcode) {
throw new Error('Opcode not provided');
}
this.buffers = [];
this.opcode = opcode;
this.bodyLength = 0;
}
add(buf) {
this.buffers.push(buf);
this.bodyLength += buf.length;
}
writeShort(num) {
const buf = utils.allocBufferUnsafe(2);
buf.writeUInt16BE(num, 0);
this.add(buf);
}
writeInt(num) {
const buf = utils.allocBufferUnsafe(4);
buf.writeInt32BE(num, 0);
this.add(buf);
}
/** @param {Long} num */
writeLong(num) {
this.add(types.Long.toBuffer(num));
}
/**
* Writes bytes according to Cassandra <int byteLength><bytes>
* @param {Buffer|null|types.unset} bytes
*/
writeBytes(bytes) {
if (bytes === null) {
//Only the length buffer containing -1
this.writeInt(-1);
return;
}
if (bytes === types.unset) {
this.writeInt(-2);
return;
}
//Add the length buffer
this.writeInt(bytes.length);
//Add the actual buffer
this.add(bytes);
}
/**
* Writes a buffer according to Cassandra protocol: bytes.length (2) + bytes
* @param {Buffer} bytes
*/
writeShortBytes(bytes) {
if(bytes === null) {
//Only the length buffer containing -1
this.writeShort(-1);
return;
}
//Add the length buffer
this.writeShort(bytes.length);
//Add the actual buffer
this.add(bytes);
}
/**
* Writes a single byte
* @param {Number} num Value of the byte, a number between 0 and 255.
*/
writeByte(num) {
this.add(utils.allocBufferFromArray([num]));
}
writeString(str) {
if (typeof str === "undefined") {
throw new Error("can not write undefined");
}
const len = Buffer.byteLength(str, 'utf8');
const buf = utils.allocBufferUnsafe(2 + len);
buf.writeUInt16BE(len, 0);
buf.write(str, 2, buf.length-2, 'utf8');
this.add(buf);
}
writeLString(str) {
const len = Buffer.byteLength(str, 'utf8');
const buf = utils.allocBufferUnsafe(4 + len);
buf.writeInt32BE(len, 0);
buf.write(str, 4, buf.length-4, 'utf8');
this.add(buf);
}
writeStringList(values) {
this.writeShort(values.length);
values.forEach(this.writeString, this);
}
writeCustomPayload(payload) {
const keys = Object.keys(payload);
this.writeShort(keys.length);
keys.forEach(k => {
this.writeString(k);
this.writeBytes(payload[k]);
});
}
writeStringMap(map) {
const keys = [];
for (const k in map) {
if (map.hasOwnProperty(k)) {
keys.push(k);
}
}
this.writeShort(keys.length);
for(let i = 0; i < keys.length; i++) {
const key = keys[i];
this.writeString(key);
this.writeString(map[key]);
}
}
/**
* @param {Number} version
* @param {Number} streamId
* @param {Number} [flags] Header flags
* @returns {Buffer}
* @throws {TypeError}
*/
write(version, streamId, flags) {
const header = new FrameHeader(version, flags || 0, streamId, this.opcode, this.bodyLength);
const headerBuffer = header.toBuffer();
this.buffers.unshift(headerBuffer);
return Buffer.concat(this.buffers, headerBuffer.length + this.bodyLength);
}
}
/**
* Represents a queue that process one write at a time (FIFO).
* @extends {EventEmitter}
*/
class WriteQueue extends events.EventEmitter {
/**
* Creates a new WriteQueue instance.
* @param {Socket} netClient
* @param {Encoder} encoder
* @param {ClientOptions} options
*/
constructor(netClient, encoder, options) {
super();
this.netClient = netClient;
this.encoder = encoder;
this.isRunning = false;
/** @type {Array<{operation: OperationState, callback: Function}>} */
this.queue = [];
this.coalescingThreshold = options.socketOptions.coalescingThreshold;
this.error = null;
this.canWrite = true;
// Listen to drain event that is going to be fired once
// the underlying buffer is empty
netClient.on('drain', () => {
this.canWrite = true;
this.run();
});
}
/**
* Enqueues a new request
* @param {OperationState} operation
* @param {Function} callback The write callback.
*/
push(operation, callback) {
const self = this;
if (this.error) {
// There was a write error, there is no point in further trying to write to the socket.
return process.nextTick(function writePushError() {
callback(self.error);
});
}
this.queue.push({ operation: operation, callback: callback});
this.run();
}
run() {
if (!this.isRunning && this.canWrite) {
this.isRunning = true;
// Use nextTick to allow the queue to build up on the current phase
process.nextTick(() => this.process());
}
}
process() {
if (this.error) {
return;
}
const buffers = [];
const callbacks = [];
let totalLength = 0;
while (this.queue.length > 0 && totalLength < this.coalescingThreshold) {
const writeItem = this.queue.shift();
if (!writeItem.operation.canBeWritten()) {
// Invoke the write callback with an error that is not going to be yielded to user
// as the operation has timed out or was cancelled.
writeItem.callback(new Error('The operation was already cancelled or timeout elapsed'));
continue;
}
let data;
try {
data = writeItem.operation.request.write(this.encoder, writeItem.operation.streamId);
}
catch (err) {
writeItem.callback(err);
continue;
}
totalLength += data.length;
buffers.push(data);
callbacks.push(writeItem.callback);
}
if (totalLength === 0) {
this.isRunning = false;
return;
}
// We have to invoke the callbacks to avoid race conditions.
// There is a performance benefit from executing all of them in a loop
for (let i = 0; i < callbacks.length; i++) {
callbacks[i]();
}
// Concatenate buffers and write it to the socket
// Further writes will be throttled until flushed
this.canWrite = this.netClient.write(Buffer.concat(buffers, totalLength), err => {
if (err) {
this.setWriteError(err);
return;
}
if (this.queue.length === 0 || !this.canWrite) {
// It will start running once we get the next message or has drained
this.isRunning = false;
return;
}
// Allow IO between writes
setImmediate(() => this.process());
});
}
/**
* Emits the 'error' event and callbacks items that haven't been written and clears them from the queue.
* @param err
*/
setWriteError(err) {
err.isSocketError = true;
this.error = new types.DriverError('Socket was closed');
this.error.isSocketError = true;
// Use an special flag for items that haven't been written
this.error.requestNotWritten = true;
this.error.innerError = err;
const q = this.queue;
// Not more items can be added to the queue.
this.queue = utils.emptyArray;
for (let i = 0; i < q.length; i++) {
const item = q[i];
// Use the error marking that it was not written
item.callback(this.error);
}
}
}
module.exports = { FrameWriter, WriteQueue };