123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- package querylog
- import (
- "context"
- _ "embed"
- "fmt"
- "log"
- "os"
- "path"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "golang.org/x/exp/slices"
- "github.com/hamba/avro/v2"
- "github.com/hamba/avro/v2/ocf"
- )
- //go:embed querylog.avsc
- var schemaJson string
- type AvroLogger struct {
- path string
- maxsize int
- maxtime time.Duration
- schema avro.Schema
- ctx context.Context
- cancel context.CancelCauseFunc
- wg sync.WaitGroup
- ch chan *Entry
- }
- func NewAvroLogger(path string, maxsize int, maxtime time.Duration) (*AvroLogger, error) {
- schema, err := AvroSchema()
- if err != nil {
- return nil, err
- }
- ctx, cancel := context.WithCancelCause(context.Background())
- l := &AvroLogger{
- ctx: ctx,
- cancel: cancel,
- path: path,
- maxsize: maxsize,
- maxtime: maxtime,
- schema: schema,
- ch: make(chan *Entry, 2000),
- wg: sync.WaitGroup{},
- }
- go l.writer(ctx)
- return l, nil
- }
- func AvroSchema() (avro.Schema, error) {
- schema, err := avro.Parse(schemaJson)
- if err != nil {
- return nil, err
- }
- return schema, nil
- }
- func (l *AvroLogger) Write(e *Entry) error {
- select {
- case l.ch <- e:
- return nil
- default:
- return fmt.Errorf("buffer full")
- }
- }
- // func (l *AvroFile)
- type avroFile struct {
- fh *os.File
- enc *ocf.Encoder
- open bool
- count int
- }
- func (l *AvroLogger) writer(ctx context.Context) {
- mu := sync.Mutex{}
- timer := time.After(l.maxtime)
- openFiles := []*avroFile{}
- var fileCounter atomic.Int32
- openFile := func() (*avroFile, error) {
- // todo: communicate back to the main process when this goes wrong
- now := time.Now().UTC().Format("20060102-150405")
- fileCounter.Add(1)
- f, err := os.OpenFile(path.Join(l.path, fmt.Sprintf("log.%s.%d.avro.tmp", now, fileCounter.Load())), os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0660)
- if err != nil {
- return nil, err
- }
- enc, err := ocf.NewEncoder(schemaJson, f, ocf.WithCodec(ocf.Snappy))
- if err != nil {
- return nil, err
- }
- l.wg.Add(1)
- a := &avroFile{fh: f, enc: enc, open: true}
- // log.Printf("opened %s", a.fh.Name())
- mu.Lock()
- defer mu.Unlock()
- openFiles = append([]*avroFile{a}, openFiles...)
- timer = time.After(l.maxtime)
- return a, nil
- }
- currentFile, err := openFile()
- if err != nil {
- log.Fatalf("openfile error: %s", err)
- }
- closeFile := func(af *avroFile) error {
- mu.Lock()
- idx := slices.Index(openFiles, af)
- if idx >= 0 {
- openFiles = slices.Delete(openFiles, idx, idx+1)
- } else {
- log.Printf("could not find avroFile for closing in openFiles list")
- }
- if !af.open {
- mu.Unlock()
- log.Printf("called closeFile on file already being closed %s", af.fh.Name())
- return nil
- }
- af.open = false
- mu.Unlock()
- defer l.wg.Done()
- // log.Printf("closing %s", af.fh.Name())
- if err := af.enc.Flush(); err != nil {
- return err
- }
- if err := af.fh.Sync(); err != nil {
- return err
- }
- if err := af.fh.Close(); err != nil {
- return err
- }
- tmpName := af.fh.Name()
- newName := strings.TrimSuffix(tmpName, ".tmp")
- if tmpName == newName {
- return fmt.Errorf("unexpected tmp file name %s", tmpName)
- }
- // log.Printf("renaming to %s", newName)
- if err := os.Rename(tmpName, newName); err != nil {
- return err
- }
- return nil
- }
- for {
- select {
- case e := <-l.ch:
- currentFile.count++
- err := currentFile.enc.Encode(e)
- if err != nil {
- log.Fatal(err)
- }
- if currentFile.count%1000 == 0 {
- size, err := currentFile.fh.Seek(0, 2)
- if err != nil {
- log.Printf("could not seek avro file: %s", err)
- continue
- }
- if size > int64(l.maxsize) {
- // log.Printf("rotating avro file for size")
- currentFile, err = openFile()
- if err != nil {
- log.Printf("could not open new avro file: %s", err)
- }
- }
- }
- case <-ctx.Done():
- log.Printf("closing avro files")
- // drain the buffer within reason
- count := 0
- drain:
- for {
- select {
- case e := <-l.ch:
- count++
- err := currentFile.enc.Encode(e)
- if err != nil {
- log.Fatal(err)
- }
- if count > 40000 {
- break drain
- }
- default:
- break drain
- }
- }
- for i := len(openFiles) - 1; i >= 0; i-- {
- err := closeFile(openFiles[i])
- if err != nil {
- log.Printf("error closing file: %s", err)
- }
- }
- return
- case <-timer:
- if currentFile.count == 0 {
- timer = time.After(l.maxtime)
- continue
- }
- // log.Printf("rotating avro file for time")
- var err error
- currentFile, err = openFile()
- if err != nil {
- log.Printf("could not open new avrofile: %s", err)
- } else {
- for i, af := range openFiles {
- if i == 0 || af == currentFile {
- continue
- }
- err := closeFile(af)
- if err != nil {
- log.Printf("error closing old avro files: %s", err)
- }
- }
- }
- }
- }
- }
- func (l *AvroLogger) Close() error {
- l.cancel(fmt.Errorf("closing"))
- <-l.ctx.Done()
- l.wg.Wait() // wait for all files to be closed
- return nil
- }
|