1 /// 2 module timingwheels.timingwheels_impl; 3 4 import std.datetime; 5 import std.exception; 6 import std.typecons; 7 import std.format; 8 import std.traits; 9 import std.range; 10 import std.algorithm; 11 import std.experimental.logger; 12 13 import std.experimental.allocator; 14 import std.experimental.allocator.mallocator: Mallocator; 15 16 import core.thread; 17 import core.memory; 18 19 import ikod.containers.hashmap; 20 import automem; 21 22 version(twtesting) 23 { 24 import unit_threaded; 25 } 26 27 28 version(twtesting) 29 { 30 private class Timer 31 { 32 static ulong _current_id; 33 private 34 { 35 ulong _id; 36 } 37 this() @safe @nogc 38 { 39 _id = _current_id; 40 _current_id++; 41 } 42 ~this() @safe @nogc 43 { 44 45 } 46 ulong id() @safe @nogc 47 { 48 return _id; 49 } 50 override string toString() 51 { 52 return "%d:%s".format(_id); 53 } 54 } 55 } 56 /// 57 /// scheduling error occurs at schedule() when ticks == 0 or timer already scheduled. 58 /// 59 /// 60 class ScheduleTimerError: Exception 61 { 62 /// 63 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 64 { 65 super(msg, file, line); 66 } 67 } 68 /// 69 /// Cancel timer error occurs if you try to cancel timer which is not scheduled. 70 /// 71 class CancelTimerError: Exception 72 { 73 /// 74 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 75 { 76 super(msg, file, line); 77 } 78 } 79 /// 80 /// Advancing error occurs if number of ticks for advance not in range 0<t<=256 81 /// 82 class AdvanceWheelError: Exception 83 { 84 /// 85 /// 86 /// 87 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 88 { 89 super(msg, file, line); 90 } 91 } 92 93 debug(timingwheels) @safe @nogc nothrow 94 { 95 package 96 void safe_tracef(A...)(string f, scope A args, string file = __FILE__, int line = __LINE__) @safe @nogc nothrow 97 { 98 bool osx,ldc; 99 version(OSX) 100 { 101 osx = true; 102 } 103 version(LDC) 104 { 105 ldc = true; 106 } 107 debug (timingwheels) try 108 { 109 // this can fail on pair ldc2/osx, see https://github.com/ldc-developers/ldc/issues/3240 110 if (!osx || !ldc) 111 { 112 () @trusted @nogc {tracef("%s:%d " ~ f, file, line, args);}(); 113 } 114 } 115 catch(Exception e) 116 { 117 } 118 } 119 } 120 121 pragma(inline) 122 private void dl_insertFront(L)(L *le, L** head) 123 { 124 if ( *head == null) 125 { 126 le.next = le.prev = le; 127 } 128 else 129 { 130 auto curr_head = *head; 131 le.prev = curr_head.prev; 132 le.next = curr_head; 133 curr_head.prev.next = le; 134 curr_head.prev = le; 135 } 136 *head = le; 137 } 138 139 pragma(inline) 140 private void dl_unlink(L)(L *le, L** head) 141 in(*head != null) 142 { 143 if (le.next == le && *head == le) 144 { 145 *head = null; 146 return; 147 } 148 if (le == *head) 149 { 150 *head = le.next; 151 } 152 le.next.prev = le.prev; 153 le.prev.next = le.next; 154 } 155 pragma(inline) 156 private void dl_walk(L)(L** head) 157 { 158 if (*head == null) 159 { 160 return; 161 } 162 auto le = *head; 163 do 164 { 165 le = le.next; 166 } while (le != *head); 167 } 168 pragma(inline) 169 private void dl_relink(L)(L* le, L** head_from, L** head_to) 170 in(le.prev !is null && le.next !is null) 171 { 172 dl_unlink(le, head_from); 173 dl_insertFront(le, head_to); 174 } 175 176 @("dl") 177 unittest 178 { 179 globalLogLevel = LogLevel.info; 180 struct LE 181 { 182 int p; 183 LE *next; 184 LE *prev; 185 } 186 LE* head1 = null; 187 LE* head2 = null; 188 auto le1 = new LE(1); 189 auto le2 = new LE(2); 190 dl_insertFront(le1, &head1); 191 assert(head1 != null); 192 dl_unlink(le1, &head1); 193 assert(head1 == null); 194 195 dl_insertFront(le1, &head1); 196 assert(head1 != null); 197 dl_insertFront(le2, &head1); 198 dl_unlink(le1, &head1); 199 assert(head1 != null); 200 dl_unlink(le2, &head1); 201 assert(head1 == null); 202 203 dl_insertFront(le1, &head1); 204 assert(head1 != null); 205 dl_insertFront(le2, &head1); 206 dl_unlink(le2, &head1); 207 assert(head1 != null); 208 dl_unlink(le1, &head1); 209 assert(head1 == null); 210 211 dl_insertFront(le1, &head1); 212 dl_relink(le1, &head1, &head2); 213 assert(head1 == null); 214 assert(head2 != null); 215 } 216 /// 217 /// This structure implements scheme 6.2 thom the 218 /// http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf 219 /// It supports several primitives: 220 /// 1. schedule timer in the future. 221 /// 2. cancel timer. 222 /// 3. time step (advance) - all timers expired at current time tick are extracted from wheels. 223 /// Each operation take O(1) time. 224 /// 225 struct TimingWheels(T) 226 { 227 import core.bitop: bsr; 228 229 private 230 { 231 alias TimerIdType = ReturnType!(T.id); 232 alias allocator = Mallocator.instance; 233 234 enum MASK = 0xff; 235 enum LEVELS = 8; 236 enum LEVEL_MAX = LEVELS - 1; 237 enum SLOTS = 256; 238 enum FreeListMaxLen = 100; 239 240 struct ListElement(T) 241 { 242 private 243 { 244 T timer; 245 ulong scheduled_at; 246 ushort position; 247 ListElement!T* prev, next; 248 } 249 } 250 struct Slot 251 { 252 ListElement!T* head; 253 } 254 struct Level 255 { 256 // now if counter of ticks processed on this level 257 ulong now; 258 Slot[SLOTS] slots; 259 } 260 261 Level[LEVELS] levels; 262 ListElement!T* freeList; 263 int freeListLen; 264 HashMap!(TimerIdType, ListElement!T*) 265 ptrs; 266 long startedAt; 267 } 268 invariant 269 { 270 assert(freeListLen>=0); 271 } 272 alias Ticks = ulong; // ticks are 64 bit unsigned integers. 273 274 // hashing ticks to slots 275 // 8 levels, each level 256 slots, with of slot on each level 256 times 276 // translate ticks to level 277 // 0x00_00_00_00_00_00_00_00 <- ticks 278 // ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ 279 // □ □ □ □ □ □ □ □ \ 280 // □ □ □ □ □ □ □ □ | 281 // . . . . . . . . | 256 slots 282 // . . . . . . . . | 283 // □ □ □ □ □ □ □ □ / 284 // 7 6 5 4 3 2 1 0 285 // <- 8 levels 286 // each slot - double linked list of timers 287 288 // ticks to level = bsr(ticks)/8 289 pragma(inline) private pure int t2l(ulong t) @safe @nogc nothrow 290 { 291 if (t == 0) 292 { 293 return 0; 294 } 295 return bsr(t)/LEVELS; 296 } 297 // ticks to slot = ticks >> (level*8) 298 pragma(inline) private pure int t2s(ulong t, int l) @safe @nogc nothrow 299 { 300 return (t >> (l<<3)) & MASK; 301 } 302 // level to ticks 303 // l[0] -> 256 304 // l[1] -> 256*256 305 // ... 306 pragma(inline) private pure ulong l2t(int l) @safe @nogc nothrow 307 { 308 return SLOTS<<l; 309 } 310 ~this() 311 { 312 ptrs.clear; 313 for(int l=0;l<=LEVEL_MAX;l++) 314 for(int s=0; s<SLOTS; s++) 315 { 316 while(levels[l].slots[s].head) 317 { 318 auto le = levels[l].slots[s].head; 319 dl_unlink(le, &levels[l].slots[s].head); 320 () @trusted { 321 GC.removeRange(le); 322 dispose(allocator, le); 323 }(); 324 } 325 } 326 while(freeList) 327 { 328 assert(freeListLen>0); 329 auto n = freeList.next; 330 () @trusted { 331 GC.removeRange(freeList); 332 dispose(allocator, freeList); 333 }(); 334 freeListLen--; 335 freeList = n; 336 } 337 } 338 339 private ListElement!T* getOrCreate() 340 { 341 ListElement!T* result; 342 if (freeList !is null) 343 { 344 result = freeList; 345 freeList = freeList.next; 346 freeListLen--; 347 return result; 348 } 349 result = make!(ListElement!T)(allocator); 350 () @trusted { 351 GC.addRange(result, (*result).sizeof); 352 }(); 353 return result; 354 } 355 private void returnToFreeList(ListElement!T* le) 356 { 357 if ( freeListLen >= FreeListMaxLen ) 358 { 359 // this can be safely disposed as we do not leak ListElements outide this module 360 () @trusted { 361 GC.removeRange(le); 362 dispose(allocator, le); 363 }(); 364 } 365 else 366 { 367 le.position = 0xffff; 368 le.next = freeList; 369 freeList = le; 370 freeListLen++; 371 } 372 } 373 /// 374 /// Schedule timer to future 375 ///Params: 376 /// timer = timer to schedule; 377 /// ticks = ticks in the future to schedule timer. (0 < ticks < ulong.max); 378 ///Returns: 379 /// void 380 ///Throws: 381 /// ScheduleTimerError 382 /// when thicks == 0 383 /// or when timer already scheduled 384 /// 385 void schedule(T)(T timer, const ulong ticks) 386 { 387 if (ticks == 0) 388 { 389 throw new ScheduleTimerError("ticks can't be 0"); 390 } 391 auto timer_id = timer.id(); 392 if (ptrs.contains(timer_id)) 393 { 394 throw new ScheduleTimerError("Timer already scheduled"); 395 } 396 if (startedAt == 0) 397 { 398 startedAt = Clock.currStdTime; 399 } 400 auto level_index = t2l(ticks); 401 auto level = &levels[level_index]; 402 auto slot_index = (level.now + t2s(ticks, level_index)) & MASK; 403 auto slot = &levels[level_index].slots[slot_index]; 404 auto le = getOrCreate(); 405 le.timer = timer; 406 le.position = ((level_index << 8 ) | slot_index) & 0xffff; 407 le.scheduled_at = levels[0].now + ticks; 408 dl_insertFront(le, &slot.head); 409 ptrs[timer_id] = le; 410 debug(timingwheels) safe_tracef("scheduled timer id: %s, ticks: %s, level: %s, slot %s", 411 timer_id, ticks, level_index, slot_index); 412 } 413 /// Cancel timer 414 ///Params: 415 /// timer = timer to cancel 416 ///Returns: 417 /// void 418 ///Throws: 419 /// CancelTimerError 420 /// if timer not in wheel 421 void cancel(T)(T timer) 422 { 423 // get list element pointer 424 auto v = ptrs.fetch(timer.id()); 425 if ( !v.ok ) 426 { 427 throw new CancelTimerError("Cant find timer to cancel"); 428 } 429 auto le = v.value; 430 immutable level_index = le.position>>8; 431 immutable slot_index = le.position & 0xff; 432 assert(timer is le.timer); 433 debug(timingwheels) safe_tracef("cancel timer, l:%d, s:%d", level_index, slot_index); 434 dl_unlink(le, &levels[level_index].slots[slot_index].head); 435 returnToFreeList(le); 436 ptrs.remove(timer.id()); 437 } 438 /// 439 /// count "empty" ticks - slots without events. 440 /// If you have empty ticks it is safe to sleep - you will not miss anything, just wake up 441 /// at the time when next timer have to be processed. 442 ///Returns: number of empty ticks. 443 /// 444 int ticksUntilNextEvent() 445 out(r; r<=256) 446 { 447 int result = 1; 448 auto level = &levels[0]; 449 immutable now = levels[0].now & MASK; 450 auto slot = (now + 1) & MASK; 451 assert(level.slots[now].head == null); 452 do 453 { 454 if (level.slots[slot].head != null) 455 { 456 break; 457 } 458 result++; 459 slot = (slot + 1) & MASK; 460 } 461 while(slot != now); 462 return result; 463 } 464 /// Time until next scheduled timer event. 465 ///Params: 466 /// tick = your accepted tick duration. 467 ///Returns: msecs until next event. Can be zero or negative in case you have already expired events. 468 /// 469 Duration timeUntilNextEvent(const Duration tick) 470 { 471 if (startedAt == 0) 472 { 473 return Duration.max; 474 } 475 immutable n = ticksUntilNextEvent(); 476 immutable target = startedAt + n * tick.split!"hnsecs".hnsecs; 477 auto delta = (target - Clock.currStdTime).hnsecs; 478 return delta; 479 } 480 /// 481 /// Adnvance wheel and return all timers expired during wheel turn. 482 // 483 /// Params: 484 /// ticks = how many ticks to advance. Must be in range 0 <= 256 485 /// Returns: list of expired timers 486 /// 487 auto advance(this W)(ulong ticks) 488 { 489 struct ExpiredTimers 490 { 491 HashMap!(TimerIdType, T) _map; 492 auto timers() 493 { 494 return _map.byValue; 495 } 496 } 497 alias AdvanceResult = automem.RefCounted!(ExpiredTimers, Mallocator); 498 if (ticks > l2t(0)) 499 { 500 throw new AdvanceWheelError("You can't advance that much"); 501 } 502 if (ticks == 0) 503 { 504 throw new AdvanceWheelError("ticks must be > 0"); 505 } 506 auto result = AdvanceResult(ExpiredTimers()); 507 auto level = &levels[0]; 508 509 scope(exit) 510 { 511 startedAt = Clock.currStdTime; 512 } 513 514 while(ticks) 515 { 516 ticks--; 517 immutable now = ++level.now; 518 immutable slot_index = now & MASK; 519 auto slot = &level.slots[slot_index]; 520 debug(timingwheels) safe_tracef("level 0, now=%s", now); 521 if (slot_index == 0) 522 { 523 advance_level(1); 524 } 525 while(slot.head) 526 { 527 auto le = slot.head; 528 auto timer = le.timer; 529 auto timer_id = timer.id(); 530 assert(!result._map.contains(timer_id), "Something wrong: we try to return same timer twice"); 531 debug(timingwheels) safe_tracef("return timer id: %s, scheduled at %s", timer_id, le.scheduled_at); 532 result._map[timer_id] = timer; 533 dl_unlink(le, &slot.head); 534 returnToFreeList(le); 535 ptrs.remove(timer.id()); 536 } 537 } 538 return result; 539 } 540 541 private void advance_level(int level_index) 542 in(level_index>0) 543 { 544 debug(timingwheels) safe_tracef("running advance on level %d", level_index); 545 immutable now0 = levels[0].now; 546 auto level = &levels[level_index]; 547 immutable now = ++level.now; 548 immutable slot_index = now & MASK; 549 debug(timingwheels) safe_tracef("level %s, now=%s", level_index, now); 550 if (slot_index == 0 && level_index < LEVEL_MAX) 551 { 552 advance_level(level_index+1); 553 } 554 auto slot = &level.slots[slot_index]; 555 debug(timingwheels) safe_tracef("haldle l%s:s%s timers", level_index, slot_index); 556 while(slot.head) 557 { 558 auto listElement = slot.head; 559 560 immutable delta = listElement.scheduled_at - now0; 561 immutable lower_level_index = t2l(delta); 562 immutable lower_level_slot_index = t2s(delta, lower_level_index); 563 debug(timingwheels) safe_tracef("move timer id: %s to level %s, slot: %s", 564 listElement.timer.id(), lower_level_index, lower_level_slot_index, delta); 565 listElement.position = ((lower_level_index<<8) | lower_level_slot_index) & 0xffff; 566 dl_relink(listElement, &slot.head, &levels[lower_level_index].slots[lower_level_slot_index].head); 567 } 568 } 569 } 570 571 572 @("TimingWheels") 573 unittest 574 { 575 globalLogLevel = LogLevel.info; 576 TimingWheels!Timer w; 577 assert(w.t2l(1) == 0); 578 assert(w.t2s(1, 0) == 1); 579 immutable t = 0x00_00_00_11_00_00_00_77; 580 immutable level = w.t2l(t); 581 assert(level==4); 582 immutable slot = w.t2s(t, level); 583 assert(slot == 0x11); 584 auto timer = new Timer(); 585 () @nogc @safe { 586 w.schedule(timer, 2); 587 bool thrown; 588 // check that you can't schedule same timer twice 589 try 590 { 591 w.schedule(timer, 5); 592 } 593 catch(ScheduleTimerError e) 594 { 595 thrown = true; 596 } 597 assert(thrown); 598 thrown = false; 599 try 600 { 601 w.advance(1024); 602 } 603 catch(AdvanceWheelError e) 604 { 605 thrown = true; 606 } 607 assert(thrown); 608 thrown = false; 609 w.cancel(timer); 610 w.advance(1); 611 }(); 612 w = TimingWheels!Timer(); 613 w.schedule(timer, 1); 614 auto r = w.advance(1); 615 assert(r.timers.count == 1); 616 w.schedule(timer, 256); 617 r = w.advance(255); 618 assert(r.timers.count == 0); 619 r = w.advance(1); 620 assert(r.timers.count == 1); 621 w.schedule(timer, 256*256); 622 int c; 623 for(int i=0;i<256;i++) 624 { 625 r = w.advance(256); 626 c += r.timers.count; 627 } 628 assert(c==1); 629 } 630 @("cancel") 631 unittest 632 { 633 globalLogLevel = LogLevel.info; 634 TimingWheels!Timer w; 635 Timer timer0 = new Timer(); 636 Timer timer1 = new Timer(); 637 w.schedule(timer0, 256); 638 w.schedule(timer1, 256+128); 639 auto r = w.advance(255); 640 assert(r.timers.count == 0); 641 w.cancel(timer0); 642 r = w.advance(1); 643 assert(r.timers.count == 0); 644 assertThrown!CancelTimerError(w.cancel(timer0)); 645 w.cancel(timer1); 646 } 647 @("ticksUntilNextEvent") 648 unittest 649 { 650 globalLogLevel = LogLevel.info; 651 TimingWheels!Timer w; 652 auto s = w.ticksUntilNextEvent; 653 assert(s==256); 654 auto r = w.advance(s); 655 assert(r.timers.count == 0); 656 Timer t = new Timer(); 657 w.schedule(t, 50); 658 s = w.ticksUntilNextEvent; 659 assert(s==50); 660 r = w.advance(s); 661 assert(r.timers.count == 1); 662 } 663 @("load") 664 unittest 665 { 666 import std.array:array; 667 globalLogLevel = LogLevel.info; 668 enum TIMERS = 100_000; 669 Timer._current_id = 1; 670 auto w = TimingWheels!Timer(); 671 for(int i=1;i<=TIMERS;i++) 672 { 673 auto t = new Timer(); 674 w.schedule(t, i); 675 } 676 int counter; 677 for(int i=1;i<=TIMERS;i++) 678 { 679 auto r = w.advance(1); 680 auto timers = r.timers; 681 auto t = timers.array()[0]; 682 assert(t.id == i); 683 assert(timers.count == 1); 684 counter++; 685 } 686 assert(counter == TIMERS, "expected 100 timers, got %d".format(counter)); 687 } 688 689 /// 690 /// 691 /// 692 @("example") 693 unittest 694 { 695 import std; 696 globalLogLevel = LogLevel.info; 697 auto rnd = Random(142); 698 699 /// track execution 700 int counter; 701 SysTime last = Clock.currTime; 702 703 /// this is our Timer 704 class Timer 705 { 706 static ulong __id; 707 private ulong _id; 708 private string _name; 709 this(string name) 710 { 711 _id = __id++; 712 _name = name; 713 } 714 /// must provide id() method 715 ulong id() 716 { 717 return _id; 718 } 719 } 720 721 enum IOWakeUpInterval = 100; // to simulate random IO wakeups in interval 0 - 100.msecs 722 723 // each tick span 5 msecs - this is our link with time in reality 724 enum Tick = 5.msecs; 725 TimingWheels!Timer w; 726 727 auto durationToTicks(Duration d) 728 { 729 return d/Tick; 730 } 731 void process_timer(Timer t) 732 { 733 switch(t._name) 734 { 735 case "periodic": 736 writefln("@ %s - delta: %sms (should be 50ms)", t._name, (Clock.currTime - last).split!"msecs".msecs); 737 last = Clock.currTime; 738 counter++; 739 w.schedule(t, durationToTicks(50.msecs)); // rearm 740 break; 741 default: 742 writefln("@ %s", t._name); 743 break; 744 } 745 } 746 747 // 748 // start one arbitrary timer and one periodic timer 749 // 750 auto some_timer = new Timer("some"); 751 auto periodic_timer = new Timer("periodic"); 752 w.schedule(some_timer, durationToTicks(32.msecs)); 753 w.schedule(periodic_timer, durationToTicks(50.msecs)); 754 755 while(counter < 10) 756 { 757 auto randomIoInterval = uniform(0, IOWakeUpInterval, rnd).msecs; 758 auto nextTimerEvent = max(w.timeUntilNextEvent(Tick), 0.msecs); 759 // wait for what should happen earlier 760 auto time_to_sleep = min(randomIoInterval, nextTimerEvent); 761 writefln("* sleep until timer event or random I/O for %s", time_to_sleep); 762 Thread.sleep(time_to_sleep); 763 // if we waked up early by the IO event then timeUntilNextEvent will be positive 764 // otherwise it will be <= 0 and we have something to process. 765 while(w.timeUntilNextEvent(Tick) <= 0.msecs) 766 { 767 auto ticks = w.ticksUntilNextEvent(); 768 auto wr = w.advance(ticks); 769 foreach(t; wr.timers) 770 { 771 process_timer(t); 772 } 773 } 774 // some random processing time 775 Thread.sleep(uniform(0, 5, rnd).msecs); 776 } 777 }