|
| 1 | +#ifndef BESS_UTILS_CODEL_H_ |
| 2 | +#define BESS_UTILS_CODEL_H_ |
| 3 | + |
| 4 | +#include <cmath> |
| 5 | +#include <deque> |
| 6 | + |
| 7 | +#include <glog/logging.h> |
| 8 | + |
| 9 | +#include "time.h" |
| 10 | +#include "queue.h" |
| 11 | + |
| 12 | + |
| 13 | +namespace bess { |
| 14 | +namespace utils { |
| 15 | +// Codel(Controlled Delay Management) is an Queue controller based on this |
| 16 | +// article http://queue.acm.org/detail.cfm?id=2209336 |
| 17 | + |
| 18 | +// It provides an active queue management to help prevent bufferbloat by dropping |
| 19 | +// queue entries at an increasing rate if the delay in the queue is above the |
| 20 | +// target queue delay. The equation used to calculate drop intervals is based on TCP |
| 21 | +// throughput response to drop probability. |
| 22 | + |
| 23 | +// template argument T is the type that is going to be enqueued/dequeued. |
| 24 | +template <typename T> |
| 25 | +class Codel final: public Queue<T> { |
| 26 | + public: |
| 27 | + // default delay target for codel |
| 28 | + static const uint64_t kDefaultTarget = 5000000; |
| 29 | + // default window size for codel |
| 30 | + static const uint64_t kDefaultWindow = 100000000; |
| 31 | + // default number of slots in the codel queue |
| 32 | + static const int kDefaultSlots = 4096; |
| 33 | + |
| 34 | + typedef std::pair<uint64_t, T> Wrapper; |
| 35 | + |
| 36 | + // Takes a drop function which is a function that should take a dropped object |
| 37 | + // and handle it removing the object potentially including freeing the object. |
| 38 | + // If there is no need to handle a dropped object, NULL can be passed instead. |
| 39 | + // target is the target delay in nanoseconds and the window is the buffer time u |
| 40 | + // in nanosecond before changing into drop state. |
| 41 | + Codel(void (*drop_func)(T)= NULL, size_t max_entries=0, uint64_t target = kDefaultTarget, |
| 42 | + uint64_t window = kDefaultWindow) |
| 43 | + : delay_target_(target), |
| 44 | + window_(window), |
| 45 | + time_above_target_(0), |
| 46 | + next_drop_time_(NanoSecondTime() + window), |
| 47 | + drop_count_(0), |
| 48 | + dropping_(0), |
| 49 | + max_size_(max_entries), |
| 50 | + queue_(), |
| 51 | + drop_func_(drop_func) { } |
| 52 | + |
| 53 | + virtual ~Codel() { |
| 54 | + Wrapper w; |
| 55 | + while (!queue_.empty()) { |
| 56 | + Drop(queue_.front()); |
| 57 | + queue_.pop_front(); |
| 58 | + } |
| 59 | + } |
| 60 | + |
| 61 | + int Push(T obj) override { |
| 62 | + if (max_size_ != 0 && queue_.size() >= max_size_) { |
| 63 | + return -1; |
| 64 | + } |
| 65 | + Wrapper w(NanoSecondTime(), obj); |
| 66 | + queue_.push_back(w); |
| 67 | + return 0; |
| 68 | + } |
| 69 | + |
| 70 | + int Push(T* ptr, size_t count) override { |
| 71 | + size_t i = 0; |
| 72 | + for (; i < count; i++) { |
| 73 | + if (Push(ptr[i])) { |
| 74 | + break; |
| 75 | + } |
| 76 | + } |
| 77 | + return i; |
| 78 | + } |
| 79 | + |
| 80 | + // Retrieves the next entry from the queue and in the process, potentially drops |
| 81 | + // objects as well as changes between dropping state and not dropping state. |
| 82 | + int Pop(T &obj) override { |
| 83 | + bool drop = false; |
| 84 | + Wrapper w; |
| 85 | + int err = RingDequeue(w, drop); |
| 86 | + if (err != 0) { |
| 87 | + dropping_ = 0; |
| 88 | + return -2; |
| 89 | + } |
| 90 | + |
| 91 | + uint64_t now = NanoSecondTime(); |
| 92 | + if (dropping_) { |
| 93 | + // if in dropping state, drop object until next drop time is greater |
| 94 | + // than the current time. |
| 95 | + err = DropDequeue(w, drop); |
| 96 | + } else if (drop && ((now - next_drop_time_ < window_) || |
| 97 | + (now - time_above_target_ >= window_))) { |
| 98 | + // if not in dropping state, determine whether to enter drop state and if |
| 99 | + // so, drop current object, get a new object and reset the drop counter. |
| 100 | + Drop(w); |
| 101 | + err = RingDequeue(w, drop); |
| 102 | + |
| 103 | + if (err == 0) { |
| 104 | + dropping_ = 1; |
| 105 | + if (now - next_drop_time_ < window_ && drop_count_ > 2) { |
| 106 | + drop_count_ -= 2; |
| 107 | + } else { |
| 108 | + drop_count_ = 1; |
| 109 | + } |
| 110 | + next_drop_time_ = NextDrop(now); |
| 111 | + } |
| 112 | + } |
| 113 | + |
| 114 | + // if there was a wrapper to dequeue, set the parameter object to the |
| 115 | + // wrapper's object |
| 116 | + if (err == 0) { |
| 117 | + obj = w.second; |
| 118 | + } |
| 119 | + return err; |
| 120 | + } |
| 121 | + |
| 122 | + // Retrieves the next count entries from the queue and in the process, potentially |
| 123 | + // drops objects as well as changes between dropping state and not dropping state. |
| 124 | + // Does not necessarily return count if there are count present but some are dropped. |
| 125 | + int Pop(T* objs, size_t count) override { |
| 126 | + size_t i = 0; |
| 127 | + T next_obj; |
| 128 | + for (; i < count; i++) { |
| 129 | + int err = Pop(next_obj); |
| 130 | + if (err != 0) { |
| 131 | + break; |
| 132 | + } |
| 133 | + objs[i] = next_obj; |
| 134 | + } |
| 135 | + return i; |
| 136 | + } |
| 137 | + |
| 138 | + size_t Capacity() override { return queue_.max_size(); } |
| 139 | + |
| 140 | + bool Empty() override { return queue_.empty(); } |
| 141 | + |
| 142 | + bool Full() override { |
| 143 | + if (max_size_ != 0) { |
| 144 | + return max_size_ == queue_.size(); |
| 145 | + } |
| 146 | + return queue_.size() == queue_.max_size(); |
| 147 | + } |
| 148 | + |
| 149 | + size_t Size() override { return queue_.size(); } |
| 150 | + |
| 151 | + private: |
| 152 | + // Calls the drop_func on the object if the drop function exists |
| 153 | + void Drop(Wrapper w) { |
| 154 | + if (drop_func_ != NULL) { |
| 155 | + drop_func_(w.second); |
| 156 | + } |
| 157 | + } |
| 158 | + |
| 159 | + // Takes the relative time to determine the next time to drop. |
| 160 | + // returns the next time to drop a object. |
| 161 | + uint64_t NextDrop(uint64_t cur_time) { |
| 162 | + return cur_time + window_ * pow(drop_count_, -.5); |
| 163 | + } |
| 164 | + |
| 165 | + // Gets the next object from the queue and determines based on current state, |
| 166 | + // whether set the passed drop boolean to true(to tell the calling function to |
| 167 | + // drop it). Takes a Wrapper to set to the next entry in the queue and a boolean |
| 168 | + // to set if the entry should be dropped. Returns 0 on success. |
| 169 | + int RingDequeue(Wrapper &w, bool &drop) { |
| 170 | + if (!queue_.empty()) { |
| 171 | + w = queue_.front(); |
| 172 | + queue_.pop_front(); |
| 173 | + } else { |
| 174 | + return -1; |
| 175 | + } |
| 176 | + |
| 177 | + uint64_t now = NanoSecondTime(); |
| 178 | + uint64_t delay_time = now - w.first; |
| 179 | + |
| 180 | + // determine whether object should be dropped or to change state |
| 181 | + if (delay_time < delay_target_) { |
| 182 | + time_above_target_ = 0; |
| 183 | + } else { |
| 184 | + if (time_above_target_ == 0) { |
| 185 | + time_above_target_ = now + window_; |
| 186 | + } else if (now >= time_above_target_) { |
| 187 | + drop = true; |
| 188 | + } |
| 189 | + } |
| 190 | + return 0; |
| 191 | + } |
| 192 | + |
| 193 | + // Called while Codel is in drop state to determine whether to drop the current |
| 194 | + // entries and dequeue the next entry. Will continue to drop entries until the |
| 195 | + // next drop is greater than the current time. Takes a Wrapper which is the next |
| 196 | + // entry in the queue which will potentially be replaced and a boolean determing |
| 197 | + // if the entry should be dropped. Returns 0 on success. |
| 198 | + int DropDequeue(Wrapper &w, bool &drop) { |
| 199 | + uint64_t now = NanoSecondTime(); |
| 200 | + if (!drop) { |
| 201 | + dropping_ = 0; |
| 202 | + } else if (now >= next_drop_time_) { |
| 203 | + while (now >= next_drop_time_ && dropping_) { |
| 204 | + Drop(w); |
| 205 | + drop_count_++; |
| 206 | + int err = RingDequeue(w, drop); |
| 207 | + |
| 208 | + if (err != 0 || !drop) { |
| 209 | + dropping_ = 0; |
| 210 | + return err; |
| 211 | + } |
| 212 | + next_drop_time_ = NextDrop(next_drop_time_); |
| 213 | + } |
| 214 | + } |
| 215 | + return 0; |
| 216 | + } |
| 217 | + |
| 218 | + // Returns the current time in microseconds. |
| 219 | + uint64_t NanoSecondTime() { |
| 220 | + return tsc_to_ns(rdtsc()); |
| 221 | + } |
| 222 | + |
| 223 | + uint64_t delay_target_; // the delay that codel will adjust for |
| 224 | + uint64_t window_; // minimum time before changing state |
| 225 | + |
| 226 | + // the time at which codel will change state to above target(0 if below) |
| 227 | + uint64_t time_above_target_; |
| 228 | + uint64_t next_drop_time_; // the next time codel will drop |
| 229 | + |
| 230 | + // the number of objects dropped while delay has been above target |
| 231 | + uint32_t drop_count_; |
| 232 | + uint8_t dropping_; // whether in dropping state(above target for window) |
| 233 | + size_t max_size_; |
| 234 | + std::deque<Wrapper> queue_; // queue |
| 235 | + void (*drop_func_)(T); // the function to call to drop a value |
| 236 | +}; |
| 237 | + |
| 238 | +} // namespace utils |
| 239 | +} // namespace bess |
| 240 | + |
| 241 | +#endif // BESS_UTILS_CODEL_H_ |
0 commit comments