Ver código fonte

- added stream_buffer_size setting to the config
- stream buffer is recycled

flashmob 6 anos atrás
pai
commit
5e3eb79ba5
1 arquivos alterados com 11 adições e 4 exclusões
  1. 11 4
      backends/gateway.go

+ 11 - 4
backends/gateway.go

@@ -37,6 +37,8 @@ type BackendGateway struct {
 	State    backendState
 	config   BackendConfig
 	gwConfig *GatewayConfig
+
+	buf []byte // stream output buffer
 }
 
 type GatewayConfig struct {
@@ -52,6 +54,8 @@ type GatewayConfig struct {
 	TimeoutValidateRcpt string `json:"gw_val_rcpt_timeout,omitempty"`
 	// StreamSaveProcess is same as a ProcessorStack, but reads from an io.Reader to write email data
 	StreamSaveProcess string `json:"stream_save_process,omitempty"`
+	// StreamBufferLen controls the size of the output buffer, in bytes. Default is 4096
+	StreamBufferSize int `json:"stream_buffer_size,omitempty"`
 }
 
 // workerMsg is what get placed on the BackendGateway.saveMailChan channel
@@ -468,6 +472,12 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 	if gw.conveyor == nil {
 		gw.conveyor = make(chan *workerMsg, workersSize)
 	}
+
+	size := 4096 // 4096
+	if gw.gwConfig.StreamBufferSize > 0 {
+		size = gw.gwConfig.StreamBufferSize
+	}
+	gw.buf = make([]byte, size)
 	// ready to start
 	gw.State = BackendStateInitialized
 	return nil
@@ -601,10 +611,7 @@ func (gw *BackendGateway) workDispatcher(
 			} else if msg.task == TaskSaveMailStream {
 				err := stream.open(msg.e)
 				if err == nil {
-					var buf []byte
-					// TODO make the buffer configurable
-					buf = make([]byte, 1024*4)
-					if msg.e.Values["size"], err = io.CopyBuffer(stream, msg.r, buf); err != nil {
+					if msg.e.Values["size"], err = io.CopyBuffer(stream, msg.r, gw.buf); err != nil {
 						Log().WithError(err).Error("stream writing failed")
 					}
 					if err = stream.close(); err != nil {