| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 | package nebulaimport (	"sync"	"time")// How many timer objects should be cachedconst timerCacheMax = 50000type TimerWheel[T any] struct {	// Current tick	current int	// Cheat on finding the length of the wheel	wheelLen int	// Last time we ticked, since we are lazy ticking	lastTick *time.Time	// Durations of a tick and the entire wheel	tickDuration  time.Duration	wheelDuration time.Duration	// The actual wheel which is just a set of singly linked lists, head/tail pointers	wheel []*TimeoutList[T]	// Singly linked list of items that have timed out of the wheel	expired *TimeoutList[T]	// Item cache to avoid garbage collect	itemCache   *TimeoutItem[T]	itemsCached int}type LockingTimerWheel[T any] struct {	m sync.Mutex	t *TimerWheel[T]}// TimeoutList Represents a tick in the wheeltype TimeoutList[T any] struct {	Head *TimeoutItem[T]	Tail *TimeoutItem[T]}// TimeoutItem Represents an item within a ticktype TimeoutItem[T any] struct {	Item T	Next *TimeoutItem[T]}// NewTimerWheel Builds a timer wheel and identifies the tick duration and wheel duration from the provided values// Purge must be called once per entry to actually remove anything// The TimerWheel does not handle concurrency on its own.// Locks around access to it must be used if multiple routines are manipulating it.func NewTimerWheel[T any](min, max time.Duration) *TimerWheel[T] {	//TODO provide an error	//if min >= max {	//	return nil	//}	// Round down and add 2 so we can have the smallest # of ticks in the wheel and still account for a full	// max duration, even if our current tick is at the maximum position and the next item to be added is at maximum	// timeout	wLen := int((max / min) + 2)	tw := TimerWheel[T]{		wheelLen:      wLen,		wheel:         make([]*TimeoutList[T], wLen),		tickDuration:  min,		wheelDuration: max,		expired:       &TimeoutList[T]{},	}	for i := range tw.wheel {		tw.wheel[i] = &TimeoutList[T]{}	}	return &tw}// NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penaltyfunc NewLockingTimerWheel[T any](min, max time.Duration) *LockingTimerWheel[T] {	return &LockingTimerWheel[T]{		t: NewTimerWheel[T](min, max),	}}// Add will add an item to the wheel in its proper timeout.// Caller should Advance the wheel prior to ensure the proper slot is used.func (tw *TimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {	i := tw.findWheel(timeout)	// Try to fetch off the cache	ti := tw.itemCache	if ti != nil {		tw.itemCache = ti.Next		tw.itemsCached--		ti.Next = nil	} else {		ti = &TimeoutItem[T]{}	}	// Relink and return	ti.Item = v	if tw.wheel[i].Tail == nil {		tw.wheel[i].Head = ti		tw.wheel[i].Tail = ti	} else {		tw.wheel[i].Tail.Next = ti		tw.wheel[i].Tail = ti	}	return ti}// Purge removes and returns the first available expired item from the wheel and the 2nd argument is true.// If no item is available then an empty T is returned and the 2nd argument is false.func (tw *TimerWheel[T]) Purge() (T, bool) {	if tw.expired.Head == nil {		var na T		return na, false	}	ti := tw.expired.Head	tw.expired.Head = ti.Next	if tw.expired.Head == nil {		tw.expired.Tail = nil	}	// Clear out the items references	ti.Next = nil	// Maybe cache it for later	if tw.itemsCached < timerCacheMax {		ti.Next = tw.itemCache		tw.itemCache = ti		tw.itemsCached++	}	return ti.Item, true}// findWheel find the next position in the wheel for the provided timeout given the current tickfunc (tw *TimerWheel[T]) findWheel(timeout time.Duration) (i int) {	if timeout < tw.tickDuration {		// Can't track anything below the set resolution		timeout = tw.tickDuration	} else if timeout > tw.wheelDuration {		// We aren't handling timeouts greater than the wheels duration		timeout = tw.wheelDuration	}	// Find the next highest, rounding up	tick := int(((timeout - 1) / tw.tickDuration) + 1)	// Add another tick since the current tick may almost be over then map it to the wheel from our	// current position	tick += tw.current + 1	if tick >= tw.wheelLen {		tick -= tw.wheelLen	}	return tick}// Advance will move the wheel forward by the appropriate number of ticks for the provided time and all items// passed over will be moved to the expired list. Calling Purge is necessary to remove them entirely.func (tw *TimerWheel[T]) Advance(now time.Time) {	if tw.lastTick == nil {		tw.lastTick = &now	}	// We want to round down	ticks := int(now.Sub(*tw.lastTick) / tw.tickDuration)	adv := ticks	if ticks > tw.wheelLen {		ticks = tw.wheelLen	}	for i := 0; i < ticks; i++ {		tw.current++		if tw.current >= tw.wheelLen {			tw.current = 0		}		if tw.wheel[tw.current].Head != nil {			// We need to append the expired items as to not starve evicting the oldest ones			if tw.expired.Tail == nil {				tw.expired.Head = tw.wheel[tw.current].Head				tw.expired.Tail = tw.wheel[tw.current].Tail			} else {				tw.expired.Tail.Next = tw.wheel[tw.current].Head				tw.expired.Tail = tw.wheel[tw.current].Tail			}			tw.wheel[tw.current].Head = nil			tw.wheel[tw.current].Tail = nil		}	}	// Advance the tick based on duration to avoid losing some accuracy	newTick := tw.lastTick.Add(tw.tickDuration * time.Duration(adv))	tw.lastTick = &newTick}func (lw *LockingTimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {	lw.m.Lock()	defer lw.m.Unlock()	return lw.t.Add(v, timeout)}func (lw *LockingTimerWheel[T]) Purge() (T, bool) {	lw.m.Lock()	defer lw.m.Unlock()	return lw.t.Purge()}func (lw *LockingTimerWheel[T]) Advance(now time.Time) {	lw.m.Lock()	defer lw.m.Unlock()	lw.t.Advance(now)}
 |