Browse Source

gateway: each stream processor has its own buffer
gatweay: workerID is unique

flashmob 5 years ago
parent
commit
945bb6d674
2 changed files with 40 additions and 20 deletions
  1. 40 19
      backends/gateway.go
  2. 0 1
      mail/envelope.go

+ 40 - 19
backends/gateway.go

@@ -34,13 +34,16 @@ type BackendGateway struct {
 	validators   []ValidatingProcessor
 	streamers    []streamer
 
+	workerID int
+
 	// controls access to state
 	sync.Mutex
 	State    backendState
 	config   BackendConfig
 	gwConfig *GatewayConfig
 
-	buf []byte // stream output buffer
+	//buffers []byte // stream output buffer
+	buffers map[int][]byte
 }
 
 type GatewayConfig struct {
@@ -589,7 +592,6 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 			gw.State = BackendStateError
 			return err
 		}
-
 		gw.streamers = append(gw.streamers, s)
 	}
 	// Initialize processors & stream processors
@@ -610,11 +612,6 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 		gw.conveyorStreamBg = make(chan *workerMsg, workersSize)
 	}
 
-	size := streamBufferSize
-	if gw.gwConfig.StreamBufferSize > 0 {
-		size = gw.gwConfig.StreamBufferSize
-	}
-	gw.buf = make([]byte, size)
 	// ready to start
 	gw.State = BackendStateInitialized
 	return nil
@@ -644,13 +641,24 @@ func (gw *BackendGateway) startWorkers(conveyor chan *workerMsg, workersSize int
 	gw.wg.Add(workersSize)
 	for i := 0; i < workersSize; i++ {
 		stop := make(chan bool)
+
+		var p interface{}
+		switch v := processors.(type) {
+		case []Processor:
+			p = v[i]
+		case []ValidatingProcessor:
+			p = v[i]
+		case []streamer:
+			p = v[i]
+		}
+		gw.buffers[gw.workerID] = gw.makeBuffer()
 		go func(workerId int, stop chan bool) {
 			// blocks here until the worker exits
 			// for-loop used so that if workDispatcher panics, re-enter gw.workDispatcher
 			for {
 				state := gw.workDispatcher(
 					conveyor,
-					processors,
+					p,
 					workerId,
 					stop)
 				// keep running after panic
@@ -659,8 +667,9 @@ func (gw *BackendGateway) startWorkers(conveyor chan *workerMsg, workersSize int
 				}
 			}
 			gw.wg.Done()
-		}(i, stop)
+		}(gw.workerID, stop)
 		gw.workStoppers = append(gw.workStoppers, stop)
+		gw.workerID++
 	}
 }
 
@@ -709,7 +718,7 @@ const (
 
 func (gw *BackendGateway) workDispatcher(
 	workIn chan *workerMsg,
-	processors interface{},
+	processor interface{},
 	workerId int,
 	stop chan bool) (state dispatcherState) {
 
@@ -743,22 +752,22 @@ func (gw *BackendGateway) workDispatcher(
 			return
 		case msg = <-workIn:
 			state = dispatcherStateWorking // recovers from panic if in this state
-			switch v := processors.(type) {
-			case []Processor:
-				result, err := v[workerId].Process(msg.e, msg.task)
+			switch v := processor.(type) {
+			case Processor:
+				result, err := v.Process(msg.e, msg.task)
 				state = dispatcherStateNotify
 				msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
-			case []ValidatingProcessor:
-				result, err := v[workerId].Process(msg.e, msg.task)
+			case ValidatingProcessor:
+				result, err := v.Process(msg.e, msg.task)
 				state = dispatcherStateNotify
 				msg.notifyMe <- &notifyMsg{err: err, result: result}
-			case []streamer:
-				err := v[workerId].open(msg.e)
+			case streamer:
+				err := v.open(msg.e)
 				if err == nil {
-					if msg.e.Size, err = io.CopyBuffer(v[workerId], msg.r, gw.buf); err != nil {
+					if msg.e.Size, err = io.CopyBuffer(v, msg.r, gw.buffers[workerId]); err != nil {
 						Log().Fields("error", err, "workerID", workerId+1).Error("stream writing failed")
 					}
-					if err = v[workerId].close(); err != nil {
+					if err = v.close(); err != nil {
 						Log().Fields("error", err, "workerID", workerId+1).Error("stream closing failed")
 					}
 				}
@@ -777,9 +786,21 @@ func (gw *BackendGateway) workDispatcher(
 	}
 }
 
+func (gw *BackendGateway) makeBuffer() []byte {
+	if gw.buffers == nil {
+		gw.buffers = make(map[int][]byte, gw.gwConfig.WorkersSize)
+	}
+	size := streamBufferSize
+	if gw.gwConfig.StreamBufferSize > 0 {
+		size = gw.gwConfig.StreamBufferSize
+	}
+	return make([]byte, size)
+}
+
 // stopWorkers sends a signal to all workers to stop
 func (gw *BackendGateway) stopWorkers() {
 	for i := range gw.workStoppers {
 		gw.workStoppers[i] <- true
 	}
+	gw.workerID = 0
 }

+ 0 - 1
mail/envelope.go

@@ -260,7 +260,6 @@ func (e *Envelope) PopRcpt() Address {
 const (
 	statePlainText = iota
 	stateStartEncodedWord
-	stateEncodedWord
 	stateEncoding
 	stateCharset
 	statePayload