|
@@ -623,13 +623,11 @@ func (gw *BackendGateway) Start() error {
|
|
|
gw.Lock()
|
|
|
defer gw.Unlock()
|
|
|
if gw.State == BackendStateInitialized || gw.State == BackendStateShuttered {
|
|
|
- // we start our workers
|
|
|
- workersSize := gw.workersSize()
|
|
|
// make our slice of channels for stopping
|
|
|
gw.workStoppers = make([]chan bool, 0)
|
|
|
- gw.startWorkers(gw.conveyor, workersSize, gw.processors)
|
|
|
- gw.startWorkers(gw.conveyorValidation, workersSize, gw.validators)
|
|
|
- gw.startWorkers(gw.conveyorStream, workersSize, gw.streamers)
|
|
|
+ gw.startWorkers(gw.conveyor, gw.processors)
|
|
|
+ gw.startWorkers(gw.conveyorValidation, gw.validators)
|
|
|
+ gw.startWorkers(gw.conveyorStream, gw.streamers)
|
|
|
gw.State = BackendStateRunning
|
|
|
return nil
|
|
|
} else {
|
|
@@ -637,14 +635,14 @@ func (gw *BackendGateway) Start() error {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (gw *BackendGateway) startWorkers(conveyor chan *workerMsg, workersSize int, processors interface{}) {
|
|
|
- s := reflect.ValueOf(processors)
|
|
|
+func (gw *BackendGateway) startWorkers(conveyor chan *workerMsg, processors interface{}) {
|
|
|
+ p := reflect.ValueOf(processors)
|
|
|
if reflect.TypeOf(processors).Kind() != reflect.Slice {
|
|
|
panic("processors must be a slice")
|
|
|
}
|
|
|
// set the wait group (when stopping, it will block for all goroutines to exit)
|
|
|
- gw.wg.Add(s.Len())
|
|
|
- for i := 0; i < s.Len(); i++ {
|
|
|
+ gw.wg.Add(p.Len())
|
|
|
+ for i := 0; i < p.Len(); i++ {
|
|
|
// set the buffer
|
|
|
gw.buffers[gw.workerID] = gw.makeBuffer()
|
|
|
// stop is a channel used for stopping the worker
|
|
@@ -656,7 +654,7 @@ func (gw *BackendGateway) startWorkers(conveyor chan *workerMsg, workersSize int
|
|
|
for {
|
|
|
state := gw.workDispatcher(
|
|
|
conveyor,
|
|
|
- s.Index(i),
|
|
|
+ p.Index(i).Interface(),
|
|
|
workerId,
|
|
|
stop)
|
|
|
// keep running after panic
|
|
@@ -723,7 +721,6 @@ func (gw *BackendGateway) workDispatcher(
|
|
|
var msg *workerMsg
|
|
|
|
|
|
defer func() {
|
|
|
-
|
|
|
// panic recovery mechanism: it may panic when processing
|
|
|
// since processors may call arbitrary code, some may be 3rd party / unstable
|
|
|
// we need to detect the panic, and notify the backend that it failed & unlock the envelope
|