avro.go 4.8 KB


  1. package querylog
  2. import (
  3. "context"
  4. _ "embed"
  5. "fmt"
  6. "log"
  7. "os"
  8. "path"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "golang.org/x/exp/slices"
  14. "github.com/hamba/avro/v2"
  15. "github.com/hamba/avro/v2/ocf"
  16. )
  17. //go:embed querylog.avsc
  18. var schemaJson string
  19. type AvroLogger struct {
  20. path string
  21. maxsize int
  22. maxtime time.Duration
  23. schema avro.Schema
  24. ctx context.Context
  25. cancel context.CancelCauseFunc
  26. wg sync.WaitGroup
  27. ch chan *Entry
  28. }
  29. func NewAvroLogger(path string, maxsize int, maxtime time.Duration) (*AvroLogger, error) {
  30. schema, err := AvroSchema()
  31. if err != nil {
  32. return nil, err
  33. }
  34. ctx, cancel := context.WithCancelCause(context.Background())
  35. l := &AvroLogger{
  36. ctx: ctx,
  37. cancel: cancel,
  38. path: path,
  39. maxsize: maxsize,
  40. maxtime: maxtime,
  41. schema: schema,
  42. ch: make(chan *Entry, 2000),
  43. wg: sync.WaitGroup{},
  44. }
  45. go l.writer(ctx)
  46. return l, nil
  47. }
  48. func AvroSchema() (avro.Schema, error) {
  49. schema, err := avro.Parse(schemaJson)
  50. if err != nil {
  51. return nil, err
  52. }
  53. return schema, nil
  54. }
  55. func (l *AvroLogger) Write(e *Entry) error {
  56. select {
  57. case l.ch <- e:
  58. return nil
  59. default:
  60. return fmt.Errorf("buffer full")
  61. }
  62. }
  63. // func (l *AvroFile)
  64. type avroFile struct {
  65. fh *os.File
  66. enc *ocf.Encoder
  67. open bool
  68. count int
  69. }
  70. func (l *AvroLogger) writer(ctx context.Context) {
  71. mu := sync.Mutex{}
  72. timer := time.After(l.maxtime)
  73. openFiles := []*avroFile{}
  74. var fileCounter atomic.Int32
  75. openFile := func() (*avroFile, error) {
  76. // todo: communicate back to the main process when this goes wrong
  77. now := time.Now().UTC().Format("20060102-150405")
  78. fileCounter.Add(1)
  79. f, err := os.OpenFile(path.Join(l.path, fmt.Sprintf("log.%s.%d.avro.tmp", now, fileCounter.Load())), os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0660)
  80. if err != nil {
  81. return nil, err
  82. }
  83. enc, err := ocf.NewEncoder(schemaJson, f, ocf.WithCodec(ocf.Snappy))
  84. if err != nil {
  85. return nil, err
  86. }
  87. l.wg.Add(1)
  88. a := &avroFile{fh: f, enc: enc, open: true}
  89. // log.Printf("opened %s", a.fh.Name())
  90. mu.Lock()
  91. defer mu.Unlock()
  92. openFiles = append([]*avroFile{a}, openFiles...)
  93. timer = time.After(l.maxtime)
  94. return a, nil
  95. }
  96. currentFile, err := openFile()
  97. if err != nil {
  98. log.Fatalf("openfile error: %s", err)
  99. }
  100. closeFile := func(af *avroFile) error {
  101. mu.Lock()
  102. idx := slices.Index(openFiles, af)
  103. if idx >= 0 {
  104. openFiles = slices.Delete(openFiles, idx, idx+1)
  105. } else {
  106. log.Printf("could not find avroFile for closing in openFiles list")
  107. }
  108. if !af.open {
  109. mu.Unlock()
  110. log.Printf("called closeFile on file already being closed %s", af.fh.Name())
  111. return nil
  112. }
  113. af.open = false
  114. mu.Unlock()
  115. defer l.wg.Done()
  116. // log.Printf("closing %s", af.fh.Name())
  117. if err := af.enc.Flush(); err != nil {
  118. return err
  119. }
  120. if err := af.fh.Sync(); err != nil {
  121. return err
  122. }
  123. if err := af.fh.Close(); err != nil {
  124. return err
  125. }
  126. tmpName := af.fh.Name()
  127. newName := strings.TrimSuffix(tmpName, ".tmp")
  128. if tmpName == newName {
  129. return fmt.Errorf("unexpected tmp file name %s", tmpName)
  130. }
  131. // log.Printf("renaming to %s", newName)
  132. if err := os.Rename(tmpName, newName); err != nil {
  133. return err
  134. }
  135. return nil
  136. }
  137. for {
  138. select {
  139. case e := <-l.ch:
  140. currentFile.count++
  141. err := currentFile.enc.Encode(e)
  142. if err != nil {
  143. log.Fatal(err)
  144. }
  145. if currentFile.count%1000 == 0 {
  146. size, err := currentFile.fh.Seek(0, 2)
  147. if err != nil {
  148. log.Printf("could not seek avro file: %s", err)
  149. continue
  150. }
  151. if size > int64(l.maxsize) {
  152. // log.Printf("rotating avro file for size")
  153. currentFile, err = openFile()
  154. if err != nil {
  155. log.Printf("could not open new avro file: %s", err)
  156. }
  157. }
  158. }
  159. case <-ctx.Done():
  160. log.Printf("closing avro files")
  161. // drain the buffer within reason
  162. count := 0
  163. drain:
  164. for {
  165. select {
  166. case e := <-l.ch:
  167. count++
  168. err := currentFile.enc.Encode(e)
  169. if err != nil {
  170. log.Fatal(err)
  171. }
  172. if count > 40000 {
  173. break drain
  174. }
  175. default:
  176. break drain
  177. }
  178. }
  179. for i := len(openFiles) - 1; i >= 0; i-- {
  180. err := closeFile(openFiles[i])
  181. if err != nil {
  182. log.Printf("error closing file: %s", err)
  183. }
  184. }
  185. return
  186. case <-timer:
  187. if currentFile.count == 0 {
  188. timer = time.After(l.maxtime)
  189. continue
  190. }
  191. // log.Printf("rotating avro file for time")
  192. var err error
  193. currentFile, err = openFile()
  194. if err != nil {
  195. log.Printf("could not open new avrofile: %s", err)
  196. } else {
  197. for i, af := range openFiles {
  198. if i == 0 || af == currentFile {
  199. continue
  200. }
  201. err := closeFile(af)
  202. if err != nil {
  203. log.Printf("error closing old avro files: %s", err)
  204. }
  205. }
  206. }
  207. }
  208. }
  209. }
  210. func (l *AvroLogger) Close() error {
  211. l.cancel(fmt.Errorf("closing"))
  212. <-l.ctx.Done()
  213. l.wg.Wait() // wait for all files to be closed
  214. return nil
  215. }