88#include " ../utils/ether.h"
99#include " ../utils/ip.h"
1010#include " ../utils/udp.h"
11+ #include " ../utils/common.h"
12+ #include " ../utils/codel.h"
13+
14+ using bess::utils::Codel;
15+ using bess::pb::DRRArg;
1116
1217uint32_t RoundToPowerTwo (uint32_t v) {
1318 v--;
@@ -31,7 +36,8 @@ DRR::DRR()
3136 max_queue_size_(kFlowQueueMax ),
3237 max_number_flows_(kDefaultNumFlows ),
3338 flow_ring_(nullptr ),
34- current_flow_(nullptr ) {}
39+ current_flow_(nullptr ),
40+ generator_(NULL ) {}
3541
3642DRR::~DRR () {
3743 for (auto it = flows_.begin (); it != flows_.end ();) {
@@ -41,7 +47,7 @@ DRR::~DRR() {
4147 std::free (flow_ring_);
4248}
4349
44- CommandResponse DRR::Init (const bess::pb:: DRRArg& arg) {
50+ CommandResponse DRR::Init (const DRRArg& arg) {
4551 CommandResponse err;
4652 task_id_t tid;
4753
@@ -70,11 +76,37 @@ CommandResponse DRR::Init(const bess::pb::DRRArg& arg) {
7076 }
7177
7278 int err_num = 0 ;
73- flow_ring_ = AddQueue (max_number_flows_, &err_num );
79+ flow_ring_ = new LockLessQueue<Flow*> (max_number_flows_, true , true );
7480 if (err_num != 0 ) {
7581 return CommandFailure (-err_num);
7682 }
7783
84+ /* get flow queue generator */
85+ if (arg.type () == DRRArg::CODEL) {
86+ const DRRArg::Codel& codel = arg.codel ();
87+ uint64_t target;
88+ uint64_t window;
89+ if (!(target = codel.target ())) {
90+ target = Codel<bess::Packet*>::kDefaultTarget ;
91+ }
92+
93+ if (!(window = codel.window ())) {
94+ window = Codel<bess::Packet*>::kDefaultWindow ;
95+ }
96+
97+ generator_ =
98+ Codel<bess::Packet*>::Factory (bess::Packet::Free, max_queue_size_, target, window);
99+ } else {
100+ const DRRArg::Llring& ring = arg.ring ();
101+ size_t size;
102+ if (!(size = ring.size ())) {
103+ size = LockLessQueue<bess::Packet*>::kDefaultRingSize ;
104+ }
105+
106+ generator_ = LockLessQueue<bess::Packet*>::Factory (
107+ size, !ring.multiple_producer (), !ring.multiple_consumer ());
108+ }
109+
78110 return CommandSuccess ();
79111}
80112
@@ -101,7 +133,7 @@ void DRR::ProcessBatch(bess::PacketBatch* batch) {
101133 // if the Flow doesn't exist create one
102134 // and add the packet to the new Flow
103135 if (it == nullptr ) {
104- if (llring_full ( flow_ring_)) {
136+ if (flow_ring_-> Full ( )) {
105137 bess::Packet::Free (pkt);
106138 } else {
107139 AddNewFlow (pkt, id, &err);
@@ -140,7 +172,7 @@ struct task_result DRR::RunTask(void*) {
140172uint32_t DRR::GetNextBatch (bess::PacketBatch* batch, int * err) {
141173 Flow* f;
142174 uint32_t total_bytes = 0 ;
143- uint32_t count = llring_count ( flow_ring_);
175+ uint32_t count = flow_ring_-> Size ( );
144176 if (current_flow_) {
145177 count++;
146178 }
@@ -155,33 +187,30 @@ uint32_t DRR::GetNextBatch(bess::PacketBatch* batch, int* err) {
155187 if (batch_size == batch->cnt ()) {
156188 break ;
157189 } else {
158- count = llring_count ( flow_ring_);
190+ count = flow_ring_-> Size ( );
159191 batch_size = batch->cnt ();
160192 }
161193 }
162194 count--;
163195
164196 f = GetNextFlow (err);
165- if (*err != 0 ) {
197+ if (*err) {
166198 return total_bytes;
167199 } else if (f == nullptr ) {
168200 continue ;
169201 }
170202
171- uint32_t bytes = GetNextPackets (batch, f, err );
203+ uint32_t bytes = GetNextPackets (batch, f);
172204 total_bytes += bytes;
173- if (*err != 0 ) {
174- return total_bytes;
175- }
176205
177- if (llring_empty ( f->queue ) && !f->next_packet ) {
206+ if (f->queue -> Empty ( ) && !f->next_packet ) {
178207 f->deficit = 0 ;
179208 }
180209
181210 // if the flow doesn't have any more packets to give, reenqueue it
182211 if (!f->next_packet || f->next_packet ->total_len () > f->deficit ) {
183- *err = llring_enqueue ( flow_ring_, f);
184- if (*err != 0 ) {
212+ *err = flow_ring_-> Push ( f);
213+ if (*err) {
185214 return total_bytes;
186215 }
187216 } else {
@@ -198,18 +227,18 @@ DRR::Flow* DRR::GetNextFlow(int* err) {
198227 double now = get_epoch_time ();
199228
200229 if (!current_flow_) {
201- *err = llring_dequeue ( flow_ring_, reinterpret_cast < void **>(&f) );
202- if (*err < 0 ) {
230+ *err = flow_ring_-> Pop (f );
231+ if (*err) {
203232 return nullptr ;
204233 }
205234
206- if (llring_empty ( f->queue ) && !f->next_packet ) {
235+ if (f->queue -> Empty ( ) && !f->next_packet ) {
207236 // if the flow expired, remove it
208237 if (now - f->timer > kTtl ) {
209238 RemoveFlow (f);
210239 } else {
211- *err = llring_enqueue ( flow_ring_, f);
212- if (*err < 0 ) {
240+ *err = flow_ring_-> Push ( f);
241+ if (*err) {
213242 return nullptr ;
214243 }
215244 }
@@ -224,15 +253,15 @@ DRR::Flow* DRR::GetNextFlow(int* err) {
224253 return f;
225254}
226255
227- uint32_t DRR::GetNextPackets (bess::PacketBatch* batch, Flow* f, int * err ) {
256+ uint32_t DRR::GetNextPackets (bess::PacketBatch* batch, Flow* f) {
228257 uint32_t total_bytes = 0 ;
229258 bess::Packet* pkt;
230259
231- while (!batch->full () && (!llring_empty ( f->queue ) || f->next_packet )) {
260+ while (!batch->full () && (!f->queue -> Empty ( ) || f->next_packet )) {
232261 // makes sure there isn't already a packet at the front
233262 if (!f->next_packet ) {
234- * err = llring_dequeue ( f->queue , reinterpret_cast < void **>(& pkt) );
235- if (* err < 0 ) {
263+ int err = f->queue -> Pop ( pkt);
264+ if (err) {
236265 return total_bytes;
237266 }
238267 } else {
@@ -274,21 +303,21 @@ void DRR::AddNewFlow(bess::Packet* pkt, FlowId id, int* err) {
274303 Flow* f = new Flow (id);
275304
276305 // TODO(joshua) do proper error checking
277- f->queue = AddQueue ( static_cast < int >( kFlowQueueSize ), err );
306+ f->queue = generator_ ( );
278307
279- if (*err != 0 ) {
308+ if (*err) {
280309 return ;
281310 }
282311
283312 flows_.Insert (id, f);
284313
285314 Enqueue (f, pkt, err);
286- if (*err != 0 ) {
315+ if (*err) {
287316 return ;
288317 }
289318
290319 // puts flow in round robin
291- *err = llring_enqueue ( flow_ring_, f);
320+ *err = flow_ring_-> Push ( f);
292321}
293322
294323void DRR::RemoveFlow (Flow* f) {
@@ -299,77 +328,34 @@ void DRR::RemoveFlow(Flow* f) {
299328 delete f;
300329}
301330
302- llring* DRR::AddQueue (uint32_t slots, int * err) {
303- int bytes = llring_bytes_with_slots (slots);
304- int ret;
305-
306- llring* queue = static_cast <llring*>(aligned_alloc (alignof (llring), bytes));
307- if (!queue) {
308- *err = -ENOMEM;
309- return nullptr ;
310- }
311-
312- ret = llring_init (queue, slots, 1 , 1 );
313- if (ret) {
314- std::free (queue);
315- *err = -EINVAL;
316- return nullptr ;
317- }
318- return queue;
319- }
320331
321332void DRR::Enqueue (Flow* f, bess::Packet* newpkt, int * err) {
322333 // if the queue is full. drop the packet.
323- if (llring_count ( f->queue ) >= max_queue_size_) {
334+ if (f->queue -> Size ( ) >= max_queue_size_) {
324335 bess::Packet::Free (newpkt);
325336 return ;
326337 }
327338
328339 // creates a new queue if there is not enough space for the new packet
329340 // in the old queue
330- if (llring_full ( f->queue )) {
341+ if (f->queue -> Full ( )) {
331342 uint32_t slots =
332- RoundToPowerTwo (llring_count ( f->queue ) * kQueueGrowthFactor );
333- f-> queue = ResizeQueue ( f->queue , slots, err );
334- if (*err != 0 ) {
343+ RoundToPowerTwo (f->queue -> Size ( ) * kQueueGrowthFactor );
344+ *err = f->queue -> Resize ( slots);
345+ if (*err) {
335346 bess::Packet::Free (newpkt);
336347 return ;
337348 }
338349 }
339350
340- *err = llring_enqueue ( f->queue , reinterpret_cast < void *> (newpkt) );
341- if (*err == 0 ) {
351+ *err = f->queue -> Push (newpkt);
352+ if (! *err) {
342353 f->timer = get_epoch_time ();
343354 } else {
344355 bess::Packet::Free (newpkt);
345356 }
346357}
347358
348- llring* DRR::ResizeQueue (llring* old_queue, uint32_t new_size, int * err) {
349- llring* new_queue = AddQueue (new_size, err);
350- if (*err != 0 ) {
351- return nullptr ;
352- }
353-
354- // migrates packets from the old queue
355- if (old_queue) {
356- bess::Packet* pkt;
357-
358- while (llring_dequeue (old_queue, reinterpret_cast <void **>(&pkt)) == 0 ) {
359- *err = llring_enqueue (new_queue, pkt);
360- if (*err == -LLRING_ERR_NOBUF) {
361- bess::Packet::Free (pkt);
362- *err = 0 ;
363- } else if (*err != 0 ) {
364- std::free (new_queue);
365- return nullptr ;
366- }
367- }
368-
369- std::free (old_queue);
370- }
371- return new_queue;
372- }
373359
374360CommandResponse DRR::SetQuantumSize (uint32_t size) {
375361 if (size == 0 ) {
0 commit comments