You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
583 lines
18 KiB
JavaScript
583 lines
18 KiB
JavaScript
/*
|
|
* Copyright DataStax, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
'use strict';
|
|
|
|
const util = require('util');
|
|
const { Transform, Writable } = require('stream');
|
|
|
|
const types = require('./types');
|
|
const utils = require('./utils');
|
|
const errors = require('./errors');
|
|
const { FrameHeader } = types;
|
|
const { FrameReader } = require('./readers');
|
|
|
|
/**
|
|
* Transforms chunks, emits data objects {header, chunk}
|
|
* @param options Stream options
|
|
* @extends Transform
|
|
*/
|
|
function Protocol (options) {
|
|
Transform.call(this, options);
|
|
this.header = null;
|
|
this.bodyLength = 0;
|
|
this.clearHeaderChunks();
|
|
this.version = 0;
|
|
this.headerSize = 0;
|
|
}
|
|
|
|
util.inherits(Protocol, Transform);
|
|
|
|
Protocol.prototype._transform = function (chunk, encoding, callback) {
|
|
let error = null;
|
|
try {
|
|
this.readItems(chunk);
|
|
}
|
|
catch (err) {
|
|
error = err;
|
|
}
|
|
callback(error);
|
|
};
|
|
|
|
/**
|
|
* Parses the chunk into frames (header and body).
|
|
* Emits (push) complete frames or frames with incomplete bodies. Following chunks containing the rest of the body will
|
|
* be emitted using the same frame.
|
|
* It buffers incomplete headers.
|
|
* @param {Buffer} chunk
|
|
*/
|
|
Protocol.prototype.readItems = function (chunk) {
|
|
if (!chunk || chunk.length === 0) {
|
|
return;
|
|
}
|
|
if (this.version === 0) {
|
|
//The server replies the first message with the max protocol version supported
|
|
this.version = FrameHeader.getProtocolVersion(chunk);
|
|
this.headerSize = FrameHeader.size(this.version);
|
|
}
|
|
let offset = 0;
|
|
let currentHeader = this.header;
|
|
this.header = null;
|
|
if (this.headerChunks.byteLength !== 0) {
|
|
//incomplete header was buffered try to read the header from the buffered chunks
|
|
this.headerChunks.parts.push(chunk);
|
|
if (this.headerChunks.byteLength + chunk.length < this.headerSize) {
|
|
this.headerChunks.byteLength += chunk.length;
|
|
return;
|
|
}
|
|
currentHeader = FrameHeader.fromBuffer(Buffer.concat(this.headerChunks.parts, this.headerSize));
|
|
offset = this.headerSize - this.headerChunks.byteLength;
|
|
this.clearHeaderChunks();
|
|
}
|
|
const items = [];
|
|
while (true) {
|
|
if (!currentHeader) {
|
|
if (this.headerSize > chunk.length - offset) {
|
|
if (chunk.length - offset <= 0) {
|
|
break;
|
|
}
|
|
//the header is incomplete, buffer it until the next chunk
|
|
const headerPart = chunk.slice(offset, chunk.length);
|
|
this.headerChunks.parts.push(headerPart);
|
|
this.headerChunks.byteLength = headerPart.length;
|
|
break;
|
|
}
|
|
//read header
|
|
currentHeader = FrameHeader.fromBuffer(chunk, offset);
|
|
offset += this.headerSize;
|
|
}
|
|
//parse body
|
|
const remaining = chunk.length - offset;
|
|
if (currentHeader.bodyLength <= remaining + this.bodyLength) {
|
|
items.push({ header: currentHeader, chunk: chunk, offset: offset, frameEnded: true });
|
|
offset += currentHeader.bodyLength - this.bodyLength;
|
|
//reset the body length
|
|
this.bodyLength = 0;
|
|
}
|
|
else if (remaining >= 0) {
|
|
//the body is not fully contained in this chunk
|
|
//will continue later
|
|
this.header = currentHeader;
|
|
this.bodyLength += remaining;
|
|
if (remaining > 0) {
|
|
//emit if there is at least a byte to emit
|
|
items.push({ header: currentHeader, chunk: chunk, offset: offset, frameEnded: false });
|
|
}
|
|
break;
|
|
}
|
|
currentHeader = null;
|
|
}
|
|
for (let i = 0; i < items.length; i++) {
|
|
this.push(items[i]);
|
|
}
|
|
};
|
|
|
|
Protocol.prototype.clearHeaderChunks = function () {
|
|
this.headerChunks = { byteLength: 0, parts: [] };
|
|
};
|
|
|
|
/**
|
|
* A stream that gets reads header + body chunks and transforms them into header + (row | error)
|
|
* @param {Object} streamOptions Node.js Stream options
|
|
* @param {Encoder} encoder Encoder instance for the parser to use
|
|
* @extends Transform
|
|
*/
|
|
function Parser (streamOptions, encoder) {
|
|
Transform.call(this, streamOptions);
|
|
//frames that are streaming, indexed by id
|
|
this.frames = {};
|
|
this.encoder = encoder;
|
|
}
|
|
|
|
util.inherits(Parser, Transform);
|
|
|
|
Parser.prototype._transform = function (item, encoding, callback) {
|
|
const frameInfo = this.frameState(item);
|
|
|
|
let error = null;
|
|
try {
|
|
this.parseBody(frameInfo, item);
|
|
}
|
|
catch (err) {
|
|
error = err;
|
|
}
|
|
callback(error);
|
|
|
|
if (item.frameEnded) {
|
|
if (frameInfo.cellBuffer) {
|
|
//Frame was being streamed but an error force it to buffer the result
|
|
this.push({
|
|
header: frameInfo.header,
|
|
error: new errors.DriverInternalError('There was an problem while parsing streaming frame, opcode ' +
|
|
frameInfo.header.opcode)
|
|
});
|
|
}
|
|
//all the parsing finished and it was streamed down
|
|
//emit an item that signals it
|
|
this.push({ header: frameInfo.header, frameEnded: true});
|
|
}
|
|
};
|
|
|
|
/**
|
|
* @param frameInfo
|
|
* @param {{header: FrameHeader, chunk: Buffer, offset: Number}} item
|
|
*/
|
|
Parser.prototype.parseBody = function (frameInfo, item) {
|
|
frameInfo.isStreaming = frameInfo.byRow && item.header.opcode === types.opcodes.result;
|
|
if (!this.handleFrameBuffers(frameInfo, item)) {
|
|
// Frame isn't complete and we are not streaming the frame
|
|
return;
|
|
}
|
|
const reader = new FrameReader(item.header, item.chunk, item.offset);
|
|
// Check that flags have not been parsed yet for this frame
|
|
if (frameInfo.flagsInfo === undefined) {
|
|
const originalOffset = reader.offset;
|
|
try {
|
|
frameInfo.flagsInfo = reader.readFlagsInfo();
|
|
}
|
|
catch (e) {
|
|
return this.handleParsingError(e, frameInfo, reader, originalOffset);
|
|
}
|
|
}
|
|
|
|
//All the body for most operations is already buffered at this stage
|
|
//Except for RESULT
|
|
switch (item.header.opcode) {
|
|
case types.opcodes.result:
|
|
return this.parseResult(frameInfo, reader);
|
|
case types.opcodes.ready:
|
|
case types.opcodes.authSuccess:
|
|
return this.push({ header: frameInfo.header, ready: true });
|
|
case types.opcodes.authChallenge:
|
|
return this.push({ header: frameInfo.header, authChallenge: true, token: reader.readBytes()});
|
|
case types.opcodes.authenticate:
|
|
return this.push({ header: frameInfo.header, mustAuthenticate: true, authenticatorName: reader.readString()});
|
|
case types.opcodes.error:
|
|
return this.push({ header: frameInfo.header, error: reader.readError()});
|
|
case types.opcodes.supported:
|
|
return this.push({ header: frameInfo.header, supported: reader.readStringMultiMap()});
|
|
case types.opcodes.event:
|
|
return this.push({ header: frameInfo.header, event: reader.readEvent()});
|
|
default:
|
|
return this.push({ header: frameInfo.header, error: new Error('Received invalid opcode: ' + item.header.opcode) });
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Buffers if needed and returns true if it has all the necessary data to continue parsing the frame.
|
|
* @param frameInfo
|
|
* @param {{header: FrameHeader, chunk: Buffer, offset: Number}} item
|
|
* @returns {Boolean}
|
|
*/
|
|
Parser.prototype.handleFrameBuffers = function (frameInfo, item) {
|
|
if (!frameInfo.isStreaming) {
|
|
// Handle buffering for complete frame bodies
|
|
const currentLength = (frameInfo.bufferLength || 0) + item.chunk.length - item.offset;
|
|
if (currentLength < item.header.bodyLength) {
|
|
//buffer until the frame is completed
|
|
this.addFrameBuffer(frameInfo, item);
|
|
return false;
|
|
}
|
|
//We have received the full frame body
|
|
if (frameInfo.buffers) {
|
|
item.chunk = this.getFrameBuffer(frameInfo, item);
|
|
item.offset = 0;
|
|
}
|
|
return true;
|
|
}
|
|
if (frameInfo.cellBuffer) {
|
|
// Handle buffering for frame cells (row cells or metadata cells)
|
|
if (item.offset !== 0) {
|
|
throw new errors.DriverInternalError('Following chunks can not have an offset greater than zero');
|
|
}
|
|
frameInfo.cellBuffer.parts.push(item.chunk);
|
|
if (!frameInfo.cellBuffer.expectedLength) {
|
|
//Its a buffer outside a row cell (metadata or other)
|
|
if (frameInfo.cellBuffer.parts.length !== 2) {
|
|
throw new errors.DriverInternalError('Buffer for streaming frame can not contain more than 1 item');
|
|
}
|
|
item.chunk = Buffer.concat(frameInfo.cellBuffer.parts, frameInfo.cellBuffer.byteLength + item.chunk.length);
|
|
frameInfo.cellBuffer = null;
|
|
return true;
|
|
}
|
|
if (frameInfo.cellBuffer.expectedLength > frameInfo.cellBuffer.byteLength + item.chunk.length) {
|
|
//We still haven't got the cell data
|
|
frameInfo.cellBuffer.byteLength += item.chunk.length;
|
|
return false;
|
|
}
|
|
item.chunk = Buffer.concat(frameInfo.cellBuffer.parts, frameInfo.cellBuffer.byteLength + item.chunk.length);
|
|
frameInfo.cellBuffer = null;
|
|
}
|
|
return true;
|
|
};
|
|
|
|
/**
|
|
* Adds this chunk to the frame buffers.
|
|
* @param frameInfo
|
|
* @param {{header: FrameHeader, chunk: Buffer, offset: Number}} item
|
|
*/
|
|
Parser.prototype.addFrameBuffer = function (frameInfo, item) {
|
|
if (!frameInfo.buffers) {
|
|
frameInfo.buffers = [ item.chunk.slice(item.offset) ];
|
|
frameInfo.bufferLength = item.chunk.length - item.offset;
|
|
return;
|
|
}
|
|
if (item.offset > 0) {
|
|
throw new errors.DriverInternalError('Following chunks can not have an offset greater than zero');
|
|
}
|
|
frameInfo.buffers.push(item.chunk);
|
|
frameInfo.bufferLength += item.chunk.length;
|
|
};
|
|
|
|
/**
|
|
* Adds the last chunk and concatenates the frame buffers
|
|
* @param frameInfo
|
|
* @param {{header: FrameHeader, chunk: Buffer, offset: Number}} item
|
|
*/
|
|
Parser.prototype.getFrameBuffer = function (frameInfo, item) {
|
|
frameInfo.buffers.push(item.chunk);
|
|
const result = Buffer.concat(frameInfo.buffers, frameInfo.bodyLength);
|
|
frameInfo.buffers = null;
|
|
return result;
|
|
};
|
|
|
|
/**
|
|
* Tries to read the result in the body of a message
|
|
* @param frameInfo Frame information, header / metadata
|
|
* @param {FrameReader} reader
|
|
*/
|
|
Parser.prototype.parseResult = function (frameInfo, reader) {
|
|
let result;
|
|
// As we might be streaming and the frame buffer might not be complete,
|
|
// read the metadata and different types of result values in a try-catch.
|
|
// Store the reader position
|
|
const originalOffset = reader.offset;
|
|
try {
|
|
if (!frameInfo.meta) {
|
|
frameInfo.kind = reader.readInt();
|
|
// Spec 4.2.5
|
|
switch (frameInfo.kind) {
|
|
case types.resultKind.voidResult:
|
|
result = { header: frameInfo.header, flags: frameInfo.flagsInfo };
|
|
break;
|
|
case types.resultKind.rows:
|
|
// Parse the rows metadata, the rest of the response is going to be parsed afterwards
|
|
frameInfo.meta = reader.readMetadata(frameInfo.kind);
|
|
break;
|
|
case types.resultKind.setKeyspace:
|
|
result = { header: frameInfo.header, keyspaceSet: reader.readString(), flags: frameInfo.flagsInfo };
|
|
break;
|
|
case types.resultKind.prepared:
|
|
{
|
|
const preparedId = utils.copyBuffer(reader.readShortBytes());
|
|
frameInfo.meta = reader.readMetadata(frameInfo.kind);
|
|
result = { header: frameInfo.header, id: preparedId, meta: frameInfo.meta, flags: frameInfo.flagsInfo };
|
|
break;
|
|
}
|
|
case types.resultKind.schemaChange:
|
|
result = { header: frameInfo.header, schemaChange: reader.parseSchemaChange(), flags: frameInfo.flagsInfo };
|
|
break;
|
|
default:
|
|
throw errors.DriverInternalError('Unexpected result kind: ' + frameInfo.kind);
|
|
}
|
|
}
|
|
}
|
|
catch (e) {
|
|
return this.handleParsingError(e, frameInfo, reader, originalOffset);
|
|
}
|
|
if (result) {
|
|
if (frameInfo.emitted) {
|
|
// It may contain additional metadata and info that it's not being parsed
|
|
return;
|
|
}
|
|
frameInfo.emitted = true;
|
|
return this.push(result);
|
|
}
|
|
if (reader.remainingLength() > 0) {
|
|
this.parseRows(frameInfo, reader);
|
|
}
|
|
};
|
|
|
|
/**
|
|
* @param frameInfo
|
|
* @param {FrameReader} reader
|
|
*/
|
|
Parser.prototype.parseRows = function (frameInfo, reader) {
|
|
if (frameInfo.parsingError) {
|
|
//No more processing on this frame
|
|
return;
|
|
}
|
|
if (frameInfo.rowLength === undefined) {
|
|
try {
|
|
frameInfo.rowLength = reader.readInt();
|
|
}
|
|
catch (e) {
|
|
return this.handleParsingError(e, frameInfo, reader);
|
|
}
|
|
}
|
|
if (frameInfo.rowLength === 0) {
|
|
return this.push({
|
|
header: frameInfo.header,
|
|
result: { rows: utils.emptyArray, meta: frameInfo.meta, flags: frameInfo.flagsInfo }
|
|
});
|
|
}
|
|
const meta = frameInfo.meta;
|
|
frameInfo.rowIndex = frameInfo.rowIndex || 0;
|
|
for (let i = frameInfo.rowIndex; i < frameInfo.rowLength; i++) {
|
|
const rowOffset = reader.offset;
|
|
const row = new types.Row(meta.columns);
|
|
let cellBuffer;
|
|
for (let j = 0; j < meta.columns.length; j++ ) {
|
|
const c = meta.columns[j];
|
|
try {
|
|
cellBuffer = reader.readBytes();
|
|
}
|
|
catch (e) {
|
|
return this.handleParsingError(e, frameInfo, reader, rowOffset, i);
|
|
}
|
|
try {
|
|
row[c.name] = this.encoder.decode(cellBuffer, c.type);
|
|
}
|
|
catch (e) {
|
|
//Something went wrong while decoding, we are not going to be able to recover
|
|
return this.handleParsingError(e, frameInfo, null);
|
|
}
|
|
}
|
|
this.push({
|
|
header: frameInfo.header,
|
|
row: row,
|
|
meta: frameInfo.meta,
|
|
byRow: frameInfo.byRow,
|
|
length: frameInfo.rowLength,
|
|
flags: frameInfo.flagsInfo
|
|
});
|
|
}
|
|
if (frameInfo.byRow) {
|
|
// Use an event item to identify that all the streaming rows have finished processing
|
|
this.push({
|
|
header: frameInfo.header,
|
|
byRowCompleted: true,
|
|
meta: frameInfo.meta,
|
|
length: frameInfo.rowLength,
|
|
flags: frameInfo.flagsInfo
|
|
});
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Sets parser options (ie: how to yield the results as they are parsed)
|
|
* @param {Number} id Id of the stream
|
|
* @param options
|
|
*/
|
|
Parser.prototype.setOptions = function (id, options) {
|
|
if (this.frames[id.toString()]) {
|
|
throw new types.DriverError('There was already state for this frame');
|
|
}
|
|
this.frames[id.toString()] = options;
|
|
};
|
|
|
|
/**
|
|
* Manually clears the frame options.
|
|
* This class already clears the provided options when the frame ends, so it's usually not required to invoke this
|
|
* method.
|
|
* When manually setting the options for continuous paging, it's possible that the frame options are set while
|
|
* it's being cancelled.
|
|
* @param {Number} id The streamId
|
|
*/
|
|
Parser.prototype.clearOptions = function (id) {
|
|
delete this.frames[id.toString()];
|
|
};
|
|
|
|
/**
|
|
* Gets the frame info from the internal state.
|
|
* In case it is not there, it creates it.
|
|
* In case the frame ended
|
|
*/
|
|
Parser.prototype.frameState = function (item) {
|
|
let frameInfo = this.frames[item.header.streamId];
|
|
if (!frameInfo) {
|
|
frameInfo = {};
|
|
if (!item.frameEnded) {
|
|
//store it in the frames
|
|
this.frames[item.header.streamId] = frameInfo;
|
|
}
|
|
}
|
|
else if (item.frameEnded) {
|
|
//if it was already stored, remove it
|
|
delete this.frames[item.header.streamId];
|
|
}
|
|
frameInfo.header = item.header;
|
|
return frameInfo;
|
|
};
|
|
|
|
/**
|
|
* Handles parsing error: pushing an error if its unexpected or buffer the cell if its streaming
|
|
* @param {Error} e
|
|
* @param frameInfo
|
|
* @param {FrameReader} reader
|
|
* @param {Number} [originalOffset]
|
|
* @param {Number} [rowIndex]
|
|
*/
|
|
Parser.prototype.handleParsingError = function (e, frameInfo, reader, originalOffset, rowIndex) {
|
|
if (reader && frameInfo.isStreaming && (e instanceof RangeError)) {
|
|
//A controlled error, buffer from offset and move on
|
|
return this.bufferResultCell(frameInfo, reader, originalOffset, rowIndex, e.expectedLength);
|
|
}
|
|
frameInfo.parsingError = true;
|
|
frameInfo.cellBuffer = null;
|
|
this.push({ header: frameInfo.header, error: e });
|
|
};
|
|
|
|
/**
|
|
* When streaming, it buffers data since originalOffset.
|
|
* @param frameInfo
|
|
* @param {FrameReader} reader
|
|
* @param {Number} [originalOffset]
|
|
* @param {Number} [rowIndex]
|
|
* @param {Number} [expectedLength]
|
|
*/
|
|
Parser.prototype.bufferResultCell = function (frameInfo, reader, originalOffset, rowIndex, expectedLength) {
|
|
if (!originalOffset && originalOffset !== 0) {
|
|
originalOffset = reader.offset;
|
|
}
|
|
frameInfo.rowIndex = rowIndex;
|
|
const buffer = reader.slice(originalOffset);
|
|
frameInfo.cellBuffer = {
|
|
parts: [ buffer ],
|
|
byteLength: buffer.length,
|
|
expectedLength: expectedLength
|
|
};
|
|
};
|
|
|
|
/**
|
|
* Represents a writable streams that emits results
|
|
*/
|
|
function ResultEmitter(options) {
|
|
Writable.call(this, options);
|
|
/**
|
|
* Stores the rows for frames that needs to be yielded as one result with many rows
|
|
*/
|
|
this.rowBuffer = {};
|
|
}
|
|
|
|
util.inherits(ResultEmitter, Writable);
|
|
|
|
ResultEmitter.prototype._write = function (item, encoding, callback) {
|
|
let error = null;
|
|
try {
|
|
this.each(item);
|
|
}
|
|
catch (err) {
|
|
error = err;
|
|
}
|
|
callback(error);
|
|
};
|
|
|
|
|
|
/**
|
|
* Analyzes the item and emit the corresponding event
|
|
*/
|
|
ResultEmitter.prototype.each = function (item) {
|
|
if (item.error || item.result) {
|
|
//Its either an error or an empty array rows
|
|
//no transformation needs to be made
|
|
return this.emit('result', item.header, item.error, item.result);
|
|
}
|
|
if (item.frameEnded) {
|
|
return this.emit('frameEnded', item.header);
|
|
}
|
|
if (item.lastContinuousPage) {
|
|
return this.emit('lastContinuousPage', item.header);
|
|
}
|
|
if (item.byRowCompleted) {
|
|
return this.emit('byRowCompleted', item.header, item.row, item.meta, item.flags);
|
|
}
|
|
if (item.byRow) {
|
|
//it should be yielded by row
|
|
return this.emit('row', item.header, item.row, item.meta, item.length, item.flags);
|
|
}
|
|
if (item.row) {
|
|
//it should be yielded as a result
|
|
//it needs to be buffered to an array of rows
|
|
return this.bufferAndEmit(item);
|
|
}
|
|
if (item.event) {
|
|
//its an event from Cassandra
|
|
return this.emit('nodeEvent', item.header, item.event);
|
|
}
|
|
//its a raw response (object with flags)
|
|
return this.emit('result', item.header, null, item);
|
|
};
|
|
|
|
/**
|
|
* Buffers the rows until the result set is completed and emits the result event.
|
|
*/
|
|
ResultEmitter.prototype.bufferAndEmit = function (item) {
|
|
let rows = this.rowBuffer[item.header.streamId];
|
|
if (!rows) {
|
|
rows = this.rowBuffer[item.header.streamId] = [];
|
|
}
|
|
rows.push(item.row);
|
|
if (rows.length === item.length) {
|
|
this.emit('result', item.header, null, { rows: rows, meta: item.meta, flags: item.flags});
|
|
delete this.rowBuffer[item.header.streamId];
|
|
}
|
|
};
|
|
|
|
exports.Protocol = Protocol;
|
|
exports.Parser = Parser;
|
|
exports.ResultEmitter = ResultEmitter;
|