reader.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package chunk
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. )
  7. type chunkedReader struct {
  8. db Storage
  9. email *Email
  10. // part requests a part. If 0, all the parts are read sequentially
  11. part int
  12. // i is which part it's currently reading, j is which chunk of a part
  13. i, j int
  14. cache cachedChunks
  15. }
  16. // NewChunkedReader loads the email and selects which mime-part Read will read, starting from 1
  17. // if part is 0, Read will read in the entire message. 1 selects the first part, 2 2nd, and so on..
  18. func NewChunkedReader(db Storage, email *Email, part int) (*chunkedReader, error) {
  19. r := new(chunkedReader)
  20. r.db = db
  21. if email == nil {
  22. return nil, errors.New("nil email")
  23. } else {
  24. r.email = email
  25. }
  26. if err := r.SeekPart(part); err != nil {
  27. return nil, err
  28. }
  29. r.cache = cachedChunks{
  30. db: db,
  31. }
  32. return r, nil
  33. }
  34. // SeekPart resets the reader. The part argument chooses which part Read will read in
  35. // If part is 1, it will return the first part
  36. // If part is 0, Read will return the entire message
  37. func (r *chunkedReader) SeekPart(part int) error {
  38. if parts := len(r.email.partsInfo.Parts); parts == 0 {
  39. return errors.New("email has mime parts missing")
  40. } else if part > parts {
  41. return errors.New("no such part available")
  42. }
  43. r.part = part
  44. if part > 0 {
  45. r.i = part - 1
  46. }
  47. r.j = 0
  48. return nil
  49. }
  50. type cachedChunks struct {
  51. // chunks stores the cached chunks. It stores the latest chunk being read
  52. // and the next few chunks that are yet to be read
  53. // (see the chunkCachePreload constant)
  54. chunks []*Chunk
  55. // hashIndex is a look-up table that returns the hash of a given index
  56. hashIndex map[int]HashKey
  57. db Storage
  58. }
  59. // chunkCachePreload controls how many chunks to pre-load in the cache
  60. const chunkCachePreload = 2
  61. // warm allocates the chunk cache, and gets the first few and stores them in the cache
  62. func (c *cachedChunks) warm(hashes ...HashKey) (int, error) {
  63. if c.hashIndex == nil {
  64. c.hashIndex = make(map[int]HashKey, len(hashes))
  65. }
  66. if c.chunks == nil {
  67. c.chunks = make([]*Chunk, 0, 100)
  68. }
  69. if len(c.chunks) > 0 {
  70. // already been filled
  71. return len(c.chunks), nil
  72. }
  73. // let's pre-load some hashes.
  74. preload := chunkCachePreload
  75. if len(hashes) < preload {
  76. preload = len(hashes)
  77. }
  78. if chunks, err := c.db.GetChunks(hashes[0:preload]...); err != nil {
  79. return 0, err
  80. } else {
  81. for i := range hashes {
  82. c.hashIndex[i] = hashes[i]
  83. if i < preload {
  84. c.chunks = append(c.chunks, chunks[i])
  85. } else {
  86. // don't pre-load
  87. c.chunks = append(c.chunks, nil) // nil will be a placeholder for our chunk
  88. }
  89. }
  90. }
  91. return len(c.chunks), nil
  92. }
  93. // get returns a previously saved chunk and pre-loads the next few
  94. // also removes the previous chunks that now have become stale
  95. func (c *cachedChunks) get(i int) (*Chunk, error) {
  96. if i > len(c.chunks) {
  97. return nil, errors.New("not enough chunks")
  98. }
  99. defer func() {
  100. if len(c.chunks) > 15 {
  101. fmt.Println("moo")
  102. //fmt.Println("hash", hash[i].Hex(), "i", i)
  103. }
  104. }()
  105. if c.chunks[i] != nil {
  106. // cache hit!
  107. return c.chunks[i], nil
  108. } else {
  109. var toGet []HashKey
  110. if key, ok := c.hashIndex[i]; ok {
  111. toGet = append(toGet, key)
  112. } else {
  113. return nil, errors.New(fmt.Sprintf("hash for key [%s] not found", key))
  114. }
  115. // make a list of chunks to load (extra ones to be pre-loaded)
  116. for to := i + 1; to < len(c.chunks) && to < chunkCachePreload+i; to++ {
  117. if key, ok := c.hashIndex[to]; ok {
  118. toGet = append(toGet, key)
  119. }
  120. }
  121. if chunks, err := c.db.GetChunks(toGet...); err != nil {
  122. return nil, err
  123. } else {
  124. // cache the pre-loaded chunks
  125. for j := i; j-i < len(chunks); j++ {
  126. c.chunks[j] = chunks[j-i]
  127. c.hashIndex[j] = toGet[j-i]
  128. }
  129. // remove any old ones (walk back)
  130. if i-1 > -1 {
  131. for j := i - 1; j > -1; j-- {
  132. if c.chunks[j] != nil {
  133. // todo c.chunks[j] = nil
  134. } else {
  135. break
  136. }
  137. }
  138. }
  139. // return the chunk asked for
  140. return chunks[0], nil
  141. }
  142. }
  143. }
  144. func (c *cachedChunks) empty() {
  145. for i := range c.chunks {
  146. c.chunks[i] = nil
  147. }
  148. c.chunks = c.chunks[:0] // set len to 0
  149. for key := range c.hashIndex {
  150. delete(c.hashIndex, key)
  151. }
  152. }
  153. // Read implements the io.Reader interface
  154. func (r *chunkedReader) Read(p []byte) (n int, err error) {
  155. var length int
  156. for ; r.i < len(r.email.partsInfo.Parts); r.i++ {
  157. length, err = r.cache.warm(r.email.partsInfo.Parts[r.i].ChunkHash...)
  158. if err != nil {
  159. return
  160. }
  161. var nRead int
  162. for r.j < length {
  163. chunk, err := r.cache.get(r.j)
  164. if err != nil {
  165. return nRead, err
  166. }
  167. nRead, err = chunk.data.Read(p)
  168. if err == io.EOF { // we've read the entire chunk
  169. if closer, ok := chunk.data.(io.ReadCloser); ok {
  170. err = closer.Close()
  171. if err != nil {
  172. return nRead, err
  173. }
  174. }
  175. r.j++ // advance to the next chunk the part
  176. err = nil
  177. if r.j == length { // last chunk in a part?
  178. r.j = 0 // reset chunk index
  179. r.i++ // advance to the next part
  180. r.cache.empty()
  181. if r.i == len(r.email.partsInfo.Parts) || r.part > 0 {
  182. // there are no more parts to return
  183. err = io.EOF
  184. }
  185. }
  186. }
  187. // unless there's an error, the next time this function will be
  188. // called, it will read the next chunk
  189. return nRead, err
  190. }
  191. }
  192. err = io.EOF
  193. return n, err
  194. }