clickhouse.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package clickhouse
  2. import (
  3. "context"
  4. _ "embed"
  5. "errors"
  6. "fmt"
  7. "net/http"
  8. "time"
  9. "github.com/ClickHouse/clickhouse-go/v2"
  10. "github.com/gravitl/netmaker/servercfg"
  11. )
  12. type ctxKey string
  13. const clickhouseCtxKey ctxKey = "clickhouse"
  14. var ch clickhouse.Conn
  15. var ErrConnNotFound = errors.New("no connection in context")
  16. //go:embed initdb.d/02_create_flows_table.sql
  17. var createFlowsTableScript string
  18. func Initialize() error {
  19. if ch != nil {
  20. return nil
  21. }
  22. config := servercfg.GetClickHouseConfig()
  23. chConn, err := clickhouse.Open(&clickhouse.Options{
  24. Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)},
  25. Auth: clickhouse.Auth{
  26. Database: config.Database,
  27. Username: config.Username,
  28. Password: config.Password,
  29. },
  30. })
  31. if err != nil {
  32. return err
  33. }
  34. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  35. defer cancel()
  36. err = chConn.Exec(ctx, createFlowsTableScript)
  37. if err != nil {
  38. return err
  39. }
  40. ch = chConn
  41. return nil
  42. }
  43. // WithContext returns a new context with the clickhouse
  44. // connection instance.
  45. //
  46. // Ensure Initialize has been called before using
  47. // this function.
  48. //
  49. // To extract the clickhouse connection use the FromContext
  50. // function.
  51. func WithContext(ctx context.Context) context.Context {
  52. return context.WithValue(ctx, clickhouseCtxKey, ch)
  53. }
  54. // Middleware to auto-inject the clickhouse connection instance
  55. // in a request's context.
  56. //
  57. // Ensure Initialize has been called before using this
  58. // middleware.
  59. func Middleware(next http.Handler) http.Handler {
  60. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  61. next.ServeHTTP(w, r.WithContext(WithContext(r.Context())))
  62. })
  63. }
  64. // FromContext extracts the clickhouse connection instance from
  65. // the given context.
  66. //
  67. // Returns the connection and an error if one does not exist.
  68. func FromContext(ctx context.Context) (clickhouse.Conn, error) {
  69. ch, ok := ctx.Value(clickhouseCtxKey).(clickhouse.Conn)
  70. if !ok {
  71. return nil, ErrConnNotFound
  72. }
  73. return ch, nil
  74. }
  75. // Close closes a connection to the clickhouse database
  76. // (if one exists). It panics if any error
  77. // occurs.
  78. func Close() {
  79. if ch == nil {
  80. return
  81. }
  82. err := ch.Close()
  83. if err != nil {
  84. panic(err)
  85. }
  86. ch = nil
  87. }