|
@@ -15,9 +15,9 @@ import (
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
- "github.com/hpcloud/tail/ratelimiter"
|
|
|
- "github.com/hpcloud/tail/util"
|
|
|
- "github.com/hpcloud/tail/watch"
|
|
|
+ "github.com/nxadm/tail/ratelimiter"
|
|
|
+ "github.com/nxadm/tail/util"
|
|
|
+ "github.com/nxadm/tail/watch"
|
|
|
"gopkg.in/tomb.v1"
|
|
|
)
|
|
|
|
|
@@ -26,20 +26,22 @@ var (
|
|
|
)
|
|
|
|
|
|
type Line struct {
|
|
|
- Text string
|
|
|
- Time time.Time
|
|
|
- Err error // Error from tail
|
|
|
+ Text string
|
|
|
+ Num int
|
|
|
+ SeekInfo SeekInfo
|
|
|
+ Time time.Time
|
|
|
+ Err error // Error from tail
|
|
|
}
|
|
|
|
|
|
// NewLine returns a Line with present time.
|
|
|
-func NewLine(text string) *Line {
|
|
|
- return &Line{text, time.Now(), nil}
|
|
|
+func NewLine(text string, lineNum int) *Line {
|
|
|
+ return &Line{text, lineNum, SeekInfo{}, time.Now(), nil}
|
|
|
}
|
|
|
|
|
|
-// SeekInfo represents arguments to `os.Seek`
|
|
|
+// SeekInfo represents arguments to `io.Seek`
|
|
|
type SeekInfo struct {
|
|
|
Offset int64
|
|
|
- Whence int // os.SEEK_*
|
|
|
+ Whence int // io.Seek*
|
|
|
}
|
|
|
|
|
|
type logger interface {
|
|
@@ -78,8 +80,9 @@ type Tail struct {
|
|
|
Lines chan *Line
|
|
|
Config
|
|
|
|
|
|
- file *os.File
|
|
|
- reader *bufio.Reader
|
|
|
+ file *os.File
|
|
|
+ reader *bufio.Reader
|
|
|
+ lineNum int
|
|
|
|
|
|
watcher watch.FileWatcher
|
|
|
changes *watch.FileChanges
|
|
@@ -113,7 +116,7 @@ func TailFile(filename string, config Config) (*Tail, error) {
|
|
|
|
|
|
// when Logger was not specified in config, use default logger
|
|
|
if t.Logger == nil {
|
|
|
- t.Logger = log.New(os.Stderr, "", log.LstdFlags)
|
|
|
+ t.Logger = DefaultLogger
|
|
|
}
|
|
|
|
|
|
if t.Poll {
|
|
@@ -135,15 +138,15 @@ func TailFile(filename string, config Config) (*Tail, error) {
|
|
|
return t, nil
|
|
|
}
|
|
|
|
|
|
-// Return the file's current position, like stdio's ftell().
|
|
|
+// Tell returns the file's current position, like stdio's ftell().
|
|
|
// But this value is not very accurate.
|
|
|
-// it may readed one line in the chan(tail.Lines),
|
|
|
-// so it may lost one line.
|
|
|
+// One line from the chan(tail.Lines) may have been read,
|
|
|
+// so it may have lost one line.
|
|
|
func (tail *Tail) Tell() (offset int64, err error) {
|
|
|
if tail.file == nil {
|
|
|
return
|
|
|
}
|
|
|
- offset, err = tail.file.Seek(0, os.SEEK_CUR)
|
|
|
+ offset, err = tail.file.Seek(0, io.SeekCurrent)
|
|
|
if err != nil {
|
|
|
return
|
|
|
}
|
|
@@ -186,6 +189,7 @@ func (tail *Tail) closeFile() {
|
|
|
|
|
|
func (tail *Tail) reopen() error {
|
|
|
tail.closeFile()
|
|
|
+ tail.lineNum = 0
|
|
|
for {
|
|
|
var err error
|
|
|
tail.file, err = OpenFile(tail.Filename)
|
|
@@ -241,7 +245,6 @@ func (tail *Tail) tailFileSync() {
|
|
|
// Seek to requested location on first open of the file.
|
|
|
if tail.Location != nil {
|
|
|
_, err := tail.file.Seek(tail.Location.Offset, tail.Location.Whence)
|
|
|
- tail.Logger.Printf("Seeked %s - %+v\n", tail.Filename, tail.Location)
|
|
|
if err != nil {
|
|
|
tail.Killf("Seek error on %s: %s", tail.Filename, err)
|
|
|
return
|
|
@@ -250,16 +253,12 @@ func (tail *Tail) tailFileSync() {
|
|
|
|
|
|
tail.openReader()
|
|
|
|
|
|
- var offset int64
|
|
|
- var err error
|
|
|
-
|
|
|
// Read line by line.
|
|
|
for {
|
|
|
// do not seek in named pipes
|
|
|
if !tail.Pipe {
|
|
|
// grab the position in case we need to back up in the event of a half-line
|
|
|
- offset, err = tail.Tell()
|
|
|
- if err != nil {
|
|
|
+ if _, err := tail.Tell(); err != nil {
|
|
|
tail.Kill(err)
|
|
|
return
|
|
|
}
|
|
@@ -273,9 +272,9 @@ func (tail *Tail) tailFileSync() {
|
|
|
if cooloff {
|
|
|
// Wait a second before seeking till the end of
|
|
|
// file when rate limit is reached.
|
|
|
- msg := ("Too much log activity; waiting a second " +
|
|
|
- "before resuming tailing")
|
|
|
- tail.Lines <- &Line{msg, time.Now(), errors.New(msg)}
|
|
|
+ msg := ("Too much log activity; waiting a second before resuming tailing")
|
|
|
+ offset, _ := tail.Tell()
|
|
|
+ tail.Lines <- &Line{msg, tail.lineNum, SeekInfo{Offset: offset}, time.Now(), errors.New(msg)}
|
|
|
select {
|
|
|
case <-time.After(time.Second):
|
|
|
case <-tail.Dying():
|
|
@@ -295,10 +294,8 @@ func (tail *Tail) tailFileSync() {
|
|
|
}
|
|
|
|
|
|
if tail.Follow && line != "" {
|
|
|
- // this has the potential to never return the last line if
|
|
|
- // it's not followed by a newline; seems a fair trade here
|
|
|
- err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0})
|
|
|
- if err != nil {
|
|
|
+ tail.sendLine(line)
|
|
|
+ if err := tail.seekEnd(); err != nil {
|
|
|
tail.Kill(err)
|
|
|
return
|
|
|
}
|
|
@@ -336,7 +333,7 @@ func (tail *Tail) tailFileSync() {
|
|
|
// reopened if ReOpen is true. Truncated files are always reopened.
|
|
|
func (tail *Tail) waitForChanges() error {
|
|
|
if tail.changes == nil {
|
|
|
- pos, err := tail.file.Seek(0, os.SEEK_CUR)
|
|
|
+ pos, err := tail.file.Seek(0, io.SeekCurrent)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
@@ -360,10 +357,9 @@ func (tail *Tail) waitForChanges() error {
|
|
|
tail.Logger.Printf("Successfully reopened %s", tail.Filename)
|
|
|
tail.openReader()
|
|
|
return nil
|
|
|
- } else {
|
|
|
- tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
|
|
|
- return ErrStop
|
|
|
}
|
|
|
+ tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
|
|
|
+ return ErrStop
|
|
|
case <-tail.changes.Truncated:
|
|
|
// Always reopen truncated files (Follow is true)
|
|
|
tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename)
|
|
@@ -376,20 +372,21 @@ func (tail *Tail) waitForChanges() error {
|
|
|
case <-tail.Dying():
|
|
|
return ErrStop
|
|
|
}
|
|
|
- panic("unreachable")
|
|
|
}
|
|
|
|
|
|
func (tail *Tail) openReader() {
|
|
|
+ tail.lk.Lock()
|
|
|
if tail.MaxLineSize > 0 {
|
|
|
// add 2 to account for newline characters
|
|
|
tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize+2)
|
|
|
} else {
|
|
|
tail.reader = bufio.NewReader(tail.file)
|
|
|
}
|
|
|
+ tail.lk.Unlock()
|
|
|
}
|
|
|
|
|
|
func (tail *Tail) seekEnd() error {
|
|
|
- return tail.seekTo(SeekInfo{Offset: 0, Whence: os.SEEK_END})
|
|
|
+ return tail.seekTo(SeekInfo{Offset: 0, Whence: io.SeekEnd})
|
|
|
}
|
|
|
|
|
|
func (tail *Tail) seekTo(pos SeekInfo) error {
|
|
@@ -414,13 +411,19 @@ func (tail *Tail) sendLine(line string) bool {
|
|
|
}
|
|
|
|
|
|
for _, line := range lines {
|
|
|
- tail.Lines <- &Line{line, now, nil}
|
|
|
+ tail.lineNum++
|
|
|
+ offset, _ := tail.Tell()
|
|
|
+ select {
|
|
|
+ case tail.Lines <- &Line{line, tail.lineNum, SeekInfo{Offset: offset}, now, nil}:
|
|
|
+ case <-tail.Dying():
|
|
|
+ return true
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
if tail.Config.RateLimiter != nil {
|
|
|
ok := tail.Config.RateLimiter.Pour(uint16(len(lines)))
|
|
|
if !ok {
|
|
|
- tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.\n",
|
|
|
+ tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.",
|
|
|
tail.Filename)
|
|
|
return false
|
|
|
}
|