Browse Source

add old backend compatibility

flashmob 8 years ago
parent
commit
1d59402815
2 changed files with 199 additions and 21 deletions
  1. 2 21
      backends/guerrilla_db_redis.go
  2. 197 0
      backends/proxy.go

+ 2 - 21
backends/guerrilla_db_redis.go

@@ -50,7 +50,8 @@ const GuerrillaDBAndRedisBatchMax = 2
 const GuerrillaDBAndRedisBatchTimeout = time.Second * 3
 
 func init() {
-	backends["guerrilla-db-redis"] = &GuerrillaDBAndRedisBackend{}
+	backends["guerrilla-db-redis"] = &ProxyBackend{
+		extend: &GuerrillaDBAndRedisBackend{}}
 }
 
 type GuerrillaDBAndRedisBackend struct {
@@ -453,23 +454,3 @@ func (g *GuerrillaDBAndRedisBackend) testSettings() (err error) {
 
 	return
 }
-
-func (g *GuerrillaDBAndRedisBackend) Initialize(config BackendConfig) error {
-	err := g.loadConfig(config)
-	if err != nil {
-		return err
-	}
-	return nil
-
-}
-
-// does nothing
-func (g *GuerrillaDBAndRedisBackend) Process(mail *envelope.Envelope) Result {
-	return NewResult("250 OK")
-}
-
-// does nothing
-func (g *GuerrillaDBAndRedisBackend) Shutdown() error {
-
-	return nil
-}

+ 197 - 0
backends/proxy.go

@@ -0,0 +1,197 @@
+package backends
+
+import (
+	"errors"
+	"fmt"
+	"github.com/flashmob/go-guerrilla/envelope"
+	"sync"
+)
+
+// The ProxyBackend makes it possible to use the old backend system
+// which is not using processors
+type ProxyBackend struct {
+	config       proxyConfig
+	extend       proxy
+	saveMailChan chan *savePayload
+	State        backendState
+	// waits for backend workers to start/stop
+	wg sync.WaitGroup
+}
+
+type savePayload struct {
+	mail        *envelope.Envelope
+	from        *envelope.EmailAddress
+	recipient   *envelope.EmailAddress
+	savedNotify chan *saveStatus
+}
+
+type saveStatus struct {
+	err  error
+	hash string
+}
+
+type proxy interface {
+
+	//	Backend
+	/*
+		saveMailWorker
+		numberOfWorkersGetter
+		settingsTester
+		configLoader
+	*/
+
+}
+
+type saveMailWorker interface {
+	// start save mail worker(s)
+	saveMailWorker(chan *savePayload)
+}
+
+type numberOfWorkersGetter interface {
+	// get the number of workers that will be stared
+	getNumberOfWorkers() int
+}
+
+type settingsTester interface {
+	// test database settings, permissions, correct paths, etc, before starting workers
+	testSettings() error
+}
+
+type configLoader interface {
+	// parse the configuration files
+	loadConfig(BackendConfig) error
+}
+
+type proxyConfig struct {
+	LogReceivedMails bool `json:"log_received_mails"`
+}
+
+// Your backend should implement this method and set b.config field with a custom config struct
+// Therefore, your implementation would have your own custom config type instead of dummyConfig
+func (pb *ProxyBackend) loadConfig(backendConfig BackendConfig) (err error) {
+	// Load the backend config for the backend. It has already been unmarshalled
+	// from the main config file 'backend' config "backend_config"
+	// Now we need to convert each type and copy into the dummyConfig struct
+	configType := BaseConfig(&proxyConfig{})
+	bcfg, err := Svc.ExtractConfig(backendConfig, configType)
+	if err != nil {
+		return err
+	}
+	m := bcfg.(*proxyConfig)
+	pb.config = *m
+	return nil
+}
+
+func (pb *ProxyBackend) initialize(config BackendConfig) error {
+
+	if cl, ok := pb.extend.(configLoader); ok {
+		return cl.loadConfig(config)
+	}
+	err := pb.loadConfig(config)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (pb *ProxyBackend) Initialize(cfg BackendConfig) error {
+	err := pb.initialize(cfg)
+	if err == nil {
+		workersSize := pb.getNumberOfWorkers()
+		if workersSize < 1 {
+			pb.State = BackendStateError
+			return errors.New("Must have at least 1 worker")
+		}
+		if err := pb.testSettings(); err != nil {
+			pb.State = BackendStateError
+			return err
+		}
+		pb.saveMailChan = make(chan *savePayload, workersSize)
+		// start our savemail workers
+		pb.wg.Add(workersSize)
+		for i := 0; i < workersSize; i++ {
+			go func() {
+				pb.saveMailWorker(pb.saveMailChan)
+				pb.wg.Done()
+			}()
+		}
+	} else {
+		pb.State = BackendStateError
+	}
+	return err
+}
+
+func (pb *ProxyBackend) Shutdown() error {
+	if b, ok := pb.extend.(Backend); ok {
+		return b.Shutdown()
+	}
+	return nil
+}
+
+func (pb *ProxyBackend) ValidateRcpt(mail *envelope.Envelope) RcptError {
+	if b, ok := pb.extend.(Backend); ok {
+		return b.ValidateRcpt(mail)
+	}
+	return nil
+}
+
+func (pb *ProxyBackend) Process(mail *envelope.Envelope) Result {
+	if b, ok := pb.extend.(Backend); ok {
+		return b.Process(mail)
+	}
+	mail.ParseHeaders()
+
+	if pb.config.LogReceivedMails {
+		Log().Infof("Mail from: %s / to: %v", mail.MailFrom.String(), mail.RcptTo)
+		Log().Info("Headers are: %s", mail.Header)
+
+	}
+	return NewResult("250 OK")
+}
+
+func (pb *ProxyBackend) saveMailWorker(saveMailChan chan *savePayload) {
+	if s, ok := pb.extend.(saveMailWorker); ok {
+		s.saveMailWorker(saveMailChan)
+		return
+	}
+
+	defer func() {
+		if r := recover(); r != nil {
+			// recover form closed channel
+			fmt.Println("Recovered in f", r)
+		}
+		// close any connections / files
+		// ...
+
+	}()
+	for {
+		payload := <-saveMailChan
+		if payload == nil {
+			Log().Debug("No more saveMailChan payload")
+			return
+		}
+		// process the email here
+		result := pb.Process(payload.mail)
+		// if all good
+		if result.Code() < 300 {
+			payload.savedNotify <- &saveStatus{nil, "s0m3l337Ha5hva1u3LOL"}
+		} else {
+			payload.savedNotify <- &saveStatus{errors.New(result.String()), "s0m3l337Ha5hva1u3LOL"}
+		}
+
+	}
+}
+
+func (pb *ProxyBackend) getNumberOfWorkers() int {
+	if n, ok := pb.extend.(numberOfWorkersGetter); ok {
+		return n.getNumberOfWorkers()
+	}
+	return 1
+}
+
+func (b *ProxyBackend) testSettings() error {
+	if t, ok := b.extend.(settingsTester); ok {
+		return t.testSettings()
+	}
+	return nil
+}