flows.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package controllers
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "github.com/gorilla/mux"
  10. ch "github.com/gravitl/netmaker/clickhouse"
  11. "github.com/gravitl/netmaker/database"
  12. "github.com/gravitl/netmaker/logic"
  13. proLogic "github.com/gravitl/netmaker/pro/logic"
  14. )
  15. func FlowHandlers(r *mux.Router) {
  16. r.HandleFunc("/api/v1/flows", logic.SecurityCheck(true, http.HandlerFunc(handleListFlows))).Methods(http.MethodGet)
  17. }
  18. const (
  19. querySelect = `
  20. SELECT
  21. flow_id, host_id, host_name, network_id,
  22. protocol, src_port, dst_port,
  23. icmp_type, icmp_code, direction,
  24. src_ip, src_type, src_entity_id, src_entity_name,
  25. dst_ip, dst_type, dst_entity_id, dst_entity_name,
  26. start_ts, end_ts,
  27. bytes_sent, bytes_recv,
  28. packets_sent, packets_recv,
  29. status, version
  30. FROM flows`
  31. queryOrder = `
  32. ORDER BY version DESC
  33. LIMIT ? OFFSET ?`
  34. )
  35. // FlowRow represents a single flow log entry
  36. type FlowRow struct {
  37. FlowID string `ch:"flow_id" json:"flow_id"`
  38. HostID string `ch:"host_id" json:"host_id"`
  39. HostName string `ch:"host_name" json:"host_name"`
  40. NetworkID string `ch:"network_id" json:"network_id"`
  41. Protocol uint16 `ch:"protocol" json:"protocol"`
  42. SrcPort uint16 `ch:"src_port" json:"src_port"`
  43. DstPort uint16 `ch:"dst_port" json:"dst_port"`
  44. ICMPType uint8 `ch:"icmp_type" json:"icmp_type"`
  45. ICMPCode uint8 `ch:"icmp_code" json:"icmp_code"`
  46. Direction string `ch:"direction" json:"direction"`
  47. SrcIP string `ch:"src_ip" json:"src_ip"`
  48. SrcType string `ch:"src_type" json:"src_type"`
  49. SrcEntityID string `ch:"src_entity_id" json:"src_entity_id"`
  50. SrcEntityName string `ch:"src_entity_name" json:"src_entity_name"`
  51. DstIP string `ch:"dst_ip" json:"dst_ip"`
  52. DstType string `ch:"dst_type" json:"dst_type"`
  53. DstEntityID string `ch:"dst_entity_id" json:"dst_entity_id"`
  54. DstEntityName string `ch:"dst_entity_name" json:"dst_entity_name"`
  55. StartTs time.Time `ch:"start_ts" json:"start_ts"`
  56. EndTs time.Time `ch:"end_ts" json:"end_ts"`
  57. BytesSent uint64 `ch:"bytes_sent" json:"bytes_sent"`
  58. BytesRecv uint64 `ch:"bytes_recv" json:"bytes_recv"`
  59. PacketsSent uint64 `ch:"packets_sent" json:"packets_sent"`
  60. PacketsRecv uint64 `ch:"packets_recv" json:"packets_recv"`
  61. Status uint32 `ch:"status" json:"status"`
  62. Version time.Time `ch:"version" json:"version"`
  63. }
  64. // @Summary List flow logs
  65. // @Router /api/v1/flows [get]
  66. // @Tags Traffic Logs
  67. // @Security oauth
  68. // @Produce json
  69. // @Param network_id query string false "Filter by network ID"
  70. // @Param from query string false "Start time in RFC3339 format"
  71. // @Param to query string false "End time in RFC3339 format"
  72. // @Param src_type query string false "Source type filter"
  73. // @Param src_entity_id query string false "Source entity ID filter"
  74. // @Param dst_type query string false "Destination type filter"
  75. // @Param dst_entity_id query string false "Destination entity ID filter"
  76. // @Param protocol query string false "Protocol filter"
  77. // @Param node_id query string false "Node ID filter"
  78. // @Param username query string false "Username filter"
  79. // @Param page query int false "Page number"
  80. // @Param per_page query int false "Items per page (max 1000)"
  81. // @Success 200 {array} FlowRow
  82. // @Failure 400 {object} models.ErrorResponse
  83. // @Failure 500 {object} models.ErrorResponse
  84. func handleListFlows(w http.ResponseWriter, r *http.Request) {
  85. if !proLogic.GetFeatureFlags().EnableFlowLogs || !logic.GetServerSettings().EnableFlowLogs {
  86. logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("flow logs not enabled"), logic.Forbidden))
  87. return
  88. }
  89. q := r.URL.Query()
  90. // TODO: handle query filters better
  91. var (
  92. whereParts []string
  93. args []any
  94. )
  95. // 0. Network filter.
  96. networkID := q.Get("network_id")
  97. if networkID != "" {
  98. whereParts = append(whereParts, "network_id = ?")
  99. args = append(args, networkID)
  100. }
  101. // 1. Time filtering (version: UInt64 timestamp in ms)
  102. fromStr := q.Get("from")
  103. toStr := q.Get("to")
  104. if fromStr != "" {
  105. fromVal, err := time.Parse(time.RFC3339, fromStr)
  106. if err != nil {
  107. logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("invalid 'from' timestamp: %v", err), logic.BadReq))
  108. return
  109. }
  110. whereParts = append(whereParts, "version >= ?")
  111. args = append(args, fromVal)
  112. }
  113. if toStr != "" {
  114. toVal, err := time.Parse(time.RFC3339, toStr)
  115. if err != nil {
  116. logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("invalid 'to' timestamp: %v", err), logic.BadReq))
  117. return
  118. }
  119. whereParts = append(whereParts, "version <= ?")
  120. args = append(args, toVal)
  121. }
  122. // 2. Source filters
  123. srcTypeStr := q.Get("src_type")
  124. if srcTypeStr != "" {
  125. whereParts = append(whereParts, "src_type = ?")
  126. args = append(args, srcTypeStr)
  127. }
  128. srcEntity := q.Get("src_entity_id")
  129. if srcEntity != "" {
  130. whereParts = append(whereParts, "src_entity_id = ?")
  131. args = append(args, srcEntity)
  132. }
  133. // 3. Destination filters
  134. dstTypeStr := q.Get("dst_type")
  135. if dstTypeStr != "" {
  136. whereParts = append(whereParts, "dst_type = ?")
  137. args = append(args, dstTypeStr)
  138. }
  139. dstEntity := q.Get("dst_entity_id")
  140. if dstEntity != "" {
  141. whereParts = append(whereParts, "dst_entity_id = ?")
  142. args = append(args, dstEntity)
  143. }
  144. // 4. Protocol filter
  145. protoStr := q.Get("protocol")
  146. if protoStr != "" {
  147. whereParts = append(whereParts, "protocol = ?")
  148. args = append(args, protoStr)
  149. }
  150. // 5. Node filter
  151. nodeID := q.Get("node_id")
  152. if nodeID != "" {
  153. node, err := logic.GetNodeByID(nodeID)
  154. if err != nil {
  155. errType := logic.Internal
  156. if database.IsEmptyRecord(err) {
  157. errType = logic.BadReq
  158. }
  159. logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("error fetching node with id %s: %v", nodeID, err), errType))
  160. return
  161. }
  162. if networkID == "" {
  163. whereParts = append(whereParts, "network_id = ?")
  164. args = append(args, node.Network)
  165. } else {
  166. if networkID != node.Network {
  167. logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("node with id %s does not belong to network %s", nodeID, networkID), logic.BadReq))
  168. return
  169. }
  170. }
  171. whereParts = append(whereParts, "host_id = ?")
  172. args = append(args, node.HostID)
  173. }
  174. // 6. User filter
  175. username := q.Get("username")
  176. if username != "" {
  177. if srcTypeStr != "" || dstTypeStr != "" ||
  178. srcEntity != "" || dstEntity != "" {
  179. logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("cannot provide username filter along with src/dst type and id filters"), logic.BadReq))
  180. return
  181. }
  182. srcTypeStr = "user"
  183. srcEntity = username
  184. dstTypeStr = "user"
  185. dstEntity = username
  186. whereParts = append(whereParts, "((src_type = ? AND src_entity_id = ?) OR (dst_type = ? AND dst_entity_id = ?))")
  187. args = append(args, srcTypeStr, srcEntity, dstTypeStr, dstEntity)
  188. }
  189. // Pagination
  190. page := parseIntOrDefault(q.Get("page"), 1)
  191. perPage := parseIntOrDefault(q.Get("per_page"), 100)
  192. if perPage > 1000 {
  193. perPage = 1000
  194. }
  195. offset := (page - 1) * perPage
  196. whereSQL := ""
  197. if len(whereParts) > 0 {
  198. whereSQL = "WHERE " + strings.Join(whereParts, " AND ")
  199. }
  200. query := querySelect + "\n" + whereSQL + "\n" + queryOrder
  201. args = append(args, perPage, offset)
  202. conn, err := ch.FromContext(r.Context())
  203. if err != nil {
  204. logic.ReturnErrorResponse(w, r,
  205. logic.FormatError(fmt.Errorf("clickhouse connection not available: %v", err), logic.Internal))
  206. return
  207. }
  208. rows, err := conn.Query(r.Context(), query, args...)
  209. if err != nil {
  210. logic.ReturnErrorResponse(w, r,
  211. logic.FormatError(fmt.Errorf("error fetching flows: %v", err), logic.Internal))
  212. return
  213. }
  214. defer rows.Close()
  215. result := make([]FlowRow, 0, 1000)
  216. for rows.Next() {
  217. var fr FlowRow
  218. if err := rows.ScanStruct(&fr); err != nil {
  219. logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("error fetching flows: %v", err), logic.Internal))
  220. return
  221. }
  222. result = append(result, fr)
  223. }
  224. logic.ReturnSuccessResponseWithJson(w, r, result, "flows retrieved successfully")
  225. }
  226. func parseIntOrDefault(s string, def int) int {
  227. if s == "" {
  228. return def
  229. }
  230. v, err := strconv.Atoi(s)
  231. if err != nil || v <= 0 {
  232. return def
  233. }
  234. return v
  235. }