mutex_debug.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. //go:build mutex_debug
  2. // +build mutex_debug
  3. package nebula
  4. import (
  5. "fmt"
  6. "runtime"
  7. "sync"
  8. "github.com/clarkmcc/go-dag"
  9. "github.com/timandy/routine"
  10. )
  11. type mutexKey = string
  12. // For each Key in this map, the Value is a list of lock types you can already have
  13. // when you want to grab that Key. This ensures that locks are always fetched
  14. // in the same order, to prevent deadlocks.
  15. var allowedConcurrentLocks = map[mutexKey][]mutexKey{
  16. "connection-manager-in": {"hostmap"},
  17. "connection-manager-out": {"connection-manager-in", "handshake-hostinfo", "handshake-manager"},
  18. "connection-manager-relay-used": {"handshake-hostinfo"},
  19. "connection-manager-timer": {"connection-manager-out"},
  20. // "connection-state-write": {"hostmap"},
  21. "firewall-conntrack": {"handshake-hostinfo"},
  22. "handshake-manager": {"handshake-hostinfo", "hostmap"},
  23. "handshake-manager-timer": {"handshake-manager"},
  24. "hostmap": {"lighthouse-query-chan", "handshake-hostinfo"},
  25. "lighthouse": {"handshake-hostinfo"},
  26. "relay-state": {"connection-manager-relay-used", "hostmap"},
  27. "remote-list": {"lighthouse", "handshake-manager"},
  28. "lighthouse-query-chan": {"handshake-hostinfo"},
  29. }
  30. type mutexValue struct {
  31. file string
  32. line int
  33. }
  34. func (m mutexValue) String() string {
  35. return fmt.Sprintf("%s:%d", m.file, m.line)
  36. }
  37. var threadLocal = routine.NewThreadLocalWithInitial[map[mutexKey]mutexValue](func() map[mutexKey]mutexValue { return map[mutexKey]mutexValue{} })
  38. var allowedDAG dag.AcyclicGraph
  39. // We build a directed acyclic graph to assert that the locks can only be
  40. // acquired in a determined order, If there are cycles in the DAG, then we
  41. // know that the locking order is not guaranteed.
  42. func init() {
  43. for k, v := range allowedConcurrentLocks {
  44. allowedDAG.Add(k)
  45. for _, t := range v {
  46. allowedDAG.Add(t)
  47. }
  48. }
  49. for k, v := range allowedConcurrentLocks {
  50. for _, t := range v {
  51. allowedDAG.Connect(dag.BasicEdge(k, t))
  52. }
  53. }
  54. if cycles := allowedDAG.Cycles(); len(cycles) > 0 {
  55. panic(fmt.Errorf("Cycles found in allowedConcurrentLocks: %v", cycles))
  56. }
  57. // Rebuild allowedConcurrentLocks as a flattened list of all possibilities
  58. for k := range allowedConcurrentLocks {
  59. ancestors, err := allowedDAG.Ancestors(k)
  60. if err != nil {
  61. panic(err)
  62. }
  63. var allowed []mutexKey
  64. for t := range ancestors {
  65. allowed = append(allowed, t.(mutexKey))
  66. }
  67. allowedConcurrentLocks[k] = allowed
  68. }
  69. }
  70. type syncRWMutex struct {
  71. sync.RWMutex
  72. mutexKey
  73. }
  74. type syncMutex struct {
  75. sync.Mutex
  76. mutexKey
  77. }
  78. func newSyncRWMutex(key mutexKey) syncRWMutex {
  79. return syncRWMutex{
  80. mutexKey: key,
  81. }
  82. }
  83. func newSyncMutex(key mutexKey) syncMutex {
  84. return syncMutex{
  85. mutexKey: key,
  86. }
  87. }
  88. func alertMutex(err error) {
  89. panic(err)
  90. // NOTE: you could switch to this log Line and remove the panic if you want
  91. // to log all failures instead of panicking on the first one
  92. //log.Print(err, string(debug.Stack()))
  93. }
  94. func checkMutex(state map[mutexKey]mutexValue, add mutexKey) {
  95. if add == "" {
  96. alertMutex(fmt.Errorf("mutex not initialized with mutexKey"))
  97. }
  98. allowedConcurrent := allowedConcurrentLocks[add]
  99. for k, v := range state {
  100. if add == k {
  101. alertMutex(fmt.Errorf("re-entrant lock: %s. previous allocation: %s", add, v))
  102. }
  103. // TODO use slices.Contains, but requires go1.21
  104. var found bool
  105. for _, a := range allowedConcurrent {
  106. if a == k {
  107. found = true
  108. break
  109. }
  110. }
  111. if !found {
  112. alertMutex(fmt.Errorf("grabbing %s lock and already have these locks: %s", add, state))
  113. }
  114. }
  115. }
  116. func chanDebugRecv(key mutexKey) {
  117. m := threadLocal.Get()
  118. checkMutex(m, key)
  119. v := mutexValue{}
  120. _, v.file, v.line, _ = runtime.Caller(1)
  121. m[key] = v
  122. }
  123. func chanDebugSend(key mutexKey) {
  124. m := threadLocal.Get()
  125. checkMutex(m, key)
  126. }
  127. func (s *syncRWMutex) Lock() {
  128. m := threadLocal.Get()
  129. checkMutex(m, s.mutexKey)
  130. v := mutexValue{}
  131. _, v.file, v.line, _ = runtime.Caller(1)
  132. m[s.mutexKey] = v
  133. s.RWMutex.Lock()
  134. }
  135. func (s *syncRWMutex) Unlock() {
  136. m := threadLocal.Get()
  137. delete(m, s.mutexKey)
  138. s.RWMutex.Unlock()
  139. }
  140. func (s *syncRWMutex) RLock() {
  141. m := threadLocal.Get()
  142. checkMutex(m, s.mutexKey)
  143. v := mutexValue{}
  144. _, v.file, v.line, _ = runtime.Caller(1)
  145. m[s.mutexKey] = v
  146. s.RWMutex.RLock()
  147. }
  148. func (s *syncRWMutex) RUnlock() {
  149. m := threadLocal.Get()
  150. delete(m, s.mutexKey)
  151. s.RWMutex.RUnlock()
  152. }
  153. func (s *syncMutex) Lock() {
  154. m := threadLocal.Get()
  155. checkMutex(m, s.mutexKey)
  156. v := mutexValue{}
  157. _, v.file, v.line, _ = runtime.Caller(1)
  158. m[s.mutexKey] = v
  159. s.Mutex.Lock()
  160. }
  161. func (s *syncMutex) Unlock() {
  162. m := threadLocal.Get()
  163. delete(m, s.mutexKey)
  164. s.Mutex.Unlock()
  165. }