Browse Source

- backend logger refactoring

flashmob 8 years ago
parent
commit
0c84f5e694

+ 16 - 29
backends/backend.go

@@ -8,10 +8,10 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"sync/atomic"
 )
 
 var (
-	mainlog log.Logger
 	Service *BackendService
 	// deprecated backends system
 	backends = map[string]Backend{}
@@ -37,25 +37,9 @@ type Backend interface {
 	Shutdown() error
 }
 
-/*
-type Worker interface {
-	// start save mail worker(s)
-	saveMailWorker(chan *savePayload)
-	// get the number of workers that will be stared
-	getNumberOfWorkers() int
-	// test database settings, permissions, correct paths, etc, before starting workers
-	// parse the configuration files
-	loadConfig(BackendConfig) error
-
-	Shutdown() error
-	Process(*envelope.Envelope) BackendResult
-	Initialize(BackendConfig) error
-
-	SetProcessors(p ...Decorator)
-}
-*/
 type BackendConfig map[string]interface{}
 
+// All config structs extend from this
 type baseConfig interface{}
 
 type saveStatus struct {
@@ -63,13 +47,6 @@ type saveStatus struct {
 	hash string
 }
 
-type savePayload struct {
-	mail *envelope.Envelope
-	//from        *envelope.EmailAddress
-	//recipient   *envelope.EmailAddress
-	savedNotify chan *saveStatus
-}
-
 // BackendResult represents a response to an SMTP client after receiving DATA.
 // The String method should return an SMTP message ready to send back to the
 // client, for example `250 OK: Message received`.
@@ -129,13 +106,23 @@ func (s Shutdown) Shutdown() error {
 }
 
 type BackendService struct {
-	ProcessorHandlers
+	Initializers []ProcessorInitializer
+	Shutdowners  []ProcessorShutdowner
 	sync.Mutex
+	mainlog atomic.Value
 }
 
-type ProcessorHandlers struct {
-	Initializers []ProcessorInitializer
-	Shutdowners  []ProcessorShutdowner
+// Get loads the log.logger in an atomic operation. Returns a stderr logger if not able to load
+func Log() log.Logger {
+	if v, ok := Service.mainlog.Load().(log.Logger); ok {
+		return v
+	}
+	l, _ := log.GetLogger(log.OutputStderr.String())
+	return l
+}
+
+func (b *BackendService) StoreMainlog(l log.Logger) {
+	b.mainlog.Store(l)
 }
 
 // AddInitializer adds a function that impliments ProcessorShutdowner to be called when initializing

+ 23 - 11
backends/gateway.go

@@ -24,10 +24,10 @@ type BackendGateway struct {
 	w  *Worker
 	b  Backend
 	// controls access to state
-	stateGuard sync.Mutex
-	State      backendState
-	config     BackendConfig
-	gwConfig   *GatewayConfig
+	sync.Mutex
+	State    backendState
+	config   BackendConfig
+	gwConfig *GatewayConfig
 }
 
 type GatewayConfig struct {
@@ -35,6 +35,13 @@ type GatewayConfig struct {
 	ProcessorLine string `json:"process_line,omitempty"`
 }
 
+// savePayload is what get placed on the BackendGateway.saveMailChan channel
+type savePayload struct {
+	mail *envelope.Envelope
+	// savedNotify is used to notify that the save operation completed
+	savedNotify chan *saveStatus
+}
+
 // possible values for state
 const (
 	BackendStateRunning = iota
@@ -51,7 +58,7 @@ func (s backendState) String() string {
 // New retrieve a backend specified by the backendName, and initialize it using
 // backendConfig
 func New(backendName string, backendConfig BackendConfig, l log.Logger) (Backend, error) {
-	mainlog = l
+	Service.StoreMainlog(l)
 	gateway := &BackendGateway{config: backendConfig}
 	if backend, found := backends[backendName]; found {
 		gateway.b = backend
@@ -83,17 +90,19 @@ func (gw *BackendGateway) Process(e *envelope.Envelope) BackendResult {
 		return NewBackendResult(response.Canned.SuccessMessageQueued + status.hash)
 
 	case <-time.After(time.Second * 30):
-		mainlog.Infof("Backend has timed out")
+		Log().Infof("Backend has timed out")
 		return NewBackendResult(response.Canned.FailBackendTimeout)
 	}
+
 }
 
 // Shutdown shuts down the backend and leaves it in BackendStateShuttered state
 func (gw *BackendGateway) Shutdown() error {
-	gw.stateGuard.Lock()
-	defer gw.stateGuard.Unlock()
+	gw.Lock()
+	defer gw.Unlock()
 	if gw.State != BackendStateShuttered {
 		close(gw.saveMailChan) // workers will stop
+		// wait for workers to stop
 		gw.wg.Wait()
 		Service.Shutdown()
 		gw.State = BackendStateShuttered
@@ -103,6 +112,8 @@ func (gw *BackendGateway) Shutdown() error {
 
 // Reinitialize starts up a backend gateway that was shutdown before
 func (gw *BackendGateway) Reinitialize() error {
+	gw.Lock()
+	defer gw.Unlock()
 	if gw.State != BackendStateShuttered {
 		return errors.New("backend must be in BackendStateshuttered state to Reinitialize")
 	}
@@ -114,7 +125,7 @@ func (gw *BackendGateway) Reinitialize() error {
 	return err
 }
 
-// newProcessorLine creates a new stack of decorators and returns as a single Processor
+// newProcessorLine creates a new call-stack of decorators and returns as a single Processor
 // Decorators are functions of Decorator type, source files prefixed with p_*
 // Each decorator does a specific task during the processing stage.
 // This function uses the config value process_line to figure out which Decorator to use
@@ -146,8 +157,10 @@ func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
 	return nil
 }
 
-// Initialize builds the workers and starts each worker in a thread
+// Initialize builds the workers and starts each worker in a goroutine
 func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
+	gw.Lock()
+	defer gw.Unlock()
 	err := gw.loadConfig(cfg)
 	if err == nil {
 		workersSize := gw.getNumberOfWorkers()
@@ -170,7 +183,6 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 				gw.wg.Done()
 			}(i)
 		}
-
 	} else {
 		gw.State = BackendStateError
 	}

+ 20 - 18
backends/guerrilla_db_redis.go

@@ -1,6 +1,9 @@
 package backends
 
 // This backend is presented here as an example only, please modify it to your needs.
+//
+// Deprecated: as of 14th Feb 2017, backends are composed via config, by chaining Processors (files prefixed with p_*)
+//
 // The backend stores the email data in Redis.
 // Other meta-information is stored in MySQL to be joined later.
 // A lot of email gets discarded without viewing on Guerrilla Mail,
@@ -31,7 +34,6 @@ import (
 	"bytes"
 	"compress/zlib"
 	"database/sql"
-	_ "github.com/go-sql-driver/mysql"
 
 	"github.com/flashmob/go-guerrilla/envelope"
 	"github.com/go-sql-driver/mysql"
@@ -178,7 +180,7 @@ func (g *GuerrillaDBAndRedisBackend) prepareInsertQuery(rows int, db *sql.DB) *s
 	}
 	stmt, sqlErr := db.Prepare(sqlstr)
 	if sqlErr != nil {
-		mainlog.WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
+		Log().WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
 	}
 	// cache it
 	g.cache[rows-1] = stmt
@@ -190,14 +192,14 @@ func (g *GuerrillaDBAndRedisBackend) doQuery(c int, db *sql.DB, insertStmt *sql.
 	defer func() {
 		if r := recover(); r != nil {
 			//logln(1, fmt.Sprintf("Recovered in %v", r))
-			mainlog.Error("Recovered form panic:", r, string(debug.Stack()))
+			Log().Error("Recovered form panic:", r, string(debug.Stack()))
 			sum := 0
 			for _, v := range *vals {
 				if str, ok := v.(string); ok {
 					sum = sum + len(str)
 				}
 			}
-			mainlog.Errorf("panic while inserting query [%s] size:%d, err %v", r, sum, execErr)
+			Log().Errorf("panic while inserting query [%s] size:%d, err %v", r, sum, execErr)
 			panic("query failed")
 		}
 	}()
@@ -205,7 +207,7 @@ func (g *GuerrillaDBAndRedisBackend) doQuery(c int, db *sql.DB, insertStmt *sql.
 	insertStmt = g.prepareInsertQuery(c, db)
 	_, execErr = insertStmt.Exec(*vals...)
 	if execErr != nil {
-		mainlog.WithError(execErr).Error("There was a problem the insert")
+		Log().WithError(execErr).Error("There was a problem the insert")
 	}
 }
 
@@ -239,7 +241,7 @@ func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(feeder chan []interface{
 	}
 	defer func() {
 		if r := recover(); r != nil {
-			mainlog.Error("insertQueryBatcher caught a panic", r)
+			Log().Error("insertQueryBatcher caught a panic", r)
 		}
 	}()
 	// Keep getting values from feeder and add to batch.
@@ -251,14 +253,14 @@ func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(feeder chan []interface{
 		// it may panic when reading on a closed feeder channel. feederOK detects if it was closed
 		case row, feederOk := <-feeder:
 			if row == nil {
-				mainlog.Info("Query batchaer exiting")
+				Log().Info("Query batchaer exiting")
 				// Insert any remaining rows
 				insert(count)
 				return feederOk
 			}
 			vals = append(vals, row...)
 			count++
-			mainlog.Debug("new feeder row:", row, " cols:", len(row), " count:", count, " worker", workerId)
+			Log().Debug("new feeder row:", row, " cols:", len(row), " count:", count, " worker", workerId)
 			if count >= GuerrillaDBAndRedisBatchMax {
 				insert(GuerrillaDBAndRedisBatchMax)
 			}
@@ -299,7 +301,7 @@ func (g *GuerrillaDBAndRedisBackend) mysqlConnect() (*sql.DB, error) {
 		Params:       map[string]string{"collation": "utf8_general_ci"},
 	}
 	if db, err := sql.Open("mysql", conf.FormatDSN()); err != nil {
-		mainlog.Error("cannot open mysql", err)
+		Log().Error("cannot open mysql", err)
 		return nil, err
 	} else {
 		return db, nil
@@ -307,7 +309,7 @@ func (g *GuerrillaDBAndRedisBackend) mysqlConnect() (*sql.DB, error) {
 
 }
 
-func (g *GuerrillaDBAndRedisBackend) saveMailWorker_old(saveMailChan chan *savePayload) {
+func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePayload) {
 	var to, body string
 
 	var redisErr error
@@ -319,7 +321,7 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker_old(saveMailChan chan *saveP
 	var err error
 	db, err = g.mysqlConnect()
 	if err != nil {
-		mainlog.Fatalf("cannot open mysql: %s", err)
+		Log().Fatalf("cannot open mysql: %s", err)
 	}
 
 	// start the query SQL batching where we will send data via the feeder channel
@@ -327,11 +329,11 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker_old(saveMailChan chan *saveP
 	go func() {
 		for {
 			if feederOK := g.insertQueryBatcher(feeder, db); !feederOK {
-				mainlog.Debug("insertQueryBatcher exited")
+				Log().Debug("insertQueryBatcher exited")
 				return
 			}
 			// if insertQueryBatcher panics, it can recover and go in again
-			mainlog.Debug("resuming insertQueryBatcher")
+			Log().Debug("resuming insertQueryBatcher")
 		}
 
 	}()
@@ -339,11 +341,11 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker_old(saveMailChan chan *saveP
 	defer func() {
 		if r := recover(); r != nil {
 			//recover form closed channel
-			mainlog.Error("panic recovered in saveMailWorker", r)
+			Log().Error("panic recovered in saveMailWorker", r)
 		}
 		db.Close()
 		if redisClient.conn != nil {
-			mainlog.Infof("closed redis")
+			Log().Infof("closed redis")
 			redisClient.conn.Close()
 		}
 		// close the feeder & wait for query batcher to exit.
@@ -358,10 +360,10 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker_old(saveMailChan chan *saveP
 	for {
 		payload := <-saveMailChan
 		if payload == nil {
-			mainlog.Debug("No more saveMailChan payload")
+			Log().Debug("No more saveMailChan payload")
 			return
 		}
-		mainlog.Debug("Got mail from chan", payload.mail.RemoteAddress)
+		Log().Debug("Got mail from chan", payload.mail.RemoteAddress)
 		to = trimToLimit(strings.TrimSpace(payload.mail.RcptTo[0].User)+"@"+g.config.PrimaryHost, 255)
 		payload.mail.Helo = trimToLimit(payload.mail.Helo, 255)
 		host := trimToLimit(payload.mail.RcptTo[0].Host, 255)
@@ -395,7 +397,7 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker_old(saveMailChan chan *saveP
 				data.clear()   // blank
 			}
 		} else {
-			mainlog.WithError(redisErr).Warn("Error while connecting redis")
+			Log().WithError(redisErr).Warn("Error while connecting redis")
 		}
 
 		vals = []interface{}{} // clear the vals

+ 2 - 2
backends/p_debugger.go

@@ -40,8 +40,8 @@ func Debugger() Decorator {
 	return func(c Processor) Processor {
 		return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
 			if config.LogReceivedMails {
-				mainlog.Infof("Mail from: %s / to: %v", e.MailFrom.String(), e.RcptTo)
-				mainlog.Info("Headers are:", e.Header)
+				Log().Infof("Mail from: %s / to: %v", e.MailFrom.String(), e.RcptTo)
+				Log().Info("Headers are:", e.Header)
 			}
 			// continue to the next Processor in the decorator chain
 			return c.Process(e)

+ 7 - 8
backends/p_mysql.go

@@ -66,10 +66,10 @@ func (m *MysqlProcessor) connect(config *MysqlProcessorConfig) (*sql.DB, error)
 		Params:       map[string]string{"collation": "utf8_general_ci"},
 	}
 	if db, err = sql.Open("mysql", conf.FormatDSN()); err != nil {
-		mainlog.Error("cannot open mysql", err)
+		Log().Error("cannot open mysql", err)
 		return nil, err
 	}
-	mainlog.Info("connected to mysql on tcp ", config.MysqlHost)
+	Log().Info("connected to mysql on tcp ", config.MysqlHost)
 	return db, err
 
 }
@@ -96,7 +96,7 @@ func (g *MysqlProcessor) prepareInsertQuery(rows int, db *sql.DB) *sql.Stmt {
 	}
 	stmt, sqlErr := db.Prepare(sqlstr)
 	if sqlErr != nil {
-		mainlog.WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
+		Log().WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
 	}
 	// cache it
 	g.cache[rows-1] = stmt
@@ -107,15 +107,14 @@ func (g *MysqlProcessor) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *
 	var execErr error
 	defer func() {
 		if r := recover(); r != nil {
-			//logln(1, fmt.Sprintf("Recovered in %v", r))
-			mainlog.Error("Recovered form panic:", r, string(debug.Stack()))
+			Log().Error("Recovered form panic:", r, string(debug.Stack()))
 			sum := 0
 			for _, v := range *vals {
 				if str, ok := v.(string); ok {
 					sum = sum + len(str)
 				}
 			}
-			mainlog.Errorf("panic while inserting query [%s] size:%d, err %v", r, sum, execErr)
+			Log().Errorf("panic while inserting query [%s] size:%d, err %v", r, sum, execErr)
 			panic("query failed")
 		}
 	}()
@@ -123,7 +122,7 @@ func (g *MysqlProcessor) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *
 	insertStmt = g.prepareInsertQuery(c, db)
 	_, execErr = insertStmt.Exec(*vals...)
 	if execErr != nil {
-		mainlog.WithError(execErr).Error("There was a problem the insert")
+		Log().WithError(execErr).Error("There was a problem the insert")
 	}
 }
 
@@ -144,7 +143,7 @@ func MySql() Decorator {
 		mp.config = config
 		db, err = mp.connect(config)
 		if err != nil {
-			mainlog.Fatalf("cannot open mysql: %s", err)
+			Log().Fatalf("cannot open mysql: %s", err)
 			return err
 		}
 		return nil

+ 2 - 2
backends/p_redis.go

@@ -107,14 +107,14 @@ func Redis() Decorator {
 					}
 				}
 				if redisErr != nil {
-					mainlog.WithError(redisErr).Warn("Error while talking to redis")
+					Log().WithError(redisErr).Warn("Error while talking to redis")
 					result := NewBackendResult(response.Canned.FailBackendTransaction)
 					return result, redisErr
 				} else {
 					e.Info["redis"] = "redis" // the backend system will know to look in redis for the message data
 				}
 			} else {
-				mainlog.Error("Redis needs a Hash() process before it")
+				Log().Error("Redis needs a Hash() process before it")
 			}
 
 			return c.Process(e)

+ 4 - 5
backends/worker.go

@@ -6,8 +6,7 @@ import (
 	"runtime/debug"
 )
 
-type Worker struct {
-}
+type Worker struct{}
 
 func (w *Worker) saveMailWorker(saveMailChan chan *savePayload, p Processor, workerId int) {
 
@@ -15,17 +14,17 @@ func (w *Worker) saveMailWorker(saveMailChan chan *savePayload, p Processor, wor
 		if r := recover(); r != nil {
 			// recover form closed channel
 			fmt.Println("Recovered in f", r, string(debug.Stack()))
-			mainlog.Error("Recovered form panic:", r, string(debug.Stack()))
+			Log().Error("Recovered form panic:", r, string(debug.Stack()))
 		}
 		// close any connections / files
 		Service.Shutdown()
 
 	}()
-	mainlog.Infof("Save mail worker started (#%d)", workerId)
+	Log().Infof("Save mail worker started (#%d)", workerId)
 	for {
 		payload := <-saveMailChan
 		if payload == nil {
-			mainlog.Debug("No more saveMailChan payload")
+			Log().Debug("No more saveMailChan payload")
 			return
 		}
 		// process the email here

+ 1 - 0
cmd/guerrillad/serve.go

@@ -111,6 +111,7 @@ func serve(cmd *cobra.Command, args []string) {
 	if err != nil {
 		mainlog.WithError(err).Fatal("Error while reading config")
 	}
+	mainlog.SetLevel(cmdConfig.LogLevel)
 
 	// Check that max clients is not greater than system open file limit.
 	fileLimit := getFileLimit()

+ 4 - 0
guerrilla.go

@@ -297,8 +297,10 @@ func (g *guerrilla) subscribeEvents() {
 			var l log.Logger
 			if l, err = log.GetLogger(sc.LogFile); err == nil {
 				g.storeMainlog(l)
+				backends.Service.StoreMainlog(l)
 				// it will change to the new logger on the next accepted client
 				server.logStore.Store(l)
+
 				g.mainlog().Infof("Server [%s] changed, new clients will log to: [%s]",
 					sc.ListenInterface,
 					sc.LogFile,
@@ -396,6 +398,8 @@ func (g *guerrilla) Shutdown() {
 	}
 }
 
+// SetLogger sets the logger for the app and propagates it to sub-packages (eg.
 func (g *guerrilla) SetLogger(l log.Logger) {
 	g.storeMainlog(l)
+	backends.Service.StoreMainlog(l)
 }