Parcourir la source

sql store: reading and writing emails to an sql db
envelope: changed queuedID to packed bytes and added a fmt.Stringer

flashmob il y a 5 ans
Parent
commit
0333b7596a

+ 2 - 2
backends/gateway.go

@@ -696,7 +696,7 @@ func (gw *BackendGateway) workDispatcher(
 			case Processor:
 				result, err := v.Process(msg.e, msg.task)
 				state = dispatcherStateNotify
-				msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
+				msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId.String()}
 			case ValidatingProcessor:
 				result, err := v.Process(msg.e, msg.task)
 				state = dispatcherStateNotify
@@ -718,7 +718,7 @@ func (gw *BackendGateway) workDispatcher(
 				} else {
 					result = NewResult(response.Canned.SuccessMessageQueued, response.SP, msg.e.QueuedId)
 				}
-				msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
+				msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId.String()}
 
 			}
 		}

+ 1 - 2
backends/p_guerrilla_db_redis.go

@@ -437,8 +437,7 @@ func GuerrillaDbRedis() Decorator {
 					e.MailFrom.String(),
 					e.Subject,
 					ts)
-				e.QueuedId = hash
-
+				e.QueuedId.FromHex(hash)
 				// Add extra headers
 				var addHead string
 				addHead += "Delivered-To: " + to + "\r\n"

+ 1 - 1
backends/p_redis.go

@@ -88,7 +88,7 @@ func Redis() Decorator {
 			if task == TaskSaveMail {
 				hash := ""
 				if len(e.Hashes) > 0 {
-					e.QueuedId = e.Hashes[0]
+					e.QueuedId.FromHex(e.Hashes[0])
 					hash = e.Hashes[0]
 					var stringer fmt.Stringer
 					// a compressor was set

+ 1 - 2
backends/p_sql.go

@@ -229,9 +229,8 @@ func SQL() Decorator {
 				hash := ""
 				if len(e.Hashes) > 0 {
 					hash = e.Hashes[0]
-					e.QueuedId = e.Hashes[0]
+					e.QueuedId.FromHex(e.Hashes[0])
 				}
-
 				var co *DataCompressor
 				// a compressor was set by the Compress processor
 				if c, ok := e.Values["zlib-compressor"]; ok {

+ 4 - 4
backends/s_mimeanalyzer.go

@@ -61,11 +61,11 @@ func StreamMimeAnalyzer() *StreamDecorator {
 
 			sd.Close = func() error {
 				if parseErr == nil {
-					_ = parser.Close()
-					return nil
-				} else {
-					return parseErr
+					if parseErr = parser.Close(); parseErr != nil {
+						Log().WithError(parseErr).Error("mime parse error in mimeanalyzer on close")
+					}
 				}
+				return parseErr
 			}
 
 			return StreamProcessWith(func(p []byte) (int, error) {

+ 64 - 23
chunk/chunk_test.go

@@ -3,6 +3,7 @@ package chunk
 import (
 	"bytes"
 	"fmt"
+	"github.com/flashmob/go-guerrilla/mail/smtp"
 	"io"
 	"os"
 	"strings"
@@ -409,7 +410,11 @@ func TestHashBytes(t *testing.T) {
 }
 
 func TestTransformer(t *testing.T) {
-	store, chunksaver, mimeanalyzer, stream := initTestStream(true)
+	store, chunksaver, mimeanalyzer, stream, _, err := initTestStream(true, nil)
+	if err != nil {
+		t.Error(err)
+		return
+	}
 	buf := make([]byte, 64)
 	var result bytes.Buffer
 	if _, err := io.CopyBuffer(stream, bytes.NewBuffer([]byte(email3)), buf); err != nil {
@@ -441,7 +446,11 @@ func TestTransformer(t *testing.T) {
 }
 
 func TestChunkSaverReader(t *testing.T) {
-	store, chunksaver, mimeanalyzer, stream := initTestStream(false)
+	store, chunksaver, mimeanalyzer, stream, _, err := initTestStream(false, nil)
+	if err != nil {
+		t.Error(err)
+		return
+	}
 	buf := make([]byte, 64)
 	var result bytes.Buffer
 	if _, err := io.CopyBuffer(stream, bytes.NewBuffer([]byte(email3)), buf); err != nil {
@@ -530,7 +539,12 @@ func TestChunkSaverReader(t *testing.T) {
 
 func TestChunkSaverWrite(t *testing.T) {
 
-	store, chunksaver, mimeanalyzer, stream := initTestStream(true)
+	store, chunksaver, mimeanalyzer, stream, _, err := initTestStream(true, nil)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	storeMemory := store.(*StoreMemory)
 	var out bytes.Buffer
 	buf := make([]byte, 128)
 	if written, err := io.CopyBuffer(stream, bytes.NewBuffer([]byte(email)), buf); err != nil {
@@ -540,7 +554,7 @@ func TestChunkSaverWrite(t *testing.T) {
 		_ = chunksaver.Close()
 		fmt.Println("written:", written)
 		total := 0
-		for _, chunk := range store.chunks {
+		for _, chunk := range storeMemory.chunks {
 			total += len(chunk.data)
 		}
 		fmt.Println("compressed", total, "saved:", written-int64(total))
@@ -599,21 +613,41 @@ func TestChunkSaverWrite(t *testing.T) {
 			t.Error(err)
 			t.FailNow()
 		}
-		//var decoded bytes.Buffer
-		//io.Copy(&decoded, dr)
+
 		io.Copy(os.Stdout, dr)
 
 	}
 }
 
-func initTestStream(transform bool) (*StoreMemory, *backends.StreamDecorator, *backends.StreamDecorator, backends.StreamProcessor) {
+func initTestStream(transform bool, chunkSaverConfig *backends.ConfigGroup) (
+	Storage, *backends.StreamDecorator, *backends.StreamDecorator, backends.StreamProcessor, *mail.Envelope, error) {
 	// place the parse result in an envelope
 	e := mail.NewEnvelope("127.0.0.1", 1, 234)
 	to, _ := mail.NewAddress("[email protected]")
 	e.RcptTo = append(e.RcptTo, *to)
 	from, _ := mail.NewAddress("[email protected]")
+	e.Helo = "some.distant-server.org"
+	e.ESMTP = true
+	e.TLS = true
+	e.TransportType = smtp.TransportType8bit
 	e.MailFrom = *from
-	store := new(StoreMemory)
+	//e.RemoteIP = "127.0.0.1"
+	e.RemoteIP = "2001:0db8:85a3:0000:0000:8a2e:0370:7334"
+
+	if chunkSaverConfig == nil {
+		chunkSaverConfig = &backends.ConfigGroup{
+			"chunk_size":     8000,
+			"storage_engine": "memory",
+			"compress_level": 9,
+		}
+	}
+	var store Storage
+	if (*chunkSaverConfig)["storage_engine"] == "sql" {
+		store = new(StoreSQL)
+	} else {
+		store = new(StoreMemory)
+	}
+
 	chunkBuffer := NewChunkedBytesBufferMime()
 	//chunkBuffer.setDatabase(store)
 	// instantiate the chunk saver
@@ -630,36 +664,43 @@ func initTestStream(transform bool) (*StoreMemory, *backends.StreamDecorator, *b
 	if transform {
 		stream = mimeanalyzer.Decorate(
 			transformer.Decorate(
-				//debug.Decorate(
+				// here we inject the store and chunkBuffer
 				chunksaver.Decorate(
 					backends.DefaultStreamProcessor{}, store, chunkBuffer)))
 	} else {
 		stream = mimeanalyzer.Decorate(
-			//debug.Decorate(
+			// inject the srore and chunkBuffer
 			chunksaver.Decorate(
-				backends.DefaultStreamProcessor{}, store, chunkBuffer)) //)
+				backends.DefaultStreamProcessor{}, store, chunkBuffer))
 	}
 
 	// configure the buffer cap
 	bc := backends.BackendConfig{
 		backends.ConfigStreamProcessors: {
-			"chunksaver": {
-				"chunk_size":     8000,
-				"storage_engine": "memory",
-				"compress_level": 9,
-			},
+			"chunksaver": *chunkSaverConfig,
 		},
 	}
 
-	//_ = backends.Svc.Initialize(bc)
-	_ = chunksaver.Configure(bc[backends.ConfigStreamProcessors]["chunksaver"])
-	_ = mimeanalyzer.Configure(backends.ConfigGroup{})
+	if err := chunksaver.Configure(bc[backends.ConfigStreamProcessors]["chunksaver"]); err != nil {
+		return nil, nil, nil, nil, nil, err
+
+	}
+	if err := mimeanalyzer.Configure(backends.ConfigGroup{}); err != nil {
+		return nil, nil, nil, nil, nil, err
+	}
 	// give it the envelope with the parse results
-	_ = chunksaver.Open(e)
-	_ = mimeanalyzer.Open(e)
+	if err := chunksaver.Open(e); err != nil {
+		return nil, nil, nil, nil, nil, err
+	}
+	if err := mimeanalyzer.Open(e); err != nil {
+		return nil, nil, nil, nil, nil, err
+	}
+
 	if transform {
-		_ = transformer.Open(e)
+		if err := transformer.Open(e); err != nil {
+			return nil, nil, nil, nil, nil, err
+		}
 	}
 
-	return store, chunksaver, mimeanalyzer, stream
+	return store, chunksaver, mimeanalyzer, stream, e, nil
 }

+ 3 - 3
chunk/processor.go

@@ -147,11 +147,12 @@ func Chunksaver() *backends.StreamDecorator {
 				// create a new entry & grab the id
 				written = 0
 				progress = 0
-				var ip net.IPAddr
+				var ip IPAddr
 				if ret := net.ParseIP(e.RemoteIP); ret != nil {
-					ip = net.IPAddr{IP: ret}
+					ip = IPAddr{net.IPAddr{IP: ret}}
 				}
 				mid, err := database.OpenMessage(
+					e.QueuedId,
 					e.MailFrom.String(),
 					e.Helo,
 					e.RcptTo[0].String(),
@@ -181,7 +182,6 @@ func Chunksaver() *backends.StreamDecorator {
 						written,
 						&chunkBuffer.Info,
 						subject,
-						envelope.QueuedId,
 						to,
 						from,
 					)

+ 8 - 4
chunk/store.go

@@ -17,10 +17,11 @@ func init() {
 type Storage interface {
 	// OpenMessage is used to begin saving an email. An email id is returned and used to call CloseMessage later
 	OpenMessage(
+		queuedID mail.Hash128,
 		from string,
 		helo string,
 		recipient string,
-		ipAddress net.IPAddr,
+		ipAddress IPAddr,
 		returnPath string,
 		protocol mail.Protocol,
 		transport smtp.TransportType) (mailID uint64, err error)
@@ -30,7 +31,6 @@ type Storage interface {
 		size int64,
 		partsInfo *PartsInfo,
 		subject string,
-		queuedID string,
 		to string,
 		from string) error
 	// AddChunk saves a chunk of bytes to a given hash key
@@ -64,8 +64,8 @@ type Email struct {
 	subject    string // subject stores the value from the first "Subject" header field
 	queuedID   string
 	recipient  string             // recipient is the email address that the server received from the RCPT TO command
-	ipv4       net.IPAddr         // set to a value if client connected via ipv4
-	ipv6       net.IPAddr         // set to a value if client connected via ipv6
+	ipv4       IPAddr             // set to a value if client connected via ipv4
+	ipv6       IPAddr             // set to a value if client connected via ipv6
 	returnPath string             // returnPath is the email address that the server received from the MAIL FROM command
 	protocol   mail.Protocol      // protocol such as SMTP, ESTMP, ESMTPS
 	transport  smtp.TransportType // transport what type of transport the message uses, eg 8bitmime
@@ -76,3 +76,7 @@ type Chunk struct {
 	referenceCount uint // referenceCount counts how many emails reference this chunk
 	data           io.Reader
 }
+
+type IPAddr struct {
+	net.IPAddr
+}

+ 9 - 9
chunk/store_memory.go

@@ -21,7 +21,7 @@ type storeMemoryConfig struct {
 	CompressLevel int `json:"compress_level,omitempty"`
 }
 
-// A StoreMemory stores emails and chunked data in mememory
+// A StoreMemory stores emails and chunked data in memory
 type StoreMemory struct {
 	chunks        map[HashKey]*memoryChunk
 	emails        []*memoryEmail
@@ -42,8 +42,8 @@ type memoryEmail struct {
 	subject    string
 	queuedID   string
 	recipient  string
-	ipv4       net.IPAddr
-	ipv6       net.IPAddr
+	ipv4       IPAddr
+	ipv6       IPAddr
 	returnPath string
 	transport  smtp.TransportType
 	protocol   mail.Protocol
@@ -57,21 +57,23 @@ type memoryChunk struct {
 
 // OpenMessage implements the Storage interface
 func (m *StoreMemory) OpenMessage(
+	queuedID mail.Hash128,
 	from string,
 	helo string,
 	recipient string,
-	ipAddress net.IPAddr,
+	ipAddress IPAddr,
 	returnPath string,
 	protocol mail.Protocol,
 	transport smtp.TransportType,
 ) (mailID uint64, err error) {
-	var ip4, ip6 net.IPAddr
+	var ip4, ip6 IPAddr
 	if ip := ipAddress.IP.To4(); ip != nil {
-		ip4 = ipAddress
+		ip4 = IPAddr{net.IPAddr{IP: ip}}
 	} else {
-		ip6 = ipAddress
+		ip6 = IPAddr{net.IPAddr{IP: ip}}
 	}
 	email := memoryEmail{
+		queuedID:   queuedID.String(),
 		mailID:     m.nextID,
 		createdAt:  time.Now(),
 		from:       from,
@@ -94,7 +96,6 @@ func (m *StoreMemory) CloseMessage(
 	size int64,
 	partsInfo *PartsInfo,
 	subject string,
-	queuedID string,
 	to string,
 	from string) error {
 	if email := m.emails[mailID-m.offset]; email == nil {
@@ -107,7 +108,6 @@ func (m *StoreMemory) CloseMessage(
 			email.partsInfo = info
 		}
 		email.subject = subject
-		email.queuedID = queuedID
 		email.to = to
 		email.from = from
 		email.size = size

+ 259 - 20
chunk/store_sql.go

@@ -4,12 +4,59 @@ import (
 	"database/sql"
 	"encoding/binary"
 	"encoding/json"
+	"errors"
+	"fmt"
 	"github.com/flashmob/go-guerrilla/backends"
 	"github.com/flashmob/go-guerrilla/mail"
 	"github.com/flashmob/go-guerrilla/mail/smtp"
+	"github.com/go-sql-driver/mysql"
 	"net"
+	"time"
 )
 
+/*
+
+SQL schema
+
+```
+
+create schema gmail collate utf8mb4_unicode_ci;
+
+CREATE TABLE `in_emails` (
+     `mail_id` bigint unsigned NOT NULL AUTO_INCREMENT,
+     `created_at` datetime NOT NULL,
+     `size` int unsigned NOT NULL,
+     `from` varbinary(255) NOT NULL,
+     `to` varbinary(255) NOT NULL,
+     `parts_info` text COLLATE utf8mb4_unicode_ci,
+     `helo` varchar(255) COLLATE latin1_swedish_ci NOT NULL,
+     `subject` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
+     `queued_id` binary(16) NOT NULL,
+     `recipient` varbinary(255) NOT NULL,
+     `ipv4_addr` int unsigned DEFAULT NULL,
+     `ipv6_addr` varbinary(16) DEFAULT NULL,
+     `return_path` varbinary(255) NOT NULL,
+     `protocol` set('SMTP','SMTPS','ESMTP','ESMTPS','LMTP','LMTPS') COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'SMTP',
+     `transport` set('7bit','8bit','unknown','invalid') COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'unknown',
+     PRIMARY KEY (`mail_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
+
+CREATE TABLE `in_emails_chunks` (
+    `modified_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
+    `reference_count` int unsigned DEFAULT '1',
+    `data` mediumblob NOT NULL,
+    `hash` varbinary(16) NOT NULL,
+    UNIQUE KEY `in_emails_chunks_hash_uindex` (`hash`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
+
+
+```
+
+ipv6_addr is big endian
+
+TODO compression, configurable SQL strings, logger
+
+*/
 func init() {
 	StorageEngines["sql"] = func() Storage {
 		return new(StoreSQL)
@@ -17,12 +64,28 @@ func init() {
 }
 
 type sqlConfig struct {
-	EmailTable    string `json:"email_table,omitempty"`
-	ChunkTable    string `json:"chunk_table,omitempty"`
-	Driver        string `json:"sql_driver,omitempty"`
-	DSN           string `json:"sql_dsn,omitempty"`
-	PrimaryHost   string `json:"primary_mail_host,omitempty"`
-	CompressLevel int    `json:"compress_level,omitempty"`
+
+	// EmailTable is the name of the main database table for the headers
+	EmailTable string `json:"email_table,omitempty"`
+	// EmailChunkTable stores the data of the emails in de-duplicated chunks
+	EmailChunkTable string `json:"email_table_chunks,omitempty"`
+
+	// Connection settings
+	// Driver to use, eg "mysql"
+	Driver string `json:"sql_driver,omitempty"`
+	// DSN (required) is the connection string, eg.
+	// "user:passt@tcp(127.0.0.1:3306)/db_name?readTimeout=10s&writeTimeout=10s&charset=utf8mb4&collation=utf8mb4_unicode_ci"
+	DSN string `json:"sql_dsn,omitempty"`
+	// MaxConnLifetime (optional) is a duration, eg. "30s"
+	MaxConnLifetime string `json:"sql_max_conn_lifetime,omitempty"`
+	// MaxOpenConns (optional) specifies the number of maximum open connections
+	MaxOpenConns int `json:"sql_max_open_conns,omitempty"`
+	// MaxIdleConns
+	MaxIdleConns int `json:"sql_max_idle_conns,omitempty"`
+
+	// CompressLevel controls the gzip compression level of email chunks.
+	// 0 = no compression, 1 == best speed, 9 == best compression, -1 == default, -2 == huffman only
+	CompressLevel int `json:"compress_level,omitempty"`
 }
 
 // StoreSQL implements the Storage interface
@@ -32,12 +95,51 @@ type StoreSQL struct {
 	db         *sql.DB
 }
 
+func (s *StoreSQL) StartWorker() (stop chan bool) {
+
+	timeo := time.Second * 1
+	stop = make(chan bool)
+	go func() {
+		select {
+
+		case <-stop:
+			return
+
+		case <-time.After(timeo):
+			t1 := int64(time.Now().UnixNano())
+			// do stuff here
+
+			if (time.Now().UnixNano())-t1 > int64(time.Second*3) {
+
+			}
+
+		}
+	}()
+	return stop
+
+}
+
 func (s *StoreSQL) connect() (*sql.DB, error) {
 	var err error
 	if s.db, err = sql.Open(s.config.Driver, s.config.DSN); err != nil {
 		backends.Log().Error("cannot open database: ", err)
 		return nil, err
 	}
+	if s.config.MaxOpenConns != 0 {
+		s.db.SetMaxOpenConns(s.config.MaxOpenConns)
+	}
+	if s.config.MaxIdleConns != 0 {
+		s.db.SetMaxIdleConns(s.config.MaxIdleConns)
+	}
+	if s.config.MaxConnLifetime != "" {
+		t, err := time.ParseDuration(s.config.MaxConnLifetime)
+		if err != nil {
+			return nil, err
+		}
+		s.db.SetConnMaxLifetime(t)
+	}
+	stats := s.db.Stats()
+	fmt.Println(stats)
 	// do we have permission to access the table?
 	_, err = s.db.Query("SELECT mail_id FROM " + s.config.EmailTable + " LIMIT 1")
 	if err != nil {
@@ -54,8 +156,8 @@ func (s *StoreSQL) prepareSql() error {
 	// begin inserting an email (before saving chunks)
 	if stmt, err := s.db.Prepare(`INSERT INTO ` +
 		s.config.EmailTable +
-		` (from, helo, recipient, ipv4_addr, ipv6_addr, return_path, transport, protocol) 
- VALUES(?, ?, ?, ?, ?, ?, ?, ?)`); err != nil {
+		` (queued_id, created_at, ` + "`from`" + `, helo, recipient, ipv4_addr, ipv6_addr, return_path, transport, protocol)
+ VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`); err != nil {
 		return err
 	} else {
 		s.statements["insertEmail"] = stmt
@@ -63,7 +165,7 @@ func (s *StoreSQL) prepareSql() error {
 
 	// insert a chunk of email's data
 	if stmt, err := s.db.Prepare(`INSERT INTO ` +
-		s.config.ChunkTable +
+		s.config.EmailChunkTable +
 		` (data, hash) 
  VALUES(?, ?)`); err != nil {
 		return err
@@ -74,7 +176,7 @@ func (s *StoreSQL) prepareSql() error {
 	// finalize the email (the connection closed)
 	if stmt, err := s.db.Prepare(`
 		UPDATE ` + s.config.EmailTable + ` 
-			SET size=?, parts_info = ?, subject, queued_id = ?, to = ? 
+			SET size=?, parts_info=?, subject=?, ` + "`to`" + `=?, ` + "`from`" + `=?
 		WHERE mail_id = ? `); err != nil {
 		return err
 	} else {
@@ -85,7 +187,7 @@ func (s *StoreSQL) prepareSql() error {
 	// This means we can avoid re-inserting an existing chunk, only update its reference_count
 	// check the "affected rows" count after executing query
 	if stmt, err := s.db.Prepare(`
-		UPDATE ` + s.config.ChunkTable + ` 
+		UPDATE ` + s.config.EmailChunkTable + ` 
 			SET reference_count=reference_count+1 
 		WHERE hash = ? `); err != nil {
 		return err
@@ -96,7 +198,7 @@ func (s *StoreSQL) prepareSql() error {
 	// If the reference_count is 0 then it means the chunk has been deleted
 	// Chunks are soft-deleted for now, hard-deleted by another sweeper query as they become stale.
 	if stmt, err := s.db.Prepare(`
-		UPDATE ` + s.config.ChunkTable + ` 
+		UPDATE ` + s.config.EmailChunkTable + ` 
 			SET reference_count=reference_count-1 
 		WHERE hash = ? AND reference_count > 0`); err != nil {
 		return err
@@ -117,7 +219,7 @@ func (s *StoreSQL) prepareSql() error {
 	// fetch a chunk
 	if stmt, err := s.db.Prepare(`
 		SELECT * 
-		from ` + s.config.ChunkTable + ` 
+		from ` + s.config.EmailChunkTable + ` 
 		where hash=?`); err != nil {
 		return err
 	} else {
@@ -131,12 +233,15 @@ func (s *StoreSQL) prepareSql() error {
 	return nil
 }
 
+const mysqlYYYY_m_d_s_H_i_s = "2006-01-02 15:04:05"
+
 // OpenMessage implements the Storage interface
 func (s *StoreSQL) OpenMessage(
+	queuedID mail.Hash128,
 	from string,
 	helo string,
 	recipient string,
-	ipAddress net.IPAddr,
+	ipAddress IPAddr,
 	returnPath string,
 	protocol mail.Protocol,
 	transport smtp.TransportType,
@@ -148,9 +253,19 @@ func (s *StoreSQL) OpenMessage(
 	if ip := ipAddress.IP.To4(); ip != nil {
 		ip4 = binary.BigEndian.Uint32(ip)
 	} else {
-		_ = copy(ip6, ipAddress.IP)
+		copy(ip6, ipAddress.IP)
 	}
-	r, err := s.statements["insertEmail"].Exec(from, helo, recipient, ip4, ip6, returnPath, transport, protocol)
+	r, err := s.statements["insertEmail"].Exec(
+		queuedID.Bytes(),
+		time.Now().Format(mysqlYYYY_m_d_s_H_i_s),
+		from,
+		helo,
+		recipient,
+		ip4,
+		ip6,
+		returnPath,
+		transport.String(),
+		protocol.String())
 	if err != nil {
 		return 0, err
 	}
@@ -188,13 +303,12 @@ func (s *StoreSQL) CloseMessage(
 	size int64,
 	partsInfo *PartsInfo,
 	subject string,
-	queuedID string,
 	to string, from string) error {
 	partsInfoJson, err := json.Marshal(partsInfo)
 	if err != nil {
 		return err
 	}
-	_, err = s.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, queuedID, to, mailID)
+	_, err = s.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, to, from, mailID)
 	if err != nil {
 		return err
 	}
@@ -208,6 +322,16 @@ func (s *StoreSQL) Initialize(cfg backends.ConfigGroup) error {
 	if err != nil {
 		return err
 	}
+	if s.config.EmailTable == "" {
+		s.config.EmailTable = "in_emails"
+	}
+	if s.config.EmailChunkTable == "" {
+		s.config.EmailChunkTable = "in_emails_chunks"
+	}
+	if s.config.Driver == "" {
+		s.config.Driver = "mysql"
+	}
+
 	s.db, err = s.connect()
 	if err != nil {
 		return err
@@ -238,11 +362,126 @@ func (s *StoreSQL) Shutdown() (err error) {
 
 // GetEmail implements the Storage interface
 func (s *StoreSQL) GetEmail(mailID uint64) (*Email, error) {
-	return &Email{}, nil
+
+	email := &Email{}
+	var createdAt mysql.NullTime
+	var transport transportType
+	var protocol protocol
+	err := s.statements["selectMail"].QueryRow(mailID).Scan(
+		&email.mailID,
+		&createdAt,
+		&email.size,
+		&email.from,
+		&email.to,
+		&email.partsInfo,
+		&email.helo,
+		&email.subject,
+		&email.queuedID,
+		&email.recipient,
+		&email.ipv4,
+		&email.ipv6,
+		&email.returnPath,
+		&protocol,
+		&transport,
+	)
+	email.createdAt = createdAt.Time
+	email.protocol = protocol.Protocol
+	email.transport = transport.TransportType
+	if err != nil {
+		return email, err
+	}
+	return email, nil
 }
 
-// GetChunk implements the Storage interface
+// GetChunks implements the Storage interface
 func (s *StoreSQL) GetChunks(hash ...HashKey) ([]*Chunk, error) {
 	result := make([]*Chunk, 0, len(hash))
 	return result, nil
 }
+
+// zap is used in testing, purges everything
+func (s *StoreSQL) zap() error {
+	if r, err := s.db.Exec("DELETE from " + s.config.EmailTable + " "); err != nil {
+		return err
+	} else {
+		affected, _ := r.RowsAffected()
+		fmt.Println(fmt.Sprintf("deleted %v emails", affected))
+	}
+
+	if r, err := s.db.Exec("DELETE from " + s.config.EmailChunkTable + " "); err != nil {
+		return err
+	} else {
+		affected, _ := r.RowsAffected()
+		fmt.Println(fmt.Sprintf("deleted %v chunks", affected))
+	}
+
+	return nil
+
+}
+
+// Scan implements database/sql scanner interface, for parsing PartsInfo
+func (info *PartsInfo) Scan(value interface{}) error {
+	if value == nil {
+		return errors.New("parts_info is null")
+	}
+	if data, ok := value.([]byte); !ok {
+		return errors.New("parts_info is not str")
+	} else {
+		if err := json.Unmarshal(data, info); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// /Scan implements database/sql scanner interface, for parsing net.IPAddr
+func (ip *IPAddr) Scan(value interface{}) error {
+	if value == nil {
+		ip = nil
+		return nil
+	}
+	if data, ok := value.([]uint8); ok {
+		if len(data) == 16 { // 128 bits
+			// ipv6
+			ipv6 := make(net.IP, 16)
+			copy(ipv6, data)
+			ip.IPAddr.IP = ipv6
+		}
+	}
+	if data, ok := value.(int64); ok {
+		// ipv4
+		ipv4 := make(net.IP, 4)
+		binary.BigEndian.PutUint32(ipv4, uint32(data))
+		ip.IPAddr.IP = ipv4
+	}
+
+	return nil
+}
+
+type transportType struct {
+	smtp.TransportType
+}
+
+type protocol struct {
+	mail.Protocol
+}
+
+// todo scanners for protocol & transport
+
+// Scan implements database/sql scanner interface, for parsing smtp.TransportType
+func (t *transportType) Scan(value interface{}) error {
+	if data, ok := value.([]uint8); ok {
+		v := smtp.ParseTransportType(string(data))
+		t.TransportType = v
+	}
+	return nil
+}
+
+// Scan implements database/sql scanner interface, for parsing mail.Protocol
+func (p *protocol) Scan(value interface{}) error {
+	if data, ok := value.([]uint8); ok {
+		v := mail.ParseProtocolType(string(data))
+		p.Protocol = v
+	}
+	return nil
+}

+ 140 - 0
chunk/store_sql_test.go

@@ -0,0 +1,140 @@
+package chunk
+
+import (
+	"bytes"
+	"flag"
+	"fmt"
+	"github.com/flashmob/go-guerrilla/mail"
+	"github.com/flashmob/go-guerrilla/mail/smtp"
+	"io"
+	"os"
+	"strings"
+	"testing"
+
+	"github.com/flashmob/go-guerrilla/backends"
+	"github.com/flashmob/go-guerrilla/chunk/transfer"
+	_ "github.com/go-sql-driver/mysql" // activate the mysql driver
+)
+
+// This test requires that you pass the -sql-dsn flag,
+// eg: go test -run ^TestSQLStore$ -sql-dsn 'user:pass@tcp(127.0.0.1:3306)/dbname?readTimeout=10s&writeTimeout=10s'
+
+var (
+	mailTableFlag  = flag.String("mail-table", "in_emails", "Table to use for testing the SQL backend")
+	chunkTableFlag = flag.String("mail-chunk-table", "in_emails_chunks", "Table to use for testing the chunking SQL backend")
+	sqlDSNFlag     = flag.String("sql-dsn", "", "DSN to use for testing the SQL backend")
+	sqlDriverFlag  = flag.String("sql-driver", "mysql", "Driver to use for testing the SQL backend")
+)
+
+func TestSQLStore(t *testing.T) {
+
+	if *sqlDSNFlag == "" {
+		t.Skip("requires -sql-dsn to run")
+	}
+
+	cfg := &backends.ConfigGroup{
+		"chunk_size":         150,
+		"storage_engine":     "sql",
+		"compress_level":     9,
+		"sql_driver":         *sqlDriverFlag,
+		"sql_dsn":            *sqlDSNFlag,
+		"email_table":        *mailTableFlag,
+		"email_table_chunks": *chunkTableFlag,
+	}
+
+	store, chunksaver, mimeanalyzer, stream, e, err := initTestStream(false, cfg)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	storeSql := store.(*StoreSQL)
+	defer func() {
+		storeSql.zap() // purge everything from db before exiting the test
+	}()
+	var out bytes.Buffer
+	buf := make([]byte, 128)
+	if written, err := io.CopyBuffer(stream, bytes.NewBuffer([]byte(email)), buf); err != nil {
+		t.Error(err)
+	} else {
+		_ = mimeanalyzer.Close()
+		_ = chunksaver.Close()
+
+		fmt.Println("written:", written)
+		/*
+			total := 0
+			for _, chunk := range storeMemory.chunks {
+				total += len(chunk.data)
+			}
+			fmt.Println("compressed", total, "saved:", written-int64(total))
+		*/
+
+		email, err := storeSql.GetEmail(e.MessageID)
+
+		if err != nil {
+			t.Error("email not found")
+			return
+		}
+
+		// check email
+		if email.transport != smtp.TransportType8bit {
+			t.Error("email.transport not ", smtp.TransportType8bit.String())
+		}
+		if email.protocol != mail.ProtocolESMTPS {
+			t.Error("email.protocol not ", mail.ProtocolESMTPS)
+		}
+
+		// this should read all parts
+		r, err := NewChunkedReader(storeSql, email, 0)
+		if w, err := io.Copy(&out, r); err != nil {
+			t.Error(err)
+		} else if w != email.size {
+			t.Error("email.size != number of bytes copied from reader", w, email.size)
+		} else if !strings.Contains(out.String(), "GIF89") {
+			t.Error("The email didn't decode properly, expecting GIF89")
+		}
+		out.Reset()
+
+		// test the seek feature
+		r, err = NewChunkedReader(storeSql, email, 0)
+		if err != nil {
+			t.Error(err)
+			t.FailNow()
+		}
+		// we start from 1 because if the start from 0, all the parts will be read
+		for i := 1; i < len(email.partsInfo.Parts); i++ {
+			fmt.Println("seeking to", i)
+			err = r.SeekPart(i)
+			if err != nil {
+				t.Error(err)
+			}
+			w, err := io.Copy(&out, r)
+			if err != nil {
+				t.Error(err)
+			}
+			if w != int64(email.partsInfo.Parts[i-1].Size) {
+				t.Error(i, "incorrect size, expecting", email.partsInfo.Parts[i-1].Size, "but read:", w)
+			}
+			out.Reset()
+		}
+
+		r, err = NewChunkedReader(storeSql, email, 0)
+		if err != nil {
+			t.Error(err)
+		}
+		part := email.partsInfo.Parts[0]
+		encoding := transfer.QuotedPrintable
+		if strings.Contains(part.TransferEncoding, "base") {
+			encoding = transfer.Base64
+		}
+		dr, err := transfer.NewDecoder(r, encoding, part.Charset)
+		_ = dr
+		if err != nil {
+			t.Error(err)
+			t.FailNow()
+		}
+		//var decoded bytes.Buffer
+		//io.Copy(&decoded, dr)
+		io.Copy(os.Stdout, dr)
+
+	}
+}

+ 42 - 4
mail/envelope.go

@@ -3,6 +3,7 @@ package mail
 import (
 	"bytes"
 	"encoding/binary"
+	"encoding/hex"
 	"errors"
 	"fmt"
 	"hash"
@@ -122,6 +123,23 @@ func NewAddress(str string) (*Address, error) {
 	return a, nil
 }
 
+type Hash128 [16]byte
+
+func (h Hash128) String() string {
+	return fmt.Sprintf("%x", h[:])
+}
+
+// FromHex converts the, string must be 32 bytes
+func (h *Hash128) FromHex(s string) {
+	if len(s) != 32 {
+		panic("hex string must be 32 bytes")
+	}
+	_, _ = hex.Decode(h[:], []byte(s))
+}
+
+// Bytes returns the raw bytes
+func (h Hash128) Bytes() []byte { return h[:] }
+
 // Envelope of Email represents a single SMTP message.
 type Envelope struct {
 	// Data stores the header and message body (when using the non-streaming processor)
@@ -153,7 +171,7 @@ type Envelope struct {
 	// TLS is true if the email was received using a TLS connection
 	TLS bool
 	// Email(s) will be queued with this id
-	QueuedId string
+	QueuedId Hash128
 	// TransportType indicates whenever 8BITMIME extension has been signaled
 	TransportType smtp.TransportType
 	// ESMTP: true if EHLO was used
@@ -182,18 +200,20 @@ func NewEnvelope(remoteAddr string, clientID uint64, serverID int) *Envelope {
 	}
 }
 
-func queuedID(clientID uint64, serverID int) string {
+func queuedID(clientID uint64, serverID int) Hash128 {
 	hasher.Lock()
 	defer func() {
 		hasher.h.Reset()
 		hasher.Unlock()
 	}()
-	// pack the seeds and hash them
+	h := Hash128{}
+	// pack the seeds and hash'em
 	binary.BigEndian.PutUint64(hasher.n[0:8], uint64(time.Now().UnixNano()))
 	binary.BigEndian.PutUint64(hasher.n[8:16], clientID)
 	binary.BigEndian.PutUint64(hasher.n[16:24], uint64(serverID))
 	hasher.h.Write(hasher.n[:])
-	return fmt.Sprintf("%x", hasher.h.Sum([]byte{}))
+	copy(h[:], hasher.h.Sum([]byte{}))
+	return h
 }
 
 // ParseHeaders parses the headers into Header field of the Envelope struct.
@@ -300,6 +320,7 @@ const (
 	ProtocolESMTP
 	ProtocolESMTPS
 	ProtocolLTPS
+	ProtocolUnknown
 )
 
 func (p Protocol) String() string {
@@ -318,6 +339,23 @@ func (p Protocol) String() string {
 	return "unknown"
 }
 
+func ParseProtocolType(str string) Protocol {
+	switch {
+	case str == "SMTP":
+		return ProtocolSMTP
+	case str == "SMTPS":
+		return ProtocolSMTPS
+	case str == "ESMTP":
+		return ProtocolESMTP
+	case str == "ESMTPS":
+		return ProtocolESMTPS
+	case str == "LTPS":
+		return ProtocolLTPS
+	}
+
+	return ProtocolUnknown
+}
+
 const (
 	statePlainText = iota
 	stateStartEncodedWord

+ 17 - 5
mail/envelope_test.go

@@ -99,7 +99,7 @@ func TestAddressWithIP(t *testing.T) {
 func TestEnvelope(t *testing.T) {
 	e := NewEnvelope("127.0.0.1", 22, 0)
 
-	e.QueuedId = "abc123"
+	e.QueuedId = queuedID(2, 33)
 	e.Helo = "helo.example.com"
 	e.MailFrom = Address{User: "test", Host: "example.com"}
 	e.TLS = true
@@ -162,12 +162,24 @@ func TestEncodedWordAhead(t *testing.T) {
 }
 
 func TestQueuedID(t *testing.T) {
-	str := queuedID(5550000000, 1)
+	h := queuedID(5550000000, 1)
+
+	if len(h) != 16 { // silly comparison, but there in case of refactoring
+		t.Error("queuedID needs to be 16 bytes in length")
+	}
+
+	str := h.String()
 	if len(str) != 32 {
-		t.Error("queuedID needs to be 32 bytes in length")
+		t.Error("queuedID string should be 32 bytes in length")
 	}
-	str2 := queuedID(5550000000, 1)
-	if str == str2 {
+
+	h2 := queuedID(5550000000, 1)
+	if bytes.Equal(h[:], h2[:]) {
 		t.Error("hashes should not be equal")
 	}
+
+	h2.FromHex("5a4a2f08784334de5148161943111ad3")
+	if h2.String() != "5a4a2f08784334de5148161943111ad3" {
+		t.Error("hex conversion didnt work")
+	}
 }

+ 14 - 0
mail/smtp/parse.go

@@ -48,6 +48,20 @@ func (t TransportType) String() string {
 	return "invalid"
 }
 
+func ParseTransportType(str string) TransportType {
+	switch {
+	case str == "7bit":
+		return TransportType7bit
+	case str == "8bit":
+		return TransportType8bit
+	case str == "unknown":
+		return TransportTypeUnspecified
+	case str == "invalid":
+		return TransportTypeInvalid
+	}
+	return TransportTypeInvalid
+}
+
 // is8BitMime checks for the BODY parameter as
 func (p PathParam) Transport() TransportType {
 	if len(p) != 2 {