|
@@ -6,8 +6,10 @@ import (
|
|
|
"fmt"
|
|
|
"log"
|
|
|
"os"
|
|
|
+ "path"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
+ "sync/atomic"
|
|
|
"time"
|
|
|
|
|
|
"golang.org/x/exp/slices"
|
|
@@ -92,15 +94,20 @@ func (l *AvroLogger) writer(ctx context.Context) {
|
|
|
|
|
|
openFiles := []*avroFile{}
|
|
|
|
|
|
+ var fileCounter atomic.Int32
|
|
|
+
|
|
|
openFile := func() (*avroFile, error) {
|
|
|
// todo: communicate back to the main process when this goes wrong
|
|
|
|
|
|
now := time.Now().UTC().Format("20060102-150405")
|
|
|
|
|
|
- f, err := os.CreateTemp(l.path, fmt.Sprintf("log.%s.*.avro.tmp", now))
|
|
|
+ fileCounter.Add(1)
|
|
|
+
|
|
|
+ f, err := os.OpenFile(path.Join(l.path, fmt.Sprintf("log.%s.%d.avro.tmp", now, fileCounter.Load())), os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0660)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
+
|
|
|
enc, err := ocf.NewEncoder(schemaJson, f, ocf.WithCodec(ocf.Snappy))
|
|
|
|
|
|
if err != nil {
|