/* * Copyright DataStax, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ 'use strict'; const util = require('util'); const { Transform, Writable } = require('stream'); const types = require('./types'); const utils = require('./utils'); const errors = require('./errors'); const { FrameHeader } = types; const { FrameReader } = require('./readers'); /** * Transforms chunks, emits data objects {header, chunk} * @param options Stream options * @extends Transform */ function Protocol (options) { Transform.call(this, options); this.header = null; this.bodyLength = 0; this.clearHeaderChunks(); this.version = 0; this.headerSize = 0; } util.inherits(Protocol, Transform); Protocol.prototype._transform = function (chunk, encoding, callback) { let error = null; try { this.readItems(chunk); } catch (err) { error = err; } callback(error); }; /** * Parses the chunk into frames (header and body). * Emits (push) complete frames or frames with incomplete bodies. Following chunks containing the rest of the body will * be emitted using the same frame. * It buffers incomplete headers. * @param {Buffer} chunk */ Protocol.prototype.readItems = function (chunk) { if (!chunk || chunk.length === 0) { return; } if (this.version === 0) { //The server replies the first message with the max protocol version supported this.version = FrameHeader.getProtocolVersion(chunk); this.headerSize = FrameHeader.size(this.version); } let offset = 0; let currentHeader = this.header; this.header = null; if (this.headerChunks.byteLength !== 0) { //incomplete header was buffered try to read the header from the buffered chunks this.headerChunks.parts.push(chunk); if (this.headerChunks.byteLength + chunk.length < this.headerSize) { this.headerChunks.byteLength += chunk.length; return; } currentHeader = FrameHeader.fromBuffer(Buffer.concat(this.headerChunks.parts, this.headerSize)); offset = this.headerSize - this.headerChunks.byteLength; this.clearHeaderChunks(); } const items = []; while (true) { if (!currentHeader) { if (this.headerSize > chunk.length - offset) { if (chunk.length - offset <= 0) { break; } //the header is incomplete, buffer it until the next chunk const headerPart = chunk.slice(offset, chunk.length); this.headerChunks.parts.push(headerPart); this.headerChunks.byteLength = headerPart.length; break; } //read header currentHeader = FrameHeader.fromBuffer(chunk, offset); offset += this.headerSize; } //parse body const remaining = chunk.length - offset; if (currentHeader.bodyLength <= remaining + this.bodyLength) { items.push({ header: currentHeader, chunk: chunk, offset: offset, frameEnded: true }); offset += currentHeader.bodyLength - this.bodyLength; //reset the body length this.bodyLength = 0; } else if (remaining >= 0) { //the body is not fully contained in this chunk //will continue later this.header = currentHeader; this.bodyLength += remaining; if (remaining > 0) { //emit if there is at least a byte to emit items.push({ header: currentHeader, chunk: chunk, offset: offset, frameEnded: false }); } break; } currentHeader = null; } for (let i = 0; i < items.length; i++) { this.push(items[i]); } }; Protocol.prototype.clearHeaderChunks = function () { this.headerChunks = { byteLength: 0, parts: [] }; }; /** * A stream that gets reads header + body chunks and transforms them into header + (row | error) * @param {Object} streamOptions Node.js Stream options * @param {Encoder} encoder Encoder instance for the parser to use * @extends Transform */ function Parser (streamOptions, encoder) { Transform.call(this, streamOptions); //frames that are streaming, indexed by id this.frames = {}; this.encoder = encoder; } util.inherits(Parser, Transform); Parser.prototype._transform = function (item, encoding, callback) { const frameInfo = this.frameState(item); let error = null; try { this.parseBody(frameInfo, item); } catch (err) { error = err; } callback(error); if (item.frameEnded) { if (frameInfo.cellBuffer) { //Frame was being streamed but an error force it to buffer the result this.push({ header: frameInfo.header, error: new errors.DriverInternalError('There was an problem while parsing streaming frame, opcode ' + frameInfo.header.opcode) }); } //all the parsing finished and it was streamed down //emit an item that signals it this.push({ header: frameInfo.header, frameEnded: true}); } }; /** * @param frameInfo * @param {{header: FrameHeader, chunk: Buffer, offset: Number}} item */ Parser.prototype.parseBody = function (frameInfo, item) { frameInfo.isStreaming = frameInfo.byRow && item.header.opcode === types.opcodes.result; if (!this.handleFrameBuffers(frameInfo, item)) { // Frame isn't complete and we are not streaming the frame return; } const reader = new FrameReader(item.header, item.chunk, item.offset); // Check that flags have not been parsed yet for this frame if (frameInfo.flagsInfo === undefined) { const originalOffset = reader.offset; try { frameInfo.flagsInfo = reader.readFlagsInfo(); } catch (e) { return this.handleParsingError(e, frameInfo, reader, originalOffset); } } //All the body for most operations is already buffered at this stage //Except for RESULT switch (item.header.opcode) { case types.opcodes.result: return this.parseResult(frameInfo, reader); case types.opcodes.ready: case types.opcodes.authSuccess: return this.push({ header: frameInfo.header, ready: true }); case types.opcodes.authChallenge: return this.push({ header: frameInfo.header, authChallenge: true, token: reader.readBytes()}); case types.opcodes.authenticate: return this.push({ header: frameInfo.header, mustAuthenticate: true, authenticatorName: reader.readString()}); case types.opcodes.error: return this.push({ header: frameInfo.header, error: reader.readError()}); case types.opcodes.supported: return this.push({ header: frameInfo.header, supported: reader.readStringMultiMap()}); case types.opcodes.event: return this.push({ header: frameInfo.header, event: reader.readEvent()}); default: return this.push({ header: frameInfo.header, error: new Error('Received invalid opcode: ' + item.header.opcode) }); } }; /** * Buffers if needed and returns true if it has all the necessary data to continue parsing the frame. * @param frameInfo * @param {{header: FrameHeader, chunk: Buffer, offset: Number}} item * @returns {Boolean} */ Parser.prototype.handleFrameBuffers = function (frameInfo, item) { if (!frameInfo.isStreaming) { // Handle buffering for complete frame bodies const currentLength = (frameInfo.bufferLength || 0) + item.chunk.length - item.offset; if (currentLength < item.header.bodyLength) { //buffer until the frame is completed this.addFrameBuffer(frameInfo, item); return false; } //We have received the full frame body if (frameInfo.buffers) { item.chunk = this.getFrameBuffer(frameInfo, item); item.offset = 0; } return true; } if (frameInfo.cellBuffer) { // Handle buffering for frame cells (row cells or metadata cells) if (item.offset !== 0) { throw new errors.DriverInternalError('Following chunks can not have an offset greater than zero'); } frameInfo.cellBuffer.parts.push(item.chunk); if (!frameInfo.cellBuffer.expectedLength) { //Its a buffer outside a row cell (metadata or other) if (frameInfo.cellBuffer.parts.length !== 2) { throw new errors.DriverInternalError('Buffer for streaming frame can not contain more than 1 item'); } item.chunk = Buffer.concat(frameInfo.cellBuffer.parts, frameInfo.cellBuffer.byteLength + item.chunk.length); frameInfo.cellBuffer = null; return true; } if (frameInfo.cellBuffer.expectedLength > frameInfo.cellBuffer.byteLength + item.chunk.length) { //We still haven't got the cell data frameInfo.cellBuffer.byteLength += item.chunk.length; return false; } item.chunk = Buffer.concat(frameInfo.cellBuffer.parts, frameInfo.cellBuffer.byteLength + item.chunk.length); frameInfo.cellBuffer = null; } return true; }; /** * Adds this chunk to the frame buffers. * @param frameInfo * @param {{header: FrameHeader, chunk: Buffer, offset: Number}} item */ Parser.prototype.addFrameBuffer = function (frameInfo, item) { if (!frameInfo.buffers) { frameInfo.buffers = [ item.chunk.slice(item.offset) ]; frameInfo.bufferLength = item.chunk.length - item.offset; return; } if (item.offset > 0) { throw new errors.DriverInternalError('Following chunks can not have an offset greater than zero'); } frameInfo.buffers.push(item.chunk); frameInfo.bufferLength += item.chunk.length; }; /** * Adds the last chunk and concatenates the frame buffers * @param frameInfo * @param {{header: FrameHeader, chunk: Buffer, offset: Number}} item */ Parser.prototype.getFrameBuffer = function (frameInfo, item) { frameInfo.buffers.push(item.chunk); const result = Buffer.concat(frameInfo.buffers, frameInfo.bodyLength); frameInfo.buffers = null; return result; }; /** * Tries to read the result in the body of a message * @param frameInfo Frame information, header / metadata * @param {FrameReader} reader */ Parser.prototype.parseResult = function (frameInfo, reader) { let result; // As we might be streaming and the frame buffer might not be complete, // read the metadata and different types of result values in a try-catch. // Store the reader position const originalOffset = reader.offset; try { if (!frameInfo.meta) { frameInfo.kind = reader.readInt(); // Spec 4.2.5 switch (frameInfo.kind) { case types.resultKind.voidResult: result = { header: frameInfo.header, flags: frameInfo.flagsInfo }; break; case types.resultKind.rows: // Parse the rows metadata, the rest of the response is going to be parsed afterwards frameInfo.meta = reader.readMetadata(frameInfo.kind); break; case types.resultKind.setKeyspace: result = { header: frameInfo.header, keyspaceSet: reader.readString(), flags: frameInfo.flagsInfo }; break; case types.resultKind.prepared: { const preparedId = utils.copyBuffer(reader.readShortBytes()); frameInfo.meta = reader.readMetadata(frameInfo.kind); result = { header: frameInfo.header, id: preparedId, meta: frameInfo.meta, flags: frameInfo.flagsInfo }; break; } case types.resultKind.schemaChange: result = { header: frameInfo.header, schemaChange: reader.parseSchemaChange(), flags: frameInfo.flagsInfo }; break; default: throw errors.DriverInternalError('Unexpected result kind: ' + frameInfo.kind); } } } catch (e) { return this.handleParsingError(e, frameInfo, reader, originalOffset); } if (result) { if (frameInfo.emitted) { // It may contain additional metadata and info that it's not being parsed return; } frameInfo.emitted = true; return this.push(result); } if (reader.remainingLength() > 0) { this.parseRows(frameInfo, reader); } }; /** * @param frameInfo * @param {FrameReader} reader */ Parser.prototype.parseRows = function (frameInfo, reader) { if (frameInfo.parsingError) { //No more processing on this frame return; } if (frameInfo.rowLength === undefined) { try { frameInfo.rowLength = reader.readInt(); } catch (e) { return this.handleParsingError(e, frameInfo, reader); } } if (frameInfo.rowLength === 0) { return this.push({ header: frameInfo.header, result: { rows: utils.emptyArray, meta: frameInfo.meta, flags: frameInfo.flagsInfo } }); } const meta = frameInfo.meta; frameInfo.rowIndex = frameInfo.rowIndex || 0; for (let i = frameInfo.rowIndex; i < frameInfo.rowLength; i++) { const rowOffset = reader.offset; const row = new types.Row(meta.columns); let cellBuffer; for (let j = 0; j < meta.columns.length; j++ ) { const c = meta.columns[j]; try { cellBuffer = reader.readBytes(); } catch (e) { return this.handleParsingError(e, frameInfo, reader, rowOffset, i); } try { row[c.name] = this.encoder.decode(cellBuffer, c.type); } catch (e) { //Something went wrong while decoding, we are not going to be able to recover return this.handleParsingError(e, frameInfo, null); } } this.push({ header: frameInfo.header, row: row, meta: frameInfo.meta, byRow: frameInfo.byRow, length: frameInfo.rowLength, flags: frameInfo.flagsInfo }); } if (frameInfo.byRow) { // Use an event item to identify that all the streaming rows have finished processing this.push({ header: frameInfo.header, byRowCompleted: true, meta: frameInfo.meta, length: frameInfo.rowLength, flags: frameInfo.flagsInfo }); } }; /** * Sets parser options (ie: how to yield the results as they are parsed) * @param {Number} id Id of the stream * @param options */ Parser.prototype.setOptions = function (id, options) { if (this.frames[id.toString()]) { throw new types.DriverError('There was already state for this frame'); } this.frames[id.toString()] = options; }; /** * Manually clears the frame options. * This class already clears the provided options when the frame ends, so it's usually not required to invoke this * method. * When manually setting the options for continuous paging, it's possible that the frame options are set while * it's being cancelled. * @param {Number} id The streamId */ Parser.prototype.clearOptions = function (id) { delete this.frames[id.toString()]; }; /** * Gets the frame info from the internal state. * In case it is not there, it creates it. * In case the frame ended */ Parser.prototype.frameState = function (item) { let frameInfo = this.frames[item.header.streamId]; if (!frameInfo) { frameInfo = {}; if (!item.frameEnded) { //store it in the frames this.frames[item.header.streamId] = frameInfo; } } else if (item.frameEnded) { //if it was already stored, remove it delete this.frames[item.header.streamId]; } frameInfo.header = item.header; return frameInfo; }; /** * Handles parsing error: pushing an error if its unexpected or buffer the cell if its streaming * @param {Error} e * @param frameInfo * @param {FrameReader} reader * @param {Number} [originalOffset] * @param {Number} [rowIndex] */ Parser.prototype.handleParsingError = function (e, frameInfo, reader, originalOffset, rowIndex) { if (reader && frameInfo.isStreaming && (e instanceof RangeError)) { //A controlled error, buffer from offset and move on return this.bufferResultCell(frameInfo, reader, originalOffset, rowIndex, e.expectedLength); } frameInfo.parsingError = true; frameInfo.cellBuffer = null; this.push({ header: frameInfo.header, error: e }); }; /** * When streaming, it buffers data since originalOffset. * @param frameInfo * @param {FrameReader} reader * @param {Number} [originalOffset] * @param {Number} [rowIndex] * @param {Number} [expectedLength] */ Parser.prototype.bufferResultCell = function (frameInfo, reader, originalOffset, rowIndex, expectedLength) { if (!originalOffset && originalOffset !== 0) { originalOffset = reader.offset; } frameInfo.rowIndex = rowIndex; const buffer = reader.slice(originalOffset); frameInfo.cellBuffer = { parts: [ buffer ], byteLength: buffer.length, expectedLength: expectedLength }; }; /** * Represents a writable streams that emits results */ function ResultEmitter(options) { Writable.call(this, options); /** * Stores the rows for frames that needs to be yielded as one result with many rows */ this.rowBuffer = {}; } util.inherits(ResultEmitter, Writable); ResultEmitter.prototype._write = function (item, encoding, callback) { let error = null; try { this.each(item); } catch (err) { error = err; } callback(error); }; /** * Analyzes the item and emit the corresponding event */ ResultEmitter.prototype.each = function (item) { if (item.error || item.result) { //Its either an error or an empty array rows //no transformation needs to be made return this.emit('result', item.header, item.error, item.result); } if (item.frameEnded) { return this.emit('frameEnded', item.header); } if (item.lastContinuousPage) { return this.emit('lastContinuousPage', item.header); } if (item.byRowCompleted) { return this.emit('byRowCompleted', item.header, item.row, item.meta, item.flags); } if (item.byRow) { //it should be yielded by row return this.emit('row', item.header, item.row, item.meta, item.length, item.flags); } if (item.row) { //it should be yielded as a result //it needs to be buffered to an array of rows return this.bufferAndEmit(item); } if (item.event) { //its an event from Cassandra return this.emit('nodeEvent', item.header, item.event); } //its a raw response (object with flags) return this.emit('result', item.header, null, item); }; /** * Buffers the rows until the result set is completed and emits the result event. */ ResultEmitter.prototype.bufferAndEmit = function (item) { let rows = this.rowBuffer[item.header.streamId]; if (!rows) { rows = this.rowBuffer[item.header.streamId] = []; } rows.push(item.row); if (rows.length === item.length) { this.emit('result', item.header, null, { rows: rows, meta: item.meta, flags: item.flags}); delete this.rowBuffer[item.header.streamId]; } }; exports.Protocol = Protocol; exports.Parser = Parser; exports.ResultEmitter = ResultEmitter;