mutex_debug.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. //go:build mutex_debug
  2. // +build mutex_debug
  3. package nebula
  4. import (
  5. "fmt"
  6. "runtime"
  7. "sync"
  8. "github.com/clarkmcc/go-dag"
  9. "github.com/timandy/routine"
  10. )
  11. type mutexKey = string
  12. // For each Key in this map, the Value is a list of lock types you can already have
  13. // when you want to grab that Key. This ensures that locks are always fetched
  14. // in the same order, to prevent deadlocks.
  15. var allowedConcurrentLocks = map[mutexKey][]mutexKey{
  16. "connection-manager-in": {"hostmap"},
  17. "connection-manager-out": {"connection-state-write", "connection-manager-in"},
  18. "connection-manager-relay-used": {"handshake-hostinfo"},
  19. "connection-manager-timer": {"connection-manager-out"},
  20. "connection-state-write": {"hostmap"},
  21. "firewall-conntrack": {"handshake-hostinfo"},
  22. "handshake-manager": {"hostmap"},
  23. "handshake-manager-timer": {"handshake-manager"},
  24. "hostmap": {"handshake-hostinfo"},
  25. "lighthouse": {"handshake-manager"},
  26. "relay-state": {"hostmap", "connection-manager-relay-used"},
  27. "remote-list": {"lighthouse"},
  28. }
  29. type mutexValue struct {
  30. file string
  31. line int
  32. }
  33. func (m mutexValue) String() string {
  34. return fmt.Sprintf("%s:%d", m.file, m.line)
  35. }
  36. var threadLocal routine.ThreadLocal = routine.NewThreadLocalWithInitial(func() any { return map[mutexKey]mutexValue{} })
  37. var allowedDAG dag.AcyclicGraph
  38. // We build a directed acyclic graph to assert that the locks can only be
  39. // acquired in a determined order, If there are cycles in the DAG, then we
  40. // know that the locking order is not guaranteed.
  41. func init() {
  42. for k, v := range allowedConcurrentLocks {
  43. allowedDAG.Add(k)
  44. for _, t := range v {
  45. allowedDAG.Add(t)
  46. }
  47. }
  48. for k, v := range allowedConcurrentLocks {
  49. for _, t := range v {
  50. allowedDAG.Connect(dag.BasicEdge(k, t))
  51. }
  52. }
  53. if cycles := allowedDAG.Cycles(); len(cycles) > 0 {
  54. panic(fmt.Errorf("Cycles found in allowedConcurrentLocks: %v", cycles))
  55. }
  56. // Rebuild allowedConcurrentLocks as a flattened list of all possibilities
  57. for k := range allowedConcurrentLocks {
  58. ancestors, err := allowedDAG.Ancestors(k)
  59. if err != nil {
  60. panic(err)
  61. }
  62. var allowed []mutexKey
  63. for t := range ancestors {
  64. allowed = append(allowed, t.(mutexKey))
  65. }
  66. allowedConcurrentLocks[k] = allowed
  67. }
  68. }
  69. type syncRWMutex struct {
  70. sync.RWMutex
  71. mutexKey
  72. }
  73. type syncMutex struct {
  74. sync.Mutex
  75. mutexKey
  76. }
  77. func newSyncRWMutex(key mutexKey) syncRWMutex {
  78. return syncRWMutex{
  79. mutexKey: key,
  80. }
  81. }
  82. func newSyncMutex(key mutexKey) syncMutex {
  83. return syncMutex{
  84. mutexKey: key,
  85. }
  86. }
  87. func alertMutex(err error) {
  88. panic(err)
  89. // NOTE: you could switch to this log Line and remove the panic if you want
  90. // to log all failures instead of panicking on the first one
  91. //log.Print(err, string(debug.Stack()))
  92. }
  93. func checkMutex(state map[mutexKey]mutexValue, add mutexKey) {
  94. if add == "" {
  95. alertMutex(fmt.Errorf("mutex not initialized with mutexKey"))
  96. }
  97. allowedConcurrent := allowedConcurrentLocks[add]
  98. for k, v := range state {
  99. if add == k {
  100. alertMutex(fmt.Errorf("re-entrant lock: %s. previous allocation: %s", add, v))
  101. }
  102. // TODO use slices.Contains, but requires go1.21
  103. var found bool
  104. for _, a := range allowedConcurrent {
  105. if a == k {
  106. found = true
  107. break
  108. }
  109. }
  110. if !found {
  111. alertMutex(fmt.Errorf("grabbing %s lock and already have these locks: %s", add, state))
  112. }
  113. }
  114. }
  115. func (s *syncRWMutex) Lock() {
  116. m := threadLocal.Get().(map[mutexKey]mutexValue)
  117. checkMutex(m, s.mutexKey)
  118. v := mutexValue{}
  119. _, v.file, v.line, _ = runtime.Caller(1)
  120. m[s.mutexKey] = v
  121. s.RWMutex.Lock()
  122. }
  123. func (s *syncRWMutex) Unlock() {
  124. m := threadLocal.Get().(map[mutexKey]mutexValue)
  125. delete(m, s.mutexKey)
  126. s.RWMutex.Unlock()
  127. }
  128. func (s *syncRWMutex) RLock() {
  129. m := threadLocal.Get().(map[mutexKey]mutexValue)
  130. checkMutex(m, s.mutexKey)
  131. v := mutexValue{}
  132. _, v.file, v.line, _ = runtime.Caller(1)
  133. m[s.mutexKey] = v
  134. s.RWMutex.RLock()
  135. }
  136. func (s *syncRWMutex) RUnlock() {
  137. m := threadLocal.Get().(map[mutexKey]mutexValue)
  138. delete(m, s.mutexKey)
  139. s.RWMutex.RUnlock()
  140. }
  141. func (s *syncMutex) Lock() {
  142. m := threadLocal.Get().(map[mutexKey]mutexValue)
  143. checkMutex(m, s.mutexKey)
  144. v := mutexValue{}
  145. _, v.file, v.line, _ = runtime.Caller(1)
  146. m[s.mutexKey] = v
  147. s.Mutex.Lock()
  148. }
  149. func (s *syncMutex) Unlock() {
  150. m := threadLocal.Get().(map[mutexKey]mutexValue)
  151. delete(m, s.mutexKey)
  152. s.Mutex.Unlock()
  153. }