Skip to content

Commit

Permalink
Fix user stream reading to carry out multiple handler calls when needed
Browse files Browse the repository at this point in the history
  • Loading branch information
nirosys committed Feb 22, 2023
1 parent f058655 commit 621e379
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 40 deletions.
82 changes: 43 additions & 39 deletions ionc/ion_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1764,7 +1764,7 @@ iERR _ion_stream_fread( ION_STREAM *stream, BYTE *dst, BYTE *end, SIZE *p_bytes_
iENTER;
ION_STREAM_PAGED *paged = PAGED_STREAM(stream);
struct _ion_user_stream *user_stream = NULL;
SIZE local_bytes_read, needed, bytes_read = 0;
SIZE local_bytes_read = 0, bytes_read = 0;

ASSERT(stream);
ASSERT(_ion_stream_is_paged(stream));
Expand Down Expand Up @@ -1797,55 +1797,59 @@ iERR _ion_stream_fread( ION_STREAM *stream, BYTE *dst, BYTE *end, SIZE *p_bytes_
// read from the user managed old style stream
//
user_stream = &(((ION_STREAM_USER_PAGED *)stream)->_user_stream);
SIZE needed = (SIZE)(end - dst);

// first check to see if we have any pending bytes, from a previous read
if (user_stream->curr == NULL
|| user_stream->limit == NULL
if (user_stream->curr == NULL || user_stream->limit == NULL
|| ((bytes_read = (SIZE)(user_stream->limit - user_stream->curr)) < 1)
) {
// if we didn't have anything, call the handler to get more bytes
switch((err = (*(user_stream->handler))(user_stream))) {
case IERR_OK:
bytes_read = (SIZE)(user_stream->limit - user_stream->curr);
break;
case IERR_EOF:
// if we didn't have anything, call the handler to get more bytes
bytes_read = 0;
break;
default:
bytes_read = READ_ERROR_LENGTH;
break;
}
}
// now, if we have bytes and no error, copy over what we got, or a page full
// whichever is shorter (if we don't copy the whole page we'll get more on our next pass)
if (bytes_read > 0) {
needed = (SIZE)(end - dst);
if (bytes_read > needed) {
bytes_read = needed;
}
memcpy(dst, user_stream->curr, bytes_read);
user_stream->curr += bytes_read; // move the handlers cursor forward
while (bytes_read < needed) {
err = (*(user_stream->handler))(user_stream);
if (err == IERR_OK) {
local_bytes_read = (SIZE)(user_stream->limit - user_stream->curr);
int to_copy = (local_bytes_read + bytes_read > needed) ? (needed - bytes_read) : local_bytes_read;
if (to_copy > 0) {
memcpy(dst, user_stream->curr, to_copy);
dst += to_copy;
user_stream->curr += to_copy;
bytes_read += to_copy;
} else { // No new data, so we break out.
break;
}
} else {
// Break in the case of error, or EOF.
break;
}
}
} else { // We have bytes from a previous read.
int to_copy = (bytes_read > needed) ? needed : bytes_read;
memcpy(dst, user_stream->curr, to_copy);
user_stream->curr += to_copy;
bytes_read += to_copy;
}
}
else {
//
// read a page from the underlying FILE*
//
local_bytes_read = (SIZE)(end - dst);
if (_ion_stream_is_fd_backed(stream)) {
bytes_read = (SIZE)READ((int)stream->_fp, dst, local_bytes_read);
if (bytes_read < 0) {
bytes_read = READ_ERROR_LENGTH;
}
else if (bytes_read == 0) {
bytes_read = READ_EOF_LENGTH;
}
}
else {
bytes_read = (SIZE)fread(dst, sizeof(BYTE), local_bytes_read, stream->_fp);
if (ferror(stream->_fp)) {
bytes_read = READ_ERROR_LENGTH;
}
}
if (_ion_stream_is_fd_backed(stream)) {
bytes_read = (SIZE)READ((int)stream->_fp, dst, local_bytes_read);
if (bytes_read < 0) {
bytes_read = READ_ERROR_LENGTH;
}
else if (bytes_read == 0) {
bytes_read = READ_EOF_LENGTH;
}
}
else {
bytes_read = (SIZE)fread(dst, sizeof(BYTE), local_bytes_read, stream->_fp);
if (ferror(stream->_fp)) {
bytes_read = READ_ERROR_LENGTH;
}
}
}

*p_bytes_read = bytes_read;
Expand Down
2 changes: 1 addition & 1 deletion test/test_ion_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ TEST(IonStream, ContinuesOverPageBoundary) {
ION_ASSERT_OK(ion_reader_close(reader));
ION_ASSERT_OK(ion_stream_close(stream));

ASSERT_EQ(2, context.number_of_handler_invocations);
ASSERT_EQ(3, context.number_of_handler_invocations);
ION_ASSERT_OK(ion_timestamp_parse(&expected, (char *)expected_str, (SIZE)strlen(expected_str), &chars_used, &g_IonEventDecimalContext));
ION_ASSERT_OK(ion_timestamp_equals(&expected, &ts, &is_equal, &g_IonEventDecimalContext));
ASSERT_TRUE(is_equal);
Expand Down

0 comments on commit 621e379

Please sign in to comment.