Skip to content

Commit

Permalink
Ensure that all buffers used inside HTTP client will follow original …
Browse files Browse the repository at this point in the history
…buffer size.

Ensure that buffer size cannot be lower than default size.
  • Loading branch information
cheatfate committed Apr 3, 2024
1 parent 402914f commit 151b591
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 12 deletions.
18 changes: 12 additions & 6 deletions chronos/apps/http/httpclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,8 @@ proc new(
tls =
try:
newTLSClientAsyncStream(treader, twriter, ha.hostname,
flags = session.flags.getTLSFlags())
flags = session.flags.getTLSFlags(),
bufferSize = session.connectionBufferSize)
except TLSStreamInitError as exc:
return err(exc.msg)

Expand Down Expand Up @@ -1327,13 +1328,18 @@ proc getBodyReader*(response: HttpClientResponseRef): HttpBodyReader {.
let reader =
case response.bodyFlag
of HttpClientBodyFlag.Sized:
let bstream = newBoundedStreamReader(response.connection.reader,
response.contentLength)
newHttpBodyReader(bstream)
newHttpBodyReader(
newBoundedStreamReader(
response.connection.reader, response.contentLength,
bufferSize = response.session.connectionBufferSize))
of HttpClientBodyFlag.Chunked:
newHttpBodyReader(newChunkedStreamReader(response.connection.reader))
newHttpBodyReader(
newChunkedStreamReader(
response.connection.reader,
bufferSize = response.session.connectionBufferSize))
of HttpClientBodyFlag.Custom:
newHttpBodyReader(newAsyncStreamReader(response.connection.reader))
newHttpBodyReader(
newAsyncStreamReader(response.connection.reader))
response.connection.state = HttpClientConnectionState.ResponseBodyReceiving
response.reader = reader
response.reader
Expand Down
6 changes: 4 additions & 2 deletions chronos/streams/asyncstream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,8 @@ proc init*(child, rsource: AsyncStreamReader, loop: StreamReaderLoop,
child.readerLoop = loop
child.rsource = rsource
child.tsource = rsource.tsource
child.buffer = AsyncBufferRef.new(bufferSize)
let size = max(AsyncStreamDefaultBufferSize, bufferSize)
child.buffer = AsyncBufferRef.new(size)
trackCounter(AsyncStreamReaderTrackerName)
child.startReader()

Expand All @@ -958,7 +959,8 @@ proc init*[T](child, rsource: AsyncStreamReader, loop: StreamReaderLoop,
child.readerLoop = loop
child.rsource = rsource
child.tsource = rsource.tsource
child.buffer = AsyncBufferRef.new(bufferSize)
let size = max(AsyncStreamDefaultBufferSize, bufferSize)
child.buffer = AsyncBufferRef.new(size)
if not isNil(udata):
GC_ref(udata)
child.udata = cast[pointer](udata)
Expand Down
12 changes: 8 additions & 4 deletions chronos/transports/stream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,8 @@ when defined(windows):
udata: cast[pointer](transp))
transp.wovl.data = CompletionData(cb: writeStreamLoop,
udata: cast[pointer](transp))
transp.buffer = BipBuffer.init(bufsize)
let size = max(bufsize, DefaultStreamBufferSize)
transp.buffer = BipBuffer.init(size)
transp.state = {ReadPaused, WritePaused}
transp.queue = initDeque[StreamVector]()
transp.future = Future[void].Raising([]).init(
Expand All @@ -606,7 +607,8 @@ when defined(windows):
udata: cast[pointer](transp))
transp.wovl.data = CompletionData(cb: writeStreamLoop,
udata: cast[pointer](transp))
transp.buffer = BipBuffer.init(bufsize)
let size = max(bufsize, DefaultStreamBufferSize)
transp.buffer = BipBuffer.init(size)
transp.flags = flags
transp.state = {ReadPaused, WritePaused}
transp.queue = initDeque[StreamVector]()
Expand Down Expand Up @@ -1452,7 +1454,8 @@ else:
transp = StreamTransport(kind: TransportKind.Socket)

transp.fd = sock
transp.buffer = BipBuffer.init(bufsize)
let size = max(bufsize, DefaultStreamBufferSize)
transp.buffer = BipBuffer.init(size)
transp.state = {ReadPaused, WritePaused}
transp.queue = initDeque[StreamVector]()
transp.future = Future[void].Raising([]).init(
Expand All @@ -1469,7 +1472,8 @@ else:
transp = StreamTransport(kind: TransportKind.Pipe)

transp.fd = fd
transp.buffer = BipBuffer.init(bufsize)
let size = max(bufsize, DefaultStreamBufferSize)
transp.buffer = BipBuffer.init(size)
transp.state = {ReadPaused, WritePaused}
transp.queue = initDeque[StreamVector]()
transp.future = Future[void].Raising([]).init(
Expand Down

0 comments on commit 151b591

Please sign in to comment.