Bladeren bron

flush() error checking

flashmob 6 jaren geleden
bovenliggende
commit
da6289a382
1 gewijzigde bestanden met toevoegingen van 38 en 21 verwijderingen
  1. 38 21
      backends/s_chunksaver.go

+ 38 - 21
backends/s_chunksaver.go

@@ -67,7 +67,7 @@ type chunkedPart struct {
 	ContentDisposition string     `json:"d"`
 }
 
-type flushEvent func()
+type flushEvent func() error
 
 type chunkedBytesBuffer struct {
 	buf          []byte
@@ -75,15 +75,18 @@ type chunkedBytesBuffer struct {
 }
 
 // flush signals that it's time to write the buffer out to disk
-func (c *chunkedBytesBuffer) flush() {
+func (c *chunkedBytesBuffer) flush() error {
 	if len(c.buf) == 0 {
-		return
+		return nil
 	}
 	fmt.Print(string(c.buf))
 	if c.flushTrigger != nil {
-		c.flushTrigger()
+		if err := c.flushTrigger(); err != nil {
+			return err
+		}
 	}
 	c.Reset()
+	return nil
 }
 
 // Reset sets the length back to 0, making it re-usable
@@ -108,7 +111,10 @@ func (c *chunkedBytesBuffer) Write(p []byte) (i int, err error) {
 			c.buf = append(c.buf, p[i:i+free]...)
 			remaining -= free
 			i += free
-			c.flush()
+			err = c.flush()
+			if err != nil {
+				return i, err
+			}
 			if remaining == 0 {
 				return
 			}
@@ -136,8 +142,8 @@ type chunkedBytesBufferMime struct {
 func newChunkedBytesBufferMime() *chunkedBytesBufferMime {
 	b := new(chunkedBytesBufferMime)
 
-	b.chunkedBytesBuffer.flushTrigger = func() {
-		b.onFlush()
+	b.chunkedBytesBuffer.flushTrigger = func() error {
+		return b.onFlush()
 	}
 	b.md5 = md5.New()
 	return b
@@ -147,7 +153,7 @@ func (b *chunkedBytesBufferMime) setDatabase(database ChunkSaverStorage) {
 	b.database = database
 }
 
-func (b *chunkedBytesBufferMime) onFlush() {
+func (b *chunkedBytesBufferMime) onFlush() error {
 	b.md5.Write(b.buf)
 	var chash [16]byte
 	copy(chash[:], b.md5.Sum([]byte{}))
@@ -167,8 +173,11 @@ func (b *chunkedBytesBufferMime) onFlush() {
 			b.info.Parts = append(b.info.Parts, part)
 			b.info.Count++
 		}
-		b.database.AddChunk(b.buf, chash[:])
+		if err := b.database.AddChunk(b.buf, chash[:]); err != nil {
+			return err
+		}
 	}
+	return nil
 }
 
 func (b *chunkedBytesBufferMime) fillInfo(cp *chunkedPart, index int) {
@@ -665,11 +674,15 @@ func Chunksaver() *StreamDecorator {
 				return nil
 			}
 
-			sd.Close = func() error {
-				chunkBuffer.flush()
+			sd.Close = func() (err error) {
+				err = chunkBuffer.flush()
+				if err != nil {
+					// TODO we could delete the half saved message here
+					return err
+				}
 				defer chunkBuffer.Reset()
 				if mid, ok := envelope.Values["messageID"].(uint64); ok {
-					err := database.CloseMessage(
+					err = database.CloseMessage(
 						mid,
 						written,
 						&chunkBuffer.info,
@@ -678,12 +691,10 @@ func Chunksaver() *StreamDecorator {
 						to,
 						from,
 					)
-
 					if err != nil {
 						return err
 					}
 				}
-
 				return nil
 			}
 
@@ -715,9 +726,9 @@ func Chunksaver() *StreamDecorator {
 				return subject, to, from
 			}
 
-			return StreamProcessWith(func(p []byte) (int, error) {
+			return StreamProcessWith(func(p []byte) (count int, err error) {
 				if envelope.Values == nil {
-					return 0, errors.New("no message headers found")
+					return count, errors.New("no message headers found")
 				}
 				if parts, ok := envelope.Values["MimeParts"].(*[]*mime.Part); ok {
 					var (
@@ -731,29 +742,35 @@ func Chunksaver() *StreamDecorator {
 					offset := msgPos
 					for i := progress; i < len(*parts); i++ {
 						part := (*parts)[i]
-						count := 0
+
 						chunkBuffer.currentPart(part)
 						// break chunk on new part
 						if part.StartingPos > 0 && part.StartingPos > msgPos {
 							count, _ = chunkBuffer.Write(p[pos : part.StartingPos-offset])
 							written += uint(count)
-							chunkBuffer.flush()
+							err = chunkBuffer.flush()
+							if err != nil {
+								return count, err
+							}
 							fmt.Println("->N")
 							pos += count
 							msgPos = part.StartingPos
 						}
 						// break chunk on header
 						if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
-							count, _ := chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
+							count, _ = chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
 							written += uint(count)
-							chunkBuffer.flush()
+							err = chunkBuffer.flush()
+							if err != nil {
+								return count, err
+							}
 							fmt.Println("->H")
 							pos += count
 							msgPos = part.StartingPosBody
 						}
 						// if on the latest (last) part, and yet there is still data to be written out
 						if len(*parts)-1 == i && len(p)-1 > pos {
-							count, _ := chunkBuffer.Write(p[pos:])
+							count, _ = chunkBuffer.Write(p[pos:])
 							written += uint(count)
 							pos += count
 							msgPos += uint(count)