Skip to content

Collector Software Architecture

Shane Alcock edited this page Aug 2, 2023 · 9 revisions

collector architecture

The image on this page depicts the general architecture of the OpenLI collector software. Each small box with rounded corners (e.g. "Perpkt 1", "Forwarder", etc.) represents a thread within the collector process.

Note that, for purposes of simplifying the diagram, output from the Perpkt threads is only depicted for Perpkt 2 -- in fact, all Perpkt threads will be sending data to the Sync, Sync VOIP and Seqtracker threads. Similarly, the Sync thread has a communication channel to send IP intercepts and Known Servers to each Perpkt thread and the Sync VOIP thread does the same for RTP Intercepts.

There is also a separate set of threads for handling email interception. Again, to avoid cluttering up the diagram, this portion of the architecture has been drawn as a separate diagram and is shown further down this page.

Perpkt threads

Key source code files:

  • src/collector/collector.c
  • src/collector/collector_push_messaging.c

Perpkt is short for "per packet" -- these threads examine every packet received via the packet capture interface and determine whether it needs to be intercepted, passed on to a Sync thread for further examination, or simply ignored.

The perpkt threads are created and managed by the underlying libtrace library, which does the actual packet capture and distributes the captured packets to the perpkt threads on our behalf. On this diagram, libtrace is shown as managing either a DPDK or pcap interface but in fact OpenLI can be used with any capture method that is supported by libtrace, including AF_XDP, PFRING, Endace DAG, and many others.

Each perpkt thread has access to a structure containing global variables for the entire collector process (collector_global_t) and a structure containing local variables which are specific to that perpkt thread (colthread_local_t). These structures are defined in src/collector/collector.h, while various key sub-structures can be found in src/collector/collector_base.h.

Perpkt threads receive instructions from the Sync threads that tell them which packets are relevant for interception purposes. This includes the IP addresses and ports being used by RADIUS, GTP and SIP servers (so the session management traffic can be forwarded back to the Sync threads for further processing), as well as the IP addresses (and ports, in the case of RTP interception) that are known to be in use by any active intercept targets (along with key details about the intercept itself, such as LIID). IPs and ports for email servers are also included, as traffic to and from these servers will need to be passed on the email worker threads.

Using that information, the Perpkt thread examines each captured packet and checks:

  • if it matches a known RADIUS or GTP server -- if so, copy the packet and forward it to the Sync thread
  • if it matches a known email server -- if so, copy the packet and forward it to an email worker thread
  • if it matches a known SIP server -- if so, copy the packet and forward it to the Sync VOIP thread
  • if it matches the IP address of a known IP intercept target -- if so, copy the packet and put it in an IPCC job to be passed directly to a Seqtracker thread
  • if it matches the IP address + port of a known VOIP session for a VOIP intercept target -- if so, copy the packet and put it in an IPMMCC job to be passed directly to a Seqtracker thread

To maximise performance, the Perpkt threads must do two things:

  1. do as little processing work as possible on each packet
  2. return the original captured packet structure back to libtrace as soon as possible, so that it can be reused to hold further incoming packets -- hence, the reason why we copy packets that we are going to forward on when normally such behaviour would be discouraged.

Any changes to the Perpkt thread code must keep those two principles in mind.

Sync Threads

Key source code files:

  • src/collector/collector_sync.c
  • src/collector/collector_sync_voip.c
  • src/collector/sipparsing.c
  • src/collector/internetaccess.c
  • src/collector/accessplugins/*

The primary purpose of the Sync threads is to use the information contained with session management traffic to maintain the current state of all active user->IP sessions and VOIP calls, and update the Perpkt threads whenever a change in session/call state is observed for an intercept target.

There are two Sync threads used by an OpenLI collector: the IP sync thread which handles user->IP session mappings, and the VOIP sync thread which tracks VOIP calls.

In a conventional deployment, OpenLI uses RADIUS protocol traffic to track IP session state for all users (even those that are not currently subject to an intercept -- this is to ensure that we are prepared in the event that a user does become an intercept target later on). For mobile data intercepts, the GTP protocol serves the same purpose. The specific code for parsing these protocols and tracking session state for each user can be found in the src/collector/accessplugins/ directory. A modular system is used here to allow for relatively easy extension to support other session management protocols that may be required in the future (e.g. DIAMETER, DHCP, etc.).

Any changes in session state for a user that is an active intercept target (such as being assigned a new address, or the user's session ending) will be communicated to each Perpkt thread immediately so that the threads can start (or stop) intercepting traffic using the corresponding IP address(es). The specifics on how session state is derived using each protocol are too complex to explain here -- these will receive their own documentation pages at some point.

The VOIP sync thread performs a similar role for tracking active VOIP calls, using SIP traffic to identify calls that are made by or to an active intercept target, and then extracting the IP addresses and ports for the corresponding RTP traffic from the SDP content within those SIP packets. The IP addresses and ports are then sent to each Perpkt thread so that they can intercept any RTP traffic for the ongoing call. Once the call ends (as seen in the SIP signalling), the Perpkt threads are told to cease interception of that RTP stream. Unlike IP sessions, SIP sessions for users that are not currently active intercept targets are ignored.

Both Sync threads also perform the task of identifying when IRIs should be generated for an active intercept and creating IRI jobs to be passed on to the Seqtracker thread. Note that there are many situations where an IRI must be generated that does not directly impact the session state (and therefore won't require a corresponding update being sent to the Perpkt threads). One example is a RADIUS Accounting-Update for an intercept target: the Sync thread must generate an IRI-Report message for that intercept whenever it sees one of these messages, but since there is no change to the IP assigned to the user then there is no need to inform the Perpkt threads about it.

The Sync IP thread also has another role -- it receives the interception instructions from the OpenLI provisioner as the intercept configuration changes. Information about new or removed IP intercepts is retained within the Sync IP thread and used to determine which existing IP sessions need to be intercepted by the Perpkt threads. If a new intercept is added and that user already has an active session, then the session IP(s) are immediately passed on to the Perpkt threads for interception. If an intercept is removed, then that will trigger a message to each Perpkt thread to cease interception on any IPs assigned to that user. Interception instructions for VOIP intercepts are passed on to the Sync VOIP thread via a dedicated messaging channel, and the Sync VOIP thread will use those messages to update its own view of which calls need to be intercepted. Any intercepts that are withdrawn while a VOIP call is ongoing will also result in an update from the Sync VOIP thread to the Perpkt threads to cease interception of that call.

The other key information that is received by the Sync IP thread from the provisioner is the IP addresses and ports that are being used by RADIUS, email, GTP and SIP servers (i.e. how to identify traffic that needs to be forwarded to the Sync threads). Any changes to this list of "known" servers are immediately forwarded on to the Perpkt threads so they can siphon off that traffic correctly.

Obviously the two Sync threads may potentially become a bottleneck if they have to handle significant quantities of session management traffic and/or are inefficient in how they process that traffic and maintain session state. In practice, this hasn't been observed as a problem (yet), but we should try to stress test this at some point to identify potential optimisations and figure out what our likely maximum throughput is.

Email Worker Threads and the Email Ingestor

email intercept architecture

Key source code files:

  • src/collector/email_worker.c
  • src/collector/email_ingest_service.c
  • src/collector/emailprotocols/smtp.c
  • src/collector/emailprotocols/imap.c
  • src/collector/emailprotocols/pop3.c

Email interception functions somewhat differently in the collector to other intercept types, as the processing of email intercepts are handled by a separate set of email worker threads, as depicted in the diagram above. The email worker threads communicate with the Sync, Perpkt and Seqtracker threads seen in the architecture diagram at the top of this page, so you should interpret the email architecture as an "off-shoot" that is linked to those threads in the main architecture.

Another key difference is that we have TWO methods for getting interceptable email traffic into the collector.

The first method is the conventional packet mirroring and capture approach that is used for all other intercept types. In this scenario, each perpkt thread will compare a captured packet against its known list of IMAP, SMTP and POP3 servers (provided by the sync thread, which in turn receives the server lists from the provisioner). If the packet is recognised as email traffic, then it will be copied and that copy will be passed on to one of the email worker threads for further processing.

The second way of receiving email data is the email ingestion service: an HTTP service that runs on the collector and allows OpenLI users to upload application layer payloads directly from their mail server via POST requests. For instance, for SMTP interception, you could write a milter program for your MTA that generates the corresponding ingestion messages whenever an intercept target sends an email and POSTs them to the ingestion service on a collector.

More details on the expected format of the ingestion messages can be found on this wiki.

The ingestion service runs as a separate thread, listening for HTTP connections on its designated port. The service receives ingestion messages via HTTP POST and converts the contents of each ingestion message into an internal structure (called openli_email_captured_t) and passes that structure on to an email worker thread for processing.

The email worker threads are told which email intercepts are active by the sync thread. These threads then check for input from both the ingestion service thread and all of the perpkt threads. The perpkt thread will provide email traffic as a captured TCP packet, so the worker will first convert this packet by extracting the relevant properties into an openli_email_captured_t structure. The ingestion service has already generated a openli_email_captured_t instance for us, so no conversion is required in this case.

An email session is defined by the 5 tuple (email protocol, server IP, client IP, server_port, client_port), but the specifics of how a session progresses between states (and therefore when a CC or IRI should be generated) is different for each of the three email protocols. Therefore, we have implemented separate code modules that contain all of the protocol parsing and state tracking code for SMTP, IMAP, and POP3. When a packet or ingestion message is received by an email worker thread, it is first matched to its corresponding session, and then the "update session" method from the appropriate protocol module is called.

The protocol modules are then responsible for determining when to emit an IRI or CC job, and will construct those jobs and forward them on to the Seqtracker thread.

The core logic of an email worker thread therefore looks something like:

do (until told to stop)
 - get next "captured" input
   - convert from packet to openli_email_captured_t, if required
 - match message to email session
   - create session if none exists
 - if session is smtp: call update_smtp_session(session, captured)
 - if session is imap: call update_imap_session(session, captured)
 - if session is pop3: call update_pop3_session(session, captured)
 - timeout any email sessions that have been inactive for a long time

Common utility methods, such as adding a recipient to an email session or finding the sender's email address from within the body of an email, are defined in the email_worker.c file and made available to the mail protocol modules.

Seqtracker Threads

Key source code files:

  • src/collector/collector_seqtracker.c

The ETSI standards require that all intercepted records include a sequence number for the purposes of detecting records that have been lost in transit from the collection point to the receiving agency. The rules for sequence numbering can be summarised as:

  • Sequence numbers must strictly increment, starting at zero and increasing by one for each subsequent record.
  • Separate sequencing must be used for IRIs and CCs; i.e. the first IRI must have sequence number 0 and the first CC will also have sequence number 0.
  • Each distinct intercepted IP session or call will use its own independent numbering sequence, even if those sessions/calls are part of the same intercept (i.e. the sequence numbers will reset to 0 when the target changes IP address or when a new VOIP call begins). Specifically in ETSI parlance, sequencing is distinct for each observed LIID + CIN combination.

Because packets for a given interception may be spread across multiple Perpkt threads, OpenLI needs a central point at which intercepted packets can be assigned an appropriate sequence number that follows the above rules. This is the purpose of the Seqtracker thread -- to remember which sequence number was last assigned to an IRI and CC for any given intercepted call or IP session and then assign the correct sequence number to each incoming IRI and CC job before passing it on to be encoded into a valid ETSI record.

While this job is a relatively low workload for each intercepted packet, it is still an obvious potential bottleneck. In theory, adding multiple Seqtracker threads to spread the load would be possible but we would need to ensure that packets for the same intercept all end up reaching a consistent Seqtracker thread.

Encoder Threads

Key source code files:

  • src/collector/encoder_worker.c
  • src/etsili_core.c
  • src/collector/ipcc.c
  • src/collector/ipiri.c
  • src/collector/ipmmcc.c
  • src/collector/ipmmiri.c
  • src/collector/umtscc.c
  • src/collector/umtsiri.c
  • src/collector/emailcc.c
  • src/collector/emailiri.c

Encoder threads convert the "jobs" have passed through the Seqtracker thread and construct ETSI-compliant IRI and CC records from them. Each job contains the relevant parameters that are required to build an ETSI record (such as sequence number, timestamp, IRI type, LIID, CIN, etc.); for CCs and VOIP IRIs, the job will also include the intercepted packet itself which will be included in the resulting record.

The ETSI encoding performed by OpenLI is based on the Distinguished Encoding Rule method (DER) for ASN.1. Because the data structures in ASN.1 fully wrap any data fields or structures contained within them, we have to encode each record using an inside-out strategy -- i.e. we need to encode any inner members of a data structure before we can encode the structure itself (specifically, we need to know the exact size of the structure's contents). This can be a computationally expensive exercise, especially given that integers in DER are not encoded using a fixed size field. A single integer deep within a series of nested structures can potentially change the size of not only that structure, but all of its parent structures as well. This makes it difficult to reuse existing encoded records by replacing any fields that may have changed; if an integer field requires a different amount of space to encode, then a significant portion of the record may end up changing.

Fortunately, an individual encoding job is independent of any others so we can use multiple encoding threads to spread the workload. However, adding too many encoding threads can actually reduce throughput, as the forwarder that receives the encoded ETSI records must still collate them and ensure that they are emitted in the correct order. More encoding threads creates a greater likelihood of records arriving out of order at the forwarder, which means the forwarder spends more time reordering and less time forwarding. Two encoder threads seems to be the sweet spot in our testing environment (assuming 1 Seqtracker and 1 Forwarder).

The ETSI records produced by the encoder threads are pre-pended with the LIID of the intercept that the record belongs to1 -- this is to allow the mediator to quickly identify which agency should receive the ETSI record without having to decode the record itself. The LIID is encoded as a 2 byte length field, followed by the LIID itself as a sequence of characters of the described length.

1: At the time of writing, the LIID pre-pending actually happened in the forwarder thread (OpenLI 1.0.10) but this will be moved to the encoder threads in all future releases to reduce workload in the forwarder thread.

Forwarder Threads

Key source code files:

  • src/collector/collector_forwarder.c
  • src/export_buffer.c
  • src/collector/reassembler.c

The role of the forwarder thread is fairly simple: take the encoded ETSI records produced by the encoder threads, put them back into their correct sequence number order and then send them on to the appropriate OpenLI Mediator.

Reordering: for each LIID + CIN, the forwarder must record the next expected sequence number for both IRIs and CCs. If a record arrives that has a sequence number after the expected one, then the record is retained in memory until the missing record arrives. Records that do have the expected sequence number are written into an "export buffer" for the mediator that the record is destined for. If a received record resolves a sequence gap, then any retained records that are now in sequence will also be pushed onto the export buffer.

Once the forwarder has sufficient data to send to a mediator, and the socket for that mediator is able to send data, then data is written from the export buffer to the mediator. OpenLI always tries to start and end transmissions on a record boundary to counteract potential issues when a mediator is restarted and the forwarder attempts to resume sending from the middle of a record.

Transport Methods

Transmission to the mediator can use a number of different transports:

  • TCP socket without SSL
  • TCP socket with SSL
  • RabbitMQ without SSL
  • RabbitMQ with SSL

SSL will encrypt the records prior to sending them to the mediator -- this will be necessary if the path between your collector and mediator is not entirely trusted and secure (e.g. it traverses a network that you do not fully control). However, enabling SSL will slightly decrease the throughput of your forwarder compared with a non-SSL transport.

Using RabbitMQ will allow your forwarder to buffer ETSI records on disk if the connection to the mediator goes down for a non-trivial period of time. This can provide extra resilience in the event of a longer mediator or network failure -- the intercepted records will simply reside on disk until the mediator comes back online. However, RabbitMQ will require more available CPU cores on your collector host and probably some configuration tweaks to ensure that it avoids using the same cores that your OpenLI collector threads are relying on to achieve good performance.

By contrast, a forwarder that is relying solely on TCP and the export buffer will only be able to buffer records in RAM which gives you much less time to resolve the issue before records are dropped or the collector is forced to halt due to lack of available memory.

The user will need to decide which level of resilience and security is required for their particular deployment, based on the locations of their collector and mediator and what they know about the network path between them, as well as their particular risk profile. Ideally, all deployments would choose RabbitMQ + SSL for maximum security and resilience, but this is also the most complex method to deploy as well as having the largest impact on interception throughput so there are definitely trade-offs to be made.

Extra Information

Vendor Mirroring

Key source files:

  • src/collector/alushim_parser.c
  • src/collector/jmirror_parser.c

Some networking hardware vendors offer a "Lawful Intercept" mode which can be used to mirror traffic for specific subscribers to a specified endpoint (typically via UDP). The mirrored traffic is encapsulated in a simple shim header that identifies which intercept the mirrored packet belongs to and not much else. As is, the traffic still needs to be properly encoded according to the ETSI standards before it can be accepted by LEAs.

OpenLI collectors are able to support translating these vendor "intercepts" into ETSI-compliant ones. Currently, we have support for the shim wrappers used by Alcatel-Lucent / Nokia, and Juniper but others may be added if requested.

Vendor intercept traffic is identified by configuring the OpenLI collector with the IP addresses and ports being used to sink the mirrored traffic. Ideally, the sink would be running on the same host as your OpenLI collector so that traffic is sent to the host directly.

Individual IP intercepts for vendor mirrored traffic can be configured with the vendmirrorid option -- the value of the option should match the intercept ID number that will be placed in the shim header by the vendor hardware. When vendor intercept traffic is observed by a collector, the shim header is parsed to determine if the intercept ID number within matches a known vendor mirror intercept -- if so, the shim header is stripped and the remaining packet is processed as a intercepted CC for the OpenLI intercept(s) that have that ID number as their vendmirrorid value.

Clone this wiki locally