Pārlūkot izejas kodu

Fixes issue #101 - Gateway Timeout deadlock. (#102)

* Fixes issue #101 - Gateway Timeout deadlock.

* update tests
Flashmob 7 gadi atpakaļ
vecāks
revīzija
5980d3ea27
8 mainītis faili ar 186 papildinājumiem un 55 dzēšanām
  1. 4 0
      Makefile
  2. 4 11
      api_test.go
  3. 21 11
      backends/gateway.go
  4. 14 0
      backends/p_debugger.go
  5. 3 3
      backends/p_guerrilla_db_redis.go
  6. 3 1
      goguerrilla.conf.sample
  7. 12 29
      mail/envelope.go
  8. 125 0
      server_test.go

+ 4 - 0
Makefile

@@ -24,6 +24,10 @@ dependencies:
 guerrillad: *.go */*.go */*/*.go
 	$(GO_VARS) $(GO) build -o="guerrillad" -ldflags="$(LD_FLAGS)" $(ROOT)/cmd/guerrillad
 
+guerrilladrace: *.go */*.go */*/*.go
+	$(GO_VARS) $(GO) build -o="guerrillad" -race -ldflags="$(LD_FLAGS)" $(ROOT)/cmd/guerrillad
+
+
 test: *.go */*.go */*/*.go
 	$(GO_VARS) $(GO) test -v .
 	$(GO_VARS) $(GO) test -v ./tests

+ 4 - 11
api_test.go

@@ -16,16 +16,14 @@ import (
 
 // Test Starting smtp without setting up logger / backend
 func TestSMTP(t *testing.T) {
+	done := make(chan bool)
 	go func() {
 		select {
 		case <-time.After(time.Second * 40):
-			//buf := make([]byte, 1<<16)
-			//stackSize := runtime.Stack(buf, true)
-			//fmt.Printf("%s\n", string(buf[0:stackSize]))
-			//panic("timeout")
 			t.Error("timeout")
 			return
-
+		case <-done:
+			return
 		}
 	}()
 
@@ -52,6 +50,7 @@ func TestSMTP(t *testing.T) {
 	}
 	time.Sleep(time.Second * 2)
 	d.Shutdown()
+	done <- true
 
 }
 
@@ -414,25 +413,19 @@ func talkToServer(address string) {
 	}
 	in := bufio.NewReader(conn)
 	str, err := in.ReadString('\n')
-	//	fmt.Println(str)
 	fmt.Fprint(conn, "HELO maildiranasaurustester\r\n")
 	str, err = in.ReadString('\n')
-	//	fmt.Println(str)
 	fmt.Fprint(conn, "MAIL FROM:<[email protected]>r\r\n")
 	str, err = in.ReadString('\n')
-	//	fmt.Println(str)
 	fmt.Fprint(conn, "RCPT TO:[email protected]\r\n")
 	str, err = in.ReadString('\n')
-	//	fmt.Println(str)
 	fmt.Fprint(conn, "DATA\r\n")
 	str, err = in.ReadString('\n')
-	//	fmt.Println(str)
 	fmt.Fprint(conn, "Subject: Test subject\r\n")
 	fmt.Fprint(conn, "\r\n")
 	fmt.Fprint(conn, "A an email body\r\n")
 	fmt.Fprint(conn, ".\r\n")
 	str, err = in.ReadString('\n')
-	//	fmt.Println(str)
 	_ = str
 }
 

+ 21 - 11
backends/gateway.go

@@ -139,14 +139,20 @@ func (gw *BackendGateway) Process(e *mail.Envelope) Result {
 	// or timeout
 	select {
 	case status := <-workerMsg.notifyMe:
-		defer workerMsgPool.Put(workerMsg) // can be recycled since we used the notifyMe channel
+		workerMsgPool.Put(workerMsg) // can be recycled since we used the notifyMe channel
 		if status.err != nil {
 			return NewResult(response.Canned.FailBackendTransaction + status.err.Error())
 		}
 		return NewResult(response.Canned.SuccessMessageQueued + status.queuedID)
-
 	case <-time.After(gw.saveTimeout()):
-		Log().Error("Backend has timed out while saving eamil")
+		Log().Error("Backend has timed out while saving email")
+		e.Lock() // lock the envelope - it's still processing here, we don't want the server to recycle it
+		go func() {
+			// keep waiting for the backend to finish processing
+			<-workerMsg.notifyMe
+			e.Unlock()
+			workerMsgPool.Put(workerMsg)
+		}()
 		return NewResult(response.Canned.FailBackendTimeout)
 	}
 }
@@ -169,13 +175,20 @@ func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
 	// or timeout
 	select {
 	case status := <-workerMsg.notifyMe:
+		workerMsgPool.Put(workerMsg)
 		if status.err != nil {
 			return status.err
 		}
 		return nil
 
 	case <-time.After(gw.validateRcptTimeout()):
-		Log().Error("Backend has timed out while validating rcpt")
+		e.Lock()
+		go func() {
+			<-workerMsg.notifyMe
+			e.Unlock()
+			workerMsgPool.Put(workerMsg)
+			Log().Error("Backend has timed out while validating rcpt")
+		}()
 		return StorageTimeout
 	}
 }
@@ -260,7 +273,7 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 	gw.Lock()
 	defer gw.Unlock()
 	if gw.State != BackendStateNew && gw.State != BackendStateShuttered {
-		return errors.New("Can only Initialize in BackendStateNew or BackendStateShuttered state")
+		return errors.New("can only Initialize in BackendStateNew or BackendStateShuttered state")
 	}
 	err := gw.loadConfig(cfg)
 	if err != nil {
@@ -270,7 +283,7 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 	workersSize := gw.workersSize()
 	if workersSize < 1 {
 		gw.State = BackendStateError
-		return errors.New("Must have at least 1 worker")
+		return errors.New("must have at least 1 worker")
 	}
 	gw.processors = make([]Processor, 0)
 	gw.validators = make([]Processor, 0)
@@ -403,7 +416,6 @@ func (gw *BackendGateway) workDispatcher(
 
 			if state == dispatcherStateWorking {
 				msg.notifyMe <- &notifyMsg{err: errors.New("storage failed")}
-				msg.e.Unlock()
 			}
 			state = dispatcherStatePanic
 			return
@@ -421,14 +433,13 @@ func (gw *BackendGateway) workDispatcher(
 			Log().Infof("stop signal for worker (#%d)", workerId)
 			return
 		case msg = <-workIn:
-			msg.e.Lock()
-			state = dispatcherStateWorking
+			state = dispatcherStateWorking // recovers from panic if in this state
 			if msg.task == TaskSaveMail {
 				// process the email here
 				result, _ := save.Process(msg.e, TaskSaveMail)
 				state = dispatcherStateNotify
 				if result.Code() < 300 {
-					// if all good, let the gateway know that it was queued
+					// if all good, let the gateway know that it was saved
 					msg.notifyMe <- &notifyMsg{nil, msg.e.QueuedId}
 				} else {
 					// notify the gateway about the error
@@ -445,7 +456,6 @@ func (gw *BackendGateway) workDispatcher(
 					msg.notifyMe <- &notifyMsg{err: nil}
 				}
 			}
-			msg.e.Unlock()
 		}
 		state = dispatcherStateIdle
 	}

+ 14 - 0
backends/p_debugger.go

@@ -3,6 +3,7 @@ package backends
 import (
 	"github.com/flashmob/go-guerrilla/mail"
 	"strings"
+	"time"
 )
 
 // ----------------------------------------------------------------------------------
@@ -24,6 +25,7 @@ func init() {
 
 type debuggerConfig struct {
 	LogReceivedMails bool `json:"log_received_mails"`
+	SleepSec         int  `json:"sleep_seconds,omitempty"`
 }
 
 func Debugger() Decorator {
@@ -45,6 +47,18 @@ func Debugger() Decorator {
 					Log().Infof("Mail from: %s / to: %v", e.MailFrom.String(), e.RcptTo)
 					Log().Info("Headers are:", e.Header)
 				}
+
+				if config.SleepSec > 0 {
+					Log().Infof("sleeping for %d", config.SleepSec)
+					time.Sleep(time.Second * time.Duration(config.SleepSec))
+					Log().Infof("woke up")
+
+					if config.SleepSec == 1 {
+						panic("panic on purpose")
+					}
+
+				}
+
 				// continue to the next Processor in the decorator stack
 				return p.Process(e, task)
 			} else {

+ 3 - 3
backends/p_guerrilla_db_redis.go

@@ -30,7 +30,7 @@ import (
 // ----------------------------------------------------------------------------------
 func init() {
 	processors["guerrillaredisdb"] = func() Decorator {
-		return GuerrillaDbReddis()
+		return GuerrillaDbRedis()
 	}
 }
 
@@ -352,9 +352,9 @@ func (c *redisClient) redisConnection(redisInterface string) (err error) {
 
 type feedChan chan []interface{}
 
-// GuerrillaDbReddis is a specialized processor for Guerrilla mail. It is here as an example.
+// GuerrillaDbRedis is a specialized processor for Guerrilla mail. It is here as an example.
 // It's an example of a 'monolithic' processor.
-func GuerrillaDbReddis() Decorator {
+func GuerrillaDbRedis() Decorator {
 
 	g := GuerrillaDBAndRedisBackend{}
 	redisClient := &redisClient{}

+ 3 - 1
goguerrilla.conf.sample

@@ -13,7 +13,9 @@
         "log_received_mails": true,
         "save_workers_size": 1,
         "save_process" : "HeadersParser|Header|Debugger",
-        "primary_mail_host" : "mail.example.com"
+        "primary_mail_host" : "mail.example.com",
+        "gw_save_timeout" : "30s",
+        "gw_val_rcpt_timeout" : "3s"
     },
     "servers" : [
         {

+ 12 - 29
mail/envelope.go

@@ -104,7 +104,7 @@ func queuedID(clientID uint64) string {
 func (e *Envelope) ParseHeaders() error {
 	var err error
 	if e.Header != nil {
-		return errors.New("Headers already parsed")
+		return errors.New("headers already parsed")
 	}
 	buf := bytes.NewBuffer(e.Data.Bytes())
 	// find where the header ends, assuming that over 30 kb would be max
@@ -153,6 +153,12 @@ func (e *Envelope) String() string {
 
 // ResetTransaction is called when the transaction is reset (keeping the connection open)
 func (e *Envelope) ResetTransaction() {
+
+	// ensure not processing by the backend, will only get lock if finished, otherwise block
+	e.Lock()
+	// got the lock, it means processing finished
+	e.Unlock()
+
 	e.MailFrom = Address{}
 	e.RcptTo = []Address{}
 	// reset the data buffer, keep it allocated
@@ -318,36 +324,13 @@ func (p *Pool) Borrow(remoteAddr string, clientID uint64) *Envelope {
 }
 
 // Return returns an envelope back to the envelope pool
-// Note that an envelope will not be recycled while it still is
-// processing
+// Make sure that envelope finished processing before calling this
 func (p *Pool) Return(e *Envelope) {
-	// we down't want to recycle an envelope that may still be processing
-	isUnlocked := func() <-chan bool {
-		signal := make(chan bool)
-		// make sure envelope finished processing
-		go func() {
-			// lock will block if still processing
-			e.Lock()
-			// got the lock, it means processing finished
-			e.Unlock()
-			// generate a signal
-			signal <- true
-		}()
-		return signal
-	}()
-
 	select {
-	case <-time.After(time.Second * 30):
-		// envelope still processing, we can't recycle it.
-	case <-isUnlocked:
-		// The envelope was _unlocked_, it finished processing
-		// put back in the pool or destroy
-		select {
-		case p.pool <- e:
-			//placed envelope back in pool
-		default:
-			// pool is full, don't return
-		}
+	case p.pool <- e:
+		//placed envelope back in pool
+	default:
+		// pool is full, discard it
 	}
 	// take a value off the semaphore to make room for more envelopes
 	<-p.sem

+ 125 - 0
server_test.go

@@ -8,10 +8,12 @@ import (
 	"strings"
 	"sync"
 
+	"fmt"
 	"github.com/flashmob/go-guerrilla/backends"
 	"github.com/flashmob/go-guerrilla/log"
 	"github.com/flashmob/go-guerrilla/mail"
 	"github.com/flashmob/go-guerrilla/mocks"
+	"net"
 )
 
 // getMockServerConfig gets a mock ServerConfig struct used for creating a new server
@@ -144,5 +146,128 @@ func TestXClient(t *testing.T) {
 	wg.Wait() // wait for handleClient to exit
 }
 
+// The backend gateway should time out after 1 second because it sleeps for 2 sec.
+// The transaction should wait until finished, and then test to see if we can do
+// a second transaction
+func TestGatewayTimeout(t *testing.T) {
+
+	bcfg := backends.BackendConfig{
+		"save_workers_size":   1,
+		"save_process":        "HeadersParser|Debugger",
+		"log_received_mails":  true,
+		"primary_mail_host":   "example.com",
+		"gw_save_timeout":     "1s",
+		"gw_val_rcpt_timeout": "1s",
+		"sleep_seconds":       2,
+	}
+
+	cfg := &AppConfig{
+		LogFile:      log.OutputOff.String(),
+		AllowedHosts: []string{"grr.la"},
+	}
+	cfg.BackendConfig = bcfg
+
+	d := Daemon{Config: cfg}
+	err := d.Start()
+
+	if err != nil {
+		t.Error("server didn't start")
+	} else {
+
+		conn, err := net.Dial("tcp", "127.0.0.1:2525")
+		if err != nil {
+
+			return
+		}
+		in := bufio.NewReader(conn)
+		str, err := in.ReadString('\n')
+		fmt.Fprint(conn, "HELO host\r\n")
+		str, err = in.ReadString('\n')
+		// perform 2 transactions
+		// both should panic.
+		for i := 0; i < 2; i++ {
+			fmt.Fprint(conn, "MAIL FROM:<[email protected]>r\r\n")
+			str, err = in.ReadString('\n')
+			fmt.Fprint(conn, "RCPT TO:[email protected]\r\n")
+			str, err = in.ReadString('\n')
+			fmt.Fprint(conn, "DATA\r\n")
+			str, err = in.ReadString('\n')
+			fmt.Fprint(conn, "Subject: Test subject\r\n")
+			fmt.Fprint(conn, "\r\n")
+			fmt.Fprint(conn, "A an email body\r\n")
+			fmt.Fprint(conn, ".\r\n")
+			str, err = in.ReadString('\n')
+			expect := "transaction timeout"
+			if strings.Index(str, expect) == -1 {
+				t.Error("Expected the reply to have'", expect, "'but got", str)
+			}
+		}
+		_ = str
+
+		d.Shutdown()
+	}
+}
+
+// The processor will panic and gateway should recover from it
+func TestGatewayPanic(t *testing.T) {
+	bcfg := backends.BackendConfig{
+		"save_workers_size":   1,
+		"save_process":        "HeadersParser|Debugger",
+		"log_received_mails":  true,
+		"primary_mail_host":   "example.com",
+		"gw_save_timeout":     "2s",
+		"gw_val_rcpt_timeout": "2s",
+		"sleep_seconds":       1,
+	}
+
+	cfg := &AppConfig{
+		LogFile:      log.OutputOff.String(),
+		AllowedHosts: []string{"grr.la"},
+	}
+	cfg.BackendConfig = bcfg
+
+	d := Daemon{Config: cfg}
+	err := d.Start()
+
+	if err != nil {
+		t.Error("server didn't start")
+	} else {
+
+		conn, err := net.Dial("tcp", "127.0.0.1:2525")
+		if err != nil {
+
+			return
+		}
+		in := bufio.NewReader(conn)
+		str, err := in.ReadString('\n')
+		fmt.Fprint(conn, "HELO host\r\n")
+		str, err = in.ReadString('\n')
+		// perform 2 transactions
+		// both should timeout. The reason why 2 is because we want to make
+		// sure that the client waits until processing finishes, and the
+		// timeout event is captured.
+		for i := 0; i < 2; i++ {
+			fmt.Fprint(conn, "MAIL FROM:<[email protected]>r\r\n")
+			str, err = in.ReadString('\n')
+			fmt.Fprint(conn, "RCPT TO:[email protected]\r\n")
+			str, err = in.ReadString('\n')
+			fmt.Fprint(conn, "DATA\r\n")
+			str, err = in.ReadString('\n')
+			fmt.Fprint(conn, "Subject: Test subject\r\n")
+			fmt.Fprint(conn, "\r\n")
+			fmt.Fprint(conn, "A an email body\r\n")
+			fmt.Fprint(conn, ".\r\n")
+			str, err = in.ReadString('\n')
+			expect := "storage failed"
+			if strings.Index(str, expect) == -1 {
+				t.Error("Expected the reply to have'", expect, "'but got", str)
+			}
+		}
+		_ = str
+		d.Shutdown()
+	}
+
+}
+
 // TODO
 // - test github issue #44 and #42