|
@@ -4,6 +4,7 @@ import (
|
|
|
"errors"
|
|
"errors"
|
|
|
"fmt"
|
|
"fmt"
|
|
|
"io"
|
|
"io"
|
|
|
|
|
+ "reflect"
|
|
|
"strconv"
|
|
"strconv"
|
|
|
"sync"
|
|
"sync"
|
|
|
"time"
|
|
"time"
|
|
@@ -637,28 +638,25 @@ func (gw *BackendGateway) Start() error {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (gw *BackendGateway) startWorkers(conveyor chan *workerMsg, workersSize int, processors interface{}) {
|
|
func (gw *BackendGateway) startWorkers(conveyor chan *workerMsg, workersSize int, processors interface{}) {
|
|
|
- // set the wait group
|
|
|
|
|
- gw.wg.Add(workersSize)
|
|
|
|
|
- for i := 0; i < workersSize; i++ {
|
|
|
|
|
- stop := make(chan bool)
|
|
|
|
|
-
|
|
|
|
|
- var p interface{}
|
|
|
|
|
- switch v := processors.(type) {
|
|
|
|
|
- case []Processor:
|
|
|
|
|
- p = v[i]
|
|
|
|
|
- case []ValidatingProcessor:
|
|
|
|
|
- p = v[i]
|
|
|
|
|
- case []streamer:
|
|
|
|
|
- p = v[i]
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ s := reflect.ValueOf(processors)
|
|
|
|
|
+ if reflect.TypeOf(processors).Kind() != reflect.Slice {
|
|
|
|
|
+ panic("processors must be a slice")
|
|
|
|
|
+ }
|
|
|
|
|
+ // set the wait group (when stopping, it will block for all goroutines to exit)
|
|
|
|
|
+ gw.wg.Add(s.Len())
|
|
|
|
|
+ for i := 0; i < s.Len(); i++ {
|
|
|
|
|
+ // set the buffer
|
|
|
gw.buffers[gw.workerID] = gw.makeBuffer()
|
|
gw.buffers[gw.workerID] = gw.makeBuffer()
|
|
|
- go func(workerId int, stop chan bool) {
|
|
|
|
|
|
|
+ // stop is a channel used for stopping the worker
|
|
|
|
|
+ stop := make(chan bool)
|
|
|
|
|
+ // start the worker and keep it running
|
|
|
|
|
+ go func(workerId int, stop chan bool, i int) {
|
|
|
// blocks here until the worker exits
|
|
// blocks here until the worker exits
|
|
|
// for-loop used so that if workDispatcher panics, re-enter gw.workDispatcher
|
|
// for-loop used so that if workDispatcher panics, re-enter gw.workDispatcher
|
|
|
for {
|
|
for {
|
|
|
state := gw.workDispatcher(
|
|
state := gw.workDispatcher(
|
|
|
conveyor,
|
|
conveyor,
|
|
|
- p,
|
|
|
|
|
|
|
+ s.Index(i),
|
|
|
workerId,
|
|
workerId,
|
|
|
stop)
|
|
stop)
|
|
|
// keep running after panic
|
|
// keep running after panic
|
|
@@ -667,7 +665,7 @@ func (gw *BackendGateway) startWorkers(conveyor chan *workerMsg, workersSize int
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
gw.wg.Done()
|
|
gw.wg.Done()
|
|
|
- }(gw.workerID, stop)
|
|
|
|
|
|
|
+ }(gw.workerID, stop, i)
|
|
|
gw.workStoppers = append(gw.workStoppers, stop)
|
|
gw.workStoppers = append(gw.workStoppers, stop)
|
|
|
gw.workerID++
|
|
gw.workerID++
|
|
|
}
|
|
}
|