timeout.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package nebula
  2. import (
  3. "time"
  4. "github.com/slackhq/nebula/firewall"
  5. )
  6. // How many timer objects should be cached
  7. const timerCacheMax = 50000
  8. var emptyFWPacket = firewall.Packet{}
  9. type TimerWheel struct {
  10. // Current tick
  11. current int
  12. // Cheat on finding the length of the wheel
  13. wheelLen int
  14. // Last time we ticked, since we are lazy ticking
  15. lastTick *time.Time
  16. // Durations of a tick and the entire wheel
  17. tickDuration time.Duration
  18. wheelDuration time.Duration
  19. // The actual wheel which is just a set of singly linked lists, head/tail pointers
  20. wheel []*TimeoutList
  21. // Singly linked list of items that have timed out of the wheel
  22. expired *TimeoutList
  23. // Item cache to avoid garbage collect
  24. itemCache *TimeoutItem
  25. itemsCached int
  26. }
  27. // Represents a tick in the wheel
  28. type TimeoutList struct {
  29. Head *TimeoutItem
  30. Tail *TimeoutItem
  31. }
  32. // Represents an item within a tick
  33. type TimeoutItem struct {
  34. Packet firewall.Packet
  35. Next *TimeoutItem
  36. }
  37. // Builds a timer wheel and identifies the tick duration and wheel duration from the provided values
  38. // Purge must be called once per entry to actually remove anything
  39. func NewTimerWheel(min, max time.Duration) *TimerWheel {
  40. //TODO provide an error
  41. //if min >= max {
  42. // return nil
  43. //}
  44. // Round down and add 1 so we can have the smallest # of ticks in the wheel and still account for a full
  45. // max duration
  46. wLen := int((max / min) + 1)
  47. tw := TimerWheel{
  48. wheelLen: wLen,
  49. wheel: make([]*TimeoutList, wLen),
  50. tickDuration: min,
  51. wheelDuration: max,
  52. expired: &TimeoutList{},
  53. }
  54. for i := range tw.wheel {
  55. tw.wheel[i] = &TimeoutList{}
  56. }
  57. return &tw
  58. }
  59. // Add will add a firewall.Packet to the wheel in it's proper timeout
  60. func (tw *TimerWheel) Add(v firewall.Packet, timeout time.Duration) *TimeoutItem {
  61. // Check and see if we should progress the tick
  62. tw.advance(time.Now())
  63. i := tw.findWheel(timeout)
  64. // Try to fetch off the cache
  65. ti := tw.itemCache
  66. if ti != nil {
  67. tw.itemCache = ti.Next
  68. tw.itemsCached--
  69. ti.Next = nil
  70. } else {
  71. ti = &TimeoutItem{}
  72. }
  73. // Relink and return
  74. ti.Packet = v
  75. if tw.wheel[i].Tail == nil {
  76. tw.wheel[i].Head = ti
  77. tw.wheel[i].Tail = ti
  78. } else {
  79. tw.wheel[i].Tail.Next = ti
  80. tw.wheel[i].Tail = ti
  81. }
  82. return ti
  83. }
  84. func (tw *TimerWheel) Purge() (firewall.Packet, bool) {
  85. if tw.expired.Head == nil {
  86. return emptyFWPacket, false
  87. }
  88. ti := tw.expired.Head
  89. tw.expired.Head = ti.Next
  90. if tw.expired.Head == nil {
  91. tw.expired.Tail = nil
  92. }
  93. // Clear out the items references
  94. ti.Next = nil
  95. // Maybe cache it for later
  96. if tw.itemsCached < timerCacheMax {
  97. ti.Next = tw.itemCache
  98. tw.itemCache = ti
  99. tw.itemsCached++
  100. }
  101. return ti.Packet, true
  102. }
  103. // advance will move the wheel forward by proper number of ticks. The caller _should_ lock the wheel before calling this
  104. func (tw *TimerWheel) findWheel(timeout time.Duration) (i int) {
  105. if timeout < tw.tickDuration {
  106. // Can't track anything below the set resolution
  107. timeout = tw.tickDuration
  108. } else if timeout > tw.wheelDuration {
  109. // We aren't handling timeouts greater than the wheels duration
  110. timeout = tw.wheelDuration
  111. }
  112. // Find the next highest, rounding up
  113. tick := int(((timeout - 1) / tw.tickDuration) + 1)
  114. // Add another tick since the current tick may almost be over then map it to the wheel from our
  115. // current position
  116. tick += tw.current + 1
  117. if tick >= tw.wheelLen {
  118. tick -= tw.wheelLen
  119. }
  120. return tick
  121. }
  122. // advance will lock and move the wheel forward by proper number of ticks.
  123. func (tw *TimerWheel) advance(now time.Time) {
  124. if tw.lastTick == nil {
  125. tw.lastTick = &now
  126. }
  127. // We want to round down
  128. ticks := int(now.Sub(*tw.lastTick) / tw.tickDuration)
  129. adv := ticks
  130. if ticks > tw.wheelLen {
  131. ticks = tw.wheelLen
  132. }
  133. for i := 0; i < ticks; i++ {
  134. tw.current++
  135. if tw.current >= tw.wheelLen {
  136. tw.current = 0
  137. }
  138. if tw.wheel[tw.current].Head != nil {
  139. // We need to append the expired items as to not starve evicting the oldest ones
  140. if tw.expired.Tail == nil {
  141. tw.expired.Head = tw.wheel[tw.current].Head
  142. tw.expired.Tail = tw.wheel[tw.current].Tail
  143. } else {
  144. tw.expired.Tail.Next = tw.wheel[tw.current].Head
  145. tw.expired.Tail = tw.wheel[tw.current].Tail
  146. }
  147. tw.wheel[tw.current].Head = nil
  148. tw.wheel[tw.current].Tail = nil
  149. }
  150. }
  151. // Advance the tick based on duration to avoid losing some accuracy
  152. newTick := tw.lastTick.Add(tw.tickDuration * time.Duration(adv))
  153. tw.lastTick = &newTick
  154. }