1 /// 2 module timingwheels.timingwheels_impl; 3 4 import std.datetime; 5 import std.exception; 6 import std.typecons; 7 import std.format; 8 import std.traits; 9 import std.range; 10 import std.algorithm; 11 import std.experimental.logger; 12 13 import std.experimental.allocator; 14 import std.experimental.allocator.mallocator: Mallocator; 15 16 import core.thread; 17 import core.memory; 18 19 import ikod.containers.hashmap; 20 import automem; 21 22 version(twtesting) 23 { 24 import unit_threaded; 25 } 26 27 28 version(twtesting) 29 { 30 private class Timer 31 { 32 static ulong _current_id; 33 private 34 { 35 ulong _id; 36 } 37 this() @safe @nogc 38 { 39 _id = _current_id; 40 _current_id++; 41 } 42 ~this() @safe @nogc 43 { 44 45 } 46 ulong id() @safe @nogc 47 { 48 return _id; 49 } 50 override string toString() 51 { 52 return "%d".format(_id); 53 } 54 } 55 } 56 /// 57 /// scheduling error occurs at schedule() when ticks == 0 or timer already scheduled. 58 /// 59 /// 60 class ScheduleTimerError: Exception 61 { 62 /// 63 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 64 { 65 super(msg, file, line); 66 } 67 } 68 /// 69 /// Cancel timer error occurs if you try to cancel timer which is not scheduled. 70 /// 71 class CancelTimerError: Exception 72 { 73 /// 74 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 75 { 76 super(msg, file, line); 77 } 78 } 79 /// 80 /// Advancing error occurs if number of ticks for advance not in range 0<t<=256 81 /// 82 class AdvanceWheelError: Exception 83 { 84 /// 85 /// 86 /// 87 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 88 { 89 super(msg, file, line); 90 } 91 } 92 93 debug(timingwheels) @safe @nogc nothrow 94 { 95 package 96 void safe_tracef(A...)(string f, scope A args, string file = __FILE__, int line = __LINE__) @safe @nogc nothrow 97 { 98 bool osx,ldc; 99 version(OSX) 100 { 101 osx = true; 102 } 103 version(LDC) 104 { 105 ldc = true; 106 } 107 debug (timingwheels) try 108 { 109 // this can fail on pair ldc2/osx, see https://github.com/ldc-developers/ldc/issues/3240 110 if (!osx || !ldc) 111 { 112 () @trusted @nogc {tracef("%s:%d " ~ f, file, line, args);}(); 113 } 114 } 115 catch(Exception e) 116 { 117 } 118 } 119 } 120 121 pragma(inline) 122 private void dl_insertFront(L)(L *le, L** head) 123 { 124 if ( *head == null) 125 { 126 le.next = le.prev = le; 127 } 128 else 129 { 130 auto curr_head = *head; 131 le.prev = curr_head.prev; 132 le.next = curr_head; 133 curr_head.prev.next = le; 134 curr_head.prev = le; 135 } 136 *head = le; 137 } 138 139 pragma(inline) 140 private void dl_unlink(L)(L *le, L** head) 141 in(*head != null) 142 { 143 if (le.next == le && *head == le) 144 { 145 *head = null; 146 return; 147 } 148 if (le == *head) 149 { 150 *head = le.next; 151 } 152 le.next.prev = le.prev; 153 le.prev.next = le.next; 154 } 155 pragma(inline) 156 private void dl_walk(L)(L** head) 157 { 158 if (*head == null) 159 { 160 return; 161 } 162 auto le = *head; 163 do 164 { 165 le = le.next; 166 } while (le != *head); 167 } 168 pragma(inline) 169 private void dl_relink(L)(L* le, L** head_from, L** head_to) 170 in(le.prev !is null && le.next !is null) 171 { 172 dl_unlink(le, head_from); 173 dl_insertFront(le, head_to); 174 } 175 176 @("dl") 177 unittest 178 { 179 globalLogLevel = LogLevel.info; 180 struct LE 181 { 182 int p; 183 LE *next; 184 LE *prev; 185 } 186 LE* head1 = null; 187 LE* head2 = null; 188 auto le1 = new LE(1); 189 auto le2 = new LE(2); 190 dl_insertFront(le1, &head1); 191 assert(head1 != null); 192 dl_unlink(le1, &head1); 193 assert(head1 == null); 194 195 dl_insertFront(le1, &head1); 196 assert(head1 != null); 197 dl_insertFront(le2, &head1); 198 dl_unlink(le1, &head1); 199 assert(head1 != null); 200 dl_unlink(le2, &head1); 201 assert(head1 == null); 202 203 dl_insertFront(le1, &head1); 204 assert(head1 != null); 205 dl_insertFront(le2, &head1); 206 dl_unlink(le2, &head1); 207 assert(head1 != null); 208 dl_unlink(le1, &head1); 209 assert(head1 == null); 210 211 dl_insertFront(le1, &head1); 212 dl_relink(le1, &head1, &head2); 213 assert(head1 == null); 214 assert(head2 != null); 215 } 216 /// 217 /// This structure implements scheme 6.2 thom the 218 /// $(LINK http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf) 219 /// and supports several primitives: 220 /// $(UL 221 /// $(LI schedule timer in the future.) 222 /// $(LI cancel timer.) 223 /// $(LI time step (advance) - all timers expired at current time tick are extracted from wheels.) 224 /// ) 225 /// Each operation take O(1) time. 226 /// 227 struct TimingWheels(T) 228 { 229 import core.bitop: bsr; 230 231 private 232 { 233 alias TimerIdType = ReturnType!(T.id); 234 alias allocator = Mallocator.instance; 235 236 enum MASK = 0xff; 237 enum LEVELS = 8; 238 enum LEVEL_MAX = LEVELS - 1; 239 enum SLOTS = 256; 240 enum FreeListMaxLen = 100; 241 242 struct ListElement(T) 243 { 244 private 245 { 246 T timer; 247 ulong scheduled_at; 248 ushort position; 249 ListElement!T* prev, next; 250 } 251 } 252 struct Slot 253 { 254 ListElement!T* head; 255 } 256 struct Level 257 { 258 // now if counter of ticks processed on this level 259 ulong now; 260 Slot[SLOTS] slots; 261 } 262 263 Level[LEVELS] levels; 264 ListElement!T* freeList; 265 int freeListLen; 266 HashMap!(TimerIdType, ListElement!T*) 267 ptrs; 268 long startedAt; 269 } 270 invariant 271 { 272 assert(freeListLen>=0); 273 } 274 alias Ticks = ulong; // ticks are 64 bit unsigned integers. 275 276 // hashing ticks to slots 277 // 8 levels, each level 256 slots, with of slot on each level 256 times 278 // translate ticks to level 279 // 0x00_00_00_00_00_00_00_00 <- ticks 280 // ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ 281 // □ □ □ □ □ □ □ □ \ 282 // □ □ □ □ □ □ □ □ | 283 // . . . . . . . . | 256 slots 284 // . . . . . . . . | 285 // □ □ □ □ □ □ □ □ / 286 // 7 6 5 4 3 2 1 0 287 // <- 8 levels 288 // each slot - double linked list of timers 289 290 // ticks to level = bsr(ticks)/8 291 pragma(inline) private pure int t2l(ulong t) @safe @nogc nothrow 292 { 293 if (t == 0) 294 { 295 return 0; 296 } 297 return bsr(t)/LEVELS; 298 } 299 // ticks to slot = ticks >> (level*8) 300 pragma(inline) private pure int t2s(ulong t, int l) @safe @nogc nothrow 301 { 302 return (t >> (l<<3)) & MASK; 303 } 304 // level to ticks 305 // l[0] -> 256 306 // l[1] -> 256*256 307 // ... 308 pragma(inline) private pure ulong l2t(int l) @safe @nogc nothrow 309 { 310 return SLOTS<<l; 311 } 312 ~this() 313 { 314 ptrs.clear; 315 for(int l=0;l<=LEVEL_MAX;l++) 316 for(int s=0; s<SLOTS; s++) 317 { 318 while(levels[l].slots[s].head) 319 { 320 auto le = levels[l].slots[s].head; 321 dl_unlink(le, &levels[l].slots[s].head); 322 () @trusted { 323 GC.removeRange(le); 324 dispose(allocator, le); 325 }(); 326 } 327 } 328 while(freeList) 329 { 330 assert(freeListLen>0); 331 auto n = freeList.next; 332 () @trusted { 333 GC.removeRange(freeList); 334 dispose(allocator, freeList); 335 }(); 336 freeListLen--; 337 freeList = n; 338 } 339 } 340 341 private ListElement!T* getOrCreate() 342 { 343 ListElement!T* result; 344 if (freeList !is null) 345 { 346 result = freeList; 347 freeList = freeList.next; 348 freeListLen--; 349 return result; 350 } 351 result = make!(ListElement!T)(allocator); 352 () @trusted { 353 GC.addRange(result, (*result).sizeof); 354 }(); 355 return result; 356 } 357 private void returnToFreeList(ListElement!T* le) 358 { 359 if ( freeListLen >= FreeListMaxLen ) 360 { 361 // this can be safely disposed as we do not leak ListElements outide this module 362 () @trusted { 363 GC.removeRange(le); 364 dispose(allocator, le); 365 }(); 366 } 367 else 368 { 369 le.position = 0xffff; 370 le.next = freeList; 371 freeList = le; 372 freeListLen++; 373 } 374 } 375 void init() 376 { 377 startedAt = Clock.currStdTime; 378 } 379 /++ 380 + Return internal view on current time - it is time at the call to $(B init) 381 + plus total number of steps multiplied by $(B tick) duration. 382 + Params: 383 + tick = tick duration 384 +/ 385 auto currStdTime(Duration tick) 386 { 387 return startedAt + levels[0].now * tick.split!"hnsecs".hnsecs; 388 } 389 /// 390 /// Schedule timer to $(B ticks) ticks forward from internal 'now'. 391 ///Params: 392 /// timer = timer to schedule; 393 /// ticks = ticks in the future to schedule timer. (0 < ticks < ulong.max); 394 ///Returns: 395 /// void 396 ///Throws: 397 /// ScheduleTimerError 398 /// when thicks == 0 399 /// or when timer already scheduled 400 /// 401 void schedule(T)(T timer, const ulong ticks) 402 { 403 if (ticks == 0) 404 { 405 throw new ScheduleTimerError("ticks can't be 0"); 406 } 407 auto timer_id = timer.id(); 408 if (ptrs.contains(timer_id)) 409 { 410 throw new ScheduleTimerError("Timer already scheduled"); 411 } 412 size_t level_index = 0; 413 long t = ticks; 414 long s = 1; // width of the slot in ticks on level 415 long shift = 0; 416 while(t > s<<8) // while t > slots on level 417 { 418 t -= (SLOTS - (levels[level_index].now & MASK)) * s; 419 level_index++; 420 s = s << 8; 421 shift += 8; 422 } 423 auto level = &levels[level_index]; 424 auto mask = s - 1; 425 size_t slot_index = (level.now + (t>>shift) + ((t&mask)>0?1:0)) & MASK; 426 auto slot = &levels[level_index].slots[slot_index]; 427 debug(timingwheels) safe_tracef("use level/slot %d/%d, level now: %d", level_index, slot_index, level.now); 428 auto le = getOrCreate(); 429 le.timer = timer; 430 le.position = ((level_index << 8 ) | slot_index) & 0xffff; 431 le.scheduled_at = levels[0].now + ticks; 432 dl_insertFront(le, &slot.head); 433 ptrs[timer_id] = le; 434 debug(timingwheels) safe_tracef("scheduled timer id: %s, ticks: %s, now: %d, scheduled at: %s to level: %s, slot %s", 435 timer_id, ticks, levels[0].now, le.scheduled_at, level_index, slot_index); 436 } 437 /// Cancel timer 438 ///Params: 439 /// timer = timer to cancel 440 ///Returns: 441 /// void 442 ///Throws: 443 /// CancelTimerError 444 /// if timer not in wheel 445 void cancel(T)(T timer) 446 { 447 // get list element pointer 448 auto v = ptrs.fetch(timer.id()); 449 if ( !v.ok ) 450 { 451 throw new CancelTimerError("Cant find timer to cancel"); 452 } 453 auto le = v.value; 454 immutable level_index = le.position>>8; 455 immutable slot_index = le.position & 0xff; 456 assert(timer is le.timer); 457 debug(timingwheels) safe_tracef("cancel timer, l:%d, s:%d", level_index, slot_index); 458 dl_unlink(le, &levels[level_index].slots[slot_index].head); 459 returnToFreeList(le); 460 ptrs.remove(timer.id()); 461 } 462 /// Number of ticks to rotate wheels until internal wheel 'now' 463 /// catch up with real world realTime. 464 /// Calculation based on time when wheels were stared and total 465 /// numer of ticks pasded. 466 ///Params: 467 /// tick = your tick length (Duration) 468 /// realTime = current real world now (Clock.currStdTime) 469 ///Returns: ticks to advance so that we catch up real world current time 470 int ticksToCatchUp(Duration tick, ulong realTime) 471 { 472 auto c = startedAt + tick.split!"hnsecs".hnsecs * levels[0].now; 473 auto v = (realTime - c) / tick.split!"hnsecs".hnsecs; 474 if ( v > 256 ) 475 { 476 return 256; 477 } 478 return cast(int)v; 479 } 480 /// Time until next scheduled timer event. 481 /// You provide tick size and current real world time. 482 /// This function find ticks until next event and use time of the start and 483 /// total steps executed to calculate time delta from $(B realNow) to next event. 484 ///Params: 485 /// tick = your accepted tick duration. 486 /// realNow = real world now, result of Clock.currStdTime 487 ///Returns: time until next event. Can be zero or negative in case you have already expired events. 488 /// 489 Duration timeUntilNextEvent(const Duration tick, ulong realNow) 490 { 491 assert(startedAt>0, "Forgot to call init()?"); 492 immutable n = ticksUntilNextEvent(); 493 immutable target = startedAt + (levels[0].now + n) * tick.split!"hnsecs".hnsecs; 494 auto delta = (target - realNow).hnsecs; 495 debug(timingwheels) safe_tracef("ticksUntilNextEvent=%s, tick=%s, startedAt=%s", n, tick, SysTime(startedAt)); 496 return delta; 497 } 498 499 /// 500 /// Adnvance wheel and return all timers expired during wheel turn. 501 // 502 /// Params: 503 /// ticks = how many ticks to advance. Must be in range 0 <= 256 504 /// Returns: list of expired timers 505 /// 506 auto advance(this W)(ulong ticks) 507 { 508 struct ExpiredTimers 509 { 510 HashMap!(TimerIdType, T) _map; 511 auto timers() 512 { 513 return _map.byValue; 514 } 515 auto length() 516 { 517 return _map.length(); 518 } 519 bool contains(TimerIdType id) 520 { 521 return _map.contains(id); 522 } 523 void remove(TimerIdType id) 524 { 525 _map.remove(id); 526 } 527 } 528 alias AdvanceResult = automem.RefCounted!(ExpiredTimers, Mallocator); 529 if (ticks > l2t(0)) 530 { 531 throw new AdvanceWheelError("You can't advance that much"); 532 } 533 if (ticks == 0) 534 { 535 throw new AdvanceWheelError("ticks must be > 0"); 536 } 537 debug(timingwheels) safe_tracef("advancing %d ticks", ticks); 538 auto result = AdvanceResult(ExpiredTimers()); 539 auto level = &levels[0]; 540 541 while(ticks) 542 { 543 ticks--; 544 immutable now = ++level.now; 545 immutable slot_index = now & MASK; 546 auto slot = &level.slots[slot_index]; 547 //debug(timingwheels) safe_tracef("level 0, now=%s", now); 548 while(slot.head) 549 { 550 auto le = slot.head; 551 auto timer = le.timer; 552 auto timer_id = timer.id(); 553 assert(!result._map.contains(timer_id), "Something wrong: we try to return same timer twice"); 554 debug(timingwheels) safe_tracef("return timer: %s, scheduled at %s", timer, le.scheduled_at); 555 result._map[timer_id] = timer; 556 dl_unlink(le, &slot.head); 557 returnToFreeList(le); 558 ptrs.remove(timer.id()); 559 } 560 if (slot_index == 0) 561 { 562 advance_level(1); 563 } 564 } 565 return result; 566 } 567 auto totalTimers() pure @safe @nogc 568 { 569 return ptrs.length(); 570 } 571 auto allTimers() @safe @nogc 572 { 573 struct AllTimers 574 { 575 HashMap!(TimerIdType, T) _map; 576 auto timers() 577 { 578 return _map.byValue; 579 } 580 auto length() 581 { 582 return _map.length(); 583 } 584 bool contains(TimerIdType id) 585 { 586 return _map.contains(id); 587 } 588 } 589 alias AllResult = automem.RefCounted!(AllTimers, Mallocator); 590 AllTimers result; 591 foreach (p; ptrs.byPair) 592 { 593 result._map[p.key] = p.value.timer; 594 } 595 return result; 596 } 597 // 598 // ticks until next event on level 0 or until next wheel rotation 599 // If you have empty ticks it is safe to sleep - you will not miss anything, just wake up 600 // at the time when next timer have to be processed. 601 //Returns: number of safe "sleep" ticks. 602 // 603 private int ticksUntilNextEvent() 604 out(r; r<=256) 605 { 606 int result = 1; 607 auto level = &levels[0]; 608 immutable uint now = levels[0].now & MASK; 609 auto slot = (now + 1) & MASK; 610 //assert(level.slots[now].head == null); 611 do 612 { 613 if (level.slots[slot].head != null) 614 { 615 break; 616 } 617 result++; 618 slot = (slot + 1) & MASK; 619 } 620 while(slot != now); 621 622 return min(result, 256-now); 623 } 624 625 private void advance_level(int level_index) 626 in(level_index>0) 627 { 628 debug(timingwheels) safe_tracef("running advance on level %d", level_index); 629 immutable now0 = levels[0].now; 630 auto level = &levels[level_index]; 631 immutable now = ++level.now; 632 immutable slot_index = now & MASK; 633 debug(timingwheels) safe_tracef("level %s, now=%s", level_index, now); 634 auto slot = &level.slots[slot_index]; 635 debug(timingwheels) safe_tracef("haldle l%s:s%s timers", level_index, slot_index); 636 while(slot.head) 637 { 638 auto listElement = slot.head; 639 640 immutable delta = listElement.scheduled_at - now0; 641 size_t lower_level_index = 0; 642 long t = delta; 643 size_t s = 1; // width of the slot in ticks on level 644 size_t shift = 0; 645 while(t > s<<8) // while t > slots on level 646 { 647 t -= (SLOTS - (levels[lower_level_index].now & MASK)) * s; 648 lower_level_index++; 649 s = s << 8; 650 shift += 8; 651 } 652 auto mask = s - 1; 653 size_t lower_level_slot_index = (levels[lower_level_index].now + (t>>shift) + ((t&mask)>0?1:0)) & MASK; 654 debug(timingwheels) safe_tracef("move timer id: %s, scheduledAt; %d to level %s, slot: %s (delta=%s)", 655 listElement.timer.id(), listElement.scheduled_at, lower_level_index, lower_level_slot_index, delta); 656 listElement.position = ((lower_level_index<<8) | lower_level_slot_index) & 0xffff; 657 dl_relink(listElement, &slot.head, &levels[lower_level_index].slots[lower_level_slot_index].head); 658 } 659 if (slot_index == 0 && level_index < LEVEL_MAX) 660 { 661 advance_level(level_index+1); 662 } 663 } 664 } 665 666 version(twtesting): 667 668 @("TimingWheels") 669 unittest 670 { 671 import std.stdio; 672 globalLogLevel = LogLevel.info; 673 TimingWheels!Timer w; 674 w.init(); 675 assert(w.t2l(1) == 0); 676 assert(w.t2s(1, 0) == 1); 677 immutable t = 0x00_00_00_11_00_00_00_77; 678 immutable level = w.t2l(t); 679 assert(level==4); 680 immutable slot = w.t2s(t, level); 681 assert(slot == 0x11); 682 auto timer = new Timer(); 683 () @nogc @safe { 684 w.schedule(timer, 2); 685 bool thrown; 686 // check that you can't schedule same timer twice 687 try 688 { 689 w.schedule(timer, 5); 690 } 691 catch(ScheduleTimerError e) 692 { 693 thrown = true; 694 } 695 assert(thrown); 696 thrown = false; 697 try 698 { 699 w.advance(1024); 700 } 701 catch(AdvanceWheelError e) 702 { 703 thrown = true; 704 } 705 assert(thrown); 706 thrown = false; 707 w.cancel(timer); 708 w.advance(1); 709 }(); 710 w = TimingWheels!Timer(); 711 w.init(); 712 w.schedule(timer, 1); 713 auto r = w.advance(1); 714 assert(r.timers.count == 1); 715 w.schedule(timer, 256); 716 r = w.advance(255); 717 assert(r.timers.count == 0); 718 r = w.advance(1); 719 assert(r.timers.count == 1); 720 w.schedule(timer, 256*256); 721 int c; 722 for(int i=0;i<256;i++) 723 { 724 r = w.advance(256); 725 c += r.timers.count; 726 } 727 assert(c==1); 728 } 729 @("rt") 730 @Tags("noauto") 731 unittest 732 { 733 globalLogLevel = LogLevel.info; 734 TimingWheels!Timer w; 735 Duration Tick = 5.msecs; 736 w.init(); 737 ulong now = Clock.currStdTime; 738 assert(now - w.currStdTime(Tick) < 5*10_000); 739 Thread.sleep(2*Tick); 740 now = Clock.currStdTime; 741 assert((now - w.currStdTime(Tick))/10_000 - (2*Tick).split!"msecs".msecs < 10); 742 auto toCatchUp = w.ticksToCatchUp(Tick, now); 743 toCatchUp.shouldEqual(2); 744 auto t = w.advance(toCatchUp); 745 toCatchUp = w.ticksToCatchUp(Tick, now); 746 toCatchUp.shouldEqual(0); 747 } 748 @("cancel") 749 unittest 750 { 751 globalLogLevel = LogLevel.info; 752 TimingWheels!Timer w; 753 w.init(); 754 Timer timer0 = new Timer(); 755 Timer timer1 = new Timer(); 756 w.schedule(timer0, 256); 757 w.schedule(timer1, 256+128); 758 auto r = w.advance(255); 759 assert(r.timers.count == 0); 760 w.cancel(timer0); 761 r = w.advance(1); 762 assert(r.timers.count == 0); 763 assertThrown!CancelTimerError(w.cancel(timer0)); 764 w.cancel(timer1); 765 } 766 @("ticksUntilNextEvent") 767 unittest 768 { 769 globalLogLevel = LogLevel.info; 770 TimingWheels!Timer w; 771 w.init(); 772 auto s = w.ticksUntilNextEvent; 773 assert(s==256); 774 auto r = w.advance(s); 775 assert(r.timers.count == 0); 776 Timer t = new Timer(); 777 w.schedule(t, 50); 778 s = w.ticksUntilNextEvent; 779 assert(s==50); 780 r = w.advance(s); 781 assert(r.timers.count == 1); 782 } 783 784 @("load") 785 @Serial 786 unittest 787 { 788 import std.array:array; 789 globalLogLevel = LogLevel.info; 790 enum TIMERS = 100_000; 791 Timer._current_id = 1; 792 auto w = TimingWheels!Timer(); 793 w.init(); 794 for(int i=1;i<=TIMERS;i++) 795 { 796 auto t = new Timer(); 797 w.schedule(t, i); 798 } 799 int counter; 800 for(int i=1;i<=TIMERS;i++) 801 { 802 auto r = w.advance(1); 803 auto timers = r.timers; 804 auto t = timers.array()[0]; 805 assert(t.id == i, "expected t.id=%s, got %s".format(t.id, i)); 806 assert(timers.count == 1); 807 counter++; 808 } 809 assert(counter == TIMERS, "expected 100 timers, got %d".format(counter)); 810 811 for(int i=1;i<=TIMERS;i++) 812 { 813 auto t = new Timer(); 814 w.schedule(t, i); 815 } 816 counter = 0; 817 for(int i=TIMERS+1;i<=2*TIMERS;i++) 818 { 819 auto r = w.advance(1); 820 auto timers = r.timers; 821 auto t = timers.array()[0]; 822 assert(t.id == i, "expected t.id=%s, got %s".format(t.id, i)); 823 assert(timers.count == 1); 824 counter++; 825 } 826 assert(counter == TIMERS, "expected 100 timers, got %d".format(counter)); 827 828 } 829 // @("cornercase") 830 // @Serial 831 // unittest 832 // { 833 // Timer._current_id = 1; 834 // auto w = TimingWheels!Timer(); 835 // globalLogLevel = LogLevel.trace; 836 // w.advance(254); 837 // auto t = new Timer(); 838 // w.schedule(t, 511); 839 // for(int i=0; i<511; i++) 840 // { 841 // w.advance(1); 842 // } 843 // } 844 845 /// 846 /// 847 /// 848 @("example") 849 @Tags("noauto") 850 @Values(1.msecs,2.msecs,3.msecs,4.msecs,5.msecs,6.msecs,7.msecs,8.msecs, 9.msecs,10.msecs) 851 @Serial 852 unittest 853 { 854 import std; 855 globalLogLevel = LogLevel.info; 856 auto rnd = Random(142); 857 auto Tick = getValue!Duration(); 858 /// track execution 859 int counter; 860 SysTime last; 861 862 /// this is our Timer 863 class Timer 864 { 865 static ulong __id; 866 private ulong _id; 867 private string _name; 868 this(string name) 869 { 870 _id = __id++; 871 _name = name; 872 } 873 /// must provide id() method 874 ulong id() 875 { 876 return _id; 877 } 878 } 879 880 enum IOWakeUpInterval = 100; // to simulate random IO wakeups in interval 0 - 100.msecs 881 882 // each tick span 5 msecs - this is our link with time in reality 883 TimingWheels!Timer w; 884 w.init(); 885 auto durationToTicks(Duration d) 886 { 887 // we have to adjust w.now and realtime 'now' before scheduling timer 888 auto real_now = Clock.currStdTime; 889 auto tw_now = w.currStdTime(Tick); 890 auto delay = (real_now - tw_now).hnsecs; 891 return (d + delay)/Tick; 892 } 893 void process_timer(Timer t) 894 { 895 switch(t._name) 896 { 897 case "periodic": 898 if ( last.stdTime == 0) 899 { 900 // initialize tracking 901 last = Clock.currTime - 50.msecs; 902 } 903 auto delta = Clock.currTime - last; 904 assert(delta - 50.msecs <= max(Tick + Tick/20, 5.msecs), "delta-50.msecs=%s".format(delta-50.msecs)); 905 writefln("@ %s - delta: %sms (should be 50ms)", t._name, (Clock.currTime - last).split!"msecs".msecs); 906 last = Clock.currTime; 907 counter++; 908 w.schedule(t, durationToTicks(50.msecs)); // rearm 909 break; 910 default: 911 writefln("@ %s", t._name); 912 break; 913 } 914 } 915 // emulate some random initial delay 916 auto randomInitialDelay = uniform(0, 500, rnd).msecs; 917 Thread.sleep(randomInitialDelay); 918 // 919 // start one arbitrary timer and one periodic timer 920 // 921 auto some_timer = new Timer("some"); 922 auto periodic_timer = new Timer("periodic"); 923 w.schedule(some_timer, durationToTicks(32.msecs)); 924 w.schedule(periodic_timer, durationToTicks(50.msecs)); 925 926 while(counter < 10) 927 { 928 auto realNow = Clock.currStdTime; 929 auto randomIoInterval = uniform(0, IOWakeUpInterval, rnd).msecs; 930 auto nextTimerEvent = max(w.timeUntilNextEvent(Tick, realNow), 0.msecs); 931 // wait for what should happen earlier 932 auto time_to_sleep = min(randomIoInterval, nextTimerEvent); 933 writefln("* sleep until timer event or random I/O for %s", time_to_sleep); 934 Thread.sleep(time_to_sleep); 935 // make steps if required 936 int ticks = w.ticksToCatchUp(Tick, Clock.currStdTime); 937 if (ticks > 0) 938 { 939 auto wr = w.advance(ticks); 940 foreach(t; wr.timers) 941 { 942 process_timer(t); 943 } 944 } 945 // emulate some random processing time 946 Thread.sleep(uniform(0, 5, rnd).msecs); 947 } 948 }