store_memory.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package chunk
  2. import (
  3. "bytes"
  4. "compress/zlib"
  5. "errors"
  6. "github.com/flashmob/go-guerrilla/backends"
  7. "github.com/flashmob/go-guerrilla/mail"
  8. "github.com/flashmob/go-guerrilla/mail/smtp"
  9. "net"
  10. "time"
  11. )
  12. func init() {
  13. StorageEngines["memory"] = func() Storage {
  14. return new(StoreMemory)
  15. }
  16. }
  17. type storeMemoryConfig struct {
  18. CompressLevel int `json:"compress_level,omitempty"`
  19. }
  20. // A StoreMemory stores emails and chunked data in memory
  21. type StoreMemory struct {
  22. chunks map[HashKey]*memoryChunk
  23. emails []*memoryEmail
  24. nextID uint64
  25. offset uint64
  26. config storeMemoryConfig
  27. }
  28. type memoryEmail struct {
  29. mailID uint64
  30. createdAt time.Time
  31. size int64
  32. from string
  33. to string
  34. partsInfo []byte
  35. helo string
  36. subject string
  37. queuedID string
  38. recipient string
  39. ipv4 IPAddr
  40. ipv6 IPAddr
  41. returnPath string
  42. transport smtp.TransportType
  43. protocol mail.Protocol
  44. }
  45. type memoryChunk struct {
  46. modifiedAt time.Time
  47. referenceCount uint
  48. data []byte
  49. }
  50. // OpenMessage implements the Storage interface
  51. func (m *StoreMemory) OpenMessage(
  52. queuedID mail.Hash128,
  53. from string,
  54. helo string,
  55. recipient string,
  56. ipAddress IPAddr,
  57. returnPath string,
  58. protocol mail.Protocol,
  59. transport smtp.TransportType,
  60. ) (mailID uint64, err error) {
  61. var ip4, ip6 IPAddr
  62. if ip := ipAddress.IP.To4(); ip != nil {
  63. ip4 = IPAddr{net.IPAddr{IP: ip}}
  64. } else {
  65. ip6 = IPAddr{net.IPAddr{IP: ip}}
  66. }
  67. email := memoryEmail{
  68. queuedID: queuedID.String(),
  69. mailID: m.nextID,
  70. createdAt: time.Now(),
  71. from: from,
  72. helo: helo,
  73. recipient: recipient,
  74. ipv4: ip4,
  75. ipv6: ip6,
  76. returnPath: returnPath,
  77. transport: transport,
  78. protocol: protocol,
  79. }
  80. m.emails = append(m.emails, &email)
  81. m.nextID++
  82. return email.mailID, nil
  83. }
  84. // CloseMessage implements the Storage interface
  85. func (m *StoreMemory) CloseMessage(
  86. mailID uint64,
  87. size int64,
  88. partsInfo *PartsInfo,
  89. subject string,
  90. to string,
  91. from string) error {
  92. if email := m.emails[mailID-m.offset]; email == nil {
  93. return errors.New("email not found")
  94. } else {
  95. email.size = size
  96. if info, err := partsInfo.MarshalJSONZlib(); err != nil {
  97. return err
  98. } else {
  99. email.partsInfo = info
  100. }
  101. email.subject = subject
  102. email.to = to
  103. email.from = from
  104. email.size = size
  105. }
  106. return nil
  107. }
  108. // AddChunk implements the Storage interface
  109. func (m *StoreMemory) AddChunk(data []byte, hash []byte) error {
  110. var key HashKey
  111. if len(hash) != hashByteSize {
  112. return errors.New("invalid hash")
  113. }
  114. key.Pack(hash)
  115. var compressed bytes.Buffer
  116. zlibw, err := zlib.NewWriterLevel(&compressed, m.config.CompressLevel)
  117. if err != nil {
  118. return err
  119. }
  120. if chunk, ok := m.chunks[key]; ok {
  121. // only update the counters and update time
  122. chunk.referenceCount++
  123. chunk.modifiedAt = time.Now()
  124. } else {
  125. if _, err := zlibw.Write(data); err != nil {
  126. return err
  127. }
  128. if err := zlibw.Close(); err != nil {
  129. return err
  130. }
  131. // add a new chunk
  132. newChunk := memoryChunk{
  133. modifiedAt: time.Now(),
  134. referenceCount: 1,
  135. data: compressed.Bytes(),
  136. }
  137. m.chunks[key] = &newChunk
  138. }
  139. return nil
  140. }
  141. // Initialize implements the Storage interface
  142. func (m *StoreMemory) Initialize(cfg backends.ConfigGroup) error {
  143. sd := backends.StreamDecorator{}
  144. err := sd.ExtractConfig(cfg, &m.config)
  145. if err != nil {
  146. return err
  147. }
  148. m.offset = 1
  149. m.nextID = m.offset
  150. m.emails = make([]*memoryEmail, 0, 100)
  151. m.chunks = make(map[HashKey]*memoryChunk, 1000)
  152. if m.config.CompressLevel > 9 || m.config.CompressLevel < 0 {
  153. m.config.CompressLevel = zlib.BestCompression
  154. }
  155. return nil
  156. }
  157. // Shutdown implements the Storage interface
  158. func (m *StoreMemory) Shutdown() (err error) {
  159. m.emails = nil
  160. m.chunks = nil
  161. return nil
  162. }
  163. // GetEmail implements the Storage interface
  164. func (m *StoreMemory) GetMessage(mailID uint64) (*Email, error) {
  165. if count := len(m.emails); count == 0 {
  166. return nil, errors.New("storage is empty")
  167. } else if overflow := uint64(count) - m.offset; overflow > mailID-m.offset {
  168. return nil, errors.New("mail not found")
  169. }
  170. email := m.emails[mailID-m.offset]
  171. pi := NewPartsInfo()
  172. if err := pi.UnmarshalJSONZlib(email.partsInfo); err != nil {
  173. return nil, err
  174. }
  175. return &Email{
  176. mailID: email.mailID,
  177. createdAt: email.createdAt,
  178. size: email.size,
  179. from: email.from,
  180. to: email.to,
  181. partsInfo: *pi,
  182. helo: email.helo,
  183. subject: email.subject,
  184. queuedID: email.queuedID,
  185. recipient: email.recipient,
  186. ipv4: email.ipv4,
  187. ipv6: email.ipv6,
  188. returnPath: email.returnPath,
  189. transport: email.transport,
  190. protocol: email.protocol,
  191. }, nil
  192. }
  193. // GetChunk implements the Storage interface
  194. func (m *StoreMemory) GetChunks(hash ...HashKey) ([]*Chunk, error) {
  195. result := make([]*Chunk, 0, len(hash))
  196. var key HashKey
  197. for i := range hash {
  198. key = hash[i]
  199. if c, ok := m.chunks[key]; ok {
  200. zwr, err := zlib.NewReader(bytes.NewReader(c.data))
  201. if err != nil {
  202. return nil, err
  203. }
  204. result = append(result, &Chunk{
  205. modifiedAt: c.modifiedAt,
  206. referenceCount: c.referenceCount,
  207. data: zwr,
  208. })
  209. }
  210. }
  211. return result, nil
  212. }