-
Notifications
You must be signed in to change notification settings - Fork 1
/
packet_manager.py
91 lines (71 loc) · 2.93 KB
/
packet_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#
# Copyright(c) 2023 Swisscom (Schweiz) AG
# Authors: Marco Tollini, Leonardo Rodoni
# Distributed under the MIT License (http://opensource.org/licenses/MIT)
#
# External Libraries
import logging
from scapy.all import PcapReader, rdpcap
# Internal Libraries
from report import report
from proto import Proto
class PacketWithMetadata:
def __init__(self, number, packet, typ: Proto) -> None:
self.number = number
self.packet = packet
self.type = typ
class PacketManager:
def __init__(self, pcap_path, selectors, preload=True):
self.pcap_path = pcap_path
self.preload = preload
self.selectors = selectors
self.reader = None
self.reader_i = 0
self.current_packet = 0
def _transform(self, packet):
for protos in self.selectors:
if self.selectors[protos](packet):
return PacketWithMetadata(number=self.current_packet, packet=packet, typ=protos)
logging.debug(f"[{self.current_packet}] discarded as proto was not selected")
report.pkt_noproto_countup()
return None
def __iter__(self):
self.current_packet = 0
# if not preload, use scapy's PcapReader, which reads packet by packet
if not self.preload:
logging.info('Preload is disabled')
self.reader = iter(PcapReader(self.pcap_path))
return self
# if preload, load, filter, and give type to all packets
# TODO: modify here & explain better or remove...
# --> simply filter based on specified port
# --> is meant for increasing speed
logging.info('Preload is enabled - loading, filtering and preprocessing all packets - this might take a while')
packets = rdpcap(self.pcap_path)
logging.info('preload process: pcap loaded into the memory')
self.reader = []
self.reader_i = 0
for packet in packets:
if self.current_packet % 10000 == 0:
logging.info(f'preload process: processed {self.current_packet}/{len(packets)} packets')
self.current_packet += 1
packetwm = self._transform(packet) # null if packet is filtered
if packetwm is None:
continue
self.reader.append(packetwm)
logging.info(f'Preload done: {len(self.reader)} packets after filters')
return self
def __next__(self) -> PacketWithMetadata:
if not self.preload:
while True:
self.current_packet += 1
packet = next(self.reader) # raises StopIteration
packetwm = self._transform(packet) # null if packet is filtered
if packetwm is not None:
return packetwm
else:
if self.reader_i >= len(self.reader):
raise StopIteration
packetwm = self.reader[self.reader_i]
self.reader_i += 1
return packetwm