1 ///
2 module timingwheels.timingwheels_impl;
3 
4 import std.datetime;
5 import std.exception;
6 import std.typecons;
7 import std.format;
8 import std.traits;
9 import std.range;
10 import std.algorithm;
11 import std.experimental.logger;
12 
13 import std.experimental.allocator;
14 import std.experimental.allocator.mallocator: Mallocator;
15 
16 import core.thread;
17 import core.memory;
18 
19 import ikod.containers.hashmap;
20 import automem;
21 
22 version(twtesting)
23 {
24     import unit_threaded;
25 }
26 
27 
28 version(twtesting)
29 {
30     private class Timer
31     {
32         static ulong _current_id;
33         private
34         {
35             ulong   _id;
36         }
37         this() @safe @nogc
38         {
39             _id = _current_id;
40             _current_id++;
41         }
42         ~this() @safe @nogc
43         {
44 
45         }
46         ulong id() @safe @nogc
47         {
48             return _id;
49         }
50         override string toString()
51         {
52             return "%d:%s".format(_id);
53         }
54     }
55 }
56 ///
57 /// scheduling error occurs at schedule() when ticks == 0 or timer already scheduled.
58 ///
59 ///
60 class ScheduleTimerError: Exception
61 {
62     ///
63     this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe
64     {
65         super(msg, file, line);
66     }
67 }
68 ///
69 /// Cancel timer error occurs if you try to cancel timer which is not scheduled.
70 ///
71 class CancelTimerError: Exception
72 {
73     ///
74     this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe
75     {
76         super(msg, file, line);
77     }
78 }
79 ///
80 /// Advancing error occurs if number of ticks for advance not in range 0<t<=256
81 ///
82 class AdvanceWheelError: Exception
83 {
84     ///
85     ///
86     ///
87     this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe
88     {
89         super(msg, file, line);
90     }
91 }
92 
93 debug(timingwheels) @safe @nogc nothrow
94 {
95     package
96     void safe_tracef(A...)(string f, scope A args, string file = __FILE__, int line = __LINE__) @safe @nogc nothrow
97     {
98         bool osx,ldc;
99         version(OSX)
100         {
101             osx = true;
102         }
103         version(LDC)
104         {
105             ldc = true;
106         }
107         debug (timingwheels) try
108         {
109             // this can fail on pair ldc2/osx, see https://github.com/ldc-developers/ldc/issues/3240
110             if (!osx || !ldc)
111             {
112                 () @trusted @nogc {tracef("%s:%d " ~ f, file, line, args);}();
113             }
114         }
115         catch(Exception e)
116         {
117         }
118     }    
119 }
120 
121 pragma(inline)
122 private void dl_insertFront(L)(L *le, L** head)
123 {
124     if ( *head == null)
125     {
126         le.next = le.prev = le;
127     }
128     else
129     {
130         auto curr_head = *head;
131         le.prev = curr_head.prev;
132         le.next = curr_head;
133         curr_head.prev.next = le;
134         curr_head.prev = le;
135     }
136     *head = le;
137 }
138 
139 pragma(inline)
140 private void dl_unlink(L)(L *le, L** head)
141 in(*head != null)
142 {
143     if (le.next == le && *head == le)
144     {
145         *head = null;
146         return;
147     }
148     if (le == *head)
149     {
150         *head = le.next;        
151     }
152     le.next.prev = le.prev;
153     le.prev.next = le.next;
154 }
155 pragma(inline)
156 private void dl_walk(L)(L** head)
157 {
158     if (*head == null)
159     {
160         return;
161     }
162     auto le = *head;
163     do
164     {
165         le = le.next;
166     } while (le != *head);
167 }
168 pragma(inline)
169 private void dl_relink(L)(L* le, L** head_from, L** head_to)
170 in(le.prev !is null && le.next !is null)
171 {
172     dl_unlink(le, head_from);
173     dl_insertFront(le, head_to);
174 }
175 
176 @("dl")
177 unittest
178 {
179     globalLogLevel = LogLevel.info;
180     struct LE
181     {
182         int p;
183         LE *next;
184         LE *prev;
185     }
186     LE* head1 = null;
187     LE* head2 = null;
188     auto le1 = new LE(1);
189     auto le2 = new LE(2);
190     dl_insertFront(le1, &head1);
191     assert(head1 != null);
192     dl_unlink(le1, &head1);
193     assert(head1 == null);
194 
195     dl_insertFront(le1, &head1);
196     assert(head1 != null);
197     dl_insertFront(le2, &head1);
198     dl_unlink(le1, &head1);
199     assert(head1 != null);
200     dl_unlink(le2, &head1);
201     assert(head1 == null);
202 
203     dl_insertFront(le1, &head1);
204     assert(head1 != null);
205     dl_insertFront(le2, &head1);
206     dl_unlink(le2, &head1);
207     assert(head1 != null);
208     dl_unlink(le1, &head1);
209     assert(head1 == null);
210 
211     dl_insertFront(le1, &head1);
212     dl_relink(le1, &head1, &head2);
213     assert(head1 == null);
214     assert(head2 != null);
215 }
216 ///
217 /// This structure implements scheme 6.2 thom the
218 /// http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf 
219 /// It supports several primitives:
220 /// 1. schedule timer in the future.
221 /// 2. cancel timer.
222 /// 3. time step (advance) - all timers expired at current time tick are extracted from wheels.
223 /// Each operation take O(1) time.
224 /// 
225 struct TimingWheels(T)
226 {
227     import core.bitop: bsr;
228 
229     private
230     {
231         alias TimerIdType = ReturnType!(T.id);
232         alias allocator = Mallocator.instance;
233 
234         enum MASK = 0xff;
235         enum LEVELS = 8;
236         enum LEVEL_MAX = LEVELS - 1;
237         enum SLOTS  = 256;
238         enum FreeListMaxLen = 100;
239 
240         struct ListElement(T)
241         {
242             private
243             {
244                 T               timer;
245                 ulong           scheduled_at;
246                 ushort          position;
247                 ListElement!T*  prev, next;
248             }
249         }
250         struct Slot
251         {
252             ListElement!T* head;
253         }
254         struct Level
255         {
256             // now if counter of ticks processed on this level
257             ulong       now;
258             Slot[SLOTS] slots;
259         }
260 
261         Level[LEVELS]   levels;
262         ListElement!T*  freeList;
263         int             freeListLen;
264         HashMap!(TimerIdType, ListElement!T*)
265                         ptrs;
266         long            startedAt;
267     }
268     invariant
269     {
270         assert(freeListLen>=0);
271     }
272     alias Ticks = ulong; // ticks are 64 bit unsigned integers.
273 
274     // hashing ticks to slots
275     // 8 levels, each level 256 slots, with of slot on each level 256 times
276     // translate ticks to level
277     // 0x00_00_00_00_00_00_00_00 <- ticks
278     //   ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓
279     //   □  □  □  □  □  □  □  □ \
280     //   □  □  □  □  □  □  □  □ |
281     //   .  .  .  .  .  .  .  . | 256 slots
282     //   .  .  .  .  .  .  .  . |
283     //   □  □  □  □  □  □  □  □ /
284     //   7  6  5  4  3  2  1  0
285     //                          <- 8 levels
286     // each slot - double linked list of timers
287 
288     // ticks to level = bsr(ticks)/8
289     pragma(inline) private pure int t2l(ulong t) @safe @nogc nothrow
290     {
291         if (t == 0)
292         {
293             return 0;
294         }
295         return bsr(t)/LEVELS;
296     }
297     // ticks to slot  = ticks >> (level*8)
298     pragma(inline) private pure int t2s(ulong t, int l) @safe @nogc nothrow
299     {
300         return (t >> (l<<3)) & MASK;
301     }
302     // level to ticks
303     // l[0] -> 256
304     // l[1] -> 256*256
305     // ...
306     pragma(inline) private pure ulong l2t(int l) @safe @nogc nothrow
307     {
308         return SLOTS<<l;
309     }
310     ~this()
311     {
312         ptrs.clear;
313         for(int l=0;l<=LEVEL_MAX;l++)
314             for(int s=0; s<SLOTS; s++)
315             {
316                 while(levels[l].slots[s].head)
317                 {
318                     auto le = levels[l].slots[s].head;
319                     dl_unlink(le, &levels[l].slots[s].head);
320                     () @trusted {
321                         GC.removeRange(le);
322                         dispose(allocator, le);
323                     }();
324                 }
325             }
326         while(freeList)
327         {
328             assert(freeListLen>0);
329             auto n = freeList.next;
330             () @trusted {
331                 GC.removeRange(freeList);
332                 dispose(allocator, freeList);
333             }();
334             freeListLen--;
335             freeList = n;
336         }
337     }
338 
339     private ListElement!T* getOrCreate()
340     {
341         ListElement!T* result;
342         if (freeList !is null)
343         {
344             result = freeList;
345             freeList = freeList.next;
346             freeListLen--;
347             return result;
348         }
349         result = make!(ListElement!T)(allocator);
350         () @trusted {
351             GC.addRange(result, (*result).sizeof);
352         }();
353         return result;
354     }
355     private void returnToFreeList(ListElement!T* le)
356     {
357         if ( freeListLen >= FreeListMaxLen )
358         {
359             // this can be safely disposed as we do not leak ListElements outide this module
360             () @trusted {
361                 GC.removeRange(le);
362                 dispose(allocator, le);
363             }();
364         }
365         else
366         {
367             le.position = 0xffff;
368             le.next = freeList;
369             freeList = le;
370             freeListLen++;
371         }
372     }
373     ///
374     /// Schedule timer to future
375     ///Params: 
376     /// timer = timer to schedule;
377     /// ticks = ticks in the future to schedule timer. (0 < ticks < ulong.max);
378     ///Returns:
379     ///  void
380     ///Throws: 
381     /// ScheduleTimerError
382     ///   when thicks == 0
383     ///   or when timer already scheduled
384     ///
385     void schedule(T)(T timer, const ulong ticks)
386     {
387         if (ticks == 0)
388         {
389             throw new ScheduleTimerError("ticks can't be 0");
390         }
391         auto timer_id = timer.id();
392         if (ptrs.contains(timer_id))
393         {
394             throw new ScheduleTimerError("Timer already scheduled");
395         }
396         if (startedAt == 0)
397         {
398             startedAt = Clock.currStdTime;
399         }
400         auto level_index = t2l(ticks);
401         auto level = &levels[level_index];
402         auto slot_index  = (level.now + t2s(ticks, level_index)) & MASK;
403         auto slot        = &levels[level_index].slots[slot_index];
404         auto le = getOrCreate();
405         le.timer = timer;
406         le.position = ((level_index << 8 ) | slot_index) & 0xffff;
407         le.scheduled_at = levels[0].now + ticks;
408         dl_insertFront(le, &slot.head);
409         ptrs[timer_id] = le;
410         debug(timingwheels) safe_tracef("scheduled timer id: %s, ticks: %s, level: %s, slot %s",
411             timer_id, ticks, level_index, slot_index);
412     }
413     /// Cancel timer
414     ///Params: 
415     /// timer = timer to cancel
416     ///Returns: 
417     /// void
418     ///Throws: 
419     /// CancelTimerError
420     ///  if timer not in wheel
421     void cancel(T)(T timer)
422     {
423         // get list element pointer
424         auto v = ptrs.fetch(timer.id());
425         if ( !v.ok )
426         {
427             throw new CancelTimerError("Cant find timer to cancel");
428         }
429         auto le = v.value;
430         immutable level_index = le.position>>8;
431         immutable slot_index  = le.position & 0xff;
432         assert(timer is le.timer);
433         debug(timingwheels) safe_tracef("cancel timer, l:%d, s:%d", level_index, slot_index);
434         dl_unlink(le, &levels[level_index].slots[slot_index].head);
435         returnToFreeList(le);
436         ptrs.remove(timer.id());
437     }
438     ///
439     /// count "empty" ticks - slots without events.
440     /// If you have empty ticks it is safe to sleep - you will not miss anything, just wake up
441     /// at the time when next timer have to be processed.
442     ///Returns: number of empty ticks.
443     ///
444     int ticksUntilNextEvent()
445     out(r; r<=256)
446     {
447         int result = 1;
448         auto level = &levels[0];
449         immutable now = levels[0].now & MASK;
450         auto slot = (now + 1) & MASK;
451         assert(level.slots[now].head == null);
452         do
453         {
454             if (level.slots[slot].head != null)
455             {
456                 break;
457             }
458             result++;
459             slot = (slot + 1) & MASK;
460         }
461         while(slot != now);
462         return result;
463     }
464     /// Time until next scheduled timer event.
465     ///Params: 
466     /// tick = your accepted tick duration. 
467     ///Returns: msecs until next event. Can be zero or negative in case you have already expired events.
468     ///
469     Duration timeUntilNextEvent(const Duration tick)
470     {
471         if (startedAt == 0)
472         {
473             return Duration.max;
474         }
475         immutable n = ticksUntilNextEvent();
476         immutable target = startedAt + n * tick.split!"hnsecs".hnsecs;
477         auto delta =  (target - Clock.currStdTime).hnsecs;
478         return delta;
479     }
480     ///
481     /// Adnvance wheel and return all timers expired during wheel turn.
482     //
483     /// Params:
484     ///   ticks = how many ticks to advance. Must be in range 0 <= 256
485     /// Returns: list of expired timers
486     ///
487     auto advance(this W)(ulong ticks)
488     {
489         struct ExpiredTimers
490         {
491             HashMap!(TimerIdType, T)    _map;
492             auto timers()
493             {
494                 return _map.byValue;
495             }
496         }
497         alias AdvanceResult = automem.RefCounted!(ExpiredTimers, Mallocator);
498         if (ticks > l2t(0))
499         {
500             throw new AdvanceWheelError("You can't advance that much");
501         }
502         if (ticks == 0)
503         {
504             throw new AdvanceWheelError("ticks must be > 0");
505         }
506         auto      result = AdvanceResult(ExpiredTimers());
507         auto      level  = &levels[0];
508 
509         scope(exit)
510         {
511             startedAt = Clock.currStdTime;
512         }
513 
514         while(ticks)
515         {
516             ticks--;
517             immutable now           = ++level.now;
518             immutable slot_index    = now & MASK;
519             auto      slot = &level.slots[slot_index];
520             debug(timingwheels) safe_tracef("level 0, now=%s", now);
521             if (slot_index == 0)
522             {
523                 advance_level(1);
524             }
525             while(slot.head)
526             {
527                 auto le = slot.head;
528                 auto timer = le.timer;
529                 auto timer_id = timer.id();
530                 assert(!result._map.contains(timer_id), "Something wrong: we try to return same timer twice");
531                 debug(timingwheels) safe_tracef("return timer id: %s, scheduled at %s", timer_id, le.scheduled_at);
532                 result._map[timer_id] = timer;
533                 dl_unlink(le, &slot.head);
534                 returnToFreeList(le);
535                 ptrs.remove(timer.id());
536             }
537         }
538         return result;
539     }
540 
541     private void advance_level(int level_index)
542     in(level_index>0)
543     {
544         debug(timingwheels) safe_tracef("running advance on level %d", level_index);
545         immutable now0 = levels[0].now;
546         auto      level  = &levels[level_index];
547         immutable now    = ++level.now;
548         immutable slot_index = now & MASK;
549         debug(timingwheels) safe_tracef("level %s, now=%s", level_index, now);
550         if (slot_index == 0 && level_index < LEVEL_MAX)
551         {
552             advance_level(level_index+1);
553         }
554         auto slot = &level.slots[slot_index];
555         debug(timingwheels) safe_tracef("haldle l%s:s%s timers", level_index, slot_index);
556         while(slot.head)
557         {
558             auto listElement = slot.head;
559 
560             immutable delta = listElement.scheduled_at - now0;
561             immutable lower_level_index = t2l(delta);
562             immutable lower_level_slot_index  = t2s(delta, lower_level_index);
563             debug(timingwheels) safe_tracef("move timer id: %s to level %s, slot: %s",
564                 listElement.timer.id(), lower_level_index, lower_level_slot_index, delta);
565             listElement.position = ((lower_level_index<<8) | lower_level_slot_index) & 0xffff;
566             dl_relink(listElement, &slot.head, &levels[lower_level_index].slots[lower_level_slot_index].head);
567         }
568     }
569 }
570 
571 
572 @("TimingWheels")
573 unittest
574 {
575     globalLogLevel = LogLevel.info;
576     TimingWheels!Timer w;
577     assert(w.t2l(1) == 0);
578     assert(w.t2s(1, 0) == 1);
579     immutable t = 0x00_00_00_11_00_00_00_77;
580     immutable level = w.t2l(t);
581     assert(level==4);
582     immutable slot = w.t2s(t, level);
583     assert(slot == 0x11);
584     auto timer = new Timer();
585     () @nogc @safe {
586         w.schedule(timer, 2);
587         bool thrown;
588         // check that you can't schedule same timer twice
589         try
590         {
591             w.schedule(timer, 5);
592         }
593         catch(ScheduleTimerError e)
594         {
595             thrown = true;
596         }
597         assert(thrown);
598         thrown = false;
599         try
600         {
601             w.advance(1024);
602         }
603         catch(AdvanceWheelError e)
604         {
605             thrown = true;
606         }
607         assert(thrown);
608         thrown = false;
609         w.cancel(timer);
610         w.advance(1);
611     }();
612     w = TimingWheels!Timer();
613     w.schedule(timer, 1);
614     auto r = w.advance(1);
615     assert(r.timers.count == 1);
616     w.schedule(timer, 256);
617     r = w.advance(255);
618     assert(r.timers.count == 0);
619     r = w.advance(1);
620     assert(r.timers.count == 1);
621     w.schedule(timer, 256*256);
622     int c;
623     for(int i=0;i<256;i++)
624     {
625         r = w.advance(256);
626         c += r.timers.count;
627     }
628     assert(c==1);
629 }
630 @("cancel")
631 unittest
632 {
633     globalLogLevel = LogLevel.info;
634     TimingWheels!Timer w;
635     Timer timer0 = new Timer();
636     Timer timer1 = new Timer();
637     w.schedule(timer0, 256);
638     w.schedule(timer1, 256+128);
639     auto r = w.advance(255);
640     assert(r.timers.count == 0);
641     w.cancel(timer0);
642     r = w.advance(1);
643     assert(r.timers.count == 0);
644     assertThrown!CancelTimerError(w.cancel(timer0));
645     w.cancel(timer1);
646 }
647 @("ticksUntilNextEvent")
648 unittest
649 {
650     globalLogLevel = LogLevel.info;
651     TimingWheels!Timer w;
652     auto s = w.ticksUntilNextEvent;
653     assert(s==256);
654     auto r = w.advance(s);
655     assert(r.timers.count == 0);
656     Timer t = new Timer();
657     w.schedule(t, 50);
658     s = w.ticksUntilNextEvent;
659     assert(s==50);
660     r = w.advance(s);
661     assert(r.timers.count == 1);
662 }
663 @("load")
664 unittest
665 {
666     import std.array:array;
667     globalLogLevel = LogLevel.info;
668     enum TIMERS = 100_000;
669     Timer._current_id = 1;
670     auto w = TimingWheels!Timer();
671     for(int i=1;i<=TIMERS;i++)
672     {
673         auto t = new Timer();
674         w.schedule(t, i);
675     }
676     int counter;
677     for(int i=1;i<=TIMERS;i++)
678     {
679         auto r = w.advance(1);
680         auto timers = r.timers;
681         auto t = timers.array()[0];
682         assert(t.id == i);
683         assert(timers.count == 1);
684         counter++;
685     }
686     assert(counter == TIMERS, "expected 100 timers, got %d".format(counter));
687 }
688 
689 ///
690 ///
691 ///
692 @("example")
693 unittest
694 {
695     import std;
696     globalLogLevel = LogLevel.info;
697     auto rnd = Random(142);
698 
699     /// track execution
700     int  counter;
701     SysTime last = Clock.currTime;
702 
703     /// this is our Timer
704     class Timer
705     {
706         static ulong __id;
707         private ulong _id;
708         private string _name;
709         this(string name)
710         {
711             _id = __id++;
712             _name = name;
713         }
714         /// must provide id() method
715         ulong id()
716         {
717             return _id;
718         }
719     }
720 
721     enum IOWakeUpInterval = 100; // to simulate random IO wakeups in interval 0 - 100.msecs
722 
723     // each tick span 5 msecs - this is our link with time in reality
724     enum Tick = 5.msecs;
725     TimingWheels!Timer w;
726 
727     auto durationToTicks(Duration d)
728     {
729         return d/Tick;
730     }
731     void process_timer(Timer t)
732     {
733         switch(t._name)
734         {
735             case "periodic":
736                 writefln("@ %s - delta: %sms (should be 50ms)", t._name, (Clock.currTime - last).split!"msecs".msecs);
737                 last = Clock.currTime;
738                 counter++;
739                 w.schedule(t, durationToTicks(50.msecs)); // rearm
740                 break;
741             default:
742                 writefln("@ %s", t._name);
743                 break;
744         }
745     }
746 
747     //
748     // start one arbitrary timer and one periodic timer
749     //
750     auto some_timer = new Timer("some");
751     auto periodic_timer = new Timer("periodic");
752     w.schedule(some_timer, durationToTicks(32.msecs));
753     w.schedule(periodic_timer, durationToTicks(50.msecs));
754 
755     while(counter < 10)
756     {
757         auto randomIoInterval = uniform(0, IOWakeUpInterval, rnd).msecs;
758         auto nextTimerEvent = max(w.timeUntilNextEvent(Tick), 0.msecs);
759         // wait for what should happen earlier
760         auto time_to_sleep = min(randomIoInterval, nextTimerEvent);
761         writefln("* sleep until timer event or random I/O for %s", time_to_sleep);
762         Thread.sleep(time_to_sleep);
763         // if we waked up early by the IO event then timeUntilNextEvent will be positive
764         // otherwise it will be <= 0 and we have something to process.
765         while(w.timeUntilNextEvent(Tick) <= 0.msecs)
766         {
767             auto ticks = w.ticksUntilNextEvent();
768             auto wr = w.advance(ticks);
769             foreach(t; wr.timers)
770             {
771                 process_timer(t);
772             }
773         }
774         // some random processing time
775         Thread.sleep(uniform(0, 5, rnd).msecs);
776     }
777 }