| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784 | package backendsimport (	"errors"	"fmt"	"io"	"reflect"	"strconv"	"sync"	"time"	"github.com/flashmob/go-guerrilla/log"	"github.com/flashmob/go-guerrilla/mail"	"github.com/flashmob/go-guerrilla/response"	"runtime/debug")// A backend gateway is a proxy that implements the Backend interface.// It is used to start multiple goroutine workers for saving mail, and then distribute email saving to the workers// via a channel. Shutting down via Shutdown() will stop all workers.// The rest of this program always talks to the backend via this gateway.type BackendGateway struct {	// name is the name of the gateway given in the config	name string	// channel for distributing envelopes to workers	conveyor           chan *workerMsg	conveyorValidation chan *workerMsg	conveyorStream     chan *workerMsg	conveyorStreamBg   chan *workerMsg	// waits for backend workers to start/stop	wg           sync.WaitGroup	workStoppers []chan bool	processors   []Processor	validators   []ValidatingProcessor	streamers    []streamer	background   []streamer	producer *StreamDecorator	decoratorLookup map[ConfigSection]map[string]*StreamDecorator	workerID int	// controls access to state	sync.Mutex	State    backendState	config   BackendConfig	gwConfig *GatewayConfig	//buffers []byte // stream output buffer	buffers map[int][]byte}// workerMsg is what get placed on the BackendGateway.saveMailChan channeltype workerMsg struct {	// The email data	e *mail.Envelope	// notifyMe is used to notify the gateway of workers finishing their processing	notifyMe chan *notifyMsg	// select the task type	task SelectTask	// io.Reader for streamed processor	r io.Reader}type streamer struct {	// StreamProcessor is a chain of StreamProcessor	sp StreamProcessor	// so that we can call Open and Close	d []*StreamDecorator}func (s streamer) Write(p []byte) (n int, err error) {	return s.sp.Write(p)}func (s *streamer) open(e *mail.Envelope) error {	var err Errors	for i := range s.d {		if s.d[i].Open != nil {			if e := s.d[i].Open(e); e != nil {				err = append(err, e)			}		}	}	if len(err) == 0 {		return nil	}	return err}func (s *streamer) close() error {	var err Errors	// close in reverse order	for i := len(s.d) - 1; i >= 0; i-- {		if s.d[i].Close != nil {			if e := s.d[i].Close(); e != nil {				err = append(err, e)			}		}	}	if len(err) == 0 {		return nil	}	return err}func (s *streamer) shutdown() error {	var err Errors	// shutdown in reverse order	for i := len(s.d) - 1; i >= 0; i-- {		if s.d[i].Shutdown != nil {			if e := s.d[i].Shutdown(); e != nil {				err = append(err, e)			}		}	}	if len(err) == 0 {		return nil	}	return err}type backendState int// possible values for stateconst (	BackendStateNew backendState = iota	BackendStateRunning	BackendStateShuttered	BackendStateError	BackendStateInitialized)func (s backendState) String() string {	switch s {	case BackendStateNew:		return "NewState"	case BackendStateRunning:		return "RunningState"	case BackendStateShuttered:		return "ShutteredState"	case BackendStateError:		return "ErrorSate"	case BackendStateInitialized:		return "InitializedState"	}	return strconv.Itoa(int(s))}// New makes a new default BackendGateway backend, and initializes it using// backendConfig and stores the loggerfunc New(name string, backendConfig BackendConfig, l log.Logger) (Backend, error) {	Svc.SetMainlog(l)	gateway := &BackendGateway{name: name}	backendConfig.toLower()	// keep the a copy of the config	gateway.config = backendConfig	err := gateway.Initialize(backendConfig)	if err != nil {		return nil, fmt.Errorf("error while initializing the backend: %s", err)	}	return gateway, nil}var workerMsgPool = sync.Pool{	// if not available, then create a new one	New: func() interface{} {		return &workerMsg{}	},}// reset resets a workerMsg that has been borrowed from the poolfunc (w *workerMsg) reset(e *mail.Envelope, task SelectTask) {	if w.notifyMe == nil {		w.notifyMe = make(chan *notifyMsg)	}	w.e = e	w.task = task}func (gw *BackendGateway) Name() string {	return gw.name}// Process distributes an envelope to one of the backend workers with a TaskSaveMail taskfunc (gw *BackendGateway) Process(e *mail.Envelope) Result {	if gw.State != BackendStateRunning {		return NewResult(response.Canned.FailBackendNotRunning, response.SP, gw.State)	}	// borrow a workerMsg from the pool	workerMsg := workerMsgPool.Get().(*workerMsg)	defer workerMsgPool.Put(workerMsg)	workerMsg.reset(e, TaskSaveMail)	// place on the channel so that one of the save mail workers can pick it up	gw.conveyor <- workerMsg	// wait for the save to complete	// or timeout	select {	case status := <-workerMsg.notifyMe:		// email saving transaction completed		if status.result == BackendResultOK && status.queuedID != "" {			return NewResult(response.Canned.SuccessMessageQueued, response.SP, status.queuedID)		}		// A custom result, there was probably an error, if so, log it		if status.result != nil {			if status.err != nil {				Log().Error(status.err)			}			return status.result		}		// if there was no result, but there's an error, then make a new result from the error		if status.err != nil {			if _, err := strconv.Atoi(status.err.Error()[:3]); err != nil {				return NewResult(response.Canned.FailBackendTransaction, response.SP, status.err)			}			return NewResult(status.err)		}		// both result & error are nil (should not happen)		err := errors.New("no response from backend - processor did not return a result or an error")		Log().Error(err)		return NewResult(response.Canned.FailBackendTransaction, response.SP, err)	case <-time.After(gw.saveTimeout()):		Log().Fields("queuedId", e.QueuedId).Error("backend has timed out while saving email")		e.Add(1) // lock the envelope - it's still processing here, we don't want the server to recycle it		go func() {			// keep waiting for the backend to finish processing			<-workerMsg.notifyMe			Log().Fields("queuedId", e.QueuedId).Error("finished processing mail after timeout")			e.Done()		}()		return NewResult(response.Canned.FailBackendTimeout)	}}// ValidateRcpt asks one of the workers to validate the recipient// Only the last recipient appended to e.RcptTo will be validated.func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {	if gw.State != BackendStateRunning {		return StorageNotAvailable	}	if _, ok := gw.validators[0].(NoopProcessor); ok {		// no validator processors configured		return nil	}	// place on the channel so that one of the save mail workers can pick it up	workerMsg := workerMsgPool.Get().(*workerMsg)	defer workerMsgPool.Put(workerMsg)	workerMsg.reset(e, TaskValidateRcpt)	gw.conveyorValidation <- workerMsg	// wait for the validation to complete	// or timeout	select {	case status := <-workerMsg.notifyMe:		if status.err != nil {			return status.err		}		return nil	case <-time.After(gw.validateRcptTimeout()):		Log().Fields("queuedId", e.QueuedId).Error("backend has timed out while validating rcpt")		e.Add(1) // lock the envelope - it's still processing here, we don't want the server to recycle it		go func() {			<-workerMsg.notifyMe			Log().Fields("queuedId", e.QueuedId).Error("finished validating rcpt after timeout")			e.Done()		}()		return StorageTimeout	}}func (gw *BackendGateway) StreamOn() bool {	return len(gw.gwConfig.SaveStream) != 0}// newStreamDecorator creates a new StreamDecorator and calls Configure with its corresponding configuration// cs - the item of 'list' property, result from newStackStreamProcessorConfig()// section - which section of the configfunc (gw *BackendGateway) newStreamDecorator(cs stackConfigExpression, section ConfigSection) *StreamDecorator {	if makeFunc, ok := Streamers[cs.name]; !ok {		return nil	} else {		d := makeFunc()		config := gw.config.lookupGroup(section, cs.String())		if config == nil {			config = ConfigGroup{}		}		if d.Configure != nil {			if err := d.Configure(config); err != nil {				return nil			}		}		return d	}}func (gw *BackendGateway) ProcessBackground(e *mail.Envelope) {	if d := gw.producer; d == nil {		Log().Error("gateway has failed creating a post_process_producer, check config")		return	} else {		r, err := d.GetEmail(e.MessageID)		if err != nil {			Log().Fields("queuedID", e.QueuedId, "messageID", e.MessageID).				Error("gateway background process aborted: email with messageID not found")			return		}		// borrow a workerMsg from the pool		workerMsg := workerMsgPool.Get().(*workerMsg)		defer workerMsgPool.Put(workerMsg)		workerMsg.reset(e, TaskSaveMailStream)		workerMsg.r = r		// place on the channel so that one of the save mail workers can pick it up		// buffered channel will block if full		select {		case gw.conveyorStreamBg <- workerMsg:			break		case <-time.After(gw.saveTimeout()):			Log().Fields("queuedID", e.QueuedId).Error("post-processing timeout - queue full, aborting")			return		}		// process in the background		for {			select {			case status := <-workerMsg.notifyMe:				// email saving transaction completed				var fields []interface{}				var code int				if status.result != nil {					code = status.result.Code()					fields = append(fields, "queuedID", e.QueuedId, "code", code)				}				if code > 200 && code < 300 {					fields = append(fields, "messageID", e.MessageID)					Log().Fields(fields...).Info("background process done")					return				}				if status.err != nil {					fields = append(fields, "error", status.err)				}				if len(fields) > 0 {					Log().Fields(fields...).Error("post-process completed with an error")					return				}				// both result & error are nil (should not happen)				Log().Fields("queuedID", e.QueuedId).Error("no response from backend - post-process did not return a result or an error")				return			case <-time.After(gw.saveTimeout()):				Log().Fields("queuedID", e.QueuedId).Error("background post-processing timed-out, will keep waiting")				// don't return here, keep waiting for workerMsg.notifyMe			}		}	}}func (gw *BackendGateway) ProcessStream(r io.Reader, e *mail.Envelope) (Result, error) {	res := response.Canned	if gw.State != BackendStateRunning {		return NewResult(res.FailBackendNotRunning, response.SP, gw.State), errors.New(res.FailBackendNotRunning.String())	}	// borrow a workerMsg from the pool	workerMsg := workerMsgPool.Get().(*workerMsg)	workerMsgPool.Put(workerMsg)	workerMsg.reset(e, TaskSaveMailStream)	workerMsg.r = r	// place on the channel so that one of the save mail workers can pick it up	gw.conveyorStream <- workerMsg	// wait for the save to complete	// or timeout	select {	case status := <-workerMsg.notifyMe:		// email saving transaction completed		if status.result == BackendResultOK && status.queuedID != "" {			return NewResult(res.SuccessMessageQueued, response.SP, status.queuedID), status.err		}		// A custom result, there was probably an error, if so, log it		if status.result != nil {			if status.err != nil {				Log().Error(status.err)			}			return status.result, status.err		}		// if there was no result, but there's an error, then make a new result from the error		if status.err != nil {			if _, err := strconv.Atoi(status.err.Error()[:3]); err != nil {				return NewResult(res.FailBackendTransaction, response.SP, status.err), status.err			}			return NewResult(status.err), status.err		}		// both result & error are nil (should not happen)		err := errors.New("no response from backend - processor did not return a result or an error")		Log().Error(err)		return NewResult(res.FailBackendTransaction, response.SP, err), err	case <-time.After(gw.saveTimeout()):		Log().Fields("queuedID", e.QueuedId).Error("backend has timed out while saving email stream")		e.Add(1) // lock the envelope - it's still processing here, we don't want the server to recycle it		go func() {			// keep waiting for the backend to finish processing			<-workerMsg.notifyMe			e.Done()			Log().Fields("queuedID", e.QueuedId).Info("backend has finished saving email stream after timeout")		}()		return NewResult(res.FailBackendTimeout), errors.New("gateway timeout")	}}// Shutdown shuts down the backend and leaves it in BackendStateShuttered statefunc (gw *BackendGateway) Shutdown() error {	gw.Lock()	defer gw.Unlock()	if gw.State != BackendStateShuttered {		// send a signal to all workers		gw.stopWorkers()		// wait for workers to stop		gw.wg.Wait()		for stream := range gw.streamers {			err := gw.streamers[stream].shutdown()			if err != nil {				Log().Fields("error", err, "gateway", gw.name).Error("failed shutting down stream")			}		}		for stream := range gw.background {			err := gw.background[stream].shutdown()			if err != nil {				Log().Fields("error", err, "gateway", gw.name).Error("failed shutting down background stream")			}		}		// call shutdown on all processor shutdowners		if err := Svc.shutdown(); err != nil {			return err		}		gw.State = BackendStateShuttered	}	return nil}// Reinitialize initializes the gateway with the existing config after it was shutdownfunc (gw *BackendGateway) Reinitialize() error {	if gw.State != BackendStateShuttered {		return errors.New("backend must be in BackendStateshuttered state to Reinitialize")	}	// clear the Initializers and Shutdowners	Svc.reset()	err := gw.Initialize(gw.config)	if err != nil {		return fmt.Errorf("error while initializing the backend: %s", err)	}	return err}// newStack creates a new Processor by chaining multiple Processors in a call stack// Decorators are functions of Decorator type, source files prefixed with p_*// Each decorator does a specific task during the processing stage.// This function uses the config value save_process or validate_process to figure out which Decorator to usefunc (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {	var decorators []Decorator	c := newStackProcessorConfig(stackConfig, newAliasMap(gw.config[ConfigProcessors]))	if len(c.list) == 0 {		return NoopProcessor{}, nil	}	for i := range c.list {		if makeFunc, ok := processors[c.list[i].name]; ok {			decorators = append(decorators, makeFunc())		} else {			return nil, c.notFound(c.list[i].name)		}	}	// build the call-stack of decorators	p := Decorate(DefaultProcessor{}, decorators...)	return p, nil}func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {	var decorators []*StreamDecorator	noop := streamer{NoopStreamProcessor{}, decorators}	groupName := ConfigStreamProcessors	c := newStackStreamProcessorConfig(stackConfig, newAliasMap(gw.config[groupName]))	if len(c.list) == 0 {		return noop, nil	}	for i := range c.list {		if d := gw.newStreamDecorator(c.list[i], groupName); d != nil {			if gw.decoratorLookup[groupName] == nil {				gw.decoratorLookup[groupName] = make(map[string]*StreamDecorator)			}			gw.decoratorLookup[groupName][c.list[i].String()] = d			decorators = append(decorators, d)		} else {			return streamer{nil, decorators}, c.notFound(c.list[i].name)		}	}	// build the call-stack of decorators	sp, decorators := DecorateStream(&DefaultStreamProcessor{}, decorators)	return streamer{sp, decorators}, nil}// loadConfig loads the config for the GatewayConfigfunc (gw *BackendGateway) loadConfig(cfg BackendConfig) error {	configType := BaseConfig(&GatewayConfig{})	// Note: treat config values as immutable	// if you need to change a config value, change in the file then	// send a SIGHUP	if gw.name == "" {		gw.name = DefaultGateway	}	if _, ok := cfg[ConfigGateways][gw.name]; !ok {		return errors.New("no such gateway configured: " + gw.name)	}	bcfg, err := Svc.ExtractConfig(ConfigGateways, gw.name, cfg, configType)	if err != nil {		return err	}	gw.gwConfig = bcfg.(*GatewayConfig)	return nil}// Initialize builds the workers and initializes each onefunc (gw *BackendGateway) Initialize(cfg BackendConfig) error {	gw.Lock()	defer gw.Unlock()	if gw.State != BackendStateNew && gw.State != BackendStateShuttered {		return errors.New("can only Initialize in BackendStateNew or BackendStateShuttered state")	}	err := gw.loadConfig(cfg)	if err != nil {		gw.State = BackendStateError		return err	}	gw.buffers = make(map[int][]byte) // individual buffers are made later	gw.decoratorLookup = make(map[ConfigSection]map[string]*StreamDecorator)	gw.processors = make([]Processor, 0)	gw.validators = make([]ValidatingProcessor, 0)	gw.streamers = make([]streamer, 0)	gw.background = make([]streamer, 0)	for i := 0; i < gw.gwConfig.saveWorkersCount(); i++ {		p, err := gw.newStack(gw.gwConfig.SaveProcess)		if err != nil {			gw.State = BackendStateError			return err		}		gw.processors = append(gw.processors, p)	}	for i := 0; i < gw.gwConfig.validateWorkersCount(); i++ {		v, err := gw.newStack(gw.gwConfig.ValidateProcess)		if err != nil {			gw.State = BackendStateError			return err		}		gw.validators = append(gw.validators, v)	}	for i := 0; i < gw.gwConfig.streamWorkersCount(); i++ {		s, err := gw.newStreamStack(gw.gwConfig.SaveStream)		if err != nil {			gw.State = BackendStateError			return err		}		gw.streamers = append(gw.streamers, s)	}	for i := 0; i < gw.gwConfig.backgroundWorkersCount(); i++ {		c, err := gw.newStreamStack(gw.gwConfig.PostProcessConsumer)		if err != nil {			gw.State = BackendStateError			return err		}		gw.background = append(gw.background, c)	}	if err = gw.initProducer(); err != nil {		return err	}	// Initialize processors & stream processors	if err := Svc.Initialize(cfg); err != nil {		gw.State = BackendStateError		return err	}	gw.conveyor = make(chan *workerMsg, gw.gwConfig.saveProcessSize())	gw.conveyorValidation = make(chan *workerMsg, gw.gwConfig.validateProcessSize())	gw.conveyorStream = make(chan *workerMsg, gw.gwConfig.saveStreamSize())	gw.conveyorStreamBg = make(chan *workerMsg, gw.gwConfig.postProcessSize())	// ready to start	gw.State = BackendStateInitialized	return nil}// Start starts the worker goroutines, assuming it has been initialized or shuttered beforefunc (gw *BackendGateway) Start() error {	gw.Lock()	defer gw.Unlock()	if gw.State == BackendStateInitialized || gw.State == BackendStateShuttered {		// make our slice of channels for stopping		gw.workStoppers = make([]chan bool, 0)		gw.startWorkers(gw.conveyor, gw.processors)		gw.startWorkers(gw.conveyorValidation, gw.validators)		gw.startWorkers(gw.conveyorStream, gw.streamers)		gw.startWorkers(gw.conveyorStreamBg, gw.background)		gw.State = BackendStateRunning		return nil	} else {		return fmt.Errorf("cannot start backend because it's in %s state", gw.State)	}}func (gw *BackendGateway) startWorkers(conveyor chan *workerMsg, processors interface{}) {	p := reflect.ValueOf(processors)	if reflect.TypeOf(processors).Kind() != reflect.Slice {		panic("processors must be a slice")	}	// set the wait group (when stopping, it will block for all goroutines to exit)	gw.wg.Add(p.Len())	for i := 0; i < p.Len(); i++ {		// set the buffer		gw.buffers[gw.workerID] = gw.makeBuffer()		// stop is a channel used for stopping the worker		stop := make(chan bool)		// start the worker and keep it running		go func(workerId int, stop chan bool, i int) {			// blocks here until the worker exits			// for-loop used so that if workDispatcher panics, re-enter gw.workDispatcher			for {				state := gw.workDispatcher(					conveyor,					p.Index(i).Interface(),					workerId,					stop)				// keep running after panic				if state != dispatcherStatePanic {					break				}			}			gw.wg.Done()		}(gw.workerID, stop, i)		gw.workStoppers = append(gw.workStoppers, stop)		gw.workerID++	}}type dispatcherState intconst (	dispatcherStateStopped dispatcherState = iota	dispatcherStateIdle	dispatcherStateWorking	dispatcherStateNotify	dispatcherStatePanic)func (gw *BackendGateway) workDispatcher(	workIn chan *workerMsg,	processor interface{},	workerId int,	stop chan bool) (state dispatcherState) {	var msg *workerMsg	defer func() {		// panic recovery mechanism: it may panic when processing		// since processors may call arbitrary code, some may be 3rd party / unstable		// we need to detect the panic, and notify the backend that it failed & unlock the envelope		if r := recover(); r != nil {			Log().Error("worker recovered from panic:", r, string(debug.Stack()))			if state == dispatcherStateWorking {				msg.notifyMe <- ¬ifyMsg{err: errors.New("storage failed")}			}			state = dispatcherStatePanic			return		}		// state is dispatcherStateStopped if it reached here	}()	state = dispatcherStateIdle	Log().Fields("id", workerId+1, "gateway", gw.name).		Info("processing worker started")	for {		select {		case <-stop:			state = dispatcherStateStopped			Log().Fields("id", workerId+1, "gateway", gw.name).				Info("stop signal for worker")			return		case msg = <-workIn:			state = dispatcherStateWorking // recovers from panic if in this state			switch v := processor.(type) {			case Processor:				result, err := v.Process(msg.e, msg.task)				state = dispatcherStateNotify				msg.notifyMe <- ¬ifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}			case ValidatingProcessor:				result, err := v.Process(msg.e, msg.task)				state = dispatcherStateNotify				msg.notifyMe <- ¬ifyMsg{err: err, result: result}			case streamer:				err := v.open(msg.e)				if err == nil {					if msg.e.Size, err = io.CopyBuffer(v, msg.r, gw.buffers[workerId]); err != nil {						Log().Fields("error", err, "workerID", workerId+1).Error("stream writing failed")					}					if err = v.close(); err != nil {						Log().Fields("error", err, "workerID", workerId+1).Error("stream closing failed")					}				}				state = dispatcherStateNotify				var result Result				if err != nil {					result = NewResult(response.Canned.FailBackendTransaction, err)				} else {					result = NewResult(response.Canned.SuccessMessageQueued, response.SP, msg.e.QueuedId)				}				msg.notifyMe <- ¬ifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}			}		}		state = dispatcherStateIdle	}}func (gw *BackendGateway) makeBuffer() []byte {	if gw.buffers == nil {		gw.buffers = make(map[int][]byte)	}	size := configStreamBufferSize	if gw.gwConfig.StreamBufferSize > 0 {		size = gw.gwConfig.StreamBufferSize	}	return make([]byte, size)}// stopWorkers sends a signal to all workers to stopfunc (gw *BackendGateway) stopWorkers() {	for i := range gw.workStoppers {		gw.workStoppers[i] <- true	}	gw.workerID = 0}func (gw *BackendGateway) initProducer() error {	notValid := errors.New("gateway has no valid [post_process_producer] configured")	if gw.gwConfig.PostProcessConsumer == "" {		// consumer not configured, so not active		return nil	}	if gw.gwConfig.PostProcessProducer == "" {		return notValid	}	section := ConfigStreamProcessors // which section of the config (stream_processors)	m := newAliasMap(gw.config[section])	c := newStackStreamProcessorConfig(gw.gwConfig.PostProcessProducer, m)	if len(c.list) == 0 {		return notValid	}	// check it there's already an instance of it	if gw.decoratorLookup[section] != nil {		if v, ok := gw.decoratorLookup[section][c.list[0].String()]; ok {			gw.producer = v			return nil		}	}	if d := gw.newStreamDecorator(c.list[0], section); d != nil {		// use a new instance		gw.producer = d		return nil	} else {		return errors.New("please check gateway config [post_process_producer]")	}}
 |