Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On allocations in the data path #79

Open
eboasson opened this issue Dec 11, 2019 · 1 comment
Open

On allocations in the data path #79

eboasson opened this issue Dec 11, 2019 · 1 comment
Labels
RT WG ROS2 Real-Time working group

Comments

@eboasson
Copy link
Collaborator

Path traversed by a message/sample, annotated with memory allocation activity (the "local
subscription bypass" skips straight from write_sample_gc to rhc_store, which is a
bit of a lie when you look carefully at the code, but that way I get the allocations
correct):

rmw_publish
  |
  v
dds_write
  |
  |  RMW serdata: from_sample
  |    - allocates "serdata" and serializes into it
  |  key to instance_handle mapping (1)
  |    allocates for a unique key
  |    - tkmap_instance (entry in table)
  |    - "topicless serdata" for insertion in table
  |    - possibly resizes hash table
  |    (if no readers, undoes the above)
  |
  v
write_sample_gc
  |        \
  |         \
  |  whc_insert (for reliable data)
  |    - allocates whc_node (2)
  |    - inserts in seq# hash (which may grow hash table)
  |    - adds to seq# interval tree (which may require an interval tree node)
  |    - may push out old samples, which may cause frees
  |    - if keyed: insert in instance_handle hash
  |      - may grow hash table (1)
  |           |
  v           |
transmit_sample
  |           |
  |  allocate and initialise new "xmsg" (3)
  |    - large samples: needs many, for DATAFRAG and HEARTBEATFRAG
  |    - serialised data integrated by-reference
  |    - may also need one for a HEARTBEAT
  |           |
  |  xpack_add called for each "xmsg" to combine into RTPS messages
  |    - fills a (lazily allocated per writer) scatter/gather list
  |    - transmits the packet using sendmsg() when full/flushed
  |    - does release record of samples just sent to track the highest seq#
  |      that was actually transmitted, which may release samples if
  |      they were ACK'd already (unlikely, but possible) (3)
  |           |
  |           |
  |           |
  |           |
  |           |local
  |           |subscriptions
sendmsg       |bypass 
  .           |
  .           |
  .           |
  .           |
nn_rmsg_new   |
  |           |
  |  ensure receive buffer space is available (5)
  |    - may allocate new buffers: data is left in these buffers while
  |      defragmenting or dealing with samples received out-of-order
  |    - these buffers are huge in the default config to reduce number of allocations
  |           |
recvmsg       |
  |           |
  |           |
  v           |
"cracking" (5)|
  |           |
  |  term is an analogy to oil refining: breaking up a message into its constituents
  |  typically allocates memory, typically contiguous with received datagram by
  |  bumping a pointer
  |    - COW receiver state on state changes
  |           |
  |  DATA/DATAFRAG/GAP:
  |    - allocate message defragmenting state
  |    - allocate message reordering state
  |    - (typically GAP doesn't require the above)
  |    - may result in delivering data or discarding fragments, which may free memory
  |           |
  |  ACKNACK: |
  |    - may drop messages from WHC, freeing (2):
  |      - whc_node, interval tree entry, index entries, possibly serdata
  |      - possible "keyless serdata" and instance_handle index entry
  |  ACKNACK/NACKFRAG:
  |    - possibly queues retransmits, GAPs and HEARTBEATs
  |      - allocates "xmsg"s (like data path) (3)
  |      - allocates queue entries (4)
  |      - freed upon sending
  |           |
  |  HEARTBEAT:
  |    - may result in delivering data or discarding fragments, which may free memory
  |           |
  v           |
deliver_user_data
  |           |
  |  RMW serdata: from_ser
  |    - allocates a "serdata" and fills it with serialised data
  |      that is still stored in receive buffers
  |           |
  |  frees receive buffer claims after having created the "serdata" (5)
  |  typical synchronous delivery path without message loss:
  |    - resets receive buffer allocator pointer to what it was prior to processing
  |      datagram
  |    - (but typical is not so interesting in a worst-case analysis ...)
  |           |
  |  key to instance_handle mapping (1)
  |    allocates for a unique key
  |    - tkmap_instance (entry in table)
  |    - "topicless serdata" for insertion in table
  |    - possibly resizes hash table
  |    (if no readers, undoes the above)
  |           /
  |          /
  |         /
  v        /
rhc_store (once per reader)
  |
  |  - allocates new "instance" if instance handle not yet present in its hash table
  |    may grow instance hash table
  |  - allocates new sample (typically, though never for KEEP_LAST with depth 1 nor
  |    for pushing old samples out of the history
  |  - may free RMW serdatas that were pushed out of the history
  |    this won't affect the instance_handle mappings nor the "topicless serdata"
  |    because overwriting data in the history doesn't change the set of keys
  |  - may require insertion of a "registration" in a hash table, which in turn
  |    may require allocating/growing the hash table (the "registration" itself
  |    is not allocated)
  |
  v
dds_take
  |
  |  - RMW serdata: to_sample, deserialises into ROS message provided by the upper
  |    layers
  |  - RMW free of the "serdata" if the refcount drops to zero
  |  - removes the sample from the reader history (possibly the instance as well)
  |    which may involve
  |  - freeing memory allocated for the instance handle mapping and the "topicless
  |    serdata" (1)
  |
  v
rmw_take

There are a number of points worth noting in the above:

  • Cyclone defers freeing memory in some cases, relying on a garbage collector, but this
    garbage collector is one in the sense of the garbage trucks that drive through the
    streets collecting the garbage that has been put on the sidewalk, rather than the
    stop-the-world/"thief in the night" model used in Java, C#, Haskell, &c.

    The deferring is so that some data can be used without having to do reference counting
    for dynamic references caused by using some data for a very short period of time, as
    well as, to some extent, to not incur the cost of freeing at the point of use. This is
    currently used for:

    • mapping entries for key value to instance handle
    • all DDSI-level entities (writers, readers, participants, and their "proxy" variants
      for remote ones)
    • hash table buckets for the concurrent hash tables used to index the above

    Freeing these requires enqueueing them for the garbage collector; that in turn is
    currently implemented by allocating a "gcreq" queue entry.

  • ROS2 only uses keyless topics (in its current versions, anyway) and so there is at most
    a single "instance" and consequently, at most a single instance handle and mapping entry
    at any one time. For administrating these instance handles, the implementation reduces
    the sample to its key value and erases the topic from it (the "topicless serdata" in the
    above). This way, if different topics from the same underlying type implementation have
    the same key value, they get the same instance handle and the same mapping entry.

    The mapping is therefore affected only when the union of the contents of the writer
    history caches and the reader history caches for ROS2 messages transitions between an
    empty set and a non-empty set.

    As there is at most a single entry, it makes sense to force its existence. DDS'
    register_instance can be used to do this (but that thing hasn't really been used in
    Cyclone and I know that I haven't cleaned up the inheritance yet). That will eliminate
    allocations/frees marked (1).

  • The allocations of "whc_node" (marked (2)) for tracking individual samples in the
    writer history cache potentially happen at very high rates (> 1M/s for throughput tests
    with small samples) and I have seen the allocator becomes a bottleneck. Caching is the
    current trick to speed this up; the cache today is bounded by what can reasonably be
    expected to be needed.

    The interval tree is allocated/freed without caching, because there is far less churn
    for those (e.g., for a simple queue, there is only one interval needed).

    The number of samples in the writer history cache is, of course, bounded by history
    settings, and so these could just as easily be pre-allocated. But even better is to use
    the fact that the WHC has been abstracted in the code. A simple pre-allocated circular
    array is sufficient for a implementing a queue with limited depth, and so using a
    different implementation is altogether more sensible.

  • The "xmsg" (marked (3)) that represent RTPS submessages (or more precisely: groups
    of submessages that must be kept together, like an INFO_TS and the DATA it applies to)
    are cached for the same reason the WHC entries are cached. Unlike the latter, the
    lifetime of the "xmsg" is the actual sending of data. The number of "xmsg" that can be
    packed into a message is bounded, and certainly for the data path these can be
    preallocated.

    For generating responses to ACKNACKs and HEARTBEATs and queueing them, maintaining a
    pool with a sensible policy in case the pool is too small should be possible: not
    sending a message in response because no "xmsg" is available is equivalent to it getting
    lost on the network, and that's supported. The "sensible" part of the policy is that it
    may be better to prioritise some types over others, e.g., perhaps it would be better to
    prioritise acknowledgements over retransmits. That's something worth investigating.

  • Responses to received RTPS messages are not sent by the same thread, but rather by a
    separate thread (marked (4)). Retransmits (for example) are generaetd and queued by
    the thread handling the incoming packet, but (for example) ACKNACKs are generated by
    rescheduling a pre-existing event that then generates an "xmsg" in that separate thread
    and transmits it similar to regular data transmission.

    Pre-allocating the queue entries (or better yet: turning the queue into a circular
    array!) has similar considerations to pre-allocating the "xmsg".

  • Receive buffers are troublesome (marked (5)): to support platforms without
    scatter/gather support in the kernel (are there still any left?) it allocates large
    chunks of memory so it can accept a full UDP datagram in it with some room to spare.
    That room to spare is then used to store data generated as a consequence of interpreting
    the packet: receiver state, sample information, administration for tracking fragments or
    samples that were received out of order. Data is the longest lived of all this, and so
    releasing the memory happens when all data in the buffer has been delivered (or
    discarded).

    If fragmented data is received or data is received out-of-order, that means the packets
    can hang around for a while ... The benefit (and primary reason why I made this
    experiment way back when) is that there is no copying of anything until the received
    data is turned into "serdata"s by from_ser (which is implemented in the RMW layer
    for ROS2 messages). Even at that stage, it is still possible to not copy the data but
    instead leave it in the receive buffer, but I have never actually tried that!

    The allocator used within the buffers is a simple bump allocator with an optimisation
    that it resets completely if no part of the packet remains live after processing all its
    submessages, and so for non-fragmented data with negligible packet loss, you're
    basically never allocating memory. Conversely, when data is fragmented or there is
    significant packet loss, it becomes a bit of a mess.

    Eliminating the allocating and freeing of these buffers needs some thought ... quite
    possibly the best way is to assume scatter/gather support and accept an additional copy
    if the data can't be delivered to the reader history immediately.

  • The reader history cache is really vastly overcomplicated for ROS2 use ... similar to
    the writer history cache, it can be replaced by an alternate implementation, and that's
    probably the most straightforward path to eliminating allocations when using ROS2.
    Alternatively, one could pre-allocate instances and samples.

    The "registrations" that track which writer is writing which instances do not incur any
    allocations unless there are multiple writers writing the same instance. Cyclone
    follows the DDS spec in treating topics that have no key fields as topics having a
    single instance, and so multiple ROS2 publishers for the same topic will cause these to
    be tracked. As ROS2 has no concept of registrations and alive/not-alive instances, it
    is probably sensible to have a way of turning this off when using the default RHC
    implementation.

@eboasson eboasson added the RT WG ROS2 Real-Time working group label Dec 11, 2019
@rosbuild
Copy link

rosbuild commented Feb 5, 2020

This issue has been mentioned on ROS. There might be relevant details there:

https://discourse.ros.org/t/ros-2-real-time-working-group-online-meeting-9-jan-22-2020-7am-pdt-utc-8/12623/1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
RT WG ROS2 Real-Time working group
Projects
None yet
Development

No branches or pull requests

2 participants