1 /// 2 module timingwheels.timingwheels_impl; 3 4 import std.datetime; 5 import std.exception; 6 import std.typecons; 7 import std.format; 8 import std.traits; 9 import std.range; 10 import std.algorithm; 11 import std.experimental.logger; 12 13 import std.experimental.allocator; 14 import std.experimental.allocator.mallocator: Mallocator; 15 16 import core.thread; 17 import core.memory; 18 19 import ikod.containers.hashmap; 20 import automem; 21 22 version(twtesting) 23 { 24 import unit_threaded; 25 } 26 27 28 version(twtesting) 29 { 30 private class Timer 31 { 32 static ulong _current_id; 33 private 34 { 35 ulong _id; 36 } 37 this() @safe @nogc 38 { 39 _id = _current_id; 40 _current_id++; 41 } 42 ~this() @safe @nogc 43 { 44 45 } 46 ulong id() @safe @nogc 47 { 48 return _id; 49 } 50 override string toString() 51 { 52 return "%d".format(_id); 53 } 54 } 55 } 56 /// 57 /// scheduling error occurs at schedule() when ticks == 0 or timer already scheduled. 58 /// 59 /// 60 class ScheduleTimerError: Exception 61 { 62 /// 63 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 64 { 65 super(msg, file, line); 66 } 67 } 68 /// 69 /// Cancel timer error occurs if you try to cancel timer which is not scheduled. 70 /// 71 class CancelTimerError: Exception 72 { 73 /// 74 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 75 { 76 super(msg, file, line); 77 } 78 } 79 /// 80 /// Advancing error occurs if number of ticks for advance not in range 0<t<=256 81 /// 82 class AdvanceWheelError: Exception 83 { 84 /// 85 /// 86 /// 87 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 88 { 89 super(msg, file, line); 90 } 91 } 92 93 debug(timingwheels) @safe @nogc nothrow 94 { 95 package 96 void safe_tracef(A...)(string f, scope A args, string file = __FILE__, int line = __LINE__) @safe @nogc nothrow 97 { 98 bool osx,ldc; 99 version(OSX) 100 { 101 osx = true; 102 } 103 version(LDC) 104 { 105 ldc = true; 106 } 107 debug (timingwheels) try 108 { 109 // this can fail on pair ldc2/osx, see https://github.com/ldc-developers/ldc/issues/3240 110 if (!osx || !ldc) 111 { 112 () @trusted @nogc {tracef("%s:%d " ~ f, file, line, args);}(); 113 } 114 } 115 catch(Exception e) 116 { 117 } 118 } 119 } 120 121 pragma(inline) 122 private void dl_insertFront(L)(L *le, L** head) 123 { 124 if ( *head == null) 125 { 126 le.next = le.prev = le; 127 } 128 else 129 { 130 auto curr_head = *head; 131 le.prev = curr_head.prev; 132 le.next = curr_head; 133 curr_head.prev.next = le; 134 curr_head.prev = le; 135 } 136 *head = le; 137 } 138 139 pragma(inline) 140 private void dl_unlink(L)(L *le, L** head) 141 in(*head != null) 142 { 143 if (le.next == le && *head == le) 144 { 145 *head = null; 146 return; 147 } 148 if (le == *head) 149 { 150 *head = le.next; 151 } 152 le.next.prev = le.prev; 153 le.prev.next = le.next; 154 } 155 pragma(inline) 156 private void dl_walk(L)(L** head) 157 { 158 if (*head == null) 159 { 160 return; 161 } 162 auto le = *head; 163 do 164 { 165 le = le.next; 166 } while (le != *head); 167 } 168 pragma(inline) 169 private void dl_relink(L)(L* le, L** head_from, L** head_to) 170 in(le.prev !is null && le.next !is null) 171 { 172 dl_unlink(le, head_from); 173 dl_insertFront(le, head_to); 174 } 175 176 @("dl") 177 unittest 178 { 179 globalLogLevel = LogLevel.info; 180 struct LE 181 { 182 int p; 183 LE *next; 184 LE *prev; 185 } 186 LE* head1 = null; 187 LE* head2 = null; 188 auto le1 = new LE(1); 189 auto le2 = new LE(2); 190 dl_insertFront(le1, &head1); 191 assert(head1 != null); 192 dl_unlink(le1, &head1); 193 assert(head1 == null); 194 195 dl_insertFront(le1, &head1); 196 assert(head1 != null); 197 dl_insertFront(le2, &head1); 198 dl_unlink(le1, &head1); 199 assert(head1 != null); 200 dl_unlink(le2, &head1); 201 assert(head1 == null); 202 203 dl_insertFront(le1, &head1); 204 assert(head1 != null); 205 dl_insertFront(le2, &head1); 206 dl_unlink(le2, &head1); 207 assert(head1 != null); 208 dl_unlink(le1, &head1); 209 assert(head1 == null); 210 211 dl_insertFront(le1, &head1); 212 dl_relink(le1, &head1, &head2); 213 assert(head1 == null); 214 assert(head2 != null); 215 } 216 /// 217 /// This structure implements scheme 6.2 thom the 218 /// $(LINK http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf) 219 /// and supports several primitives: 220 /// $(UL 221 /// $(LI schedule timer in the future.) 222 /// $(LI cancel timer.) 223 /// $(LI time step (advance) - all timers expired at current time tick are extracted from wheels.) 224 /// ) 225 /// Each operation take O(1) time. 226 /// 227 struct TimingWheels(T) 228 { 229 import core.bitop: bsr; 230 231 private 232 { 233 alias TimerIdType = ReturnType!(T.id); 234 alias allocator = Mallocator.instance; 235 236 enum MASK = 0xff; 237 enum LEVELS = 8; 238 enum LEVEL_MAX = LEVELS - 1; 239 enum SLOTS = 256; 240 enum FreeListMaxLen = 100; 241 242 struct ListElement(T) 243 { 244 private 245 { 246 T timer; 247 ulong scheduled_at; 248 ushort position; 249 ListElement!T* prev, next; 250 } 251 } 252 struct Slot 253 { 254 ListElement!T* head; 255 } 256 struct Level 257 { 258 // now if counter of ticks processed on this level 259 ulong now; 260 Slot[SLOTS] slots; 261 } 262 263 Level[LEVELS] levels; 264 ListElement!T* freeList; 265 int freeListLen; 266 HashMap!(TimerIdType, ListElement!T*) 267 ptrs; 268 long startedAt; 269 } 270 invariant 271 { 272 assert(freeListLen>=0); 273 } 274 alias Ticks = ulong; // ticks are 64 bit unsigned integers. 275 276 // hashing ticks to slots 277 // 8 levels, each level 256 slots, with of slot on each level 256 times 278 // translate ticks to level 279 // 0x00_00_00_00_00_00_00_00 <- ticks 280 // ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ 281 // □ □ □ □ □ □ □ □ \ 282 // □ □ □ □ □ □ □ □ | 283 // . . . . . . . . | 256 slots 284 // . . . . . . . . | 285 // □ □ □ □ □ □ □ □ / 286 // 7 6 5 4 3 2 1 0 287 // <- 8 levels 288 // each slot - double linked list of timers 289 290 // ticks to level = bsr(ticks)/8 291 pragma(inline) private pure int t2l(ulong t) @safe @nogc nothrow 292 { 293 if (t == 0) 294 { 295 return 0; 296 } 297 return bsr(t)/LEVELS; 298 } 299 // ticks to slot = ticks >> (level*8) 300 pragma(inline) private pure int t2s(ulong t, int l) @safe @nogc nothrow 301 { 302 return (t >> (l<<3)) & MASK; 303 } 304 // level to ticks 305 // l[0] -> 256 306 // l[1] -> 256*256 307 // ... 308 pragma(inline) private pure ulong l2t(int l) @safe @nogc nothrow 309 { 310 return SLOTS<<l; 311 } 312 ~this() 313 { 314 ptrs.clear; 315 for(int l=0;l<=LEVEL_MAX;l++) 316 for(int s=0; s<SLOTS; s++) 317 { 318 while(levels[l].slots[s].head) 319 { 320 auto le = levels[l].slots[s].head; 321 dl_unlink(le, &levels[l].slots[s].head); 322 () @trusted { 323 GC.removeRange(le); 324 dispose(allocator, le); 325 }(); 326 } 327 } 328 while(freeList) 329 { 330 assert(freeListLen>0); 331 auto n = freeList.next; 332 () @trusted { 333 GC.removeRange(freeList); 334 dispose(allocator, freeList); 335 }(); 336 freeListLen--; 337 freeList = n; 338 } 339 } 340 341 private ListElement!T* getOrCreate() 342 { 343 ListElement!T* result; 344 if (freeList !is null) 345 { 346 result = freeList; 347 freeList = freeList.next; 348 freeListLen--; 349 return result; 350 } 351 result = make!(ListElement!T)(allocator); 352 () @trusted { 353 GC.addRange(result, (*result).sizeof); 354 }(); 355 return result; 356 } 357 private void returnToFreeList(ListElement!T* le) 358 { 359 if ( freeListLen >= FreeListMaxLen ) 360 { 361 // this can be safely disposed as we do not leak ListElements outide this module 362 () @trusted { 363 GC.removeRange(le); 364 dispose(allocator, le); 365 }(); 366 } 367 else 368 { 369 le.position = 0xffff; 370 le.next = freeList; 371 freeList = le; 372 freeListLen++; 373 } 374 } 375 void init() 376 { 377 startedAt = Clock.currStdTime; 378 } 379 /++ 380 + Return internal view on current time - it is time at the call to $(B init) 381 + plus total number of steps multiplied by $(B tick) duration. 382 + Params: 383 + tick = tick duration 384 +/ 385 auto currStdTime(Duration tick) 386 { 387 return startedAt + levels[0].now * tick.split!"hnsecs".hnsecs; 388 } 389 /// 390 /// Schedule timer to $(B ticks) ticks forward from internal 'now'. 391 ///Params: 392 /// timer = timer to schedule; 393 /// ticks = ticks in the future to schedule timer. (0 < ticks < ulong.max); 394 ///Returns: 395 /// void 396 ///Throws: 397 /// ScheduleTimerError 398 /// when thicks == 0 399 /// or when timer already scheduled 400 /// 401 void schedule(T)(T timer, const ulong ticks) 402 { 403 if (ticks == 0) 404 { 405 throw new ScheduleTimerError("ticks can't be 0"); 406 } 407 auto timer_id = timer.id(); 408 if (ptrs.contains(timer_id)) 409 { 410 throw new ScheduleTimerError("Timer already scheduled"); 411 } 412 size_t level_index = 0; 413 long t = ticks; 414 long s = 1; // width of the slot in ticks on level 415 long shift = 0; 416 while(t > s<<8) // while t > slots on level 417 { 418 t -= (SLOTS - (levels[level_index].now & MASK)) * s; 419 level_index++; 420 s = s << 8; 421 shift += 8; 422 } 423 auto level = &levels[level_index]; 424 auto mask = s - 1; 425 size_t slot_index = (level.now + (t>>shift) + ((t&mask)>0?1:0)) & MASK; 426 auto slot = &levels[level_index].slots[slot_index]; 427 debug(timingwheels) safe_tracef("use level/slot %d/%d, level now: %d", level_index, slot_index, level.now); 428 auto le = getOrCreate(); 429 le.timer = timer; 430 le.position = ((level_index << 8 ) | slot_index) & 0xffff; 431 le.scheduled_at = levels[0].now + ticks; 432 dl_insertFront(le, &slot.head); 433 ptrs[timer_id] = le; 434 debug(timingwheels) safe_tracef("scheduled timer id: %s, ticks: %s, now: %d, scheduled at: %s to level: %s, slot %s", 435 timer_id, ticks, levels[0].now, le.scheduled_at, level_index, slot_index); 436 } 437 /// Cancel timer 438 ///Params: 439 /// timer = timer to cancel 440 ///Returns: 441 /// void 442 ///Throws: 443 /// CancelTimerError 444 /// if timer not in wheel 445 void cancel(T)(T timer) 446 { 447 // get list element pointer 448 auto v = ptrs.fetch(timer.id()); 449 if ( !v.ok ) 450 { 451 throw new CancelTimerError("Cant find timer to cancel"); 452 } 453 auto le = v.value; 454 immutable level_index = le.position>>8; 455 immutable slot_index = le.position & 0xff; 456 assert(timer is le.timer); 457 debug(timingwheels) safe_tracef("cancel timer, l:%d, s:%d", level_index, slot_index); 458 dl_unlink(le, &levels[level_index].slots[slot_index].head); 459 returnToFreeList(le); 460 ptrs.remove(timer.id()); 461 } 462 /// Number of ticks to rotate wheels until internal wheel 'now' 463 /// catch up with real world realTime. 464 /// Calculation based on time when wheels were stared and total 465 /// numer of ticks pasded. 466 ///Params: 467 /// tick = your tick length (Duration) 468 /// realTime = current real world now (Clock.currStdTime) 469 ///Returns: ticks to advance so that we catch up real world current time 470 int ticksToCatchUp(Duration tick, ulong realTime) 471 { 472 auto c = startedAt + tick.split!"hnsecs".hnsecs * levels[0].now; 473 auto v = (realTime - c) / tick.split!"hnsecs".hnsecs; 474 if ( v > 256 ) 475 { 476 return 256; 477 } 478 return cast(int)v; 479 } 480 /// Time until next scheduled timer event. 481 /// You provide tick size and current real world time. 482 /// This function find ticks until next event and use time of the start and 483 /// total steps executed to calculate time delta from $(B realNow) to next event. 484 ///Params: 485 /// tick = your accepted tick duration. 486 /// realNow = real world now, result of Clock.currStdTime 487 ///Returns: time until next event. Can be zero or negative in case you have already expired events. 488 /// 489 Duration timeUntilNextEvent(const Duration tick, ulong realNow) 490 { 491 assert(startedAt>0, "Forgot to call init()?"); 492 immutable n = ticksUntilNextEvent(); 493 immutable target = startedAt + (levels[0].now + n) * tick.split!"hnsecs".hnsecs; 494 auto delta = (target - realNow).hnsecs; 495 debug(timingwheels) safe_tracef("ticksUntilNextEvent=%s, tick=%s, startedAt=%s", n, tick, SysTime(startedAt)); 496 return delta; 497 } 498 499 /// 500 /// Adnvance wheel and return all timers expired during wheel turn. 501 // 502 /// Params: 503 /// ticks = how many ticks to advance. Must be in range 0 <= 256 504 /// Returns: list of expired timers 505 /// 506 auto advance(this W)(ulong ticks) 507 { 508 struct ExpiredTimers 509 { 510 HashMap!(TimerIdType, T) _map; 511 auto timers() 512 { 513 return _map.byValue; 514 } 515 auto length() 516 { 517 return _map.length(); 518 } 519 } 520 alias AdvanceResult = automem.RefCounted!(ExpiredTimers, Mallocator); 521 if (ticks > l2t(0)) 522 { 523 throw new AdvanceWheelError("You can't advance that much"); 524 } 525 if (ticks == 0) 526 { 527 throw new AdvanceWheelError("ticks must be > 0"); 528 } 529 debug(timingwheels) safe_tracef("advancing %d ticks", ticks); 530 auto result = AdvanceResult(ExpiredTimers()); 531 auto level = &levels[0]; 532 533 while(ticks) 534 { 535 ticks--; 536 immutable now = ++level.now; 537 immutable slot_index = now & MASK; 538 auto slot = &level.slots[slot_index]; 539 //debug(timingwheels) safe_tracef("level 0, now=%s", now); 540 while(slot.head) 541 { 542 auto le = slot.head; 543 auto timer = le.timer; 544 auto timer_id = timer.id(); 545 assert(!result._map.contains(timer_id), "Something wrong: we try to return same timer twice"); 546 debug(timingwheels) safe_tracef("return timer: %s, scheduled at %s", timer, le.scheduled_at); 547 result._map[timer_id] = timer; 548 dl_unlink(le, &slot.head); 549 returnToFreeList(le); 550 ptrs.remove(timer.id()); 551 } 552 if (slot_index == 0) 553 { 554 advance_level(1); 555 } 556 } 557 return result; 558 } 559 560 // 561 // ticks until next event on level 0 or until next wheel rotation 562 // If you have empty ticks it is safe to sleep - you will not miss anything, just wake up 563 // at the time when next timer have to be processed. 564 //Returns: number of safe "sleep" ticks. 565 // 566 private int ticksUntilNextEvent() 567 out(r; r<=256) 568 { 569 int result = 1; 570 auto level = &levels[0]; 571 immutable uint now = levels[0].now & MASK; 572 auto slot = (now + 1) & MASK; 573 //assert(level.slots[now].head == null); 574 do 575 { 576 if (level.slots[slot].head != null) 577 { 578 break; 579 } 580 result++; 581 slot = (slot + 1) & MASK; 582 } 583 while(slot != now); 584 585 return min(result, 256-now); 586 } 587 588 private void advance_level(int level_index) 589 in(level_index>0) 590 { 591 debug(timingwheels) safe_tracef("running advance on level %d", level_index); 592 immutable now0 = levels[0].now; 593 auto level = &levels[level_index]; 594 immutable now = ++level.now; 595 immutable slot_index = now & MASK; 596 debug(timingwheels) safe_tracef("level %s, now=%s", level_index, now); 597 auto slot = &level.slots[slot_index]; 598 debug(timingwheels) safe_tracef("haldle l%s:s%s timers", level_index, slot_index); 599 while(slot.head) 600 { 601 auto listElement = slot.head; 602 603 immutable delta = listElement.scheduled_at - now0; 604 size_t lower_level_index = 0; 605 long t = delta; 606 size_t s = 1; // width of the slot in ticks on level 607 size_t shift = 0; 608 while(t > s<<8) // while t > slots on level 609 { 610 t -= (SLOTS - (levels[lower_level_index].now & MASK)) * s; 611 lower_level_index++; 612 s = s << 8; 613 shift += 8; 614 } 615 auto mask = s - 1; 616 size_t lower_level_slot_index = (levels[lower_level_index].now + (t>>shift) + ((t&mask)>0?1:0)) & MASK; 617 debug(timingwheels) safe_tracef("move timer id: %s, scheduledAt; %d to level %s, slot: %s (delta=%s)", 618 listElement.timer.id(), listElement.scheduled_at, lower_level_index, lower_level_slot_index, delta); 619 listElement.position = ((lower_level_index<<8) | lower_level_slot_index) & 0xffff; 620 dl_relink(listElement, &slot.head, &levels[lower_level_index].slots[lower_level_slot_index].head); 621 } 622 if (slot_index == 0 && level_index < LEVEL_MAX) 623 { 624 advance_level(level_index+1); 625 } 626 } 627 } 628 629 version(twtesting): 630 631 @("TimingWheels") 632 unittest 633 { 634 import std.stdio; 635 globalLogLevel = LogLevel.info; 636 TimingWheels!Timer w; 637 w.init(); 638 assert(w.t2l(1) == 0); 639 assert(w.t2s(1, 0) == 1); 640 immutable t = 0x00_00_00_11_00_00_00_77; 641 immutable level = w.t2l(t); 642 assert(level==4); 643 immutable slot = w.t2s(t, level); 644 assert(slot == 0x11); 645 auto timer = new Timer(); 646 () @nogc @safe { 647 w.schedule(timer, 2); 648 bool thrown; 649 // check that you can't schedule same timer twice 650 try 651 { 652 w.schedule(timer, 5); 653 } 654 catch(ScheduleTimerError e) 655 { 656 thrown = true; 657 } 658 assert(thrown); 659 thrown = false; 660 try 661 { 662 w.advance(1024); 663 } 664 catch(AdvanceWheelError e) 665 { 666 thrown = true; 667 } 668 assert(thrown); 669 thrown = false; 670 w.cancel(timer); 671 w.advance(1); 672 }(); 673 w = TimingWheels!Timer(); 674 w.init(); 675 w.schedule(timer, 1); 676 auto r = w.advance(1); 677 assert(r.timers.count == 1); 678 w.schedule(timer, 256); 679 r = w.advance(255); 680 assert(r.timers.count == 0); 681 r = w.advance(1); 682 assert(r.timers.count == 1); 683 w.schedule(timer, 256*256); 684 int c; 685 for(int i=0;i<256;i++) 686 { 687 r = w.advance(256); 688 c += r.timers.count; 689 } 690 assert(c==1); 691 } 692 @("rt") 693 @Tags("noauto") 694 unittest 695 { 696 globalLogLevel = LogLevel.info; 697 TimingWheels!Timer w; 698 Duration Tick = 5.msecs; 699 w.init(); 700 ulong now = Clock.currStdTime; 701 assert(now - w.currStdTime(Tick) < 5*10_000); 702 Thread.sleep(2*Tick); 703 now = Clock.currStdTime; 704 assert((now - w.currStdTime(Tick))/10_000 - (2*Tick).split!"msecs".msecs < 10); 705 auto toCatchUp = w.ticksToCatchUp(Tick, now); 706 toCatchUp.shouldEqual(2); 707 auto t = w.advance(toCatchUp); 708 toCatchUp = w.ticksToCatchUp(Tick, now); 709 toCatchUp.shouldEqual(0); 710 } 711 @("cancel") 712 unittest 713 { 714 globalLogLevel = LogLevel.info; 715 TimingWheels!Timer w; 716 w.init(); 717 Timer timer0 = new Timer(); 718 Timer timer1 = new Timer(); 719 w.schedule(timer0, 256); 720 w.schedule(timer1, 256+128); 721 auto r = w.advance(255); 722 assert(r.timers.count == 0); 723 w.cancel(timer0); 724 r = w.advance(1); 725 assert(r.timers.count == 0); 726 assertThrown!CancelTimerError(w.cancel(timer0)); 727 w.cancel(timer1); 728 } 729 @("ticksUntilNextEvent") 730 unittest 731 { 732 globalLogLevel = LogLevel.info; 733 TimingWheels!Timer w; 734 w.init(); 735 auto s = w.ticksUntilNextEvent; 736 assert(s==256); 737 auto r = w.advance(s); 738 assert(r.timers.count == 0); 739 Timer t = new Timer(); 740 w.schedule(t, 50); 741 s = w.ticksUntilNextEvent; 742 assert(s==50); 743 r = w.advance(s); 744 assert(r.timers.count == 1); 745 } 746 747 @("load") 748 @Serial 749 unittest 750 { 751 import std.array:array; 752 globalLogLevel = LogLevel.info; 753 enum TIMERS = 100_000; 754 Timer._current_id = 1; 755 auto w = TimingWheels!Timer(); 756 w.init(); 757 for(int i=1;i<=TIMERS;i++) 758 { 759 auto t = new Timer(); 760 w.schedule(t, i); 761 } 762 int counter; 763 for(int i=1;i<=TIMERS;i++) 764 { 765 auto r = w.advance(1); 766 auto timers = r.timers; 767 auto t = timers.array()[0]; 768 assert(t.id == i, "expected t.id=%s, got %s".format(t.id, i)); 769 assert(timers.count == 1); 770 counter++; 771 } 772 assert(counter == TIMERS, "expected 100 timers, got %d".format(counter)); 773 774 for(int i=1;i<=TIMERS;i++) 775 { 776 auto t = new Timer(); 777 w.schedule(t, i); 778 } 779 counter = 0; 780 for(int i=TIMERS+1;i<=2*TIMERS;i++) 781 { 782 auto r = w.advance(1); 783 auto timers = r.timers; 784 auto t = timers.array()[0]; 785 assert(t.id == i, "expected t.id=%s, got %s".format(t.id, i)); 786 assert(timers.count == 1); 787 counter++; 788 } 789 assert(counter == TIMERS, "expected 100 timers, got %d".format(counter)); 790 791 } 792 // @("cornercase") 793 // @Serial 794 // unittest 795 // { 796 // Timer._current_id = 1; 797 // auto w = TimingWheels!Timer(); 798 // globalLogLevel = LogLevel.trace; 799 // w.advance(254); 800 // auto t = new Timer(); 801 // w.schedule(t, 511); 802 // for(int i=0; i<511; i++) 803 // { 804 // w.advance(1); 805 // } 806 // } 807 808 /// 809 /// 810 /// 811 @("example") 812 @Tags("noauto") 813 @Values(1.msecs,2.msecs,3.msecs,4.msecs,5.msecs,6.msecs,7.msecs,8.msecs, 9.msecs,10.msecs) 814 @Serial 815 unittest 816 { 817 import std; 818 globalLogLevel = LogLevel.info; 819 auto rnd = Random(142); 820 auto Tick = getValue!Duration(); 821 /// track execution 822 int counter; 823 SysTime last; 824 825 /// this is our Timer 826 class Timer 827 { 828 static ulong __id; 829 private ulong _id; 830 private string _name; 831 this(string name) 832 { 833 _id = __id++; 834 _name = name; 835 } 836 /// must provide id() method 837 ulong id() 838 { 839 return _id; 840 } 841 } 842 843 enum IOWakeUpInterval = 100; // to simulate random IO wakeups in interval 0 - 100.msecs 844 845 // each tick span 5 msecs - this is our link with time in reality 846 TimingWheels!Timer w; 847 w.init(); 848 auto durationToTicks(Duration d) 849 { 850 // we have to adjust w.now and realtime 'now' before scheduling timer 851 auto real_now = Clock.currStdTime; 852 auto tw_now = w.currStdTime(Tick); 853 auto delay = (real_now - tw_now).hnsecs; 854 return (d + delay)/Tick; 855 } 856 void process_timer(Timer t) 857 { 858 switch(t._name) 859 { 860 case "periodic": 861 if ( last.stdTime == 0) 862 { 863 // initialize tracking 864 last = Clock.currTime - 50.msecs; 865 } 866 auto delta = Clock.currTime - last; 867 assert(delta - 50.msecs <= max(Tick + Tick/20, 5.msecs), "delta-50.msecs=%s".format(delta-50.msecs)); 868 writefln("@ %s - delta: %sms (should be 50ms)", t._name, (Clock.currTime - last).split!"msecs".msecs); 869 last = Clock.currTime; 870 counter++; 871 w.schedule(t, durationToTicks(50.msecs)); // rearm 872 break; 873 default: 874 writefln("@ %s", t._name); 875 break; 876 } 877 } 878 // emulate some random initial delay 879 auto randomInitialDelay = uniform(0, 500, rnd).msecs; 880 Thread.sleep(randomInitialDelay); 881 // 882 // start one arbitrary timer and one periodic timer 883 // 884 auto some_timer = new Timer("some"); 885 auto periodic_timer = new Timer("periodic"); 886 w.schedule(some_timer, durationToTicks(32.msecs)); 887 w.schedule(periodic_timer, durationToTicks(50.msecs)); 888 889 while(counter < 10) 890 { 891 auto realNow = Clock.currStdTime; 892 auto randomIoInterval = uniform(0, IOWakeUpInterval, rnd).msecs; 893 auto nextTimerEvent = max(w.timeUntilNextEvent(Tick, realNow), 0.msecs); 894 // wait for what should happen earlier 895 auto time_to_sleep = min(randomIoInterval, nextTimerEvent); 896 writefln("* sleep until timer event or random I/O for %s", time_to_sleep); 897 Thread.sleep(time_to_sleep); 898 // make steps if required 899 int ticks = w.ticksToCatchUp(Tick, Clock.currStdTime); 900 if (ticks > 0) 901 { 902 auto wr = w.advance(ticks); 903 foreach(t; wr.timers) 904 { 905 process_timer(t); 906 } 907 } 908 // emulate some random processing time 909 Thread.sleep(uniform(0, 5, rnd).msecs); 910 } 911 }