gateway_test.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package backends
  2. import (
  3. "bufio"
  4. "bytes"
  5. "fmt"
  6. "github.com/flashmob/go-guerrilla/log"
  7. "github.com/flashmob/go-guerrilla/mail"
  8. "io"
  9. "strings"
  10. "testing"
  11. "time"
  12. )
  13. func TestStates(t *testing.T) {
  14. gw := BackendGateway{}
  15. str := fmt.Sprintf("%s", gw.State)
  16. if strings.Index(str, "NewState") != 0 {
  17. t.Error("Backend should begin in NewState")
  18. }
  19. }
  20. func TestInitialize(t *testing.T) {
  21. c := BackendConfig{
  22. ConfigProcessors: {
  23. "Debugger": {
  24. "log_received_mails": true,
  25. },
  26. },
  27. ConfigGateways: {
  28. "default": {
  29. "save_process": "HeadersParser|Debugger",
  30. "save_workers_size": "1",
  31. },
  32. },
  33. }
  34. gateway := &BackendGateway{}
  35. err := gateway.Initialize(c)
  36. if err != nil {
  37. t.Error("Gateway did not init because:", err)
  38. t.Fail()
  39. }
  40. if gateway.processors == nil {
  41. t.Error("gateway.chains should not be nil")
  42. } else if len(gateway.processors) != 1 {
  43. t.Error("len(gateway.chains) should be 1, but got", len(gateway.processors))
  44. }
  45. if gateway.conveyor == nil {
  46. t.Error("gateway.conveyor should not be nil")
  47. } else if cap(gateway.conveyor) != gateway.gwConfig.saveProcessSize() {
  48. t.Error("gateway.conveyor channel buffer cap does not match worker size, cap was", cap(gateway.conveyor))
  49. }
  50. if gateway.State != BackendStateInitialized {
  51. t.Error("gateway.State is not in initialized state, got ", gateway.State)
  52. }
  53. }
  54. func TestStartProcessStop(t *testing.T) {
  55. c := BackendConfig{
  56. ConfigProcessors: {
  57. "Debugger": {
  58. "log_received_mails": true,
  59. },
  60. },
  61. ConfigGateways: {
  62. "default": {
  63. "save_process": "HeadersParser|Debugger",
  64. "save_workers_size": "2",
  65. },
  66. },
  67. }
  68. gateway := &BackendGateway{}
  69. err := gateway.Initialize(c)
  70. mainlog, _ := log.GetLogger(log.OutputOff.String(), "debug")
  71. Svc.SetMainlog(mainlog)
  72. if err != nil {
  73. t.Error("Gateway did not init because:", err)
  74. t.Fail()
  75. }
  76. err = gateway.Start()
  77. if err != nil {
  78. t.Error("Gateway did not start because:", err)
  79. t.Fail()
  80. }
  81. if gateway.State != BackendStateRunning {
  82. t.Error("gateway.State is not in running state, got ", gateway.State)
  83. }
  84. // can we place an envelope on the conveyor channel?
  85. e := &mail.Envelope{
  86. RemoteIP: "127.0.0.1",
  87. QueuedId: mail.QueuedID(1, 2),
  88. Helo: "helo.example.com",
  89. MailFrom: mail.Address{User: "test", Host: "example.com"},
  90. TLS: true,
  91. }
  92. e.PushRcpt(mail.Address{User: "test", Host: "example.com"})
  93. //e.Data.WriteString("Subject:Test\n\nThis is a test.")
  94. in := "Subject: Test\n\nThis is a test.\n.\n"
  95. mdr := mail.NewMimeDotReader(bufio.NewReader(bytes.NewBufferString(in)), 1)
  96. i, err := io.Copy(&e.Data, mdr)
  97. if err != nil && err != io.EOF {
  98. t.Error(err, "cannot copy buffer", i, err)
  99. }
  100. if p := mdr.Parts(); p != nil && len(p) > 0 {
  101. e.Header = p[0].Headers
  102. }
  103. notify := make(chan *notifyMsg)
  104. gateway.conveyor <- &workerMsg{e, notify, TaskSaveMail, nil}
  105. // it should not produce any errors
  106. // headers (subject) should be parsed.
  107. select {
  108. case status := <-notify:
  109. if status.err != nil {
  110. t.Error("envelope processing failed with:", status.err)
  111. }
  112. if e.Header["Subject"][0] != "Test" {
  113. t.Error("envelope processing did not parse header")
  114. }
  115. case <-time.After(time.Second):
  116. t.Error("gateway did not respond after 1 second")
  117. t.Fail()
  118. }
  119. err = gateway.Shutdown()
  120. if err != nil {
  121. t.Error("Gateway did not shutdown")
  122. }
  123. }