|
@@ -2,11 +2,11 @@ package backends
|
|
|
|
|
|
import (
|
|
import (
|
|
"bytes"
|
|
"bytes"
|
|
- "github.com/flashmob/go-guerrilla/chunk/transfer"
|
|
|
|
"io"
|
|
"io"
|
|
"regexp"
|
|
"regexp"
|
|
"sync"
|
|
"sync"
|
|
|
|
|
|
|
|
+ "github.com/flashmob/go-guerrilla/chunk/transfer"
|
|
"github.com/flashmob/go-guerrilla/mail"
|
|
"github.com/flashmob/go-guerrilla/mail"
|
|
"github.com/flashmob/go-guerrilla/mail/mime"
|
|
"github.com/flashmob/go-guerrilla/mail/mime"
|
|
)
|
|
)
|
|
@@ -75,15 +75,17 @@ func (t *Transform) unswap() {
|
|
|
|
|
|
var regexpCharset = regexp.MustCompile("(?i)charset=\"?(.+)\"?") // (?i) is a flag for case-insensitive
|
|
var regexpCharset = regexp.MustCompile("(?i)charset=\"?(.+)\"?") // (?i) is a flag for case-insensitive
|
|
|
|
|
|
-// todo: we may optimize this by looking at t.partsCachedOriginal, implement a Reader for it, re-write the header as we read from it
|
|
|
|
-
|
|
|
|
-func (t *Transform) ReWrite(b []byte, last bool) (count int, err error) {
|
|
|
|
|
|
+func (t *Transform) ReWrite(b []byte, last bool, offset uint) (count int, err error) {
|
|
defer func() {
|
|
defer func() {
|
|
count = len(b)
|
|
count = len(b)
|
|
}()
|
|
}()
|
|
if !t.isBody {
|
|
if !t.isBody {
|
|
|
|
+ // Header re-write, how it works
|
|
// we place the partial header's bytes on a buffer from which we can read one line at a time
|
|
// we place the partial header's bytes on a buffer from which we can read one line at a time
|
|
- // then we match and replace the lines we want
|
|
|
|
|
|
+ // then we match and replace the lines we want, output replaced live.
|
|
|
|
+ // The following re-writes are mde:
|
|
|
|
+ // - base64 => 8bit
|
|
|
|
+ // - supported non-utf8 charset => utf8
|
|
if i, err := io.Copy(&t.buf, bytes.NewReader(b)); err != nil {
|
|
if i, err := io.Copy(&t.buf, bytes.NewReader(b)); err != nil {
|
|
return int(i), err
|
|
return int(i), err
|
|
}
|
|
}
|
|
@@ -130,10 +132,18 @@ func (t *Transform) ReWrite(b []byte, last bool) (count int, err error) {
|
|
} else {
|
|
} else {
|
|
|
|
|
|
if ct := t.current.ContentType.Supertype(); ct == "multipart" || ct == "message" {
|
|
if ct := t.current.ContentType.Supertype(); ct == "multipart" || ct == "message" {
|
|
|
|
+ _, err = io.Copy(t.parser, bytes.NewReader(b))
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
- // do body decode here
|
|
|
|
|
|
+ // Body Decode, how it works:
|
|
|
|
+ // First, the decoder is setup, depending on the source encoding type.
|
|
|
|
+ // Next, since the decoder is an io.Reader, we need to use a pipe to connect it.
|
|
|
|
+ // Subsequent calls write to the pipe in a gouritine and the parent-thread copies the result to the output stream
|
|
|
|
+ // The routine stops feeding the decoder data before EndingPosBody, and not decoding anything after, but still
|
|
|
|
+ // outputting the un-decoded remainder.
|
|
|
|
+ // The decoder is destroyed at the end of the body (when last == true)
|
|
|
|
+
|
|
t.pr, t.pw = io.Pipe()
|
|
t.pr, t.pw = io.Pipe()
|
|
if t.decoder == nil {
|
|
if t.decoder == nil {
|
|
t.buf.Reset()
|
|
t.buf.Reset()
|
|
@@ -156,10 +166,22 @@ func (t *Transform) ReWrite(b []byte, last bool) (count int, err error) {
|
|
wg := sync.WaitGroup{}
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
wg.Add(1)
|
|
|
|
|
|
|
|
+ // out is the slice that will be decoded
|
|
|
|
+ var out []byte
|
|
|
|
+ // remainder will not be decoded. Typically, this contains the boundary maker, and we want to preserve it
|
|
|
|
+ var remainder []byte
|
|
|
|
+ if t.current.EndingPosBody > 0 {
|
|
|
|
+ size := t.current.EndingPosBody - t.current.StartingPosBody - 1 // -1 since we do not want \n
|
|
|
|
+ out = b[:size]
|
|
|
|
+ remainder = b[size:]
|
|
|
|
+ } else {
|
|
|
|
+ // use the entire slice
|
|
|
|
+ out = b
|
|
|
|
+ }
|
|
go func() {
|
|
go func() {
|
|
// stream our slice to the pipe
|
|
// stream our slice to the pipe
|
|
defer wg.Done()
|
|
defer wg.Done()
|
|
- _, pRrr := io.Copy(t.pw, bytes.NewReader(b))
|
|
|
|
|
|
+ _, pRrr := io.Copy(t.pw, bytes.NewReader(out))
|
|
if pRrr != nil {
|
|
if pRrr != nil {
|
|
_ = t.pw.CloseWithError(err)
|
|
_ = t.pw.CloseWithError(err)
|
|
return
|
|
return
|
|
@@ -170,13 +192,24 @@ func (t *Transform) ReWrite(b []byte, last bool) (count int, err error) {
|
|
var i int64
|
|
var i int64
|
|
i, err = io.Copy(t.parser, t.decoder)
|
|
i, err = io.Copy(t.parser, t.decoder)
|
|
// wait for the pipe to finish
|
|
// wait for the pipe to finish
|
|
- _ = i
|
|
|
|
wg.Wait()
|
|
wg.Wait()
|
|
_ = t.pr.Close()
|
|
_ = t.pr.Close()
|
|
|
|
|
|
if last {
|
|
if last {
|
|
t.decoder = nil
|
|
t.decoder = nil
|
|
}
|
|
}
|
|
|
|
+ count += int(i)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ // flush any remainder
|
|
|
|
+ if len(remainder) > 0 {
|
|
|
|
+ i, err = io.Copy(t.parser, bytes.NewReader(remainder))
|
|
|
|
+ count += int(i)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
return count, err
|
|
return count, err
|
|
}
|
|
}
|
|
@@ -200,22 +233,32 @@ func Transformer() *StreamDecorator {
|
|
return nil
|
|
return nil
|
|
}))
|
|
}))
|
|
|
|
|
|
- var msgPos uint
|
|
|
|
- var progress int
|
|
|
|
|
|
+ var (
|
|
|
|
+ msgPos uint
|
|
|
|
+ progress int
|
|
|
|
+ )
|
|
reWriter := Transform{}
|
|
reWriter := Transform{}
|
|
|
|
|
|
sd := &StreamDecorator{}
|
|
sd := &StreamDecorator{}
|
|
sd.Decorate =
|
|
sd.Decorate =
|
|
|
|
|
|
func(sp StreamProcessor, a ...interface{}) StreamProcessor {
|
|
func(sp StreamProcessor, a ...interface{}) StreamProcessor {
|
|
- var envelope *mail.Envelope
|
|
|
|
|
|
+ var (
|
|
|
|
+ envelope *mail.Envelope
|
|
|
|
+ // total is the total number of bytes written
|
|
|
|
+ total int64
|
|
|
|
+ // pos tracks the current position of the output slice
|
|
|
|
+ pos int
|
|
|
|
+ // written is the number of bytes written out in this call
|
|
|
|
+ written int
|
|
|
|
+ )
|
|
|
|
+
|
|
if reWriter.sp == nil {
|
|
if reWriter.sp == nil {
|
|
reWriter.sp = sp
|
|
reWriter.sp = sp
|
|
}
|
|
}
|
|
|
|
|
|
sd.Open = func(e *mail.Envelope) error {
|
|
sd.Open = func(e *mail.Envelope) error {
|
|
envelope = e
|
|
envelope = e
|
|
- _ = envelope
|
|
|
|
if reWriter.parser == nil {
|
|
if reWriter.parser == nil {
|
|
reWriter.parser = mime.NewMimeParserWriter(sp)
|
|
reWriter.parser = mime.NewMimeParserWriter(sp)
|
|
reWriter.parser.Open()
|
|
reWriter.parser.Open()
|
|
@@ -224,51 +267,68 @@ func Transformer() *StreamDecorator {
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ sd.Close = func() error {
|
|
|
|
+ total = 0
|
|
|
|
+ return reWriter.parser.Close()
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ end := func(part *mime.Part, offset uint, p []byte, start uint) (int, error) {
|
|
|
|
+ var err error
|
|
|
|
+ var count int
|
|
|
|
+
|
|
|
|
+ count, err = reWriter.ReWrite(p[pos:start-offset], true, offset)
|
|
|
|
+
|
|
|
|
+ written += count
|
|
|
|
+ if err != nil {
|
|
|
|
+ return count, err
|
|
|
|
+ }
|
|
|
|
+ reWriter.current = part
|
|
|
|
+ pos += count
|
|
|
|
+ return count, nil
|
|
|
|
+ }
|
|
|
|
+
|
|
return StreamProcessWith(func(p []byte) (count int, err error) {
|
|
return StreamProcessWith(func(p []byte) (count int, err error) {
|
|
- var total int
|
|
|
|
|
|
+ pos = 0
|
|
|
|
+ written = 0
|
|
if parts, ok := envelope.Values["MimeParts"].(*mime.Parts); ok && len(*parts) > 0 {
|
|
if parts, ok := envelope.Values["MimeParts"].(*mime.Parts); ok && len(*parts) > 0 {
|
|
|
|
|
|
// we are going to change envelope.Values["MimeParts"] to our own copy with our own counts
|
|
// we are going to change envelope.Values["MimeParts"] to our own copy with our own counts
|
|
envelope.Values["MimeParts"] = reWriter.swap()
|
|
envelope.Values["MimeParts"] = reWriter.swap()
|
|
- defer reWriter.unswap()
|
|
|
|
- var pos int
|
|
|
|
|
|
+ defer func() {
|
|
|
|
+ reWriter.unswap()
|
|
|
|
+ total += int64(written)
|
|
|
|
+ }()
|
|
|
|
|
|
offset := msgPos
|
|
offset := msgPos
|
|
reWriter.current = (*parts)[0]
|
|
reWriter.current = (*parts)[0]
|
|
for i := progress; i < len(*parts); i++ {
|
|
for i := progress; i < len(*parts); i++ {
|
|
part := (*parts)[i]
|
|
part := (*parts)[i]
|
|
-
|
|
|
|
// break chunk on new part
|
|
// break chunk on new part
|
|
- if part.StartingPos > 0 && part.StartingPos > msgPos {
|
|
|
|
- cbLen := len(part.ContentBoundary) + 3
|
|
|
|
- count, err = reWriter.ReWrite(p[pos:part.StartingPos-offset-uint(cbLen)], true)
|
|
|
|
-
|
|
|
|
- total += count
|
|
|
|
|
|
+ if part.StartingPos > 0 && part.StartingPos >= msgPos {
|
|
|
|
+ count, err = end(part, offset, p, part.StartingPos)
|
|
if err != nil {
|
|
if err != nil {
|
|
break
|
|
break
|
|
}
|
|
}
|
|
- reWriter.current = part
|
|
|
|
- pos += count
|
|
|
|
msgPos = part.StartingPos
|
|
msgPos = part.StartingPos
|
|
reWriter.isBody = false
|
|
reWriter.isBody = false
|
|
|
|
+
|
|
}
|
|
}
|
|
// break chunk on header (found the body)
|
|
// break chunk on header (found the body)
|
|
if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
|
|
if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
|
|
- count, err = reWriter.ReWrite(p[pos:part.StartingPosBody-offset], true)
|
|
|
|
- total += count
|
|
|
|
|
|
+ count, err = end(part, offset, p, part.StartingPosBody)
|
|
if err != nil {
|
|
if err != nil {
|
|
break
|
|
break
|
|
}
|
|
}
|
|
- _, _ = reWriter.parser.Write([]byte{'\n'}) // send an end of header to the parser
|
|
|
|
reWriter.isBody = true
|
|
reWriter.isBody = true
|
|
- reWriter.current = part
|
|
|
|
- pos += count
|
|
|
|
- msgPos = part.StartingPosBody
|
|
|
|
|
|
+ msgPos += uint(count)
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
+
|
|
// if on the latest (last) part, and yet there is still data to be written out
|
|
// if on the latest (last) part, and yet there is still data to be written out
|
|
if len(*parts)-1 == i && len(p)-1 > pos {
|
|
if len(*parts)-1 == i && len(p)-1 > pos {
|
|
- count, err = reWriter.ReWrite(p[pos:], false)
|
|
|
|
- total += count
|
|
|
|
|
|
+ count, err = reWriter.ReWrite(p[pos:], false, offset)
|
|
|
|
+
|
|
|
|
+ written += count
|
|
if err != nil {
|
|
if err != nil {
|
|
break
|
|
break
|
|
}
|
|
}
|
|
@@ -286,7 +346,7 @@ func Transformer() *StreamDecorator {
|
|
}
|
|
}
|
|
// note that in this case, ReWrite method will output the stream to further processors down the line
|
|
// note that in this case, ReWrite method will output the stream to further processors down the line
|
|
// here we just return back with the result
|
|
// here we just return back with the result
|
|
- return total, err
|
|
|
|
|
|
+ return written, err
|
|
})
|
|
})
|
|
}
|
|
}
|
|
return sd
|
|
return sd
|