Browse Source

Add healthtest functionality to automatically exclude down hosts

Add a healthtest function activated by setting the 'test' attribute on a label.
When a test is specified, each RR within the label (A and AAAA only at this
stage) is polled regularly with a configurable test. Current configurable
tests are that a tcp port can be opened, or that an NTP response within a
given stratum range is achieved. RRs that fail the test will be excluded
from any results. Only one health test is performed per A or AAAA record
in the system (for any given healthtest configuration), i.e. multiple
RRs in one zone or multiple labels across zones will not cause testing
of the same A or AAAA record provided the tests configured by each RR
or each zone have the same parameter.

There are two categories of health test. Local health tests are run
independently of each other and simply test the health of one A or AAAA
record. Global health tests have (for any given set of testing parameters)
a single view of the status of all A and AAAA records tested by that
health test, and extract the health status of any given record from that
health test (defaulting as appropriate if it is not provided).

The following types of health test are supported:

    Type     Category   Description
    ====     ========   ===========
    tcp      local      test whether a TCP port is open
    ntp      local      test whether NTP is running at a given stratum
    exec     local      test whether a command returns a zero status
    file     global     test entries in a JSON file
    nodeping global     use Nodeping's API to get health status
    pingdom  global     use Pingdom's API to get health status

The configuration of each health test is controlled by parameters that
are common to each health test, and parameters that are specific to
each health test.

Common parameters
=================

* type: specifies type of test (currently "ntp" or "tcp")

* frequency: specifies time in seconds between polls if the server is up

* retry_time: specifies time in seconds between polls if a poll fails

* retries: number of failed polls required to consider a server as down

* timeout: timeout on each of the polls

* healthy_initially: set to true to default the status of the test to healthy

These may each be ommited and have sensible defaults

Specific parameters for type 'ntp'
==================================

* max_stratum: maximum ntp stratum number for the poll
  to be considered successful

  example configuration:

     "test" :  {
         "type" : "ntp",
         "frequency" : 30,
         "retry_time" : 5,
         "retries" : 3,
         "timeout" : 5,
         "max_stratum" : "3"
      }

Specific parameters for type 'tcp'
==================================

* port: tcp port number to connect to

    example configuration

     "test" :  {
         "type" : "tcp",
         "frequency" : 30,
         "retry_time" : 5,
         "retries" : 3,
         "timeout" : 5,
         "port" : 80
      }

Specific parameters for type 'exec'
===================================

* cmd: command to run. '{}' substituted by IP address

    example configuration

     "test" :  {
         "type" : "exec",
         "frequency" : 30,
         "retry_time" : 5,
         "retries" : 3,
         "timeout" : 5,
         "cmd" : "/bin/ping -c1 -W5 {}"
      }

Specific parameters for type 'file'
===================================

* path: JSON file to load

  the file has the format:
     { "192.200.0.1": true, "192.200.0.2": false }

    example configuration

     "test" :  {
         "type" : "file",
         "frequency" : 30,
         "retry_time" : 5,
         "retries" : 3,
         "timeout" : 5,
         "path" : "/path/to/my/file.json"
      }

Hosts not mentioned in the JSON file default to the status of
healthy_initially

Specific parameters for type 'nodeping'
=======================================

* token: Nodeping API key to use

    example configuration

     "test" :  {
         "type" : "nodeping",
         "frequency" : 30,
         "retry_time" : 5,
         "retries" : 3,
         "timeout" : 5,
         "token" : "abcdef01234567890"
      }

If the token is not specifed, it will default to a value from the
configuration file. Thus if the configuration file contains:

[nodeping]
token=abcdef01234567890

then the following much simpler configuration might be all that is
required:

    "test": {"type": "nodeping"}

Note that the label of each test in Nodeping should correspond
to the IP address being tested.

As Nodeping does not support listing of status of 'up' hosts,
hosts not mentioned in the JSON file default to the status of
'true' (i.e. healthy) all the time.

Specific parameters for type 'pingdom'
=======================================

* username: username to use with the Pingdom service

* password: password to use with the Pingdom service

* account_email: account email to use with Pingdom service (multi user accounts only)

* app_key: application key to use with the Pingdom service (has a sensible default)

* state_map: JSON map of Pingdom status to health values (e.g. true/false)

    example configuration

     "test" :  {
         "type" : "pingdom",
         "frequency" : 30,
         "retry_time" : 5,
         "retries" : 3,
         "timeout" : 5,
         "username" : "[email protected]",
         "password" : "sekr1t",
         "app_key" : "gyxtnd2fzco8ys29m8luk4syag4ybmc0"
      }

If any of the parameters is not specifed, it will default to a value from the
configuration file. Thus if the configuration file contains:

[pingdom]
[email protected]
password=serk1t
app_key=gyxtnd2fzco8ys29m8luk4syag4ybmc0

then the following much simpler configuration might be all that is
required:

    "test": {"type": "pingdom"}

Note that the name of each test in Pingdom should correspond
to the IP address being tested.

Pingdom supports more states than just 'up' and 'down'. By default
'up' is mapped to healthy (true), and 'down' or 'paused' to unhealthy
(false), with the remaining states being mapped to the default value
specified by initially_healthy. This may be customised using the
JSON state_map in either the configuration file or the test definition.

Signed-off-by: Alex Bligh <[email protected]>
Alex Bligh 10 years ago
parent
commit
ea74251cd5
6 changed files with 1010 additions and 6 deletions
  1. 10 0
      config.go
  2. 405 0
      healthtest.go
  3. 561 0
      healthtesters.go
  4. 20 6
      picker.go
  5. 3 0
      zone.go
  6. 11 0
      zones.go

+ 10 - 0
config.go

@@ -25,6 +25,16 @@ type AppConfig struct {
 		User     string
 		Password string
 	}
+	Nodeping struct {
+		Token string
+	}
+	Pingdom struct {
+		Username     string
+		Password     string
+		AccountEmail string
+		AppKey       string
+		StateMap     string
+	}
 }
 
 var Config = new(AppConfig)

+ 405 - 0
healthtest.go

@@ -0,0 +1,405 @@
+package main
+
+import (
+	"fmt"
+	"github.com/abh/geodns/Godeps/_workspace/src/github.com/miekg/dns"
+	"log"
+	"math/rand"
+	"net"
+	"sync"
+	"time"
+)
+
+var (
+	healthQtypes = []uint16{dns.TypeA, dns.TypeAAAA}
+)
+
+type HealthTester interface {
+	Test(ht *HealthTest) bool
+	String() string
+}
+
+type HealthTestParameters struct {
+	frequency        time.Duration
+	retryTime        time.Duration
+	timeout          time.Duration
+	retries          int
+	healthyInitially bool
+	testName         string
+	global           bool
+}
+
+type HealthTest struct {
+	HealthTestParameters
+	ipAddress    net.IP
+	healthy      bool
+	healthyMutex sync.RWMutex
+	closing      chan chan error
+	health       chan bool
+	tester       *HealthTester
+	globalMap    map[string]bool
+}
+
+type HealthTestRunnerEntry struct {
+	HealthTest
+	references map[string]bool
+}
+
+type HealthTestRunner struct {
+	entries    map[string]*HealthTestRunnerEntry
+	entryMutex sync.RWMutex
+}
+
+var healthTestRunner = &HealthTestRunner{
+	entries: make(map[string]*HealthTestRunnerEntry),
+}
+
+func defaultHealthTestParameters() HealthTestParameters {
+	return HealthTestParameters{
+		frequency:        30 * time.Second,
+		retryTime:        5 * time.Second,
+		timeout:          5 * time.Second,
+		retries:          3,
+		healthyInitially: false,
+	}
+}
+
+func newHealthTest(ipAddress net.IP, htp HealthTestParameters, tester *HealthTester) *HealthTest {
+	ht := HealthTest{
+		ipAddress:            ipAddress,
+		HealthTestParameters: htp,
+		healthy:              true,
+		tester:               tester,
+		globalMap:            make(map[string]bool),
+	}
+	ht.healthy = ht.healthyInitially
+	if ht.frequency < time.Second {
+		ht.frequency = time.Second
+	}
+	if ht.retryTime < time.Second {
+		ht.retryTime = time.Second
+	}
+	if ht.timeout < time.Second {
+		ht.timeout = time.Second
+	}
+	return &ht
+}
+
+// Format the health test as a string - used to compare two tests and as an index for the hash
+func (ht *HealthTest) String() string {
+	ip := ht.ipAddress.String()
+	if ht.HealthTestParameters.global {
+		ip = "" // ensure we have a single instance of a global health check with the same paramaters
+	}
+	return fmt.Sprintf("%s/%v/%s", ip, ht.HealthTestParameters, (*ht.tester).String())
+}
+
+// safe copy function that copies the parameters but not (e.g.) the
+// mutex
+func (ht *HealthTest) copy(ipAddress net.IP) *HealthTest {
+	return newHealthTest(ipAddress, ht.HealthTestParameters, ht.tester)
+}
+
+func (ht *HealthTest) setGlobal(g map[string]bool) {
+	ht.healthyMutex.Lock()
+	defer ht.healthyMutex.Unlock()
+	ht.globalMap = g
+}
+
+func (ht *HealthTest) getGlobal(k string) (bool, bool) {
+	ht.healthyMutex.RLock()
+	defer ht.healthyMutex.RUnlock()
+	healthy, ok := ht.globalMap[k]
+	return healthy, ok
+}
+
+func (ht *HealthTest) run() {
+	randomDelay := rand.Int63n(ht.frequency.Nanoseconds())
+	if !ht.isHealthy() {
+		randomDelay = rand.Int63n(ht.retryTime.Nanoseconds())
+	}
+	var nextPoll time.Time = time.Now().Add(time.Duration(randomDelay))
+	var pollStart time.Time
+	failCount := 0
+	for {
+		var pollDelay time.Duration
+		if now := time.Now(); nextPoll.After(now) {
+			pollDelay = nextPoll.Sub(now)
+		}
+		var startPoll <-chan time.Time
+		var closingPoll <-chan chan error
+		if pollStart.IsZero() {
+			closingPoll = ht.closing
+			startPoll = time.After(pollDelay)
+		}
+		select {
+		case errc := <-closingPoll: // don't close while we are polling or we send to a closed channel
+			errc <- nil
+			return
+		case <-startPoll:
+			pollStart = time.Now()
+			go ht.poll()
+		case h := <-ht.health:
+			nextPoll = pollStart.Add(ht.frequency)
+			if h {
+				ht.setHealthy(true)
+				failCount = 0
+			} else {
+				failCount++
+				logPrintf("Failure for %s, retry count=%d, healthy=%v", ht.ipAddress, failCount, ht.isHealthy())
+				if failCount >= ht.retries {
+					ht.setHealthy(false)
+					nextPoll = pollStart.Add(ht.retryTime)
+				}
+			}
+			pollStart = time.Time{}
+			logPrintf("Check result for %s health=%v, next poll at %s", ht.ipAddress, h, nextPoll)
+			//randomDelay := rand.Int63n(time.Second.Nanoseconds())
+			//nextPoll = nextPoll.Add(time.Duration(randomDelay))
+		}
+	}
+}
+
+func (ht *HealthTest) poll() {
+	logPrintf("Checking health of %s", ht.ipAddress)
+	result := (*ht.tester).Test(ht)
+	logPrintf("Checked health of %s, healthy=%v", ht.ipAddress, result)
+	ht.health <- result
+}
+
+func (ht *HealthTest) start() {
+	ht.closing = make(chan chan error)
+	ht.health = make(chan bool)
+	logPrintf("Starting health test on %s, frequency=%s, retry_time=%s, timeout=%s, retries=%d", ht.ipAddress, ht.frequency, ht.retryTime, ht.timeout, ht.retries)
+	go ht.run()
+}
+
+func (ht *HealthTest) stop() (err error) {
+	// Check it's been started by existing of the closing channel
+	if ht.closing == nil {
+		return nil
+	}
+	logPrintf("Stopping health test on %s", ht.ipAddress)
+	errc := make(chan error)
+	ht.closing <- errc
+	err = <-errc
+	close(ht.closing)
+	ht.closing = nil
+	close(ht.health)
+	ht.health = nil
+	return err
+}
+
+func (ht *HealthTest) isHealthy() bool {
+	ht.healthyMutex.RLock()
+	h := ht.healthy
+	ht.healthyMutex.RUnlock()
+	return h
+}
+
+func (ht *HealthTest) setHealthy(h bool) {
+	ht.healthyMutex.Lock()
+	old := ht.healthy
+	ht.healthy = h
+	ht.healthyMutex.Unlock()
+	if old != h {
+		logPrintf("Changing health status of %s from %v to %v", ht.ipAddress, old, h)
+	}
+}
+
+func (htr *HealthTestRunner) addTest(ht *HealthTest, ref string) {
+	key := ht.String()
+	htr.entryMutex.Lock()
+	defer htr.entryMutex.Unlock()
+	if t, ok := htr.entries[key]; ok {
+		// we already have an instance of this test running. Record we are using it
+		t.references[ref] = true
+	} else {
+		// a test that isn't running. Record we are using it and start the test
+		t := &HealthTestRunnerEntry{
+			HealthTest: *ht.copy(ht.ipAddress),
+			references: make(map[string]bool),
+		}
+		if t.global {
+			t.ipAddress = nil
+		}
+		// we know it is not started, so no need for the mutex
+		t.healthy = ht.healthy
+		t.references[ref] = true
+		t.start()
+		htr.entries[key] = t
+	}
+}
+
+func (htr *HealthTestRunner) removeTest(ht *HealthTest, ref string) {
+	key := ht.String()
+	htr.entryMutex.Lock()
+	defer htr.entryMutex.Unlock()
+	if t, ok := htr.entries[key]; ok {
+		delete(t.references, ref)
+		// record the last state of health
+		ht.healthyMutex.Lock()
+		ht.healthy = t.isHealthy()
+		ht.healthyMutex.Unlock()
+		if len(t.references) == 0 {
+			// no more references, delete the test
+			t.stop()
+			delete(htr.entries, key)
+		}
+	}
+}
+
+func (htr *HealthTestRunner) refAllGlobalHealthChecks(ref string, add bool) {
+	htr.entryMutex.Lock()
+	defer htr.entryMutex.Unlock()
+	for key, t := range htr.entries {
+		if t.global {
+			if add {
+				t.references[ref] = true
+			} else {
+				delete(t.references, ref)
+				if len(t.references) == 0 {
+					// no more references, delete the test
+					t.stop()
+					delete(htr.entries, key)
+				}
+			}
+		}
+	}
+}
+
+func (htr *HealthTestRunner) isHealthy(ht *HealthTest) bool {
+	key := ht.String()
+	htr.entryMutex.RLock()
+	defer htr.entryMutex.RUnlock()
+	if t, ok := htr.entries[key]; ok {
+		if t.global {
+			healthy, ok := t.getGlobal(ht.ipAddress.String())
+			if ok {
+				return healthy
+			}
+		} else {
+			return t.isHealthy()
+		}
+	}
+	return ht.isHealthy()
+}
+
+func (z *Zone) newHealthTest(l *Label, data interface{}) {
+	// First safely get rid of any old test. As label tests
+	// should never run this should never be executed
+	if l.Test != nil {
+		l.Test.stop()
+		l.Test = nil
+	}
+
+	if data == nil {
+		return
+	}
+	if i, ok := data.(map[string]interface{}); ok {
+		if t, ok := i["type"]; ok {
+			ts := valueToString(t)
+			htp := defaultHealthTestParameters()
+			if nh, ok := HealthTesterMap[ts]; !ok {
+				log.Printf("Bad health test type '%s'", ts)
+			} else {
+				htp.testName = ts
+				h := nh(i, &htp)
+
+				for k, v := range i {
+					switch k {
+					case "frequency":
+						htp.frequency = time.Duration(valueToInt(v)) * time.Second
+					case "retry_time":
+						htp.retryTime = time.Duration(valueToInt(v)) * time.Second
+					case "timeout":
+						htp.retryTime = time.Duration(valueToInt(v)) * time.Second
+					case "retries":
+						htp.retries = valueToInt(v)
+					case "healthy_initially":
+						htp.healthyInitially = valueToBool(v)
+						logPrintf("HealthyInitially for %s is %v", l.Label, htp.healthyInitially)
+					}
+				}
+
+				l.Test = newHealthTest(nil, htp, &h)
+			}
+		}
+	}
+}
+
+func (z *Zone) StartStopHealthChecks(start bool, oldZone *Zone) {
+	logPrintf("Start/stop health checks on zone %s start=%v", z.Origin, start)
+	for labelName, label := range z.Labels {
+		for _, qtype := range healthQtypes {
+			if label.Records[qtype] != nil && len(label.Records[qtype]) > 0 {
+				for i := range label.Records[qtype] {
+					rr := label.Records[qtype][i].RR
+					var ip net.IP
+					switch rrt := rr.(type) {
+					case *dns.A:
+						ip = rrt.A
+					case *dns.AAAA:
+						ip = rrt.AAAA
+					default:
+						continue
+					}
+					var test *HealthTest
+					ref := fmt.Sprintf("%s/%s/%d/%d", z.Origin, labelName, qtype, i)
+					if start {
+						if test = label.Records[qtype][i].Test; test != nil {
+							// stop any old test
+							healthTestRunner.removeTest(test, ref)
+						} else {
+							if ltest := label.Test; ltest != nil {
+								test = ltest.copy(ip)
+								label.Records[qtype][i].Test = test
+							}
+						}
+						if test != nil {
+							test.ipAddress = ip
+							// if we are given an oldzone, let's see if we can find the old RR and
+							// copy over the initial health state, rather than use the initial health
+							// state provided from the label. This helps to stop health state bouncing
+							// when a zone file is reloaded for a purposes unrelated to the RR
+							if oldZone != nil {
+								oLabel, ok := oldZone.Labels[labelName]
+								if ok {
+									if oLabel.Test != nil {
+										for i := range oLabel.Records[qtype] {
+											oRecord := oLabel.Records[qtype][i]
+											var oip net.IP
+											switch orrt := oRecord.RR.(type) {
+											case *dns.A:
+												oip = orrt.A
+											case *dns.AAAA:
+												oip = orrt.AAAA
+											default:
+												continue
+											}
+											if oip.Equal(ip) {
+												if oRecord.Test != nil {
+													h := oRecord.Test.isHealthy()
+													logPrintf("Carrying over previous health state for %s: %v", oRecord.Test.ipAddress, h)
+													// we know the test is stopped (as we haven't started it) so we can write
+													// without the mutex and avoid a misleading log message
+													test.healthy = h
+												}
+												break
+											}
+										}
+									}
+								}
+							}
+							healthTestRunner.addTest(test, ref)
+						}
+					} else {
+						if test = label.Records[qtype][i].Test; test != nil {
+							healthTestRunner.removeTest(test, ref)
+						}
+					}
+				}
+			}
+		}
+	}
+}

+ 561 - 0
healthtesters.go

@@ -0,0 +1,561 @@
+package main
+
+import (
+	"crypto/sha256"
+	"encoding/hex"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"log"
+	"net"
+	"net/http"
+	"net/url"
+	"os"
+	"os/exec"
+	"strconv"
+	"strings"
+	"time"
+)
+
+/*
+ * Each HealthTester is a struct that implements the HealthTester interface. To do so it
+ * needs to provide three functions:
+ *
+ * func (m *MyHealthTester) Test(ht *HealthTest) bool
+ *
+ *   performs the test in question and returns a bool if it is up
+ *
+ * func (m *MyHealthTester) String() string
+ *
+ *   returns a string which contains all the paramaters within the struct that are important
+ *   for uniqueness. This is normally a call to fmt.Sprintf.
+ *
+ * func newMyHealthTester(params map[string]interface{}) HealthTester, bool
+ *
+ *   create a new health tester of type myHealthTester with parameters in params. Second
+ *   return value true if it is global (i.e. one test yields results for all IP addresses)
+ *
+ * Then add a single entry to the HealthTesterTypes map pointing to the third function
+ */
+
+var HealthTesterMap = map[string]func(params map[string]interface{}, htp *HealthTestParameters) HealthTester{
+	"tcp":      newTcpHealthTester,
+	"ntp":      newNtpHealthTester,
+	"exec":     newExecHealthTester,
+	"file":     newFileHealthTester,
+	"nodeping": newNodepingHealthTester,
+	"pingdom":  newPingdomHealthTester,
+}
+
+// TcpHealthTester tests that a port is open
+//
+// Parameters:
+//   port (integer): the port to test
+
+type TcpHealthTester struct {
+	port int
+}
+
+func (t *TcpHealthTester) Test(ht *HealthTest) bool {
+	if conn, err := net.DialTimeout("tcp", net.JoinHostPort(ht.ipAddress.String(), strconv.Itoa(t.port)), ht.timeout); err != nil {
+		return false
+	} else {
+		conn.Close()
+	}
+	return true
+}
+
+func (t *TcpHealthTester) String() string {
+	return fmt.Sprintf("%d", t.port)
+}
+
+func newTcpHealthTester(params map[string]interface{}, htp *HealthTestParameters) HealthTester {
+	port := 80
+	if v, ok := params["port"]; ok {
+		port = valueToInt(v)
+	}
+	return &TcpHealthTester{port: port}
+}
+
+// NtpHealthTester tests that NTP is running and is less than or equal to a given NTP Stratum
+//
+// Parameters:
+//   max_stratum (integer): the maximum permissible NTP stratum
+
+type NtpHealthTester struct {
+	maxStratum int
+}
+
+func (t *NtpHealthTester) Test(ht *HealthTest) bool {
+	udpAddress, err := net.ResolveUDPAddr("udp", net.JoinHostPort(ht.ipAddress.String(), "123"))
+	if err != nil {
+		return false
+	}
+
+	data := make([]byte, 48)
+	data[0] = 4<<3 | 3 /* version 4, client mode */
+
+	conn, err := net.DialUDP("udp", nil, udpAddress)
+	if err != nil {
+		return false
+	}
+
+	defer conn.Close()
+
+	_, err = conn.Write(data)
+	if err != nil {
+		return false
+	}
+
+	conn.SetDeadline(time.Now().Add(ht.timeout))
+
+	_, err = conn.Read(data)
+	if err != nil {
+		return false
+	}
+
+	stratum := data[1]
+
+	if stratum == 0 || stratum > byte(t.maxStratum) {
+		return false
+	}
+
+	return true
+}
+
+func (t *NtpHealthTester) String() string {
+	return fmt.Sprintf("%d", t.maxStratum)
+}
+
+func newNtpHealthTester(params map[string]interface{}, htp *HealthTestParameters) HealthTester {
+	maxStratum := 3
+	if v, ok := params["max_stratum"]; ok {
+		maxStratum = valueToInt(v)
+	}
+	return &NtpHealthTester{maxStratum: maxStratum}
+}
+
+// ExecHealthTester tests that an external program runs with a zero exit code
+//
+// Parameters:
+//   cmd (string): path to the external program plus space-separated parameters
+//
+// A {} in the command is substituted with the IP to test
+
+type ExecHealthTester struct {
+	cmd string
+}
+
+func (t *ExecHealthTester) Test(ht *HealthTest) bool {
+	commandSlice := strings.Split(strings.Replace(t.cmd, "{}", ht.ipAddress.String(), -1), " ")
+	cmd := exec.Command(commandSlice[0], commandSlice[1:]...)
+	return cmd.Run() == nil
+}
+
+func (t *ExecHealthTester) String() string {
+	return fmt.Sprintf("%s", t.cmd)
+}
+
+func newExecHealthTester(params map[string]interface{}, htp *HealthTestParameters) HealthTester {
+	cmd := "echo '%s'"
+	if v, ok := params["cmd"]; ok {
+		cmd = valueToString(v)
+	}
+	return &ExecHealthTester{cmd: cmd}
+}
+
+// FileHealthTester reads health of IP addresses from an external JSON map
+//
+// Parameters:
+//   path (string): path to the JSON file
+//
+// The JSON file is of the format:
+//
+//     {
+//       "192.200.0.1": true,
+//       "192.200.0.2": false
+//     }
+
+type FileHealthTester struct {
+	path         string
+	lastHash     string
+	lastReadTime time.Time
+}
+
+func (t *FileHealthTester) Test(ht *HealthTest) bool {
+	if len(t.path) == 0 {
+		logPrintf("No test file path specified")
+		return false
+	}
+
+	if file, err := os.Open(t.path); err != nil {
+		log.Printf("Cannot open test file '%s': %v", t.path, err)
+		return false
+	} else {
+		defer file.Close()
+		if stat, err := file.Stat(); err != nil {
+			log.Printf("Cannot stat test file '%s': %v", t.path, err)
+			return false
+		} else {
+			modTime := stat.ModTime()
+			if modTime == t.lastReadTime {
+				return true
+			}
+			if bytes, err := ioutil.ReadAll(file); err != nil {
+				log.Printf("Cannot read test file '%s': %v", t.path, err)
+				return false
+			} else {
+				t.lastReadTime = modTime
+
+				hasher := sha256.New()
+				hasher.Write(bytes)
+				hash := hex.EncodeToString(hasher.Sum(nil))
+				if hash == t.lastHash {
+					return true
+				}
+				t.lastHash = hash
+
+				var m map[string]bool
+				if err := json.Unmarshal(bytes, &m); err != nil {
+					log.Printf("Cannot parse test file '%s': %v", t.path, err)
+					return false
+				}
+				ht.setGlobal(m)
+				return true
+			}
+		}
+	}
+	return false
+}
+
+func (t *FileHealthTester) String() string {
+	return fmt.Sprintf("%s", t.path)
+}
+
+func newFileHealthTester(params map[string]interface{}, htp *HealthTestParameters) HealthTester {
+	var path string
+	if v, ok := params["path"]; ok {
+		path = valueToString(v)
+	}
+	htp.global = true
+	return &FileHealthTester{path: path}
+}
+
+// NodepingHealthTester reads health of IP addresses from an external Nodeping service
+//
+// The label of each test must correspond to the IP address being tested
+//
+// Parameters:
+//   token (string): API key token to use with nodeping service
+//
+// If the token is not specified it defaults to the token within the [nodeping] section of the config file
+
+type NodepingHealthTester struct {
+	token string
+}
+
+/* Response is of the form below - only down sites are mentioned
+{
+   "201511111111111-AAAAAAAAA" : {
+      "_id" : "201511111111111-AAAAAAAAA-11111111111",
+      "label" : "192.200.0.1",
+      "type" : "down",
+      "message" : "Error: connect ECONNREFUSED",
+      "t" : 14141414141414
+   }
+}
+*/
+
+func (t *NodepingHealthTester) Test(ht *HealthTest) bool {
+	token := t.token
+	if len(token) == 0 {
+		cfgMutex.RLock()
+		token = Config.Nodeping.Token
+		cfgMutex.RUnlock()
+		if len(token) == 0 {
+			logPrintf("No Nodeping API key specified")
+			return false
+		}
+	}
+
+	var vals url.Values = url.Values{}
+	vals.Set("token", token)
+	u := url.URL{
+		Host:   "api.nodeping.com",
+		Scheme: "https",
+		Path:   "/api/1/results/current",
+	}
+	u.RawQuery = vals.Encode()
+	if resp, err := http.Get(u.String()); err != nil {
+		log.Printf("Cannot access Nodeping API : %v", err)
+		return false
+	} else {
+		defer resp.Body.Close()
+		if bytes, err := ioutil.ReadAll(resp.Body); err != nil {
+			log.Printf("Cannot read from Nodeping API: %v", err)
+			return false
+		} else {
+			var m map[string]interface{}
+			if err := json.Unmarshal(bytes, &m); err != nil {
+				log.Printf("Cannot parse response from Nodeping API: %v", err)
+				return false
+			}
+
+			state := make(map[string]bool)
+			for _, item := range m {
+				if result, ok := item.(map[string]interface{}); ok {
+					if ip, ok := result["label"]; ok {
+						host := valueToString(ip)
+						logPrintf("Nodeping host %s health set to false", host)
+						state[host] = false // only down or disabled events reported
+					}
+				}
+			}
+
+			ht.setGlobal(state)
+			return true
+		}
+
+	}
+	return false
+}
+
+func (t *NodepingHealthTester) String() string {
+	return fmt.Sprintf("%s", t.token)
+}
+
+func newNodepingHealthTester(params map[string]interface{}, htp *HealthTestParameters) HealthTester {
+	var token string
+	if v, ok := params["token"]; ok {
+		token = valueToString(v)
+	}
+	// as we can only detect down nodes, not all nodes, we should assume the default is health
+	htp.healthyInitially = true
+	htp.global = true
+	return &NodepingHealthTester{token: token}
+}
+
+// PingdomHealthTester reads health of IP addresses from an external Pingdom service
+//
+// The name of each test must correspond to the IP address being tested
+//
+// Parameters:
+//   username (string): username to use with Pingdom service
+//   password (string): password to use with the Pingdom service
+//   account_email (string, optional): account email to use with Pingdom service (multi user accounts only)
+//   app_key (string, optional): application key to use with the Pingdom service (has a sensible default)
+//   state_map (map, optional): map of Pingdom status to health values (e.g. true/false)
+//
+// If any of the above are not specified, they default to the following fields within the [pingdom] section of the config file:
+// 		username
+//		password
+//		accountemail
+//		appkey
+//		statemap
+//
+// The stateMap parameter is optional and normally not required. It defaults to:
+//    { "up": true, "down": false, "paused": false}
+// which means 'up' corresponds to healthy, 'down' and 'paused' to unhealthy, and the remainder to the default value.
+//
+// To include 'unconfirmed_down' as unhealthy as well, one would use:
+//    { "up": true, "down": false, "paused": false, "unconfirmed_down", false}
+//
+
+type PingdomHealthTester struct {
+	username     string
+	password     string
+	accountEmail string
+	appKey       string
+	stateMap     map[string]bool
+}
+
+/* Response is of the form below
+
+{
+    "checks": [
+        {
+            "hostname": "example.com",
+            "id": 85975,
+            "lasterrortime": 1297446423,
+            "lastresponsetime": 355,
+            "lasttesttime": 1300977363,
+            "name": "My check 1",
+            "resolution": 1,
+            "status": "up",
+            "type": "http",
+            "tags": [
+                {
+                    "name": "apache",
+                    "type": "a",
+                    "count": 2
+                }
+            ]
+        },
+        ...
+    ]
+}
+*/
+
+func (t *PingdomHealthTester) Test(ht *HealthTest) bool {
+	username := t.username
+	if len(username) == 0 {
+		cfgMutex.RLock()
+		username = Config.Pingdom.Username
+		cfgMutex.RUnlock()
+		if len(username) == 0 {
+			logPrintf("No Pingdom username specified")
+			return false
+		}
+	}
+
+	password := t.password
+	if len(password) == 0 {
+		cfgMutex.RLock()
+		password = Config.Pingdom.Password
+		cfgMutex.RUnlock()
+		if len(password) == 0 {
+			logPrintf("No Pingdom password specified")
+			return false
+		}
+	}
+
+	accountEmail := t.accountEmail
+	if len(accountEmail) == 0 {
+		cfgMutex.RLock()
+		accountEmail = Config.Pingdom.AccountEmail
+		cfgMutex.RUnlock()
+	}
+
+	appKey := t.appKey
+	if len(appKey) == 0 {
+		cfgMutex.RLock()
+		appKey = Config.Pingdom.AppKey
+		cfgMutex.RUnlock()
+		if len(appKey) == 0 {
+			appKey = "gyxtnd2fzco8ys29m8luk4syag4ybmc0"
+		}
+	}
+
+	stateMap := t.stateMap
+	if stateMap == nil {
+		cfgMutex.RLock()
+		stateMapString := Config.Pingdom.StateMap
+		cfgMutex.RUnlock()
+		if len(stateMapString) > 0 {
+			stateMap = make(map[string]bool)
+			if err := json.Unmarshal([]byte(stateMapString), &stateMap); err != nil {
+				logPrintf("Cannot decode configfile Pingdom state map JSON")
+				return false
+			}
+		}
+		if stateMap == nil {
+			stateMap = defaultPingdomStateMap
+		}
+	}
+
+	var vals url.Values = url.Values{}
+	u := url.URL{
+		Host:   "api.pingdom.com",
+		Scheme: "https",
+		Path:   "/api/2.0/checks",
+	}
+	u.RawQuery = vals.Encode()
+
+	client := &http.Client{}
+
+	if req, err := http.NewRequest("GET", u.String(), nil); err != nil {
+		log.Printf("Cannot construct Pingdom API request: %v", err)
+	} else {
+		req.SetBasicAuth(username, password)
+		if len(accountEmail) > 0 {
+			req.Header.Add("Account-Email", accountEmail)
+		}
+		req.Header.Add("App-Key", appKey)
+		if resp, err := client.Do(req); err != nil {
+			log.Printf("Cannot access Pingdom API : %v", err)
+			return false
+		} else {
+			defer resp.Body.Close()
+			if bytes, err := ioutil.ReadAll(resp.Body); err != nil {
+				log.Printf("Cannot read from Pingdom API: %v", err)
+				return false
+			} else {
+				var m map[string]interface{}
+				if err := json.Unmarshal(bytes, &m); err != nil {
+					log.Printf("Cannot parse response from Pingdom API: %v", err)
+					return false
+				}
+				if checks, ok := m["checks"]; !ok {
+					log.Printf("Cannot parse response from Pingdom API check response")
+					return false
+				} else {
+					if checkarray, ok := checks.([]interface{}); !ok {
+						log.Printf("Cannot parse response from Pingdom API check array: %T", checks)
+						return false
+					} else {
+						state := make(map[string]bool)
+						for _, checki := range checkarray {
+							if check, ok := checki.(map[string]interface{}); ok {
+								if ip, ok := check["name"]; ok {
+									if status, ok := check["status"]; ok {
+										s := valueToString(status)
+										if updown, ok := stateMap[s]; ok {
+											host := valueToString(ip)
+											state[host] = updown
+											logPrintf("Pingdom host %s state %s health set to %v", host, s, updown)
+										}
+									}
+								}
+							}
+						}
+
+						ht.setGlobal(state)
+						return true
+					}
+				}
+			}
+		}
+	}
+	return false
+}
+
+func (t *PingdomHealthTester) String() string {
+	return fmt.Sprintf("%s/%s/%s/%s/%v", t.username, t.password, t.accountEmail, t.appKey, t.stateMap)
+}
+
+var defaultPingdomStateMap = map[string]bool{
+	"up":     true,
+	"down":   false,
+	"paused": false,
+	// other states, i.e. unconfirmed_down, paused, are determined by initially_healthy
+}
+
+func newPingdomHealthTester(params map[string]interface{}, htp *HealthTestParameters) HealthTester {
+	var username string
+	var password string
+	var accountEmail string
+	var appKey string
+	var stateMap map[string]bool = nil
+	if v, ok := params["username"]; ok {
+		username = valueToString(v)
+	}
+	if v, ok := params["password"]; ok {
+		password = valueToString(v)
+	}
+	if v, ok := params["account_email"]; ok {
+		accountEmail = valueToString(v)
+	}
+	if v, ok := params["app_key"]; ok {
+		appKey = valueToString(v)
+	}
+	if v, ok := params["state_map"]; ok {
+		if vv, ok := v.(map[string]interface{}); ok {
+			stateMap = make(map[string]bool)
+			for k, s := range vv {
+				stateMap[valueToString(k)] = valueToBool(s)
+			}
+		}
+	}
+	htp.global = true
+	return &PingdomHealthTester{username: username, password: password, accountEmail: accountEmail, appKey: appKey, stateMap: stateMap}
+}

+ 20 - 6
picker.go

@@ -26,24 +26,38 @@ func (label *Label) Picker(qtype uint16, max int, location *Location) Records {
 
 	if labelRR := label.Records[qtype]; labelRR != nil {
 
+		sum := label.Weight[qtype]
+
+		servers := make([]Record, len(labelRR))
+		copy(servers, labelRR)
+
+		if label.Test != nil {
+			// Remove any unhealthy servers
+			tmpServers := servers[:0]
+			sum = 0
+			for i, s := range servers {
+				if servers[i].Test == nil || healthTestRunner.isHealthy(servers[i].Test) {
+					tmpServers = append(tmpServers, s)
+					sum += s.Weight
+				}
+			}
+			servers = tmpServers
+		}
+
 		// not "balanced", just return all
 		if label.Weight[qtype] == 0 {
-			return labelRR
+			return servers
 		}
 
 		if qtype == dns.TypeCNAME || qtype == dns.TypeMF {
 			max = 1
 		}
 
-		rrCount := len(labelRR)
+		rrCount := len(servers)
 		if max > rrCount {
 			max = rrCount
 		}
-
-		servers := make([]Record, len(labelRR))
-		copy(servers, labelRR)
 		result := make([]Record, max)
-		sum := label.Weight[qtype]
 
 		// Find the distance to each server, and find the servers that are
 		// closer to the querier than the max'th furthest server, or within

+ 3 - 0
zone.go

@@ -26,6 +26,7 @@ type Record struct {
 	RR     dns.RR
 	Weight int
 	Loc    *Location
+	Test   *HealthTest
 }
 
 type Records []Record
@@ -44,6 +45,7 @@ type Label struct {
 	Records  map[uint16]Records
 	Weight   map[uint16]int
 	Closest  bool
+	Test     *HealthTest
 }
 
 type labels map[string]*Label
@@ -111,6 +113,7 @@ func (z *Zone) SetupMetrics(old *Zone) {
 }
 
 func (z *Zone) Close() {
+	z.StartStopHealthChecks(false, nil)
 	z.Metrics.Registry.UnregisterAll()
 	if z.Metrics.LabelStats != nil {
 		z.Metrics.LabelStats.Close()

+ 11 - 0
zones.go

@@ -39,8 +39,16 @@ func zonesReader(dirName string, zones Zones) {
 
 func addHandler(zones Zones, name string, config *Zone) {
 	oldZone := zones[name]
+	// across the recconfiguration keep a reference to all healthchecks to ensure
+	// the global map doesn't get destroyed
+	healthTestRunner.refAllGlobalHealthChecks(name, true)
+	defer healthTestRunner.refAllGlobalHealthChecks(name, false)
+	if oldZone != nil {
+		oldZone.StartStopHealthChecks(false, nil)
+	}
 	config.SetupMetrics(oldZone)
 	zones[name] = config
+	config.StartStopHealthChecks(true, oldZone)
 	dns.HandleFunc(name, setupServerFunc(config))
 }
 
@@ -307,6 +315,9 @@ func setupZoneData(data map[string]interface{}, Zone *Zone) {
 			case "ttl":
 				label.Ttl = valueToInt(rdata)
 				continue
+			case "test":
+				Zone.newHealthTest(label, rdata)
+				continue
 			}
 
 			dnsType, ok := recordTypes[rType]