-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparse_simba.cpp
126 lines (108 loc) · 3.5 KB
/
parse_simba.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#include "length_extractors.h"
#include "network_headers.h"
#include "parsers.h"
#include "splitter.h"
#include "queue.h"
#include "thread_pool.h"
#include <iostream>
#include <memory>
#include <vector>
class SimbaFileProcessor {
public:
SimbaFileProcessor(size_t read_chunk_size, size_t parsers_threads, const std::string output_filename)
: read_chunk_size_(read_chunk_size)
, splitter_(Splitter(std::make_unique<MdpLengthExtractor>()))
, datagram_counter_(0)
, output_filename_(output_filename)
, parsers_threads_(parsers_threads) {}
bool ProcessTraffic(std::ifstream& infile) {
auto parser_chain = ParserChain(
SBEParser(
std::make_shared<ThreadSafeSeqFileWriter>(output_filename_, 1024*1024*64)
)
);
ThreadPool pool(parsers_threads_, [this, &parser_chain]() {
ConsumeDatagrams<decltype(parser_chain)>(queue_, parser_chain);
});
while (infile) {
ReadChunkFromFile(infile);
SplitPacketsFromBuffer();
DispatchPacketsForParsing();
}
std::cout << "waiting" << std::endl;
queue_.terminate();
return true;
}
private:
size_t read_chunk_size_;
Splitter splitter_;
BlockingMPMCQueue<std::pair<size_t, std::vector<char>>> queue_;
std::vector<char> buffer_;
std::vector<std::vector<char>> datagrams_;
std::string output_filename_;
size_t parsers_threads_;
int datagram_counter_;
void ReadChunkFromFile(std::ifstream& infile) {
size_t old_size = buffer_.size();
buffer_.resize(old_size + read_chunk_size_);
infile.read(reinterpret_cast<char*>(buffer_.data()) + old_size, read_chunk_size_);
std::streamsize bytes_read = infile.gcount();
buffer_.resize(old_size + bytes_read);
}
void SplitPacketsFromBuffer() {
datagrams_ = splitter_.SplitDatagrams(buffer_);
}
void DispatchPacketsForParsing() {
for (auto& datagram : datagrams_) {
queue_.push(std::make_pair(
datagram_counter_++,
std::move(datagram)
));
}
}
template <typename ParserChainType>
void ConsumeDatagrams(
BlockingMPMCQueue<std::pair<size_t, std::vector<char>>>& queue,
ParserChainType parser_chain
) {
while (true) {
auto data = queue.pop();
if (!data) {
// Is terminated
break;
}
parser_chain.Parse(IndexedDatagram{
data->first,
std::span<char>(data->second.data(), data->second.size())
});
}
}
};
struct Args {
std::string input_file;
std::string output_file;
};
Args ParseArgs(int argc, char* argv[]) {
Args args;
if (argc < 3) {
std::cerr << "Usage: parse_simba <input_file> <output_file>" << std::endl;
std::exit(1);
}
args.input_file = argv[1];
args.output_file = argv[2];
return args;
}
int main(int argc, char* argv[]) {
const auto args = ParseArgs(argc, argv);
std::ifstream infile(args.input_file, std::ios::binary);
if (!infile.is_open()) {
std::cerr << "Could not open file: " << args.input_file << std::endl;
return 1;
}
// TODO: tweak chunk size, threads
auto processor = SimbaFileProcessor(
/*read_chunk_size=*/1024*1024*1, // 1 MB
/*parsers_threads=*/1,
args.output_file);
return static_cast<int>(!processor.ProcessTraffic(infile));
}