flows.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package logic
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "time"
  7. ch "github.com/gravitl/netmaker/clickhouse"
  8. "github.com/gravitl/netmaker/logic"
  9. "github.com/gravitl/netmaker/models"
  10. )
  11. const (
  12. flowsCleanupHookID = "flows-cleanup-hook"
  13. flowsCleanupHookInterval = 24 * time.Hour
  14. )
  15. func StartFlowCleanupLoop() {
  16. logic.HookManagerCh <- models.HookDetails{
  17. ID: flowsCleanupHookID,
  18. Hook: logic.WrapHook(CleanupFlows),
  19. Interval: flowsCleanupHookInterval,
  20. }
  21. }
  22. func StopFlowCleanupLoop() {
  23. logic.StopHook(flowsCleanupHookID)
  24. }
  25. func CleanupFlows() error {
  26. ctx := ch.WithContext(context.TODO())
  27. conn, err := ch.FromContext(ctx)
  28. if err != nil {
  29. return fmt.Errorf("clickhouse connection not available: %w", err)
  30. }
  31. rows, err := conn.Query(ctx, `
  32. SELECT DISTINCT parts.partition
  33. FROM system.parts
  34. WHERE parts.database = 'netmaker' AND parts.table = 'flows'
  35. ORDER BY parts.partition ASC
  36. `)
  37. if err != nil {
  38. return err
  39. }
  40. defer rows.Close()
  41. cutoff := time.Now().AddDate(0, 0, -1*logic.GetServerSettings().AuditLogsRetentionPeriodInDays)
  42. var cleanErr error
  43. for rows.Next() {
  44. var partitionID string
  45. err = rows.Scan(&partitionID)
  46. if err != nil {
  47. cleanErr = errors.Join(cleanErr, err)
  48. continue
  49. }
  50. partition, err := time.Parse("20060102", partitionID)
  51. if err != nil {
  52. cleanErr = errors.Join(cleanErr, err)
  53. continue
  54. }
  55. if partition.Before(cutoff) {
  56. err = conn.Exec(ctx, fmt.Sprintf("ALTER TABLE netmaker.flows DROP partition %s", partitionID))
  57. if err != nil {
  58. cleanErr = errors.Join(cleanErr, err)
  59. continue
  60. }
  61. }
  62. }
  63. return cleanErr
  64. }