123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922 |
- package backends
- // ----------------------------------------------------------------------------------
- // Processor Name: ChunkSaver
- // ----------------------------------------------------------------------------------
- // Description : Takes the stream and saves it in chunks. Chunks are split on the
- // : chunksaver_chunk_size config setting, and also at the end of MIME parts,
- // : and after a header. This allows for basic de-duplication: we can take a
- // : hash of each chunk, then check the database to see if we have it already.
- // : We don't need to write it to the database, but take the reference of the
- // : previously saved chunk and only increment the reference count.
- // : The rationale to put headers and bodies into separate chunks is
- // : due to headers often containing more unique data, while the bodies are
- // : often duplicated, especially for messages that are CC'd or forwarded
- // ----------------------------------------------------------------------------------
- // Requires : "mimeanalyzer" stream processor to be enabled before it
- // ----------------------------------------------------------------------------------
- // Config Options: chunksaver_chunk_size - maximum chunk size, in bytes
- // --------------:-------------------------------------------------------------------
- // Input : e.Values["MimeParts"] Which is of type *[]*mime.Part, as populated by "mimeanalyzer"
- // ----------------------------------------------------------------------------------
- // Output :
- // ----------------------------------------------------------------------------------
- import (
- "bytes"
- "compress/zlib"
- "crypto/md5"
- "database/sql"
- "encoding/binary"
- "encoding/json"
- "errors"
- "fmt"
- "github.com/flashmob/go-guerrilla/mail"
- "github.com/flashmob/go-guerrilla/mail/mime"
- "hash"
- "io"
- "net"
- "strings"
- "time"
- )
- type chunkSaverConfig struct {
- // ChunkMaxBytes controls the maximum buffer size for saving
- // 16KB default.
- ChunkMaxBytes int `json:"chunksaver_chunk_size"`
- StorageEngine string `json:"chunksaver_storage_engine"`
- CompressLevel int `json:"chunksaver_compress_level,omitempty"`
- }
- func init() {
- streamers["chunksaver"] = func() *StreamDecorator {
- return Chunksaver()
- }
- }
- type PartsInfo struct {
- Count uint32 `json:"c"` // number of parts
- TextPart int `json:"tp"` // id of the main text part to display
- HTMLPart int `json:"hp"` // id of the main html part to display (if any)
- HasAttach bool `json:"a"`
- Parts []chunkedPart `json:"p"`
- Dictionary []byte `json:"d"` // zlib dictionary
- }
- type chunkedPart struct {
- PartId string `json:"i"`
- ChunkHash [][16]byte `json:"h"` // sequence of hashes the data is stored at
- ContentType string `json:"t"`
- Charset string `json:"c"`
- TransferEncoding string `json:"e"`
- ContentDisposition string `json:"d"`
- }
- type flushEvent func() error
- type chunkedBytesBuffer struct {
- buf []byte
- flushTrigger flushEvent
- }
- // flush signals that it's time to write the buffer out to storage
- func (c *chunkedBytesBuffer) flush() error {
- if len(c.buf) == 0 {
- return nil
- }
- fmt.Print(string(c.buf))
- if c.flushTrigger != nil {
- if err := c.flushTrigger(); err != nil {
- return err
- }
- }
- c.Reset()
- return nil
- }
- // Reset sets the length back to 0, making it re-usable
- func (c *chunkedBytesBuffer) Reset() {
- c.buf = c.buf[:0] // set the length back to 0
- }
- // Write takes a p slice of bytes and writes it to the buffer.
- // It will never grow the buffer, flushing it as soon as it's full.
- func (c *chunkedBytesBuffer) Write(p []byte) (i int, err error) {
- remaining := len(p)
- bufCap := cap(c.buf)
- for {
- free := bufCap - len(c.buf)
- if free > remaining {
- // enough of room in the buffer
- c.buf = append(c.buf, p[i:i+remaining]...)
- i += remaining
- return
- } else {
- // fill the buffer to the 'brim' with a slice from p
- c.buf = append(c.buf, p[i:i+free]...)
- remaining -= free
- i += free
- err = c.flush()
- if err != nil {
- return i, err
- }
- if remaining == 0 {
- return
- }
- }
- }
- }
- // capTo caps the internal buffer to specified number of bytes, sets the length back to 0
- func (c *chunkedBytesBuffer) capTo(n int) {
- if cap(c.buf) == n {
- return
- }
- c.buf = make([]byte, 0, n)
- }
- type chunkedBytesBufferMime struct {
- chunkedBytesBuffer
- current *mime.Part
- info PartsInfo
- md5 hash.Hash
- database ChunkSaverStorage
- }
- func newChunkedBytesBufferMime() *chunkedBytesBufferMime {
- b := new(chunkedBytesBufferMime)
- b.chunkedBytesBuffer.flushTrigger = func() error {
- return b.onFlush()
- }
- b.md5 = md5.New()
- b.buf = make([]byte, 0, chunkMaxBytes)
- return b
- }
- func (b *chunkedBytesBufferMime) setDatabase(database ChunkSaverStorage) {
- b.database = database
- }
- func (b *chunkedBytesBufferMime) onFlush() error {
- b.md5.Write(b.buf)
- var chash [16]byte
- copy(chash[:], b.md5.Sum([]byte{}))
- if b.current != nil {
- if size := len(b.info.Parts); size > 0 && b.info.Parts[size-1].PartId == b.current.Node {
- // existing part, just append the hash
- lastPart := &b.info.Parts[size-1]
- lastPart.ChunkHash = append(lastPart.ChunkHash, chash)
- b.fillInfo(lastPart, size-1)
- } else {
- // add it as a new part
- part := chunkedPart{
- PartId: b.current.Node,
- ChunkHash: [][16]byte{chash},
- }
- b.fillInfo(&part, 0)
- b.info.Parts = append(b.info.Parts, part)
- b.info.Count++
- }
- if err := b.database.AddChunk(b.buf, chash[:]); err != nil {
- return err
- }
- }
- return nil
- }
- func (b *chunkedBytesBufferMime) fillInfo(cp *chunkedPart, index int) {
- if cp.ContentType == "" && b.current.ContentType != nil {
- cp.ContentType = b.current.ContentType.String()
- }
- if cp.Charset == "" && b.current.Charset != "" {
- cp.Charset = b.current.Charset
- }
- if cp.TransferEncoding == "" && b.current.TransferEncoding != "" {
- cp.TransferEncoding = b.current.TransferEncoding
- }
- if cp.ContentDisposition == "" && b.current.ContentDisposition != "" {
- cp.ContentDisposition = b.current.ContentDisposition
- if strings.Contains(cp.ContentDisposition, "attach") {
- b.info.HasAttach = true
- }
- }
- if cp.ContentType != "" {
- if b.info.TextPart == -1 && strings.Contains(cp.ContentType, "text/plain") {
- b.info.TextPart = index
- } else if b.info.HTMLPart == -1 && strings.Contains(cp.ContentType, "text/html") {
- b.info.HTMLPart = index
- }
- }
- }
- func (b *chunkedBytesBufferMime) Reset() {
- b.md5.Reset()
- b.chunkedBytesBuffer.Reset()
- }
- func (b *chunkedBytesBufferMime) currentPart(cp *mime.Part) {
- if b.current == nil {
- b.info = PartsInfo{Parts: make([]chunkedPart, 0, 3), TextPart: -1, HTMLPart: -1}
- }
- b.current = cp
- }
- // ChunkSaverStorage defines an interface to the storage layer (the database)
- type ChunkSaverStorage interface {
- OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error)
- CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error
- AddChunk(data []byte, hash []byte) error
- GetEmail(mailID uint64) (*ChunkSaverEmail, error)
- GetChunks(hash ...[16]byte) ([]*ChunkSaverChunk, error)
- Initialize(cfg BackendConfig) error
- Shutdown() (err error)
- }
- type ChunkSaverEmail struct {
- mailID uint64
- createdAt time.Time
- size int64
- from string
- to string
- partsInfo PartsInfo
- helo string
- subject string
- deliveryID string
- recipient string
- ipv4 net.IPAddr
- ipv6 net.IPAddr
- returnPath string
- isTLS bool
- }
- type ChunkSaverChunk struct {
- modifiedAt time.Time
- referenceCount uint
- data io.Reader
- }
- type chunkSaverMemoryEmail struct {
- mailID uint64
- createdAt time.Time
- size int64
- from string
- to string
- partsInfo []byte
- helo string
- subject string
- deliveryID string
- recipient string
- ipv4 net.IPAddr
- ipv6 net.IPAddr
- returnPath string
- isTLS bool
- }
- type chunkSaverMemoryChunk struct {
- modifiedAt time.Time
- referenceCount uint
- data []byte
- }
- type chunkSaverMemory struct {
- chunks map[[16]byte]*chunkSaverMemoryChunk
- emails []*chunkSaverMemoryEmail
- nextID uint64
- IDOffset uint64
- compressLevel int
- }
- func (m *chunkSaverMemory) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
- var ip4, ip6 net.IPAddr
- if ip := ipAddress.IP.To4(); ip != nil {
- ip4 = ipAddress
- } else {
- ip6 = ipAddress
- }
- email := chunkSaverMemoryEmail{
- mailID: m.nextID,
- createdAt: time.Now(),
- from: from,
- helo: helo,
- recipient: recipient,
- ipv4: ip4,
- ipv6: ip6,
- returnPath: returnPath,
- isTLS: isTLS,
- }
- m.emails = append(m.emails, &email)
- m.nextID++
- return email.mailID, nil
- }
- func (m *chunkSaverMemory) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
- if email := m.emails[mailID-m.IDOffset]; email == nil {
- return errors.New("email not found")
- } else {
- email.size = size
- if info, err := json.Marshal(partsInfo); err != nil {
- return err
- } else {
- email.partsInfo = info
- }
- email.subject = subject
- email.deliveryID = deliveryID
- email.to = to
- email.from = from
- email.size = size
- }
- return nil
- }
- func (m *chunkSaverMemory) AddChunk(data []byte, hash []byte) error {
- var key [16]byte
- if len(hash) != 16 {
- return errors.New("invalid hash")
- }
- copy(key[:], hash[0:16])
- var compressed bytes.Buffer
- zlibw, err := zlib.NewWriterLevel(&compressed, m.compressLevel)
- if err != nil {
- return err
- }
- if chunk, ok := m.chunks[key]; ok {
- // only update the counters and update time
- chunk.referenceCount++
- chunk.modifiedAt = time.Now()
- } else {
- if _, err := zlibw.Write(data); err != nil {
- return err
- }
- if err := zlibw.Close(); err != nil {
- return err
- }
- // add a new chunk
- newChunk := chunkSaverMemoryChunk{
- modifiedAt: time.Now(),
- referenceCount: 1,
- data: compressed.Bytes(),
- }
- m.chunks[key] = &newChunk
- }
- return nil
- }
- func (m *chunkSaverMemory) Initialize(cfg BackendConfig) error {
- m.IDOffset = 1
- m.nextID = m.IDOffset
- m.emails = make([]*chunkSaverMemoryEmail, 0, 100)
- m.chunks = make(map[[16]byte]*chunkSaverMemoryChunk, 1000)
- m.compressLevel = zlib.NoCompression
- return nil
- }
- func (m *chunkSaverMemory) Shutdown() (err error) {
- m.emails = nil
- m.chunks = nil
- return nil
- }
- func (m *chunkSaverMemory) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
- if size := uint64(len(m.emails)) - m.IDOffset; size > mailID-m.IDOffset {
- return nil, errors.New("mail not found")
- }
- email := m.emails[mailID-m.IDOffset]
- pi := &PartsInfo{}
- if err := json.Unmarshal(email.partsInfo, pi); err != nil {
- return nil, err
- }
- return &ChunkSaverEmail{
- mailID: email.mailID,
- createdAt: email.createdAt,
- size: email.size,
- from: email.from,
- to: email.to,
- partsInfo: *pi,
- helo: email.helo,
- subject: email.subject,
- deliveryID: email.deliveryID,
- recipient: email.recipient,
- ipv4: email.ipv4,
- ipv6: email.ipv6,
- returnPath: email.returnPath,
- isTLS: email.isTLS,
- }, nil
- }
- func (m *chunkSaverMemory) GetChunks(hash ...[16]byte) ([]*ChunkSaverChunk, error) {
- result := make([]*ChunkSaverChunk, 0, len(hash))
- var key [16]byte
- for i := range hash {
- copy(key[:], hash[i][:16])
- if c, ok := m.chunks[key]; ok {
- zwr, err := zlib.NewReader(bytes.NewReader(c.data))
- if err != nil {
- return nil, err
- }
- result = append(result, &ChunkSaverChunk{
- modifiedAt: c.modifiedAt,
- referenceCount: c.referenceCount,
- data: zwr,
- })
- }
- }
- return result, nil
- }
- type chunkSaverSQLConfig struct {
- EmailTable string `json:"email_table"`
- ChunkTable string `json:"chunk_table"`
- Driver string `json:"sql_driver"`
- DSN string `json:"sql_dsn"`
- PrimaryHost string `json:"primary_mail_host"`
- }
- // chunkSaverSQL implements the ChunkSaverStorage interface
- type chunkSaverSQL struct {
- config *chunkSaverSQLConfig
- statements map[string]*sql.Stmt
- db *sql.DB
- }
- func (c *chunkSaverSQL) connect() (*sql.DB, error) {
- var err error
- if c.db, err = sql.Open(c.config.Driver, c.config.DSN); err != nil {
- Log().Error("cannot open database: ", err)
- return nil, err
- }
- // do we have permission to access the table?
- _, err = c.db.Query("SELECT mail_id FROM " + c.config.EmailTable + " LIMIT 1")
- if err != nil {
- return nil, err
- }
- return c.db, err
- }
- func (c *chunkSaverSQL) prepareSql() error {
- if c.statements == nil {
- c.statements = make(map[string]*sql.Stmt)
- }
- if stmt, err := c.db.Prepare(`INSERT INTO ` +
- c.config.EmailTable +
- ` (from, helo, recipient, ipv4_addr, ipv6_addr, return_path, is_tls)
- VALUES(?, ?, ?, ?, ?, ?, ?)`); err != nil {
- return err
- } else {
- c.statements["insertEmail"] = stmt
- }
- // begin inserting an email (before saving chunks)
- if stmt, err := c.db.Prepare(`INSERT INTO ` +
- c.config.ChunkTable +
- ` (data, hash)
- VALUES(?, ?)`); err != nil {
- return err
- } else {
- c.statements["insertChunk"] = stmt
- }
- // finalize the email (the connection closed)
- if stmt, err := c.db.Prepare(`
- UPDATE ` + c.config.EmailTable + `
- SET size=?, parts_info = ?, subject, delivery_id = ?, to = ?
- WHERE mail_id = ? `); err != nil {
- return err
- } else {
- c.statements["finalizeEmail"] = stmt
- }
- // Check the existence of a chunk (the reference_count col is incremented if it exists)
- // This means we can avoid re-inserting an existing chunk, only update its reference_count
- if stmt, err := c.db.Prepare(`
- UPDATE ` + c.config.ChunkTable + `
- SET reference_count=reference_count+1
- WHERE hash = ? `); err != nil {
- return err
- } else {
- c.statements["chunkReferenceIncr"] = stmt
- }
- // If the reference_count is 0 then it means the chunk has been deleted
- // Chunks are soft-deleted for now, hard-deleted by another sweeper query as they become stale.
- if stmt, err := c.db.Prepare(`
- UPDATE ` + c.config.ChunkTable + `
- SET reference_count=reference_count-1
- WHERE hash = ? AND reference_count > 0`); err != nil {
- return err
- } else {
- c.statements["chunkReferenceDecr"] = stmt
- }
- // fetch an email
- if stmt, err := c.db.Prepare(`
- SELECT *
- from ` + c.config.EmailTable + `
- where mail_id=?`); err != nil {
- return err
- } else {
- c.statements["selectMail"] = stmt
- }
- // fetch a chunk
- if stmt, err := c.db.Prepare(`
- SELECT *
- from ` + c.config.ChunkTable + `
- where hash=?`); err != nil {
- return err
- } else {
- c.statements["selectChunk"] = stmt
- }
- // sweep old chunks
- // sweep incomplete emails
- return nil
- }
- func (c *chunkSaverSQL) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
- // if it's ipv4 then we want ipv6 to be 0, and vice-versa
- var ip4 uint32
- ip6 := make([]byte, 16)
- if ip := ipAddress.IP.To4(); ip != nil {
- ip4 = binary.BigEndian.Uint32(ip)
- } else {
- _ = copy(ip6, []byte(ipAddress.IP))
- }
- r, err := c.statements["insertEmail"].Exec(from, helo, recipient, ip4, ip6, returnPath, isTLS)
- if err != nil {
- return 0, err
- }
- id, err := r.LastInsertId()
- if err != nil {
- return 0, err
- }
- return uint64(id), err
- }
- func (c *chunkSaverSQL) AddChunk(data []byte, hash []byte) error {
- // attempt to increment the reference_count (it means the chunk is already in there)
- r, err := c.statements["chunkReferenceIncr"].Exec(hash)
- if err != nil {
- return err
- }
- affected, err := r.RowsAffected()
- if err != nil {
- return err
- }
- if affected == 0 {
- // chunk isn't in there, let's insert it
- _, err := c.statements["insertChunk"].Exec(data, hash)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func (c *chunkSaverSQL) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
- partsInfoJson, err := json.Marshal(partsInfo)
- if err != nil {
- return err
- }
- _, err = c.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, deliveryID, to, mailID)
- if err != nil {
- return err
- }
- return nil
- }
- // Initialize loads the specific database config, connects to the db, prepares statements
- func (c *chunkSaverSQL) Initialize(cfg BackendConfig) error {
- configType := BaseConfig(&chunkSaverSQLConfig{})
- bcfg, err := Svc.ExtractConfig(cfg, configType)
- if err != nil {
- return err
- }
- c.config = bcfg.(*chunkSaverSQLConfig)
- c.db, err = c.connect()
- if err != nil {
- return err
- }
- err = c.prepareSql()
- if err != nil {
- return err
- }
- return nil
- }
- func (c *chunkSaverSQL) Shutdown() (err error) {
- defer func() {
- closeErr := c.db.Close()
- if closeErr != err {
- Log().WithError(err).Error("failed to close sql database")
- err = closeErr
- }
- }()
- for i := range c.statements {
- if err = c.statements[i].Close(); err != nil {
- Log().WithError(err).Error("failed to close sql statement")
- }
- }
- return err
- }
- func (c *chunkSaverSQL) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
- return &ChunkSaverEmail{}, nil
- }
- func (c *chunkSaverSQL) GetChunks(hash ...[16]byte) ([]*ChunkSaverChunk, error) {
- result := make([]*ChunkSaverChunk, 0, len(hash))
- return result, nil
- }
- type chunkMailReader struct {
- db ChunkSaverStorage
- email *ChunkSaverEmail
- part int
- i, j int
- }
- // NewChunkMailReader loads the email and selects which mime-part Read will return using the part argument
- // if part is -1, Read will read in the entire message
- func NewChunkMailReader(db ChunkSaverStorage, email *ChunkSaverEmail, part int) (*chunkMailReader, error) {
- r := new(chunkMailReader)
- r.db = db
- r.part = part
- if email == nil {
- return nil, errors.New("nil email")
- } else {
- r.email = email
- }
- if err := r.SeekPart(part); err != nil {
- return nil, err
- }
- return r, nil
- }
- func (r *chunkMailReader) SeekPart(part int) error {
- if parts := len(r.email.partsInfo.Parts); parts == 0 {
- return errors.New("email has mime parts missing")
- } else if part > parts {
- return errors.New("no such part available")
- }
- r.i = part
- r.j = 0
- return nil
- }
- func (r *chunkMailReader) Read(p []byte) (n int, err error) {
- var chunks []*ChunkSaverChunk
- if r.part < 90 {
- for ; r.i < len(r.email.partsInfo.Parts); r.i++ {
- chunks, err = r.db.GetChunks(r.email.partsInfo.Parts[r.i].ChunkHash...)
- if err != nil {
- return
- }
- var nRead int
- for r.j < len(chunks) {
- nRead, err = chunks[r.j].data.Read(p)
- if err == io.EOF {
- r.j++ // advance to the next chunk
- err = nil
- }
- if r.j == len(chunks) { // last chunk in a part?
- r.j = 0 // reset chunk index
- r.i++ // advance to the next part
- if r.i == len(r.email.partsInfo.Parts) || r.part > 0 {
- // there are no more parts to return
- err = io.EOF
- }
- }
- // unless there's an error, the next time this function will be
- // called, it will read the next chunk
- return nRead, err
- }
- }
- err = io.EOF
- }
- return n, err
- }
- const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunksaver_chunk_size config setting
- /**
- *
- * A chunk ends ether:
- * after xKB or after end of a part, or end of header
- *
- * - buffer first chunk
- * - if didn't receive first chunk for more than x bytes, save normally
- *
- */
- func Chunksaver() *StreamDecorator {
- sd := &StreamDecorator{}
- sd.Decorate =
- func(sp StreamProcessor, a ...interface{}) StreamProcessor {
- var (
- envelope *mail.Envelope
- chunkBuffer *chunkedBytesBufferMime
- msgPos uint
- database ChunkSaverStorage
- written int64
- // just some headers from the first mime-part
- subject string
- to string
- from string
- )
- var config *chunkSaverConfig
- // optional dependency injection
- for i := range a {
- if db, ok := a[i].(ChunkSaverStorage); ok {
- database = db
- }
- if buff, ok := a[i].(*chunkedBytesBufferMime); ok {
- chunkBuffer = buff
- }
- }
- Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
- configType := BaseConfig(&chunkSaverConfig{})
- bcfg, err := Svc.ExtractConfig(backendConfig, configType)
- if err != nil {
- return err
- }
- config = bcfg.(*chunkSaverConfig)
- if chunkBuffer == nil {
- chunkBuffer = newChunkedBytesBufferMime()
- }
- // configure storage if none was injected
- if database == nil {
- if config.StorageEngine == "memory" {
- db := new(chunkSaverMemory)
- db.compressLevel = config.CompressLevel
- database = db
- } else {
- db := new(chunkSaverSQL)
- database = db
- }
- }
- err = database.Initialize(backendConfig)
- if err != nil {
- return err
- }
- // configure the chunks buffer
- if config.ChunkMaxBytes > 0 {
- chunkBuffer.capTo(config.ChunkMaxBytes)
- } else {
- chunkBuffer.capTo(chunkMaxBytes)
- }
- chunkBuffer.setDatabase(database)
- return nil
- }))
- Svc.AddShutdowner(ShutdownWith(func() error {
- err := database.Shutdown()
- return err
- }))
- sd.Open = func(e *mail.Envelope) error {
- // create a new entry & grab the id
- written = 0
- var ip net.IPAddr
- if ret := net.ParseIP(e.RemoteIP); ret != nil {
- ip = net.IPAddr{IP: ret}
- }
- mid, err := database.OpenMessage(
- e.MailFrom.String(),
- e.Helo,
- e.RcptTo[0].String(),
- ip,
- e.MailFrom.String(),
- e.TLS)
- if err != nil {
- return err
- }
- e.Values["messageID"] = mid
- envelope = e
- return nil
- }
- sd.Close = func() (err error) {
- err = chunkBuffer.flush()
- if err != nil {
- // TODO we could delete the half saved message here
- return err
- }
- defer chunkBuffer.Reset()
- if mid, ok := envelope.Values["messageID"].(uint64); ok {
- err = database.CloseMessage(
- mid,
- written,
- &chunkBuffer.info,
- subject,
- envelope.QueuedId,
- to,
- from,
- )
- if err != nil {
- return err
- }
- }
- return nil
- }
- fillVars := func(parts *[]*mime.Part, subject, to, from string) (string, string, string) {
- if len(*parts) > 0 {
- if subject == "" {
- if val, ok := (*parts)[0].Headers["Subject"]; ok {
- subject = val[0]
- }
- }
- if to == "" {
- if val, ok := (*parts)[0].Headers["To"]; ok {
- addr, err := mail.NewAddress(val[0])
- if err == nil {
- to = addr.String()
- }
- }
- }
- if from == "" {
- if val, ok := (*parts)[0].Headers["From"]; ok {
- addr, err := mail.NewAddress(val[0])
- if err == nil {
- from = addr.String()
- }
- }
- }
- }
- return subject, to, from
- }
- return StreamProcessWith(func(p []byte) (count int, err error) {
- if envelope.Values == nil {
- return count, errors.New("no message headers found")
- }
- if parts, ok := envelope.Values["MimeParts"].(*[]*mime.Part); ok {
- var (
- pos int
- progress int
- )
- if len(*parts) > 2 {
- // todo: progress is a bit buggy
- // todo: do not flush empty buffer
- //progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already out
- }
- subject, to, from = fillVars(parts, subject, to, from)
- offset := msgPos
- for i := progress; i < len(*parts); i++ {
- part := (*parts)[i]
- chunkBuffer.currentPart(part)
- // break chunk on new part
- if part.StartingPos > 0 && part.StartingPos > msgPos {
- count, _ = chunkBuffer.Write(p[pos : part.StartingPos-offset])
- written += int64(count)
- err = chunkBuffer.flush()
- if err != nil {
- return count, err
- }
- fmt.Println("->N")
- pos += count
- msgPos = part.StartingPos
- }
- // break chunk on header
- if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
- count, _ = chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
- written += int64(count)
- err = chunkBuffer.flush()
- if err != nil {
- return count, err
- }
- fmt.Println("->H")
- pos += count
- msgPos = part.StartingPosBody
- }
- // if on the latest (last) part, and yet there is still data to be written out
- if len(*parts)-1 == i && len(p)-1 > pos {
- count, _ = chunkBuffer.Write(p[pos:])
- written += int64(count)
- pos += count
- msgPos += uint(count)
- }
- // if there's no more data
- if pos >= len(p) {
- break
- }
- }
- }
- return sp.Write(p)
- })
- }
- return sd
- }
|