Browse Source

Remove built-in InfluxDB support from the log processing tool

... all prometheus metrics all the time now.
Ask Bjørn Hansen 7 years ago
parent
commit
489fdbcc51
6 changed files with 28 additions and 216 deletions
  1. 1 1
      Makefile
  2. 2 2
      build
  3. 0 130
      geodns-influxdb/influx.go
  4. 25 83
      geodns-logs/process-stats.go
  5. 0 0
      geodns-logs/stats.go
  6. 0 0
      geodns-logs/stats_test.go

+ 1 - 1
Makefile

@@ -27,7 +27,7 @@ TARS=$(wildcard dist/geodns-*-*.tar)
 push: $(TARS) tmp-install.sh
 push: $(TARS) tmp-install.sh
 	#rsync -avz tmp-install.sh $(TARS)  x3.dev:webtmp/2018/04/
 	#rsync -avz tmp-install.sh $(TARS)  x3.dev:webtmp/2018/04/
 	rsync --exclude publish tmp-install.sh $(TARS) $(DIST)/$(DISTSUB)/
 	rsync --exclude publish tmp-install.sh $(TARS) $(DIST)/$(DISTSUB)/
-	$(DIST)/../push
+	$(DIST)/push
 
 
 builds: linux-build linux-build-i386 freebsd-build push
 builds: linux-build linux-build-i386 freebsd-build push
 
 

+ 2 - 2
build

@@ -16,8 +16,8 @@ set -ex
 go build -o dist/geodns-$OS-$ARCH \
 go build -o dist/geodns-$OS-$ARCH \
   -ldflags "-X main.gitVersion=$REVISION -X main.buildTime=$BUILDTIME" \
   -ldflags "-X main.gitVersion=$REVISION -X main.buildTime=$BUILDTIME" \
   -v && \
   -v && \
-  (cd geodns-influxdb && go build -v -o ../dist/geodns-influxdb-$OS-$ARCH && cd ..) && \
+  (cd geodns-logs && go build -v -o ../dist/geodns-logs-$OS-$ARCH && cd ..) && \
 cd dist && \
 cd dist && \
 tar -cvHf geodns-$OS-$ARCH.tar \
 tar -cvHf geodns-$OS-$ARCH.tar \
  --exclude \*~ geodns-$OS-$ARCH \
  --exclude \*~ geodns-$OS-$ARCH \
- geodns-influxdb-$OS-$ARCH service service-influxdb
+ geodns-logs-$OS-$ARCH service service-logs

+ 0 - 130
geodns-influxdb/influx.go

@@ -1,130 +0,0 @@
-package main
-
-import (
-	"fmt"
-	"log"
-	"sync"
-	"time"
-
-	"github.com/influxdata/influxdb/client/v2"
-	"github.com/kr/pretty"
-)
-
-const UserAgent = "geodns-logs/1.2"
-
-type influxClient struct {
-	ServerID     string
-	ServerGroups []string
-
-	URL      string
-	Username string
-	Password string
-	Database string
-
-	Verbose bool
-	Channel chan *Stats
-
-	wg      sync.WaitGroup
-	hclient client.Client
-}
-
-func NewInfluxClient() *influxClient {
-	influx := &influxClient{}
-	influx.Channel = make(chan *Stats, 10)
-	return influx
-}
-
-func (influx *influxClient) Start() error {
-	if len(influx.URL) == 0 {
-		return fmt.Errorf("InfluxDB URL required")
-	}
-	if len(influx.Username) == 0 {
-		return fmt.Errorf("InfluxDB Username required")
-	}
-	if len(influx.Password) == 0 {
-		return fmt.Errorf("InfluxDB Password required")
-	}
-	if len(influx.Database) == 0 {
-		return fmt.Errorf("InfluxDB Databse required")
-	}
-
-	conf := client.HTTPConfig{
-		Addr:      influx.URL,
-		Username:  influx.Username,
-		Password:  influx.Password,
-		UserAgent: UserAgent,
-	}
-
-	hclient, err := client.NewHTTPClient(conf)
-	if err != nil {
-		return fmt.Errorf("Could not setup http client: %s", err)
-	}
-	_, _, err = hclient.Ping(time.Second * 2)
-	if err != nil {
-		return fmt.Errorf("Could not ping %s: %s", conf.Addr, err)
-
-	}
-
-	influx.hclient = hclient
-
-	influx.wg.Add(1)
-	go influx.post()
-
-	return nil
-}
-
-func (influx *influxClient) Close() {
-	close(influx.Channel)
-	influx.wg.Wait()
-}
-
-func (influx *influxClient) post() {
-	hclient := influx.hclient
-
-	for stats := range influx.Channel {
-		if influx.Verbose {
-			pretty.Println("Sending", stats)
-		}
-		log.Printf("Sending %d stats points", len(stats.Map))
-
-		batch, err := client.NewBatchPoints(client.BatchPointsConfig{
-			Database:        "geodns_logs",
-			RetentionPolicy: "incoming",
-		})
-		if err != nil {
-			log.Printf("Could not setup batch points: %s", err)
-			continue
-		}
-
-		for _, s := range stats.Map {
-			pnt, err := client.NewPoint(
-				"log_stats",
-				map[string]string{
-					"Label":  s.Label,
-					"Name":   s.Name,
-					"Origin": s.Origin,
-					"PoolCC": s.PoolCC,
-					"Vendor": s.Vendor,
-					"Qtype":  s.Qtype,
-					"Server": influx.ServerID,
-				},
-				map[string]interface{}{
-					"Count": s.Count,
-				},
-				time.Unix(s.Time, 0),
-			)
-			if err != nil {
-				log.Printf("Could not create a point from '%+v': %s", s, err)
-				continue
-			}
-			batch.AddPoint(pnt)
-		}
-
-		err = hclient.Write(batch)
-		if err != nil {
-			log.Printf("Error writing batch points: %s", err)
-		}
-	}
-
-	influx.wg.Done()
-}

+ 25 - 83
geodns-influxdb/process-stats.go → geodns-logs/process-stats.go

@@ -9,7 +9,6 @@ import (
 	"os"
 	"os"
 	"strings"
 	"strings"
 	"sync"
 	"sync"
-	"time"
 
 
 	"github.com/hpcloud/tail"
 	"github.com/hpcloud/tail"
 	"github.com/miekg/dns"
 	"github.com/miekg/dns"
@@ -24,23 +23,24 @@ import (
 // Add vendor yes/no
 // Add vendor yes/no
 // add server region tag (identifier)?
 // add server region tag (identifier)?
 
 
+const UserAgent = "geodns-logs/2.0"
+
 func main() {
 func main() {
 
 
 	log.Printf("Starting %q", UserAgent)
 	log.Printf("Starting %q", UserAgent)
 
 
-	tailFlag := flag.Bool("tail", false, "tail the log file instead of processing all arguments")
 	identifierFlag := flag.String("identifier", "", "identifier (hostname, pop name or similar)")
 	identifierFlag := flag.String("identifier", "", "identifier (hostname, pop name or similar)")
-	verboseFlag := flag.Bool("verbose", false, "verbose output")
+	// verboseFlag := flag.Bool("verbose", false, "verbose output")
 	flag.Parse()
 	flag.Parse()
 
 
 	var serverID string
 	var serverID string
-	var serverGroups []string
+	// var serverGroups []string
 
 
 	if len(*identifierFlag) > 0 {
 	if len(*identifierFlag) > 0 {
 		ids := strings.Split(*identifierFlag, ",")
 		ids := strings.Split(*identifierFlag, ",")
 		serverID = ids[0]
 		serverID = ids[0]
 		if len(ids) > 1 {
 		if len(ids) > 1 {
-			serverGroups = ids[1:]
+			// serverGroups = ids[1:]
 		}
 		}
 	}
 	}
 
 
@@ -80,64 +80,34 @@ func main() {
 		}
 		}
 	}()
 	}()
 
 
-	influx := NewInfluxClient()
-	influx.URL = os.Getenv("INFLUXDB_URL")
-	influx.Username = os.Getenv("INFLUXDB_USERNAME")
-	influx.Password = os.Getenv("INFLUXDB_PASSWORD")
-	influx.Database = os.Getenv("INFLUXDB_DATABASE")
-
-	influx.ServerID = serverID
-	influx.ServerGroups = serverGroups
-	influx.Verbose = *verboseFlag
-
-	err := influx.Start()
-	if err != nil {
-		log.Printf("Could not start influxdb poster: %s", err)
-		os.Exit(2)
-	}
-
 	if len(flag.Args()) < 1 {
 	if len(flag.Args()) < 1 {
 		log.Printf("filename to process required")
 		log.Printf("filename to process required")
 		os.Exit(2)
 		os.Exit(2)
 	}
 	}
 
 
-	if *tailFlag {
-
-		filename := flag.Arg(0)
-
-		logf, err := tail.TailFile(filename, tail.Config{
-			// Location:  &tail.SeekInfo{-1, 0},
-			Poll:      true, // inotify is flaky on EL6, so try this ...
-			ReOpen:    true,
-			MustExist: false,
-			Follow:    true,
-		})
-		if err != nil {
-			log.Printf("Could not tail '%s': %s", filename, err)
-		}
+	filename := flag.Arg(0)
 
 
-		in := make(chan string)
+	logf, err := tail.TailFile(filename, tail.Config{
+		// Location:  &tail.SeekInfo{-1, 0},
+		Poll:      true, // inotify is flaky on EL6, so try this ...
+		ReOpen:    true,
+		MustExist: false,
+		Follow:    true,
+	})
+	if err != nil {
+		log.Printf("Could not tail '%s': %s", filename, err)
+	}
 
 
-		go processChan(in, influx.Channel, nil)
+	in := make(chan string)
+	go processChan(in, nil)
 
 
-		for line := range logf.Lines {
-			if line.Err != nil {
-				log.Printf("Error tailing file: %s", line.Err)
-			}
-			in <- line.Text
-		}
-	} else {
-		for _, file := range flag.Args() {
-			log.Printf("Log: %s", file)
-			err := processFile(file, influx.Channel)
-			if err != nil {
-				log.Printf("Error processing '%s': %s", file, err)
-			}
-			log.Printf("Done with %s", file)
+	for line := range logf.Lines {
+		if line.Err != nil {
+			log.Printf("Error tailing file: %s", line.Err)
 		}
 		}
+		in <- line.Text
 	}
 	}
 
 
-	influx.Close()
 }
 }
 
 
 var extraValidLabels = map[string]struct{}{
 var extraValidLabels = map[string]struct{}{
@@ -190,15 +160,11 @@ func getPoolCC(label string) (string, bool) {
 	return "", false
 	return "", false
 }
 }
 
 
-func processChan(in chan string, out chan<- *Stats, wg *sync.WaitGroup) error {
+func processChan(in chan string, wg *sync.WaitGroup) error {
 	e := querylog.Entry{}
 	e := querylog.Entry{}
 
 
-	// the grafana queries depend on this being one minute
-	submitInterval := time.Minute * 1
-
 	stats := NewStats()
 	stats := NewStats()
-	i := 0
-	lastMinute := int64(0)
+
 	for line := range in {
 	for line := range in {
 		err := json.Unmarshal([]byte(line), &e)
 		err := json.Unmarshal([]byte(line), &e)
 		if err != nil {
 		if err != nil {
@@ -207,38 +173,14 @@ func processChan(in chan string, out chan<- *Stats, wg *sync.WaitGroup) error {
 		}
 		}
 		e.Name = strings.ToLower(e.Name)
 		e.Name = strings.ToLower(e.Name)
 
 
-		eMinute := ((e.Time - e.Time%int64(submitInterval)) / int64(time.Second))
-		e.Time = eMinute
-
-		if len(stats.Map) == 0 {
-			lastMinute = eMinute
-			// log.Printf("Last Minute: %d", lastMinute)
-		} else {
-			if eMinute > lastMinute {
-				// fmt.Printf("eMinute %d\nlastMin %d - should summarize\n", eMinute, lastMinute)
-				stats.Summarize()
-				out <- stats
-				stats = NewStats()
-				lastMinute = eMinute
-			}
-		}
-
 		// fmt.Printf("%s %s\n", e.Origin, e.Name)
 		// fmt.Printf("%s %s\n", e.Origin, e.Name)
 
 
 		err = stats.Add(&e)
 		err = stats.Add(&e)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
-
-		if i%10000 == 0 {
-			// pretty.Println(stats)
-		}
-		// minute
 	}
 	}
 
 
-	if len(stats.Map) > 0 {
-		out <- stats
-	}
 	if wg != nil {
 	if wg != nil {
 		wg.Done()
 		wg.Done()
 	}
 	}
@@ -255,7 +197,7 @@ func processFile(file string, out chan<- *Stats) error {
 
 
 	wg := sync.WaitGroup{}
 	wg := sync.WaitGroup{}
 	wg.Add(1)
 	wg.Add(1)
-	go processChan(in, out, &wg)
+	go processChan(in, &wg)
 
 
 	scanner := bufio.NewScanner(fh)
 	scanner := bufio.NewScanner(fh)
 
 

+ 0 - 0
geodns-influxdb/stats.go → geodns-logs/stats.go


+ 0 - 0
geodns-influxdb/stats_test.go → geodns-logs/stats_test.go