Browse Source

- memory based storage for chunk saver

flashmob 6 years ago
parent
commit
92d5062756
3 changed files with 243 additions and 71 deletions
  1. 237 71
      backends/s_chunksaver.go
  2. 3 0
      backends/s_chunksaver_test.go
  3. 3 0
      mail/envelope.go

+ 237 - 71
backends/s_chunksaver.go

@@ -34,12 +34,14 @@ import (
 	"hash"
 	"hash"
 	"net"
 	"net"
 	"strings"
 	"strings"
+	"time"
 )
 )
 
 
 type chunkSaverConfig struct {
 type chunkSaverConfig struct {
 	// ChunkMaxBytes controls the maximum buffer size for saving
 	// ChunkMaxBytes controls the maximum buffer size for saving
-	// 16KB default. The smallest possible size is 64 bytes to to bytes.Buffer limitation
-	ChunkMaxBytes int `json:"chunksaver_chunk_size"`
+	// 16KB default.
+	ChunkMaxBytes int    `json:"chunksaver_chunk_size"`
+	StorageEngine string `json:"chunksaver_storage_engine"`
 }
 }
 
 
 func init() {
 func init() {
@@ -125,41 +127,50 @@ func (c *chunkedBytesBuffer) capTo(n int) {
 
 
 type chunkedBytesBufferMime struct {
 type chunkedBytesBufferMime struct {
 	chunkedBytesBuffer
 	chunkedBytesBuffer
-	current *mime.Part
-	info    partsInfo
-	md5     hash.Hash
+	current  *mime.Part
+	info     partsInfo
+	md5      hash.Hash
+	database ChunkSaverStorage
 }
 }
 
 
 func newChunkedBytesBufferMime() *chunkedBytesBufferMime {
 func newChunkedBytesBufferMime() *chunkedBytesBufferMime {
 	b := new(chunkedBytesBufferMime)
 	b := new(chunkedBytesBufferMime)
-	var chash [16]byte
+
 	b.chunkedBytesBuffer.flushTrigger = func() {
 	b.chunkedBytesBuffer.flushTrigger = func() {
-		b.md5.Write(b.buf)
-		copy(chash[:], b.md5.Sum([]byte{}))
-		if b.current != nil {
-			if size := len(b.info.Parts); size > 0 && b.info.Parts[size-1].PartId == b.current.Node {
-				// existing part, just append the hash
-				lastPart := &b.info.Parts[size-1]
-				lastPart.ChunkHash = append(lastPart.ChunkHash, chash)
-				b.fillInfo(lastPart, size-1)
-			} else {
-				// add it as a new part
-				part := chunkedPart{
-					PartId:    b.current.Node,
-					ChunkHash: [][16]byte{chash},
-				}
-				b.fillInfo(&part, 0)
-				b.info.Parts = append(b.info.Parts, part)
-				b.info.Count++
-			}
-			// TODO : send chunk to db
-			// db.savechunk(
-		}
+		b.onFlush()
 	}
 	}
 	b.md5 = md5.New()
 	b.md5 = md5.New()
 	return b
 	return b
 }
 }
 
 
+func (b *chunkedBytesBufferMime) setDatabase(database ChunkSaverStorage) {
+	b.database = database
+}
+
+func (b *chunkedBytesBufferMime) onFlush() {
+	b.md5.Write(b.buf)
+	var chash [16]byte
+	copy(chash[:], b.md5.Sum([]byte{}))
+	if b.current != nil {
+		if size := len(b.info.Parts); size > 0 && b.info.Parts[size-1].PartId == b.current.Node {
+			// existing part, just append the hash
+			lastPart := &b.info.Parts[size-1]
+			lastPart.ChunkHash = append(lastPart.ChunkHash, chash)
+			b.fillInfo(lastPart, size-1)
+		} else {
+			// add it as a new part
+			part := chunkedPart{
+				PartId:    b.current.Node,
+				ChunkHash: [][16]byte{chash},
+			}
+			b.fillInfo(&part, 0)
+			b.info.Parts = append(b.info.Parts, part)
+			b.info.Count++
+		}
+		b.database.AddChunk(b.buf, chash[:])
+	}
+}
+
 func (b *chunkedBytesBufferMime) fillInfo(cp *chunkedPart, index int) {
 func (b *chunkedBytesBufferMime) fillInfo(cp *chunkedPart, index int) {
 	if cp.ContentType == "" && b.current.ContentType != nil {
 	if cp.ContentType == "" && b.current.ContentType != nil {
 		cp.ContentType = b.current.ContentType.String()
 		cp.ContentType = b.current.ContentType.String()
@@ -198,27 +209,123 @@ func (b *chunkedBytesBufferMime) currentPart(cp *mime.Part) {
 
 
 }
 }
 
 
+// ChunkSaverStorage defines an interface to the storage layer (the database)
 type ChunkSaverStorage interface {
 type ChunkSaverStorage interface {
-	OpenMessage(
-		from []byte,
-		helo []byte,
-		recipient []byte,
-		ipAddress net.IPAddr,
-		returnPath []byte,
-		isTLS bool) (mailID uint64, err error)
-	CloseMessage(
-		mailID uint64,
-		size uint,
-		partsInfo *partsInfo,
-		subject []byte,
-		charset []byte,
-		deliveryID []byte,
-		to []byte) error
+	OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error)
+	CloseMessage(mailID uint64, size uint, partsInfo *partsInfo, subject string, deliveryID string, to string, from string) error
 	AddChunk(data []byte, hash []byte) error
 	AddChunk(data []byte, hash []byte) error
 	Initialize(cfg BackendConfig) error
 	Initialize(cfg BackendConfig) error
 	Shutdown() (err error)
 	Shutdown() (err error)
 }
 }
 
 
+type chunkSaverMemoryEmail struct {
+	mailID     uint64
+	createdAt  time.Time
+	size       uint
+	from       string
+	to         string
+	partsInfo  []byte
+	helo       string
+	subject    string
+	deliveryID string
+	recipient  string
+	ipv4       net.IPAddr
+	ipv6       net.IPAddr
+	returnPath string
+	isTLS      bool
+}
+
+type chunkSaverMemoryChunk struct {
+	modifiedAt     time.Time
+	referenceCount uint
+	data           []byte
+}
+
+type chunkSaverMemory struct {
+	chunks   map[[16]byte]*chunkSaverMemoryChunk
+	emails   []*chunkSaverMemoryEmail
+	nextID   uint64
+	IDOffset uint64
+}
+
+func (m *chunkSaverMemory) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
+	var ip4, ip6 net.IPAddr
+	if ip := ipAddress.IP.To4(); ip != nil {
+		ip4 = ipAddress
+	} else {
+		ip6 = ipAddress
+	}
+	email := chunkSaverMemoryEmail{
+		mailID:     m.nextID,
+		createdAt:  time.Now(),
+		from:       from,
+		helo:       helo,
+		recipient:  recipient,
+		ipv4:       ip4,
+		ipv6:       ip6,
+		returnPath: returnPath,
+		isTLS:      isTLS,
+	}
+	m.emails = append(m.emails, &email)
+	m.nextID++
+	return email.mailID, nil
+}
+
+func (m *chunkSaverMemory) CloseMessage(mailID uint64, size uint, partsInfo *partsInfo, subject string, deliveryID string, to string, from string) error {
+	if email := m.emails[mailID-m.IDOffset]; email == nil {
+		return errors.New("email not found")
+	} else {
+		email.size = size
+		if info, err := json.Marshal(partsInfo); err != nil {
+			return err
+		} else {
+			email.partsInfo = info
+		}
+		email.subject = subject
+		email.deliveryID = deliveryID
+		email.to = to
+		email.from = from
+		email.size = size
+	}
+	return nil
+}
+
+func (m *chunkSaverMemory) AddChunk(data []byte, hash []byte) error {
+	var key [16]byte
+	if len(hash) != 16 {
+		return errors.New("invalid hash")
+	}
+	copy(key[:], hash[0:15])
+	if chunk, ok := m.chunks[key]; ok {
+		// only update the counters and update time
+		chunk.referenceCount++
+		chunk.modifiedAt = time.Now()
+	} else {
+		// add a new chunk
+		newChunk := chunkSaverMemoryChunk{
+			modifiedAt:     time.Now(),
+			referenceCount: 1,
+			data:           data,
+		}
+		m.chunks[key] = &newChunk
+	}
+	return nil
+}
+
+func (m *chunkSaverMemory) Initialize(cfg BackendConfig) error {
+	m.IDOffset = 1
+	m.nextID = m.IDOffset
+	m.emails = make([]*chunkSaverMemoryEmail, 0, 100)
+	m.chunks = make(map[[16]byte]*chunkSaverMemoryChunk, 1000)
+	return nil
+}
+
+func (m *chunkSaverMemory) Shutdown() (err error) {
+	m.emails = nil
+	m.chunks = nil
+	return nil
+}
+
 type chunkSaverSQLConfig struct {
 type chunkSaverSQLConfig struct {
 	EmailTable  string `json:"email_table"`
 	EmailTable  string `json:"email_table"`
 	ChunkTable  string `json:"chunk_table"`
 	ChunkTable  string `json:"chunk_table"`
@@ -227,6 +334,7 @@ type chunkSaverSQLConfig struct {
 	PrimaryHost string `json:"primary_mail_host"`
 	PrimaryHost string `json:"primary_mail_host"`
 }
 }
 
 
+// chunkSaverSQL implements the ChunkSaverStorage interface
 type chunkSaverSQL struct {
 type chunkSaverSQL struct {
 	config     *chunkSaverSQLConfig
 	config     *chunkSaverSQLConfig
 	statements map[string]*sql.Stmt
 	statements map[string]*sql.Stmt
@@ -274,7 +382,7 @@ func (c *chunkSaverSQL) prepareSql() error {
 	// finalize the email (the connection closed)
 	// finalize the email (the connection closed)
 	if stmt, err := c.db.Prepare(`
 	if stmt, err := c.db.Prepare(`
 		UPDATE ` + c.config.EmailTable + ` 
 		UPDATE ` + c.config.EmailTable + ` 
-			SET size=?, parts_info = ?, subject, charset = ?, delivery_id = ?, to = ? 
+			SET size=?, parts_info = ?, subject, delivery_id = ?, to = ? 
 		WHERE mail_id = ? `); err != nil {
 		WHERE mail_id = ? `); err != nil {
 		return err
 		return err
 	} else {
 	} else {
@@ -330,13 +438,7 @@ func (c *chunkSaverSQL) prepareSql() error {
 	return nil
 	return nil
 }
 }
 
 
-func (c *chunkSaverSQL) OpenMessage(
-	from []byte,
-	helo []byte,
-	recipient []byte,
-	ipAddress net.IPAddr,
-	returnPath []byte,
-	isTLS bool) (mailID uint64, err error) {
+func (c *chunkSaverSQL) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
 
 
 	// if it's ipv4 then we want ipv6 to be 0, and vice-versa
 	// if it's ipv4 then we want ipv6 to be 0, and vice-versa
 	var ip4 uint32
 	var ip4 uint32
@@ -377,19 +479,12 @@ func (c *chunkSaverSQL) AddChunk(data []byte, hash []byte) error {
 	return nil
 	return nil
 }
 }
 
 
-func (c *chunkSaverSQL) CloseMessage(
-	mailID uint64,
-	size uint,
-	partsInfo *partsInfo,
-	subject []byte,
-	charset []byte,
-	deliveryID []byte,
-	to []byte) error {
+func (c *chunkSaverSQL) CloseMessage(mailID uint64, size uint, partsInfo *partsInfo, subject string, deliveryID string, to string, from string) error {
 	partsInfoJson, err := json.Marshal(partsInfo)
 	partsInfoJson, err := json.Marshal(partsInfo)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	_, err = c.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, charset, deliveryID, to, mailID)
+	_, err = c.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, deliveryID, to, mailID)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -452,6 +547,12 @@ func Chunksaver() *StreamDecorator {
 				chunkBuffer *chunkedBytesBufferMime
 				chunkBuffer *chunkedBytesBufferMime
 				msgPos      uint
 				msgPos      uint
 				database    ChunkSaverStorage
 				database    ChunkSaverStorage
+				written     uint
+
+				// just some headers from the first mime-part
+				subject string
+				to      string
+				from    string
 			)
 			)
 
 
 			var config *chunkSaverConfig
 			var config *chunkSaverConfig
@@ -469,7 +570,16 @@ func Chunksaver() *StreamDecorator {
 				} else {
 				} else {
 					chunkBuffer.capTo(chunkMaxBytes)
 					chunkBuffer.capTo(chunkMaxBytes)
 				}
 				}
-				database = new(chunkSaverSQL)
+				if config.StorageEngine == "memory" {
+					db := new(chunkSaverMemory)
+					chunkBuffer.setDatabase(db)
+					database = db
+				} else {
+					db := new(chunkSaverSQL)
+					chunkBuffer.setDatabase(db)
+					database = db
+				}
+
 				err = database.Initialize(backendConfig)
 				err = database.Initialize(backendConfig)
 				if err != nil {
 				if err != nil {
 					return err
 					return err
@@ -484,22 +594,80 @@ func Chunksaver() *StreamDecorator {
 
 
 			sd.Open = func(e *mail.Envelope) error {
 			sd.Open = func(e *mail.Envelope) error {
 				// create a new entry & grab the id
 				// create a new entry & grab the id
+				written = 0
+				var ip net.IPAddr
+				if ret := net.ParseIP(e.RemoteIP); ret != nil {
+					ip = net.IPAddr{IP: ret}
+				}
+				mid, err := database.OpenMessage(
+					e.MailFrom.String(),
+					e.Helo,
+					e.RcptTo[0].String(),
+					ip,
+					e.MailFrom.String(),
+					e.TLS)
+				if err != nil {
+					return err
+				}
+				e.Values["messageID"] = mid
 				envelope = e
 				envelope = e
 				return nil
 				return nil
 			}
 			}
 
 
 			sd.Close = func() error {
 			sd.Close = func() error {
 				chunkBuffer.flush()
 				chunkBuffer.flush()
-				chunkBuffer.Reset()
+				defer chunkBuffer.Reset()
+				if mid, ok := envelope.Values["messageID"].(uint64); ok {
+					err := database.CloseMessage(
+						mid,
+						written,
+						&chunkBuffer.info,
+						subject,
+						envelope.QueuedId,
+						to,
+						from,
+					)
+
+					if err != nil {
+						return err
+					}
+				}
+
 				return nil
 				return nil
 			}
 			}
 
 
-			return StreamProcessWith(func(p []byte) (int, error) {
+			fillVars := func(parts *[]*mime.Part, subject, to, from string) (string, string, string) {
+				if len(*parts) > 0 {
+					if subject == "" {
+						if val, ok := (*parts)[0].Headers["Subject"]; ok {
+							subject = val[0]
+						}
+					}
+					if to == "" {
+						if val, ok := (*parts)[0].Headers["To"]; ok {
+							addr, err := mail.NewAddress(val[0])
+							if err == nil {
+								to = addr.String()
+							}
+						}
+					}
+					if from == "" {
+						if val, ok := (*parts)[0].Headers["From"]; ok {
+							addr, err := mail.NewAddress(val[0])
+							if err == nil {
+								from = addr.String()
+							}
+						}
+					}
+
+				}
+				return subject, to, from
+			}
 
 
+			return StreamProcessWith(func(p []byte) (int, error) {
 				if envelope.Values == nil {
 				if envelope.Values == nil {
 					return 0, errors.New("no message headers found")
 					return 0, errors.New("no message headers found")
 				}
 				}
-
 				if parts, ok := envelope.Values["MimeParts"].(*[]*mime.Part); ok {
 				if parts, ok := envelope.Values["MimeParts"].(*[]*mime.Part); ok {
 					var (
 					var (
 						pos      int
 						pos      int
@@ -508,39 +676,37 @@ func Chunksaver() *StreamDecorator {
 					if len(*parts) > 2 {
 					if len(*parts) > 2 {
 						progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already out
 						progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already out
 					}
 					}
-
+					subject, to, from = fillVars(parts, subject, to, from)
 					offset := msgPos
 					offset := msgPos
 					for i := progress; i < len(*parts); i++ {
 					for i := progress; i < len(*parts); i++ {
 						part := (*parts)[i]
 						part := (*parts)[i]
+						count := 0
 						chunkBuffer.currentPart(part)
 						chunkBuffer.currentPart(part)
-
 						// break chunk on new part
 						// break chunk on new part
 						if part.StartingPos > 0 && part.StartingPos > msgPos {
 						if part.StartingPos > 0 && part.StartingPos > msgPos {
-							count, _ := chunkBuffer.Write(p[pos : part.StartingPos-offset])
-
+							count, _ = chunkBuffer.Write(p[pos : part.StartingPos-offset])
+							written += uint(count)
 							chunkBuffer.flush()
 							chunkBuffer.flush()
 							fmt.Println("->N")
 							fmt.Println("->N")
 							pos += count
 							pos += count
 							msgPos = part.StartingPos
 							msgPos = part.StartingPos
 						}
 						}
-
 						// break chunk on header
 						// break chunk on header
 						if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
 						if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
 							count, _ := chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
 							count, _ := chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
-
+							written += uint(count)
 							chunkBuffer.flush()
 							chunkBuffer.flush()
 							fmt.Println("->H")
 							fmt.Println("->H")
 							pos += count
 							pos += count
 							msgPos = part.StartingPosBody
 							msgPos = part.StartingPosBody
 						}
 						}
-
 						// if on the latest (last) part, and yet there is still data to be written out
 						// if on the latest (last) part, and yet there is still data to be written out
 						if len(*parts)-1 == i && len(p)-1 > pos {
 						if len(*parts)-1 == i && len(p)-1 > pos {
 							count, _ := chunkBuffer.Write(p[pos:])
 							count, _ := chunkBuffer.Write(p[pos:])
+							written += uint(count)
 							pos += count
 							pos += count
 							msgPos += uint(count)
 							msgPos += uint(count)
 						}
 						}
-
 						// if there's no more data
 						// if there's no more data
 						if pos >= len(p) {
 						if pos >= len(p) {
 							break
 							break

+ 3 - 0
backends/s_chunksaver_test.go

@@ -146,6 +146,8 @@ func TestChunkSaverWrite(t *testing.T) {
 
 
 	// place the parse result in an envelope
 	// place the parse result in an envelope
 	e := mail.NewEnvelope("127.0.0.1", 1)
 	e := mail.NewEnvelope("127.0.0.1", 1)
+	to, _ := mail.NewAddress("[email protected]")
+	e.RcptTo = append(e.RcptTo, to)
 	e.Values["MimeParts"] = &parser.Parts
 	e.Values["MimeParts"] = &parser.Parts
 
 
 	// instantiate the chunk saver
 	// instantiate the chunk saver
@@ -157,6 +159,7 @@ func TestChunkSaverWrite(t *testing.T) {
 	// configure the buffer cap
 	// configure the buffer cap
 	bc := BackendConfig{}
 	bc := BackendConfig{}
 	bc["chunksaver_chunk_size"] = 64
 	bc["chunksaver_chunk_size"] = 64
+	bc["chunksaver_storage_engine"] = "memory"
 	_ = Svc.initialize(bc)
 	_ = Svc.initialize(bc)
 
 
 	// give it the envelope with the parse results
 	// give it the envelope with the parse results

+ 3 - 0
mail/envelope.go

@@ -49,10 +49,13 @@ func (ep *Address) IsEmpty() bool {
 }
 }
 
 
 var ap = mail.AddressParser{}
 var ap = mail.AddressParser{}
+var apLock sync.Mutex // guards mail.AddressParser
 
 
 // NewAddress takes a string of an RFC 5322 address of the
 // NewAddress takes a string of an RFC 5322 address of the
 // form "Gogh Fir <[email protected]>" or "[email protected]".
 // form "Gogh Fir <[email protected]>" or "[email protected]".
 func NewAddress(str string) (Address, error) {
 func NewAddress(str string) (Address, error) {
+	apLock.Lock()
+	defer apLock.Unlock()
 	a, err := ap.Parse(str)
 	a, err := ap.Parse(str)
 	if err != nil {
 	if err != nil {
 		return Address{}, err
 		return Address{}, err