Skip to content

Commit

Permalink
readAll api - Implementation for chunked outputStreamType and unit te…
Browse files Browse the repository at this point in the history
…sts for transform, categories, consistentSnapshot and onBatchError.
  • Loading branch information
anu3990 committed Dec 21, 2021
1 parent 3d6146d commit f78da40
Show file tree
Hide file tree
Showing 2 changed files with 360 additions and 100 deletions.
190 changes: 117 additions & 73 deletions lib/documents.js
Original file line number Diff line number Diff line change
Expand Up @@ -2811,6 +2811,12 @@ function readAllDocumentsImpl(jobOptions) {
jobState.jobOptions.batchSize = 250;
}

if(jobState.jobOptions.outputStreamType){
if(jobState.jobOptions.outputStreamType === 'chunked' && jobState.jobOptions.categories){
throw new Error('categories not expected when outputStreamType is chunked.');
}
}

if(jobState.jobOptions.consistentSnapshot && typeof jobState.jobOptions.consistentSnapshot !== 'boolean'){
throw new Error('consistentSnapshot needs to be a boolean.');
}
Expand Down Expand Up @@ -2838,7 +2844,20 @@ function onReadAllInit(output) {
break;
}
jobState.startTime = Date.now();
for (let i = 0; i < maxRequesters; i++) {
jobState.maxRequesters = maxRequesters;

if(jobState.jobOptions.consistentSnapshot){
jobState.requesterCount++;
onReadAllDocs(jobState,0);
}
else {
spinReaderThreads(jobState,0);
}
}

function spinReaderThreads(jobState, readerId){

for(let i=readerId;i<jobState.maxRequesters; i++){
jobState.requesterCount++;
onReadAllDocs(jobState,i);
}
Expand Down Expand Up @@ -2887,10 +2906,7 @@ function onReadAllDocs(jobState, readerId) {
batchdef.transform = jobState.jobOptions.transform;
}
if(jobState.jobOptions.consistentSnapshot){
if(!jobState.consistentSnapshotTimestamp){
jobState.consistentSnapshotTimestamp = new Date();
}
batchdef.timestamp = jobState.consistentSnapshotTimestamp;
batchdef.timestamp = jobState.consistentSnapshotTimestamp? jobState.consistentSnapshotTimestamp:new mlutil.Timestamp();
}
if(jobState.jobOptions.categories){
batchdef.categories = jobState.jobOptions.categories;
Expand All @@ -2904,78 +2920,103 @@ function readDocs(jobState, val, batchdef, readBatchArray, readerId){
finishReader(jobState);
return;
}
readDocumentsImpl.call(jobState.docInstance, val, [batchdef])
.result((output) => {
jobState.docsReadSuccessfully+= readBatchArray.length;
output.forEach(item => {
if(jobState.readableObjectMode === 'chunked'){
jobState.outputStream.write(JSON.stringify(item));
const readDocumentsImplRequest = readDocumentsImpl.call(jobState.docInstance, val, [batchdef]);
if(jobState.readableObjectMode === 'chunked'){
readDocumentsImplRequest.stream('chunked')
.on('error', function(err){
readAllDocumentsErrorHandle(jobState, batchdef, readBatchArray, readerId, val, err);
})
.on('data', function(item){
if(jobState.jobOptions.consistentSnapshot && !jobState.consistentSnapshotTimestamp){
jobState.consistentSnapshotTimestamp = batchdef.timestamp;
spinReaderThreads(jobState,1);
}
jobState.outputStream.write(item);
})
.on('end', function(){
if(jobState.jobOptions.consistentSnapshot && !jobState.consistentSnapshotTimestamp){
jobState.consistentSnapshotTimestamp = batchdef.timestamp;
spinReaderThreads(jobState,1);
}
else {
jobState.outputStream.write(item);
}
});
if(jobState.lastBatch){
finishReader(jobState);
}
else {
onReadAllDocs(jobState, readerId);
}
})
});
}
else {
readDocumentsImplRequest.result((output) => {
if(jobState.jobOptions.consistentSnapshot && !jobState.consistentSnapshotTimestamp){
jobState.consistentSnapshotTimestamp = batchdef.timestamp;
spinReaderThreads(jobState,1);
}
jobState.docsReadSuccessfully+= readBatchArray.length;
output.forEach(item => {
jobState.outputStream.write(item);
});
if(jobState.lastBatch){
finishReader(jobState);
}
else {
onReadAllDocs(jobState, readerId);
}
})
.catch((err) => {
if(jobState.error){
finishReader(jobState);
return;
}
let errorDisposition = err;

if (jobState.jobOptions.onBatchError) {
const progressSoFar = {
docsReadSuccessfully: jobState.docsReadSuccessfully,
docsFailedToBeRead: readBatchArray.length,
timeElapsed: (Date.now()-jobState.startTime)
};
const documents = readBatchArray;
try {
errorDisposition = jobState.jobOptions.onBatchError(progressSoFar, documents, err);
} catch(err){
errorDisposition = err;
}
readAllDocumentsErrorHandle(jobState, batchdef, readBatchArray, readerId, val, err);
});
}
}

if(errorDisposition instanceof Error){
jobState.docsFailedToBeWritten += readBatchArray.length;
jobState.error = errorDisposition;
jobState.stream.emit('error', errorDisposition);
finishReader(jobState);
} else if(errorDisposition === null ||
(Array.isArray(errorDisposition) && errorDisposition.length === 0)) {
onReadAllDocs(jobState, readerId);
} else if(Array.isArray(errorDisposition)){
batchdef = {uris: errorDisposition};
readDocs(jobState, val, batchdef, errorDisposition, readerId);
} else {
const onBatchErrorFailure = new Error('onBatchError should return null, empty array or a replacement array.');
jobState.error = onBatchErrorFailure;
jobState.stream.emit('error', onBatchErrorFailure);
finishReader(jobState);
}
}
function readAllDocumentsErrorHandle(jobState, batchdef, readBatchArray, readerId, val, err){
if(jobState.error){
finishReader(jobState);
return;
}
let errorDisposition = err;
if (jobState.jobOptions.onBatchError) {
const progressSoFar = {
docsReadSuccessfully: jobState.docsReadSuccessfully,
docsFailedToBeRead: readBatchArray.length,
timeElapsed: (Date.now()-jobState.startTime)
};
const documents = readBatchArray;
try {
errorDisposition = jobState.jobOptions.onBatchError(progressSoFar, documents, err);
} catch(err){
errorDisposition = err;
}
if(errorDisposition instanceof Error){
jobState.docsFailedToBeWritten += readBatchArray.length;
jobState.error = errorDisposition;
jobState.stream.emit('error', errorDisposition);
finishReader(jobState);
} else if(errorDisposition === null ||
(Array.isArray(errorDisposition) && errorDisposition.length === 0)) {
jobState.docsFailedToBeRead += readBatchArray.length;
onReadAllDocs(jobState, readerId);
} else if(Array.isArray(errorDisposition)){
batchdef = {uris: errorDisposition};
readDocs(jobState, val, batchdef, errorDisposition, readerId);
return;
} else {
const onBatchErrorFailure = new Error('onBatchError should return null, empty array or a replacement array.');
jobState.error = onBatchErrorFailure;
jobState.outputStream.emit('error', onBatchErrorFailure);
finishReader(jobState);
}
}

switch(jobState.retryCount){
case 0: {
jobState.docsFailedToBeRead += readBatchArray.length;
jobState.error = new Error('RetryCount exceeded.');
jobState.stream.emit('error', jobState.error);
finishReader(jobState);
break;
}
default: {
jobState.retryCount--;
readDocs(jobState, val, batchdef,readBatchArray, readerId);
break;
}
}
});
switch(jobState.retryCount){
case 0: {
jobState.docsFailedToBeRead += readBatchArray.length;
jobState.error = new Error('RetryCount exceeded.');
jobState.outputStream.emit('error', jobState.error);
finishReader(jobState);
break;
}
default: {
jobState.retryCount--;
readDocs(jobState, val, batchdef,readBatchArray, readerId);
break;
}
}
}

function finishReader(jobState) {
Expand All @@ -2992,7 +3033,10 @@ function finishReader(jobState) {
summary.error = jobState.error.toString();
}
if(jobState.consistentSnapshotTimestamp){
summary.consistentSnapshotTimestamp = jobState.consistentSnapshotTimestamp;
const timestamp = (jobState.consistentSnapshotTimestamp.value.length>13) ?
(+jobState.consistentSnapshotTimestamp.value.substr(0, 13)):
jobState.consistentSnapshotTimestamp.value;
summary.consistentSnapshotTimestamp = new Date(timestamp);
}
jobState.jobOptions.onCompletion(summary);
}
Expand Down
Loading

0 comments on commit f78da40

Please sign in to comment.