| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- package controllers
- import (
- "errors"
- "fmt"
- "net/http"
- "strconv"
- "strings"
- "time"
- "github.com/gorilla/mux"
- ch "github.com/gravitl/netmaker/clickhouse"
- "github.com/gravitl/netmaker/database"
- "github.com/gravitl/netmaker/logic"
- proLogic "github.com/gravitl/netmaker/pro/logic"
- )
- func FlowHandlers(r *mux.Router) {
- r.HandleFunc("/api/v1/flows", logic.SecurityCheck(true, http.HandlerFunc(handleListFlows))).Methods(http.MethodGet)
- }
- const (
- querySelect = `
- SELECT
- flow_id, host_id, host_name, network_id,
- protocol, src_port, dst_port,
- icmp_type, icmp_code, direction,
- src_ip, src_type, src_entity_id, src_entity_name,
- dst_ip, dst_type, dst_entity_id, dst_entity_name,
- start_ts, end_ts,
- bytes_sent, bytes_recv,
- packets_sent, packets_recv,
- status, version
- FROM flows`
- queryOrder = `
- ORDER BY version DESC
- LIMIT ? OFFSET ?`
- )
- // FlowRow represents a single flow log entry
- type FlowRow struct {
- FlowID string `ch:"flow_id" json:"flow_id"`
- HostID string `ch:"host_id" json:"host_id"`
- HostName string `ch:"host_name" json:"host_name"`
- NetworkID string `ch:"network_id" json:"network_id"`
- Protocol uint16 `ch:"protocol" json:"protocol"`
- SrcPort uint16 `ch:"src_port" json:"src_port"`
- DstPort uint16 `ch:"dst_port" json:"dst_port"`
- ICMPType uint8 `ch:"icmp_type" json:"icmp_type"`
- ICMPCode uint8 `ch:"icmp_code" json:"icmp_code"`
- Direction string `ch:"direction" json:"direction"`
- SrcIP string `ch:"src_ip" json:"src_ip"`
- SrcType string `ch:"src_type" json:"src_type"`
- SrcEntityID string `ch:"src_entity_id" json:"src_entity_id"`
- SrcEntityName string `ch:"src_entity_name" json:"src_entity_name"`
- DstIP string `ch:"dst_ip" json:"dst_ip"`
- DstType string `ch:"dst_type" json:"dst_type"`
- DstEntityID string `ch:"dst_entity_id" json:"dst_entity_id"`
- DstEntityName string `ch:"dst_entity_name" json:"dst_entity_name"`
- StartTs time.Time `ch:"start_ts" json:"start_ts"`
- EndTs time.Time `ch:"end_ts" json:"end_ts"`
- BytesSent uint64 `ch:"bytes_sent" json:"bytes_sent"`
- BytesRecv uint64 `ch:"bytes_recv" json:"bytes_recv"`
- PacketsSent uint64 `ch:"packets_sent" json:"packets_sent"`
- PacketsRecv uint64 `ch:"packets_recv" json:"packets_recv"`
- Status uint32 `ch:"status" json:"status"`
- Version time.Time `ch:"version" json:"version"`
- }
- // @Summary List flow logs
- // @Router /api/v1/flows [get]
- // @Tags Traffic Logs
- // @Security oauth
- // @Produce json
- // @Param network_id query string false "Filter by network ID"
- // @Param from query string false "Start time in RFC3339 format"
- // @Param to query string false "End time in RFC3339 format"
- // @Param src_type query string false "Source type filter"
- // @Param src_entity_id query string false "Source entity ID filter"
- // @Param dst_type query string false "Destination type filter"
- // @Param dst_entity_id query string false "Destination entity ID filter"
- // @Param protocol query string false "Protocol filter"
- // @Param node_id query string false "Node ID filter"
- // @Param username query string false "Username filter"
- // @Param page query int false "Page number"
- // @Param per_page query int false "Items per page (max 1000)"
- // @Success 200 {array} FlowRow
- // @Failure 400 {object} models.ErrorResponse
- // @Failure 500 {object} models.ErrorResponse
- func handleListFlows(w http.ResponseWriter, r *http.Request) {
- if !proLogic.GetFeatureFlags().EnableFlowLogs || !logic.GetServerSettings().EnableFlowLogs {
- logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("flow logs not enabled"), logic.Forbidden))
- return
- }
- q := r.URL.Query()
- // TODO: handle query filters better
- var (
- whereParts []string
- args []any
- )
- // 0. Network filter.
- networkID := q.Get("network_id")
- if networkID != "" {
- whereParts = append(whereParts, "network_id = ?")
- args = append(args, networkID)
- }
- // 1. Time filtering (version: UInt64 timestamp in ms)
- fromStr := q.Get("from")
- toStr := q.Get("to")
- if fromStr != "" {
- fromVal, err := time.Parse(time.RFC3339, fromStr)
- if err != nil {
- logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("invalid 'from' timestamp: %v", err), logic.BadReq))
- return
- }
- whereParts = append(whereParts, "version >= ?")
- args = append(args, fromVal)
- }
- if toStr != "" {
- toVal, err := time.Parse(time.RFC3339, toStr)
- if err != nil {
- logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("invalid 'to' timestamp: %v", err), logic.BadReq))
- return
- }
- whereParts = append(whereParts, "version <= ?")
- args = append(args, toVal)
- }
- // 2. Source filters
- srcTypeStr := q.Get("src_type")
- if srcTypeStr != "" {
- whereParts = append(whereParts, "src_type = ?")
- args = append(args, srcTypeStr)
- }
- srcEntity := q.Get("src_entity_id")
- if srcEntity != "" {
- whereParts = append(whereParts, "src_entity_id = ?")
- args = append(args, srcEntity)
- }
- // 3. Destination filters
- dstTypeStr := q.Get("dst_type")
- if dstTypeStr != "" {
- whereParts = append(whereParts, "dst_type = ?")
- args = append(args, dstTypeStr)
- }
- dstEntity := q.Get("dst_entity_id")
- if dstEntity != "" {
- whereParts = append(whereParts, "dst_entity_id = ?")
- args = append(args, dstEntity)
- }
- // 4. Protocol filter
- protoStr := q.Get("protocol")
- if protoStr != "" {
- whereParts = append(whereParts, "protocol = ?")
- args = append(args, protoStr)
- }
- // 5. Node filter
- nodeID := q.Get("node_id")
- if nodeID != "" {
- node, err := logic.GetNodeByID(nodeID)
- if err != nil {
- errType := logic.Internal
- if database.IsEmptyRecord(err) {
- errType = logic.BadReq
- }
- logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("error fetching node with id %s: %v", nodeID, err), errType))
- return
- }
- if networkID == "" {
- whereParts = append(whereParts, "network_id = ?")
- args = append(args, node.Network)
- } else {
- if networkID != node.Network {
- logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("node with id %s does not belong to network %s", nodeID, networkID), logic.BadReq))
- return
- }
- }
- whereParts = append(whereParts, "host_id = ?")
- args = append(args, node.HostID)
- }
- // 6. User filter
- username := q.Get("username")
- if username != "" {
- if srcTypeStr != "" || dstTypeStr != "" ||
- srcEntity != "" || dstEntity != "" {
- logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("cannot provide username filter along with src/dst type and id filters"), logic.BadReq))
- return
- }
- srcTypeStr = "user"
- srcEntity = username
- dstTypeStr = "user"
- dstEntity = username
- whereParts = append(whereParts, "((src_type = ? AND src_entity_id = ?) OR (dst_type = ? AND dst_entity_id = ?))")
- args = append(args, srcTypeStr, srcEntity, dstTypeStr, dstEntity)
- }
- // Pagination
- page := parseIntOrDefault(q.Get("page"), 1)
- perPage := parseIntOrDefault(q.Get("per_page"), 100)
- if perPage > 1000 {
- perPage = 1000
- }
- offset := (page - 1) * perPage
- whereSQL := ""
- if len(whereParts) > 0 {
- whereSQL = "WHERE " + strings.Join(whereParts, " AND ")
- }
- query := querySelect + "\n" + whereSQL + "\n" + queryOrder
- args = append(args, perPage, offset)
- conn, err := ch.FromContext(r.Context())
- if err != nil {
- logic.ReturnErrorResponse(w, r,
- logic.FormatError(fmt.Errorf("clickhouse connection not available: %v", err), logic.Internal))
- return
- }
- rows, err := conn.Query(r.Context(), query, args...)
- if err != nil {
- logic.ReturnErrorResponse(w, r,
- logic.FormatError(fmt.Errorf("error fetching flows: %v", err), logic.Internal))
- return
- }
- defer rows.Close()
- result := make([]FlowRow, 0, 1000)
- for rows.Next() {
- var fr FlowRow
- if err := rows.ScanStruct(&fr); err != nil {
- logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("error fetching flows: %v", err), logic.Internal))
- return
- }
- result = append(result, fr)
- }
- logic.ReturnSuccessResponseWithJson(w, r, result, "flows retrieved successfully")
- }
- func parseIntOrDefault(s string, def int) int {
- if s == "" {
- return def
- }
- v, err := strconv.Atoi(s)
- if err != nil || v <= 0 {
- return def
- }
- return v
- }
|