123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- package chunk
- import (
- "database/sql"
- "encoding/binary"
- "encoding/json"
- "github.com/flashmob/go-guerrilla/backends"
- "github.com/flashmob/go-guerrilla/mail/smtp"
- "net"
- )
- type sqlConfig struct {
- EmailTable string `json:"chunksaver_email_table,omitempty"`
- ChunkTable string `json:"chunksaver_chunk_table,omitempty"`
- Driver string `json:"chunksaver_sql_driver,omitempty"`
- DSN string `json:"chunksaver_sql_dsn,omitempty"`
- PrimaryHost string `json:"chunksaver_primary_mail_host,omitempty"`
- }
- // StoreSQL implements the Storage interface
- type StoreSQL struct {
- config *sqlConfig
- statements map[string]*sql.Stmt
- db *sql.DB
- }
- func (s *StoreSQL) connect() (*sql.DB, error) {
- var err error
- if s.db, err = sql.Open(s.config.Driver, s.config.DSN); err != nil {
- backends.Log().Error("cannot open database: ", err)
- return nil, err
- }
- // do we have permission to access the table?
- _, err = s.db.Query("SELECT mail_id FROM " + s.config.EmailTable + " LIMIT 1")
- if err != nil {
- return nil, err
- }
- return s.db, err
- }
- func (s *StoreSQL) prepareSql() error {
- if s.statements == nil {
- s.statements = make(map[string]*sql.Stmt)
- }
- // begin inserting an email (before saving chunks)
- if stmt, err := s.db.Prepare(`INSERT INTO ` +
- s.config.EmailTable +
- ` (from, helo, recipient, ipv4_addr, ipv6_addr, return_path, is_tls, is_8bit)
- VALUES(?, ?, ?, ?, ?, ?, ?, ?)`); err != nil {
- return err
- } else {
- s.statements["insertEmail"] = stmt
- }
- // insert a chunk of email's data
- if stmt, err := s.db.Prepare(`INSERT INTO ` +
- s.config.ChunkTable +
- ` (data, hash)
- VALUES(?, ?)`); err != nil {
- return err
- } else {
- s.statements["insertChunk"] = stmt
- }
- // finalize the email (the connection closed)
- if stmt, err := s.db.Prepare(`
- UPDATE ` + s.config.EmailTable + `
- SET size=?, parts_info = ?, subject, delivery_id = ?, to = ?
- WHERE mail_id = ? `); err != nil {
- return err
- } else {
- s.statements["finalizeEmail"] = stmt
- }
- // Check the existence of a chunk (the reference_count col is incremented if it exists)
- // This means we can avoid re-inserting an existing chunk, only update its reference_count
- // check the "affected rows" count after executing query
- if stmt, err := s.db.Prepare(`
- UPDATE ` + s.config.ChunkTable + `
- SET reference_count=reference_count+1
- WHERE hash = ? `); err != nil {
- return err
- } else {
- s.statements["chunkReferenceIncr"] = stmt
- }
- // If the reference_count is 0 then it means the chunk has been deleted
- // Chunks are soft-deleted for now, hard-deleted by another sweeper query as they become stale.
- if stmt, err := s.db.Prepare(`
- UPDATE ` + s.config.ChunkTable + `
- SET reference_count=reference_count-1
- WHERE hash = ? AND reference_count > 0`); err != nil {
- return err
- } else {
- s.statements["chunkReferenceDecr"] = stmt
- }
- // fetch an email
- if stmt, err := s.db.Prepare(`
- SELECT *
- from ` + s.config.EmailTable + `
- where mail_id=?`); err != nil {
- return err
- } else {
- s.statements["selectMail"] = stmt
- }
- // fetch a chunk
- if stmt, err := s.db.Prepare(`
- SELECT *
- from ` + s.config.ChunkTable + `
- where hash=?`); err != nil {
- return err
- } else {
- s.statements["selectChunk"] = stmt
- }
- // TODO sweep old chunks
- // TODO sweep incomplete emails
- return nil
- }
- // OpenMessage implements the Storage interface
- func (s *StoreSQL) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool, transport smtp.TransportType) (mailID uint64, err error) {
- // if it's ipv4 then we want ipv6 to be 0, and vice-versa
- var ip4 uint32
- ip6 := make([]byte, 16)
- if ip := ipAddress.IP.To4(); ip != nil {
- ip4 = binary.BigEndian.Uint32(ip)
- } else {
- _ = copy(ip6, ipAddress.IP)
- }
- r, err := s.statements["insertEmail"].Exec(from, helo, recipient, ip4, ip6, returnPath, isTLS, transport)
- if err != nil {
- return 0, err
- }
- id, err := r.LastInsertId()
- if err != nil {
- return 0, err
- }
- return uint64(id), err
- }
- // AddChunk implements the Storage interface
- func (s *StoreSQL) AddChunk(data []byte, hash []byte) error {
- // attempt to increment the reference_count (it means the chunk is already in there)
- r, err := s.statements["chunkReferenceIncr"].Exec(hash)
- if err != nil {
- return err
- }
- affected, err := r.RowsAffected()
- if err != nil {
- return err
- }
- if affected == 0 {
- // chunk isn't in there, let's insert it
- _, err := s.statements["insertChunk"].Exec(data, hash)
- if err != nil {
- return err
- }
- }
- return nil
- }
- // CloseMessage implements the Storage interface
- func (s *StoreSQL) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
- partsInfoJson, err := json.Marshal(partsInfo)
- if err != nil {
- return err
- }
- _, err = s.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, deliveryID, to, mailID)
- if err != nil {
- return err
- }
- return nil
- }
- // Initialize loads the specific database config, connects to the db, prepares statements
- func (s *StoreSQL) Initialize(cfg backends.BackendConfig) error {
- configType := backends.BaseConfig(&sqlConfig{})
- bcfg, err := backends.Svc.ExtractConfig(backends.ConfigStreamProcessors, "chunksaver", cfg, configType)
- if err != nil {
- return err
- }
- s.config = bcfg.(*sqlConfig)
- s.db, err = s.connect()
- if err != nil {
- return err
- }
- err = s.prepareSql()
- if err != nil {
- return err
- }
- return nil
- }
- // Shutdown implements the Storage interface
- func (s *StoreSQL) Shutdown() (err error) {
- defer func() {
- closeErr := s.db.Close()
- if closeErr != err {
- backends.Log().WithError(err).Error("failed to close sql database")
- err = closeErr
- }
- }()
- for i := range s.statements {
- if err = s.statements[i].Close(); err != nil {
- backends.Log().WithError(err).Error("failed to close sql statement")
- }
- }
- return err
- }
- // GetEmail implements the Storage interface
- func (s *StoreSQL) GetEmail(mailID uint64) (*Email, error) {
- return &Email{}, nil
- }
- // GetChunk implements the Storage interface
- func (s *StoreSQL) GetChunks(hash ...HashKey) ([]*Chunk, error) {
- result := make([]*Chunk, 0, len(hash))
- return result, nil
- }
|