server.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777
  1. package guerrilla
  2. import (
  3. "bytes"
  4. "crypto/rand"
  5. "crypto/tls"
  6. "crypto/x509"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net"
  11. "path/filepath"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. "github.com/flashmob/go-guerrilla/backends"
  17. "github.com/flashmob/go-guerrilla/log"
  18. "github.com/flashmob/go-guerrilla/mail"
  19. "github.com/flashmob/go-guerrilla/mail/smtp"
  20. "github.com/flashmob/go-guerrilla/response"
  21. )
  22. const (
  23. CommandVerbMaxLength = 16
  24. CommandLineMaxLength = 1024
  25. // Number of allowed unrecognized commands before we terminate the connection
  26. MaxUnrecognizedCommands = 5
  27. )
  28. const (
  29. // server has just been created
  30. ServerStateNew = iota
  31. // Server has just been stopped
  32. ServerStateStopped
  33. // Server has been started and is running
  34. ServerStateRunning
  35. // Server could not start due to an error
  36. ServerStateStartError
  37. )
  38. // Server listens for SMTP clients on the port specified in its config
  39. type server struct {
  40. configStore atomic.Value // stores guerrilla.ServerConfig
  41. tlsConfigStore atomic.Value
  42. timeout atomic.Value // stores time.Duration
  43. listenInterface string
  44. serverID int
  45. clientPool *Pool
  46. wg sync.WaitGroup // for waiting to shutdown
  47. listener net.Listener
  48. closedListener chan bool
  49. hosts allowedHosts // stores map[string]bool for faster lookup
  50. state int
  51. // If log changed after a config reload, newLogStore stores the value here until it's safe to change it
  52. logStore atomic.Value
  53. mainlogStore atomic.Value
  54. backendStore atomic.Value
  55. envelopePool *mail.Pool
  56. }
  57. type allowedHosts struct {
  58. table map[string]bool // host lookup table
  59. wildcards []string // host wildcard list (* is used as a wildcard)
  60. sync.Mutex // guard access to the map
  61. }
  62. type command []byte
  63. var (
  64. cmdHELO command = []byte("HELO")
  65. cmdEHLO command = []byte("EHLO")
  66. cmdHELP command = []byte("HELP")
  67. cmdXCLIENT command = []byte("XCLIENT")
  68. cmdMAIL command = []byte("MAIL FROM:")
  69. cmdRCPT command = []byte("RCPT TO:")
  70. cmdRSET command = []byte("RSET")
  71. cmdVRFY command = []byte("VRFY")
  72. cmdNOOP command = []byte("NOOP")
  73. cmdQUIT command = []byte("QUIT")
  74. cmdDATA command = []byte("DATA")
  75. cmdSTARTTLS command = []byte("STARTTLS")
  76. )
  77. func (c command) match(in []byte) bool {
  78. return bytes.Index(in, c) == 0
  79. }
  80. // Creates and returns a new ready-to-run Server from a ServerConfig configuration
  81. func newServer(sc *ServerConfig, b backends.Backend, mainlog log.Logger, serverID int) (*server, error) {
  82. server := &server{
  83. clientPool: NewPool(sc.MaxClients),
  84. closedListener: make(chan bool, 1),
  85. listenInterface: sc.ListenInterface,
  86. serverID: serverID,
  87. state: ServerStateNew,
  88. envelopePool: mail.NewPool(sc.MaxClients * 2),
  89. }
  90. server.mainlogStore.Store(mainlog)
  91. server.backendStore.Store(b)
  92. if sc.LogFile == "" {
  93. // none set, use the mainlog for the server log
  94. server.logStore.Store(mainlog)
  95. server.log().Fields("iface", sc.ListenInterface).Info("server did not configure a separate log file, so using the main log")
  96. } else {
  97. // set level to same level as mainlog level
  98. if l, logOpenError := log.GetLogger(sc.LogFile, server.mainlog().GetLevel()); logOpenError != nil {
  99. server.log().Fields("error", logOpenError, "iface", sc.ListenInterface).Error("Failed creating a logger for server")
  100. return server, logOpenError
  101. } else {
  102. server.logStore.Store(l)
  103. }
  104. }
  105. server.setConfig(sc)
  106. server.setTimeout(sc.Timeout)
  107. if err := server.configureTLS(); err != nil {
  108. return server, err
  109. }
  110. return server, nil
  111. }
  112. func (s *server) configureTLS() error {
  113. sConfig := s.configStore.Load().(ServerConfig)
  114. if sConfig.TLS.AlwaysOn || sConfig.TLS.StartTLSOn {
  115. cert, err := tls.LoadX509KeyPair(sConfig.TLS.PublicKeyFile, sConfig.TLS.PrivateKeyFile)
  116. if err != nil {
  117. return fmt.Errorf("error while loading the certificate: %s", err)
  118. }
  119. tlsConfig := &tls.Config{
  120. Certificates: []tls.Certificate{cert},
  121. ClientAuth: tls.VerifyClientCertIfGiven,
  122. ServerName: sConfig.Hostname,
  123. }
  124. if len(sConfig.TLS.Protocols) > 0 {
  125. if min, ok := TLSProtocols[sConfig.TLS.Protocols[0]]; ok {
  126. tlsConfig.MinVersion = min
  127. }
  128. }
  129. if len(sConfig.TLS.Protocols) > 1 {
  130. if max, ok := TLSProtocols[sConfig.TLS.Protocols[1]]; ok {
  131. tlsConfig.MaxVersion = max
  132. }
  133. }
  134. if len(sConfig.TLS.Ciphers) > 0 {
  135. for _, val := range sConfig.TLS.Ciphers {
  136. if c, ok := TLSCiphers[val]; ok {
  137. tlsConfig.CipherSuites = append(tlsConfig.CipherSuites, c)
  138. }
  139. }
  140. }
  141. if len(sConfig.TLS.Curves) > 0 {
  142. for _, val := range sConfig.TLS.Curves {
  143. if c, ok := TLSCurves[val]; ok {
  144. tlsConfig.CurvePreferences = append(tlsConfig.CurvePreferences, c)
  145. }
  146. }
  147. }
  148. if len(sConfig.TLS.RootCAs) > 0 {
  149. caCert, err := ioutil.ReadFile(sConfig.TLS.RootCAs)
  150. if err != nil {
  151. s.log().Fields("error", err, "file", sConfig.TLS.RootCAs).Error("failed opening TLSRootCAs file")
  152. return err
  153. } else {
  154. caCertPool := x509.NewCertPool()
  155. caCertPool.AppendCertsFromPEM(caCert)
  156. tlsConfig.RootCAs = caCertPool
  157. }
  158. }
  159. if len(sConfig.TLS.ClientAuthType) > 0 {
  160. if ca, ok := TLSClientAuthTypes[sConfig.TLS.ClientAuthType]; ok {
  161. tlsConfig.ClientAuth = ca
  162. }
  163. }
  164. tlsConfig.PreferServerCipherSuites = sConfig.TLS.PreferServerCipherSuites
  165. tlsConfig.Rand = rand.Reader
  166. s.tlsConfigStore.Store(tlsConfig)
  167. }
  168. return nil
  169. }
  170. // setBackend sets the backend to use for processing email envelopes
  171. func (s *server) setBackend(b backends.Backend) {
  172. s.backendStore.Store(b)
  173. }
  174. // backend gets the backend used to process email envelopes
  175. func (s *server) backend() backends.Backend {
  176. if b, ok := s.backendStore.Load().(backends.Backend); ok {
  177. return b
  178. }
  179. return nil
  180. }
  181. // Set the timeout for the server and all clients
  182. func (s *server) setTimeout(seconds int) {
  183. duration := time.Duration(int64(seconds))
  184. s.clientPool.SetTimeout(duration)
  185. s.timeout.Store(duration)
  186. }
  187. // goroutine safe config store
  188. func (s *server) setConfig(sc *ServerConfig) {
  189. s.configStore.Store(*sc)
  190. }
  191. // goroutine safe
  192. func (s *server) isEnabled() bool {
  193. sc := s.configStore.Load().(ServerConfig)
  194. return sc.IsEnabled
  195. }
  196. // Set the allowed hosts for the server
  197. func (s *server) setAllowedHosts(allowedHosts []string) {
  198. s.hosts.Lock()
  199. defer s.hosts.Unlock()
  200. s.hosts.table = make(map[string]bool, len(allowedHosts))
  201. s.hosts.wildcards = nil
  202. for _, h := range allowedHosts {
  203. if strings.Contains(h, "*") {
  204. s.hosts.wildcards = append(s.hosts.wildcards, strings.ToLower(h))
  205. } else if len(h) > 5 && h[0] == '[' && h[len(h)-1] == ']' {
  206. if ip := net.ParseIP(h[1 : len(h)-1]); ip != nil {
  207. // this will save the normalized ip, as ip.String always returns ipv6 in short form
  208. s.hosts.table["["+ip.String()+"]"] = true
  209. }
  210. } else {
  211. s.hosts.table[strings.ToLower(h)] = true
  212. }
  213. }
  214. }
  215. // Begin accepting SMTP clients. Will block unless there is an error or server.Shutdown() is called
  216. func (s *server) Start(startWG *sync.WaitGroup) error {
  217. var clientID uint64
  218. clientID = 0
  219. listener, err := net.Listen("tcp", s.listenInterface)
  220. s.listener = listener
  221. if err != nil {
  222. startWG.Done() // don't wait for me
  223. s.state = ServerStateStartError
  224. return fmt.Errorf("[%s] cannot listen on port: %s ", s.listenInterface, err.Error())
  225. }
  226. s.log().Fields("iface", s.listenInterface, "serverID", s.serverID).Info("listening on TCP")
  227. s.state = ServerStateRunning
  228. startWG.Done() // start successful, don't wait for me
  229. for {
  230. s.log().Fields("serverID", s.serverID, "nextSeq", clientID+1, "iface", s.listenInterface).
  231. Debug("waiting for a new client")
  232. conn, err := listener.Accept()
  233. clientID++
  234. if err != nil {
  235. if e, ok := err.(net.Error); ok && !e.Temporary() {
  236. s.log().Fields("iface", s.listenInterface, "serverID", s.serverID).Info("server has stopped accepting new clients")
  237. // the listener has been closed, wait for clients to exit
  238. s.log().Fields("iface", s.listenInterface, "serverID", s.serverID).Info("shutting down pool")
  239. s.clientPool.ShutdownState()
  240. s.clientPool.ShutdownWait()
  241. s.state = ServerStateStopped
  242. s.closedListener <- true
  243. return nil
  244. }
  245. s.mainlog().Fields("error", err, "serverID", s.serverID).Error("temporary error accepting client")
  246. continue
  247. }
  248. go func(p Poolable, borrowErr error) {
  249. c := p.(*client)
  250. if borrowErr == nil {
  251. s.handleClient(c)
  252. s.envelopePool.Return(c.Envelope)
  253. s.clientPool.Return(c)
  254. } else {
  255. s.log().Fields("error", borrowErr, "serverID", s.serverID).Error("couldn't borrow a new client")
  256. // we could not get a client, so close the connection.
  257. _ = conn.Close()
  258. }
  259. // intentionally placed Borrow in args so that it's called in the
  260. // same main goroutine.
  261. }(s.clientPool.Borrow(conn, clientID, s.log(), s.envelopePool, s.serverID))
  262. }
  263. }
  264. func (s *server) Shutdown() {
  265. if s.listener != nil {
  266. // This will cause Start function to return, by causing an error on listener.Accept
  267. _ = s.listener.Close()
  268. // wait for the listener to listener.Accept
  269. <-s.closedListener
  270. // At this point Start will exit and close down the pool
  271. } else {
  272. s.clientPool.ShutdownState()
  273. // listener already closed, wait for clients to exit
  274. s.clientPool.ShutdownWait()
  275. s.state = ServerStateStopped
  276. }
  277. }
  278. func (s *server) GetActiveClientsCount() int {
  279. return s.clientPool.GetActiveClientsCount()
  280. }
  281. // Verifies that the host is a valid recipient.
  282. // host checking turned off if there is a single entry and it's a dot.
  283. func (s *server) allowsHost(host string) bool {
  284. s.hosts.Lock()
  285. defer s.hosts.Unlock()
  286. // if hosts contains a single dot, further processing is skipped
  287. if len(s.hosts.table) == 1 {
  288. if _, ok := s.hosts.table["."]; ok {
  289. return true
  290. }
  291. }
  292. if _, ok := s.hosts.table[strings.ToLower(host)]; ok {
  293. return true
  294. }
  295. // check the wildcards
  296. for _, w := range s.hosts.wildcards {
  297. if matched, err := filepath.Match(w, strings.ToLower(host)); matched && err == nil {
  298. return true
  299. }
  300. }
  301. return false
  302. }
  303. func (s *server) allowsIp(ip net.IP) bool {
  304. ipStr := ip.String()
  305. return s.allowsHost("[" + ipStr + "]")
  306. }
  307. const commandSuffix = "\r\n"
  308. // Reads from the client until a \n terminator is encountered,
  309. // or until a timeout occurs.
  310. func (s *server) readCommand(client *client) ([]byte, error) {
  311. //var input string
  312. var err error
  313. var bs []byte
  314. // In command state, stop reading at line breaks
  315. bs, err = client.bufin.ReadSlice('\n')
  316. if err != nil {
  317. return bs, err
  318. } else if bytes.HasSuffix(bs, []byte(commandSuffix)) {
  319. return bs[:len(bs)-2], err
  320. }
  321. return bs[:len(bs)-1], err
  322. }
  323. // flushResponse a response to the client. Flushes the client.bufout buffer to the connection
  324. func (s *server) flushResponse(client *client) error {
  325. if err := client.setTimeout(s.timeout.Load().(time.Duration)); err != nil {
  326. return err
  327. }
  328. return client.bufout.Flush()
  329. }
  330. func (s *server) isShuttingDown() bool {
  331. return s.clientPool.IsShuttingDown()
  332. }
  333. const advertisePipelining = "250-PIPELINING\r\n"
  334. const advertiseStartTLS = "250-STARTTLS\r\n"
  335. const advertiseEnhancedStatusCodes = "250-ENHANCEDSTATUSCODES\r\n"
  336. const advertise8BitMime = "250-8BITMIME\r\n"
  337. // The last line doesn't need \r\n since string will be printed as a new line.
  338. // Also, Last line has no dash -
  339. const advertiseHelp = "250 HELP"
  340. // handleClient handles an entire client SMTP exchange
  341. func (s *server) handleClient(client *client) {
  342. defer func() {
  343. s.log().Fields(
  344. "peer", client.RemoteIP,
  345. "event", "disconnect",
  346. "id", client.ID,
  347. "queuedID", client.QueuedId,
  348. ).Info("Disconnect client")
  349. client.closeConn()
  350. }()
  351. sc := s.configStore.Load().(ServerConfig)
  352. s.log().Fields(
  353. "peer", client.RemoteIP,
  354. "id", client.ID,
  355. "event", "connect",
  356. "queuedID", client.QueuedId,
  357. ).Info("handle client")
  358. // Initial greeting
  359. greeting := fmt.Sprintf("220 %s SMTP Guerrilla(%s) #%d (%d) %s",
  360. sc.Hostname, Version, client.ID,
  361. s.clientPool.GetActiveClientsCount(), time.Now().Format(time.RFC3339))
  362. helo := fmt.Sprintf("250 %s Hello", sc.Hostname)
  363. // ehlo is a multi-line reply and need additional \r\n at the end
  364. ehlo := fmt.Sprintf("250-%s Hello\r\n", sc.Hostname)
  365. // Extended feature advertisements
  366. messageSize := fmt.Sprintf("250-SIZE %d\r\n", sc.MaxSize)
  367. advertiseTLS := advertiseStartTLS
  368. // The last line doesn't need \r\n since string will be printed as a new line.
  369. // Also, Last line has no dash -
  370. if sc.TLS.AlwaysOn {
  371. tlsConfig, ok := s.tlsConfigStore.Load().(*tls.Config)
  372. if !ok {
  373. s.mainlog().Error("Failed to load *tls.Config")
  374. } else if err := client.upgradeToTLS(tlsConfig); err == nil {
  375. advertiseTLS = ""
  376. } else {
  377. s.log().Fields("error", err, "peer", client.RemoteIP).Warn("failed TLS handshake")
  378. // server requires TLS, but can't handshake
  379. client.kill()
  380. }
  381. }
  382. if !sc.TLS.StartTLSOn {
  383. // STARTTLS turned off, don't advertise it
  384. advertiseTLS = ""
  385. }
  386. r := response.Canned
  387. for client.isAlive() {
  388. switch client.state {
  389. case ClientGreeting:
  390. client.sendResponse(greeting)
  391. client.state = ClientCmd
  392. case ClientCmd:
  393. client.bufin.setLimit(CommandLineMaxLength)
  394. input, err := s.readCommand(client)
  395. s.log().Fields("input", string(input)).Debug("client said")
  396. if err == io.EOF {
  397. s.log().Fields("error", err, "peer", client.RemoteIP).Warn("client closed the connection")
  398. return
  399. } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  400. s.log().Fields("error", err, "peer", client.RemoteIP).Warn("timeout")
  401. return
  402. } else if err == LineLimitExceeded {
  403. client.sendResponse(r.FailLineTooLong)
  404. client.kill()
  405. break
  406. } else if err != nil {
  407. s.log().Fields("error", err, "peer", client.RemoteIP).Warn("read error")
  408. client.kill()
  409. break
  410. }
  411. if s.isShuttingDown() {
  412. client.state = ClientShutdown
  413. continue
  414. }
  415. cmdLen := len(input)
  416. if cmdLen > CommandVerbMaxLength {
  417. cmdLen = CommandVerbMaxLength
  418. }
  419. cmd := bytes.ToUpper(input[:cmdLen])
  420. switch {
  421. case cmdHELO.match(cmd):
  422. if h, err := client.parser.Helo(input[4:]); err == nil {
  423. client.Helo = h
  424. } else {
  425. s.log().Fields("helo", h, "seq", client.ID).Warn("invalid helo")
  426. client.sendResponse(r.FailSyntaxError)
  427. break
  428. }
  429. client.resetTransaction()
  430. client.sendResponse(helo)
  431. case cmdEHLO.match(cmd):
  432. if h, _, err := client.parser.Ehlo(input[4:]); err == nil {
  433. client.Helo = h
  434. } else {
  435. client.sendResponse(r.FailSyntaxError)
  436. s.log().Fields("ehlo", h, "seq", client.ID).Warn("invalid ehlo")
  437. client.sendResponse(r.FailSyntaxError)
  438. break
  439. }
  440. client.ESMTP = true
  441. client.resetTransaction()
  442. client.sendResponse(ehlo,
  443. messageSize,
  444. advertisePipelining,
  445. advertiseTLS,
  446. advertiseEnhancedStatusCodes,
  447. advertise8BitMime,
  448. advertiseHelp)
  449. case cmdHELP.match(cmd):
  450. quote := response.GetQuote()
  451. client.sendResponse("214-OK\r\n", quote)
  452. case sc.XClientOn && cmdXCLIENT.match(cmd):
  453. if toks := bytes.Split(input[8:], []byte{' '}); len(toks) > 0 {
  454. for i := range toks {
  455. if vals := bytes.Split(toks[i], []byte{'='}); len(vals) == 2 {
  456. if bytes.Equal(vals[1], []byte("[UNAVAILABLE]")) {
  457. // skip
  458. continue
  459. }
  460. if bytes.Equal(vals[0], []byte("ADDR")) {
  461. client.RemoteIP = string(vals[1])
  462. }
  463. if bytes.Equal(vals[0], []byte("HELO")) {
  464. client.Helo = string(vals[1])
  465. }
  466. }
  467. }
  468. }
  469. client.sendResponse(r.SuccessMailCmd)
  470. case cmdMAIL.match(cmd):
  471. if client.isInTransaction() {
  472. client.sendResponse(r.FailNestedMailCmd)
  473. break
  474. }
  475. client.MailFrom, err = client.parsePath(input[10:], client.parser.MailFrom)
  476. if err != nil {
  477. s.log().Fields("error", err, "raw", string(input[10:])).Error("MAIL parse error")
  478. client.sendResponse(err)
  479. break
  480. } else if client.parser.NullPath {
  481. // bounce has empty from address
  482. client.MailFrom = mail.Address{}
  483. } else {
  484. s.log().Fields(
  485. "event", "mailfrom",
  486. "helo", client.Helo,
  487. "domain", client.MailFrom.Host,
  488. "address", client.RemoteIP,
  489. "id", client.ID,
  490. "queuedID", client.QueuedId,
  491. ).Info("mail from")
  492. }
  493. client.TransportType = smtp.TransportTypeUnspecified
  494. for i := range client.MailFrom.PathParams {
  495. if tt := client.MailFrom.PathParams[i].Transport(); tt != smtp.TransportTypeUnspecified {
  496. client.TransportType = tt
  497. if tt == smtp.TransportTypeInvalid {
  498. continue
  499. }
  500. break
  501. }
  502. }
  503. client.sendResponse(r.SuccessMailCmd)
  504. case cmdRCPT.match(cmd):
  505. if len(client.RcptTo) > smtp.LimitRecipients {
  506. client.sendResponse(r.ErrorTooManyRecipients)
  507. break
  508. }
  509. to, err := client.parsePath(input[8:], client.parser.RcptTo)
  510. if err != nil {
  511. s.log().Fields("error", err, "raw", string(input[8:])).Error("RCPT parse error")
  512. client.sendResponse(err.Error())
  513. break
  514. }
  515. s.defaultHost(&to)
  516. if (to.IP != nil && !s.allowsIp(to.IP)) || (to.IP == nil && !s.allowsHost(to.Host)) {
  517. client.sendResponse(r.ErrorRelayDenied, " ", to.Host)
  518. } else {
  519. client.PushRcpt(to)
  520. rcptError := s.backend().ValidateRcpt(client.Envelope)
  521. if rcptError != nil {
  522. client.PopRcpt()
  523. client.sendResponse(r.FailRcptCmd, " ", rcptError.Error())
  524. } else {
  525. client.sendResponse(r.SuccessRcptCmd)
  526. }
  527. }
  528. case cmdRSET.match(cmd):
  529. client.resetTransaction()
  530. client.sendResponse(r.SuccessResetCmd)
  531. case cmdVRFY.match(cmd):
  532. client.sendResponse(r.SuccessVerifyCmd)
  533. case cmdNOOP.match(cmd):
  534. client.sendResponse(r.SuccessNoopCmd)
  535. case cmdQUIT.match(cmd):
  536. client.sendResponse(r.SuccessQuitCmd)
  537. client.kill()
  538. case cmdDATA.match(cmd):
  539. if len(client.RcptTo) == 0 {
  540. client.sendResponse(r.FailNoRecipientsDataCmd)
  541. break
  542. }
  543. client.sendResponse(r.SuccessDataCmd)
  544. client.state = ClientData
  545. case sc.TLS.StartTLSOn && cmdSTARTTLS.match(cmd):
  546. client.sendResponse(r.SuccessStartTLSCmd)
  547. client.state = ClientStartTLS
  548. default:
  549. client.errors++
  550. if client.errors >= MaxUnrecognizedCommands {
  551. client.sendResponse(r.FailMaxUnrecognizedCmd)
  552. client.kill()
  553. } else {
  554. client.sendResponse(r.FailUnrecognizedCmd)
  555. }
  556. }
  557. case ClientData:
  558. // intentionally placed the limit 1MB above so that reading does not return with an error
  559. // if the client goes a little over. Anything above will err
  560. client.bufin.setLimit(sc.MaxSize + 1024000) // This a hard limit.
  561. be := s.backend()
  562. var (
  563. n int64
  564. err error
  565. res backends.Result
  566. )
  567. fields := []interface{}{
  568. "event", "data",
  569. "id", client.ID,
  570. "queuedID", client.QueuedId,
  571. "messageID", client.MessageID,
  572. "peer", client.RemoteIP,
  573. "serverID", s.serverID,
  574. }
  575. s.log().Fields(fields...).Info("receive DATA")
  576. if be.StreamOn() {
  577. // process the message as a stream
  578. res, n, err = be.ProcessStream(client.smtpReader.DotReader(), client.Envelope)
  579. if err == nil && res.Code() < 300 {
  580. e := s.envelopePool.Borrow(
  581. client.Envelope.RemoteIP,
  582. client.ID,
  583. client.Envelope.ServerID,
  584. )
  585. s.copyEnvelope(client.Envelope, e)
  586. // process in the background then return the envelope
  587. go func() {
  588. be.ProcessBackground(e)
  589. s.envelopePool.Return(e)
  590. }()
  591. }
  592. } else {
  593. // or buffer the entire message (parse headers & mime structure as we go along)
  594. n, err = client.Data.ReadFrom(client.smtpReader)
  595. if n > sc.MaxSize {
  596. err = fmt.Errorf("maximum DATA size exceeded (%d)", sc.MaxSize)
  597. } else {
  598. if p := client.smtpReader.Parts(); p != nil && len(p) > 0 {
  599. client.Envelope.Header = p[0].Headers
  600. }
  601. }
  602. // All done. we can close the smtpReader, the protocol will reset the transaction, expecting a new message
  603. if closeErr := client.smtpReader.Close(); closeErr != nil {
  604. s.log().WithError(closeErr).Error("could not close DATA reader")
  605. }
  606. }
  607. if err != nil {
  608. if err == LineLimitExceeded {
  609. client.sendResponse(r.FailReadLimitExceededDataCmd, " ", LineLimitExceeded.Error())
  610. client.kill()
  611. } else if err == MessageSizeExceeded {
  612. client.sendResponse(r.FailMessageSizeExceeded, " ", MessageSizeExceeded.Error())
  613. client.kill()
  614. } else {
  615. client.sendResponse(r.FailReadErrorDataCmd, " ", err.Error())
  616. client.kill()
  617. }
  618. s.log().Fields(append(fields, "error", err)...).Error("error reading DATA")
  619. client.resetTransaction()
  620. break
  621. }
  622. if !be.StreamOn() {
  623. res = be.Process(client.Envelope)
  624. }
  625. if res.Code() < 300 {
  626. client.messagesSent++
  627. s.log().Fields(append(fields, "length", n)...).Info("received message DATA")
  628. }
  629. client.sendResponse(res)
  630. client.state = ClientCmd
  631. if s.isShuttingDown() {
  632. client.state = ClientShutdown
  633. }
  634. client.resetTransaction()
  635. case ClientStartTLS:
  636. if !client.TLS && sc.TLS.StartTLSOn {
  637. tlsConfig, ok := s.tlsConfigStore.Load().(*tls.Config)
  638. if !ok {
  639. s.mainlog().Fields("iface", s.listenInterface).Error("failed to load *tls.Config")
  640. } else if err := client.upgradeToTLS(tlsConfig); err == nil {
  641. advertiseTLS = ""
  642. client.resetTransaction()
  643. } else {
  644. s.log().Fields("error", err, "iface", s.listenInterface, "ip", client.RemoteIP).
  645. Warn("failed TLS handshake")
  646. // Don't disconnect, let the client decide if it wants to continue
  647. }
  648. }
  649. // change to command state
  650. client.state = ClientCmd
  651. case ClientShutdown:
  652. // shutdown state
  653. client.sendResponse(r.ErrorShutdown)
  654. client.kill()
  655. }
  656. if client.bufErr != nil {
  657. s.log().WithError(client.bufErr).Debug("client could not buffer a response")
  658. return
  659. }
  660. // flush the response buffer
  661. if client.bufout.Buffered() > 0 {
  662. if s.log().IsDebug() {
  663. s.log().Fields("out", client.response.String()).Debug("writing response to client")
  664. }
  665. err := s.flushResponse(client)
  666. if err != nil {
  667. s.log().WithError(err).Debug("error writing response")
  668. return
  669. }
  670. }
  671. }
  672. }
  673. func (s *server) log() log.Logger {
  674. return s.loadLog(&s.logStore)
  675. }
  676. func (s *server) mainlog() log.Logger {
  677. return s.loadLog(&s.mainlogStore)
  678. }
  679. func (s *server) loadLog(value *atomic.Value) log.Logger {
  680. if l, ok := value.Load().(log.Logger); ok {
  681. return l
  682. }
  683. out := log.OutputStderr.String()
  684. level := log.InfoLevel.String()
  685. if value == &s.logStore {
  686. if sc, ok := s.configStore.Load().(ServerConfig); ok && sc.LogFile != "" {
  687. out = sc.LogFile
  688. }
  689. level = s.mainlog().GetLevel()
  690. }
  691. l, err := log.GetLogger(out, level)
  692. if err == nil {
  693. value.Store(l)
  694. }
  695. return l
  696. }
  697. // defaultHost ensures that the host attribute is set, if addressed to Postmaster
  698. func (s *server) defaultHost(a *mail.Address) {
  699. if a.Host == "" && a.IsPostmaster() {
  700. sc := s.configStore.Load().(ServerConfig)
  701. a.Host = sc.Hostname
  702. if !s.allowsHost(a.Host) {
  703. s.log().Fields("hostname", sc.Hostname).
  704. Warn("the hostname is not present in the AllowedHosts config setting")
  705. }
  706. }
  707. }
  708. func (s *server) copyEnvelope(src *mail.Envelope, dest *mail.Envelope) {
  709. dest.TLS = src.TLS
  710. dest.Helo = src.Helo
  711. dest.ESMTP = src.ESMTP
  712. dest.RcptTo = src.RcptTo
  713. dest.MailFrom = src.MailFrom
  714. dest.MessageID = src.MessageID
  715. dest.TransportType = src.TransportType
  716. }