Browse Source

Merge branch 'master' into dev

Ask Bjørn Hansen 8 years ago
parent
commit
b2b9aeaac2
6 changed files with 569 additions and 1 deletions
  1. 11 0
      CHANGES.md
  2. 0 1
      Makefile
  3. 130 0
      geodns-influxdb/influx.go
  4. 243 0
      geodns-influxdb/process-stats.go
  5. 120 0
      geodns-influxdb/stats.go
  6. 65 0
      geodns-influxdb/stats_test.go

+ 11 - 0
CHANGES.md

@@ -1,5 +1,16 @@
 # GeoDNS Changelog
 
+## 2.7.0 February 13, 2017
+
+* Add support for PTR records (Florent AIDE)
+* Test improvements (Alex Bligh)
+* Update github.com/miekg/dns
+* Update github.com/rcrowley/go-metrics
+* Use vendor/ instead of godep
+* Make query logging (globally) configurable
+* Support base configuration file outside the zone config directory
+* service: Read extra args from env/ARGS
+
 ## 2.6.0 October 4, 2015
 
 Leif Johansson:

+ 0 - 1
Makefile

@@ -4,7 +4,6 @@ all: templates.go
 templates.go: templates/*.html monitor.go
 	go generate
 
-.PHONY: test
 test: .PHONY
 	go test $(shell go list ./... | grep -v /vendor/)
 

+ 130 - 0
geodns-influxdb/influx.go

@@ -0,0 +1,130 @@
+package main
+
+import (
+	"fmt"
+	"log"
+	"sync"
+	"time"
+
+	"github.com/influxdata/influxdb/client/v2"
+	"github.com/kr/pretty"
+)
+
+const UserAgent = "geodns-logs/1.1"
+
+type influxClient struct {
+	ServerID     string
+	ServerGroups []string
+
+	URL      string
+	Username string
+	Password string
+	Database string
+
+	Verbose bool
+	Channel chan *Stats
+
+	wg      sync.WaitGroup
+	hclient client.Client
+}
+
+func NewInfluxClient() *influxClient {
+	influx := &influxClient{}
+	influx.Channel = make(chan *Stats, 10)
+	return influx
+}
+
+func (influx *influxClient) Start() error {
+	if len(influx.URL) == 0 {
+		return fmt.Errorf("InfluxDB URL required")
+	}
+	if len(influx.Username) == 0 {
+		return fmt.Errorf("InfluxDB Username required")
+	}
+	if len(influx.Password) == 0 {
+		return fmt.Errorf("InfluxDB Password required")
+	}
+	if len(influx.Database) == 0 {
+		return fmt.Errorf("InfluxDB Databse required")
+	}
+
+	conf := client.HTTPConfig{
+		Addr:      influx.URL,
+		Username:  influx.Username,
+		Password:  influx.Password,
+		UserAgent: UserAgent,
+	}
+
+	hclient, err := client.NewHTTPClient(conf)
+	if err != nil {
+		return fmt.Errorf("Could not setup http client: %s", err)
+	}
+	_, _, err = hclient.Ping(time.Second * 2)
+	if err != nil {
+		return fmt.Errorf("Could not ping %s: %s", conf.Addr, err)
+
+	}
+
+	influx.hclient = hclient
+
+	influx.wg.Add(1)
+	go influx.post()
+
+	return nil
+}
+
+func (influx *influxClient) Close() {
+	close(influx.Channel)
+	influx.wg.Wait()
+}
+
+func (influx *influxClient) post() {
+	hclient := influx.hclient
+
+	for stats := range influx.Channel {
+		if influx.Verbose {
+			pretty.Println("Sending", stats)
+		}
+		log.Printf("Sending %d stats points", len(stats.Map))
+
+		batch, err := client.NewBatchPoints(client.BatchPointsConfig{
+			Database:        "geodns_logs",
+			RetentionPolicy: "incoming",
+		})
+		if err != nil {
+			log.Printf("Could not setup batch points: %s", err)
+			continue
+		}
+
+		for _, s := range stats.Map {
+			pnt, err := client.NewPoint(
+				"log_stats",
+				map[string]string{
+					"Label":  s.Label,
+					"Name":   s.Name,
+					"Origin": s.Origin,
+					"PoolCC": s.PoolCC,
+					"Vendor": s.Vendor,
+					"Qtype":  s.Qtype,
+					"Server": influx.ServerID,
+				},
+				map[string]interface{}{
+					"Count": s.Count,
+				},
+				time.Unix(s.Time, 0),
+			)
+			if err != nil {
+				log.Printf("Could not create a point from '%+v': %s", s, err)
+				continue
+			}
+			batch.AddPoint(pnt)
+		}
+
+		err = hclient.Write(batch)
+		if err != nil {
+			log.Printf("Error writing batch points: %s", err)
+		}
+	}
+
+	influx.wg.Done()
+}

+ 243 - 0
geodns-influxdb/process-stats.go

@@ -0,0 +1,243 @@
+package main
+
+import (
+	"bufio"
+	"encoding/json"
+	"flag"
+	"fmt"
+	"log"
+	"os"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/hpcloud/tail"
+	"github.com/miekg/dns"
+
+	"github.com/abh/geodns/countries"
+	"github.com/abh/geodns/querylog"
+)
+
+// TODO:
+// Add vendor yes/no
+// add server region tag (identifier)?
+
+func main() {
+
+	tailFlag := flag.Bool("tail", false, "tail the log file instead of processing all arguments")
+	identifierFlag := flag.String("identifier", "", "identifier (hostname, pop name or similar)")
+	verboseFlag := flag.Bool("verbose", false, "verbose output")
+	flag.Parse()
+
+	var serverID string
+	var serverGroups []string
+
+	if len(*identifierFlag) > 0 {
+		ids := strings.Split(*identifierFlag, ",")
+		serverID = ids[0]
+		if len(ids) > 1 {
+			serverGroups = ids[1:]
+		}
+	}
+
+	if len(serverID) == 0 {
+		var err error
+		serverID, err = os.Hostname()
+		if err != nil {
+			log.Printf("Could not get hostname: %s", err)
+			os.Exit(2)
+		}
+	}
+
+	influx := NewInfluxClient()
+	influx.URL = os.Getenv("INFLUXDB_URL")
+	influx.Username = os.Getenv("INFLUXDB_USERNAME")
+	influx.Password = os.Getenv("INFLUXDB_PASSWORD")
+	influx.Database = os.Getenv("INFLUXDB_DATABASE")
+
+	influx.ServerID = serverID
+	influx.ServerGroups = serverGroups
+	influx.Verbose = *verboseFlag
+
+	err := influx.Start()
+	if err != nil {
+		log.Printf("Could not start influxdb poster: %s", err)
+		os.Exit(2)
+	}
+
+	if len(flag.Args()) < 1 {
+		log.Printf("filename to process required")
+		os.Exit(2)
+	}
+
+	if *tailFlag {
+
+		filename := flag.Arg(0)
+
+		logf, err := tail.TailFile(filename, tail.Config{
+			// Location:  &tail.SeekInfo{-1, 0},
+			Poll:      true, // inotify is flaky on EL6, so try this ...
+			ReOpen:    true,
+			MustExist: false,
+			Follow:    true,
+		})
+		if err != nil {
+			log.Printf("Could not tail '%s': %s", filename, err)
+		}
+
+		in := make(chan string)
+
+		go processChan(in, influx.Channel, nil)
+
+		for line := range logf.Lines {
+			if line.Err != nil {
+				log.Printf("Error tailing file: %s", line.Err)
+			}
+			in <- line.Text
+		}
+	} else {
+		for _, file := range flag.Args() {
+			log.Printf("Log: %s", file)
+			err := processFile(file, influx.Channel)
+			if err != nil {
+				log.Printf("Error processing '%s': %s", file, err)
+			}
+			log.Printf("Done with %s", file)
+		}
+	}
+
+	influx.Close()
+}
+
+var extraValidLabels = map[string]struct{}{
+	"uk":       struct{}{},
+	"_status":  struct{}{},
+	"www":      struct{}{},
+	"nag-test": struct{}{},
+}
+
+func validCC(label string) bool {
+	if _, ok := countries.CountryContinent[label]; ok {
+		return true
+	}
+	if _, ok := countries.ContinentCountries[label]; ok {
+		return true
+	}
+	if _, ok := countries.RegionGroupRegions[label]; ok {
+		return true
+	}
+	if _, ok := countries.RegionGroups[label]; ok {
+		return true
+	}
+	if _, ok := extraValidLabels[label]; ok {
+		return true
+	}
+	return false
+}
+
+func getPoolCC(label string) (string, bool) {
+	l := dns.SplitDomainName(label)
+	// log.Printf("LABEL: %+v", l)
+	if len(l) == 0 {
+		return "", true
+	}
+
+	for _, cc := range l {
+		if validCC(cc) {
+			return cc, true
+		}
+	}
+
+	if len(l[0]) == 1 && strings.ContainsAny(l[0], "01234") {
+		if len(l) == 1 {
+			return "", true
+		}
+	}
+
+	// log.Printf("LABEL '%s' unhandled cc...", label)
+	return "", false
+}
+
+func processChan(in chan string, out chan<- *Stats, wg *sync.WaitGroup) error {
+	e := querylog.Entry{}
+
+	// the grafana queries depend on this being one minute
+	submitInterval := time.Minute * 1
+
+	stats := NewStats()
+	i := 0
+	lastMinute := int64(0)
+	for line := range in {
+		err := json.Unmarshal([]byte(line), &e)
+		if err != nil {
+			log.Printf("Can't unmarshal '%s': %s", line, err)
+			return err
+		}
+
+		eMinute := ((e.Time - e.Time%int64(submitInterval)) / int64(time.Second))
+		e.Time = eMinute
+
+		if len(stats.Map) == 0 {
+			lastMinute = eMinute
+			log.Printf("Last MInute: %d", lastMinute)
+		} else {
+			if eMinute > lastMinute {
+				fmt.Printf("eMinute %d\nlastMin %d - should summarize\n", eMinute, lastMinute)
+
+				stats.Summarize()
+				out <- stats
+				stats = NewStats()
+				lastMinute = eMinute
+			}
+		}
+
+		e.Name = strings.ToLower(e.Name)
+		// fmt.Printf("%s %s\n", e.Origin, e.Name)
+
+		err = stats.Add(&e)
+		if err != nil {
+			return err
+		}
+
+		if i%10000 == 0 {
+			// pretty.Println(stats)
+		}
+		// minute
+	}
+
+	if len(stats.Map) > 0 {
+		out <- stats
+	}
+	if wg != nil {
+		wg.Done()
+	}
+	return nil
+}
+
+func processFile(file string, out chan<- *Stats) error {
+	fh, err := os.Open(file)
+	if err != nil {
+		return err
+	}
+
+	in := make(chan string)
+
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+	go processChan(in, out, &wg)
+
+	scanner := bufio.NewScanner(fh)
+
+	for scanner.Scan() {
+		in <- scanner.Text()
+	}
+	if err := scanner.Err(); err != nil {
+		log.Println("reading standard input:", err)
+	}
+
+	close(in)
+
+	wg.Wait()
+
+	return nil
+}

+ 120 - 0
geodns-influxdb/stats.go

@@ -0,0 +1,120 @@
+package main
+
+import (
+	"fmt"
+	"log"
+	"strings"
+	"time"
+
+	"github.com/abh/geodns/querylog"
+	"github.com/miekg/dns"
+)
+
+type statsEntry struct {
+	Time   int64
+	Origin string
+	Name   string
+	Vendor string
+	Label  string
+	Qtype  string
+	PoolCC string
+	Count  int
+}
+
+type Stats struct {
+	Count int
+	Map   map[string]*statsEntry
+}
+
+func NewStats() *Stats {
+	return &Stats{
+		Map: map[string]*statsEntry{},
+	}
+}
+
+func (s *Stats) Key(e *querylog.Entry) string {
+	return fmt.Sprintf("%s %s %s %d", e.Origin, e.Name, e.LabelName, e.Qtype)
+}
+
+func vendorName(n string) string {
+	idx := strings.Index(n, ".pool.ntp.org.")
+	// log.Printf("IDX for %s: %d", n, idx)
+	if idx <= 0 {
+		return ""
+	}
+	n = n[0:idx]
+
+	l := dns.SplitDomainName(n)
+
+	v := l[len(l)-1]
+
+	if len(v) == 1 && strings.ContainsAny(v, "01234") {
+		return ""
+	}
+
+	if len(v) == 2 {
+		// country code
+		return "_country"
+	}
+
+	if v == "asia" || v == "north-america" || v == "europe" || v == "south-america" || v == "oceania" || v == "africa" {
+		return "_continent"
+	}
+
+	return v
+}
+
+func (stats *Stats) Add(e *querylog.Entry) error {
+	if e.Rcode > 0 {
+		// NXDOMAIN, count separately?
+		return nil
+	}
+	if e.Answers == 0 {
+		// No answers, count separately?
+		return nil
+	}
+
+	var vendor, poolCC string
+
+	if e.Origin == "pool.ntp.org" || strings.HasSuffix(e.Origin, "ntppool.org") {
+		vendor = vendorName(e.Name)
+
+		var ok bool
+		poolCC, ok = getPoolCC(e.LabelName)
+		if !ok {
+			log.Printf("Could not get valid poolCC label for %+v", e)
+		}
+	}
+
+	stats.Count++
+
+	key := stats.Key(e)
+
+	if s, ok := stats.Map[key]; ok {
+		s.Count++
+	} else {
+		stats.Map[key] = &statsEntry{
+			// Time:   time.Unix(e.Time/int64(time.Second), 0),
+			Time:   e.Time,
+			Origin: e.Origin,
+			Name:   e.Name,
+			Vendor: vendor,
+			Label:  e.LabelName,
+			PoolCC: poolCC,
+			Qtype:  dns.TypeToString[e.Qtype],
+			Count:  1,
+		}
+	}
+
+	return nil
+}
+
+func (stats *Stats) Summarize() {
+	// pretty.Println(stats)
+	var timeStamp int64
+	for k := range stats.Map {
+		timeStamp = stats.Map[k].Time
+		break
+	}
+	fmt.Printf("Stats %s count total: %d, summarized: %d\n", time.Unix(timeStamp, 0).String(), stats.Count, len(stats.Map))
+}

+ 65 - 0
geodns-influxdb/stats_test.go

@@ -0,0 +1,65 @@
+package main
+
+import (
+	"flag"
+	"os"
+	"testing"
+)
+
+func TestMain(m *testing.M) {
+	flag.Parse()
+	os.Exit(m.Run())
+}
+
+func TestPoolCC(t *testing.T) {
+	tests := []struct {
+		Input    string
+		Expected string
+		Ok       bool
+	}{
+		{"pool.ntp.org", "", false},
+		{"2.pool.ntp.org", "", false},
+		{"us.pool.ntp.org", "us", true},
+		{"0.us.pool.ntp.org", "us", true},
+		{"asia.pool.ntp.org", "asia", true},
+		{"3.asia.pool.ntp.org", "asia", true},
+		{"3.example.pool.ntp.org", "", false},
+	}
+
+	for _, x := range tests {
+		got, ok := getPoolCC(x.Input)
+		if got != x.Expected {
+			t.Logf("Got '%s' but expected '%s' for '%s'", got, x.Expected, x.Input)
+			t.Fail()
+		}
+		if ok != x.Ok {
+			t.Logf("Got '%t' but expected '%t' for '%s'", ok, x.Ok, x.Input)
+			t.Fail()
+		}
+	}
+}
+
+func TestVendorName(t *testing.T) {
+	tests := []struct {
+		Input    string
+		Expected string
+	}{
+
+		{"pool.ntp.org.", ""},
+		{"2.pool.ntp.org.", ""},
+		{"us.pool.ntp.org.", "_country"},
+		{"2.us.pool.ntp.org.", "_country"},
+		{"europe.pool.ntp.org.", "_continent"},
+		{"2.europe.pool.ntp.org.", "_continent"},
+		{"0.example.pool.ntp.org.", "example"},
+		{"3.example.pool.ntp.org.", "example"},
+	}
+
+	for _, x := range tests {
+		got := vendorName(x.Input)
+		if got != x.Expected {
+			t.Logf("Got '%s' but expected '%s' for '%s'", got, x.Expected, x.Input)
+			t.Fail()
+		}
+	}
+}