server.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. package guerrilla
  2. import (
  3. "crypto/rand"
  4. "crypto/tls"
  5. "fmt"
  6. "io"
  7. "net"
  8. "strings"
  9. "time"
  10. "runtime"
  11. log "github.com/Sirupsen/logrus"
  12. "sync"
  13. "sync/atomic"
  14. "github.com/flashmob/go-guerrilla/backends"
  15. "github.com/flashmob/go-guerrilla/envelope"
  16. "github.com/flashmob/go-guerrilla/response"
  17. )
  18. const (
  19. CommandVerbMaxLength = 16
  20. CommandLineMaxLength = 1024
  21. // Number of allowed unrecognized commands before we terminate the connection
  22. MaxUnrecognizedCommands = 5
  23. // The maximum total length of a reverse-path or forward-path is 256
  24. RFC2821LimitPath = 256
  25. // The maximum total length of a user name or other local-part is 64
  26. RFC2832LimitLocalPart = 64
  27. //The maximum total length of a domain name or number is 255
  28. RFC2821LimitDomain = 255
  29. // The minimum total number of recipients that must be buffered is 100
  30. RFC2821LimitRecipients = 100
  31. )
  32. const (
  33. // server has just been created
  34. ServerStateNew = iota
  35. // Server has just been stopped
  36. ServerStateStopped
  37. // Server has been started and is running
  38. ServerStateRunning
  39. // Server could not start due to an error
  40. ServerStateStartError
  41. )
  42. // Server listens for SMTP clients on the port specified in its config
  43. type server struct {
  44. configStore atomic.Value // stores guerrilla.ServerConfig
  45. backend backends.Backend
  46. tlsConfig *tls.Config
  47. tlsConfigStore atomic.Value
  48. timeout atomic.Value // stores time.Duration
  49. listenInterface string
  50. clientPool *Pool
  51. wg sync.WaitGroup // for waiting to shutdown
  52. listener net.Listener
  53. closedListener chan (bool)
  54. hosts allowedHosts // stores map[string]bool for faster lookup
  55. state int
  56. }
  57. type allowedHosts struct {
  58. table map[string]bool // host lookup table
  59. m sync.Mutex // guard access to the map
  60. }
  61. // Creates and returns a new ready-to-run Server from a configuration
  62. func newServer(sc *ServerConfig, b backends.Backend) (*server, error) {
  63. server := &server{
  64. backend: b,
  65. clientPool: NewPool(sc.MaxClients),
  66. closedListener: make(chan (bool), 1),
  67. listenInterface: sc.ListenInterface,
  68. state: ServerStateNew,
  69. }
  70. server.setConfig(sc)
  71. server.setTimeout(sc.Timeout)
  72. if err := server.configureSSL(); err != nil {
  73. return server, err
  74. }
  75. return server, nil
  76. }
  77. func (s *server) configureSSL() error {
  78. sConfig := s.configStore.Load().(ServerConfig)
  79. if sConfig.TLSAlwaysOn || sConfig.StartTLSOn {
  80. cert, err := tls.LoadX509KeyPair(sConfig.PublicKeyFile, sConfig.PrivateKeyFile)
  81. if err != nil {
  82. return fmt.Errorf("error while loading the certificate: %s", err)
  83. }
  84. tlsConfig := &tls.Config{
  85. Certificates: []tls.Certificate{cert},
  86. ClientAuth: tls.VerifyClientCertIfGiven,
  87. ServerName: sConfig.Hostname,
  88. }
  89. tlsConfig.Rand = rand.Reader
  90. s.tlsConfigStore.Store(tlsConfig)
  91. }
  92. return nil
  93. }
  94. // Set the timeout for the server and all clients
  95. func (server *server) setTimeout(seconds int) {
  96. duration := time.Duration(int64(seconds))
  97. server.clientPool.SetTimeout(duration)
  98. server.timeout.Store(duration)
  99. }
  100. // goroutine safe config store
  101. func (server *server) setConfig(sc *ServerConfig) {
  102. server.configStore.Store(*sc)
  103. }
  104. // goroutine safe
  105. func (server *server) isEnabled() bool {
  106. sc := server.configStore.Load().(ServerConfig)
  107. return sc.IsEnabled
  108. }
  109. // Set the allowed hosts for the server
  110. func (server *server) setAllowedHosts(allowedHosts []string) {
  111. defer server.hosts.m.Unlock()
  112. server.hosts.m.Lock()
  113. server.hosts.table = make(map[string]bool, len(allowedHosts))
  114. for _, h := range allowedHosts {
  115. server.hosts.table[strings.ToLower(h)] = true
  116. }
  117. }
  118. // Begin accepting SMTP clients. Will block unless there is an error or server.Shutdown() is called
  119. func (server *server) Start(startWG *sync.WaitGroup) error {
  120. var clientID uint64
  121. clientID = 0
  122. listener, err := net.Listen("tcp", server.listenInterface)
  123. server.listener = listener
  124. if err != nil {
  125. startWG.Done() // don't wait for me
  126. server.state = ServerStateStartError
  127. return fmt.Errorf("[%s] Cannot listen on port: %s ", server.listenInterface, err.Error())
  128. }
  129. log.Infof("Listening on TCP %s", server.listenInterface)
  130. server.state = ServerStateRunning
  131. startWG.Done() // start successful, don't wait for me
  132. for {
  133. log.Debugf("[%s] Waiting for a new client. Next Client ID: %d", server.listenInterface, clientID+1)
  134. conn, err := listener.Accept()
  135. clientID++
  136. if err != nil {
  137. if e, ok := err.(net.Error); ok && !e.Temporary() {
  138. log.Infof("Server [%s] has stopped accepting new clients", server.listenInterface)
  139. // the listener has been closed, wait for clients to exit
  140. log.Infof("shutting down pool [%s]", server.listenInterface)
  141. server.clientPool.ShutdownState()
  142. server.clientPool.ShutdownWait()
  143. server.state = ServerStateStopped
  144. server.closedListener <- true
  145. return nil
  146. }
  147. log.WithError(err).Info("Temporary error accepting client")
  148. continue
  149. }
  150. go func(p Poolable, borrow_err error) {
  151. c := p.(*client)
  152. if borrow_err == nil {
  153. server.handleClient(c)
  154. server.clientPool.Return(c)
  155. } else {
  156. log.WithError(borrow_err).Info("couldn't borrow a new client")
  157. // we could not get a client, so close the connection.
  158. conn.Close()
  159. }
  160. // intentionally placed Borrow in args so that it's called in the
  161. // same main goroutine.
  162. }(server.clientPool.Borrow(conn, clientID))
  163. }
  164. }
  165. func (server *server) Shutdown() {
  166. if server.listener != nil {
  167. // This will cause Start function to return, by causing an error on listener.Accept
  168. server.listener.Close()
  169. // wait for the listener to listener.Accept
  170. <-server.closedListener
  171. // At this point Start will exit and close down the pool
  172. } else {
  173. server.clientPool.ShutdownState()
  174. // listener already closed, wait for clients to exit
  175. server.clientPool.ShutdownWait()
  176. server.state = ServerStateStopped
  177. }
  178. }
  179. func (server *server) GetActiveClientsCount() int {
  180. return server.clientPool.GetActiveClientsCount()
  181. }
  182. // Verifies that the host is a valid recipient.
  183. func (server *server) allowsHost(host string) bool {
  184. defer server.hosts.m.Unlock()
  185. server.hosts.m.Lock()
  186. if _, ok := server.hosts.table[strings.ToLower(host)]; ok {
  187. return true
  188. }
  189. return false
  190. }
  191. // Reads from the client until a terminating sequence is encountered,
  192. // or until a timeout occurs.
  193. func (server *server) readCommand(client *client, maxSize int64) (string, error) {
  194. var input, reply string
  195. var err error
  196. // In command state, stop reading at line breaks
  197. suffix := "\r\n"
  198. for {
  199. client.setTimeout(server.timeout.Load().(time.Duration))
  200. reply, err = client.bufin.ReadString('\n')
  201. input = input + reply
  202. if err != nil {
  203. break
  204. }
  205. if strings.HasSuffix(input, suffix) {
  206. // discard the suffix and stop reading
  207. input = input[0 : len(input)-len(suffix)]
  208. break
  209. }
  210. }
  211. return input, err
  212. }
  213. // Writes a response to the client.
  214. func (server *server) writeResponse(client *client) error {
  215. client.setTimeout(server.timeout.Load().(time.Duration))
  216. size, err := client.bufout.WriteString(client.response)
  217. if err != nil {
  218. return err
  219. }
  220. err = client.bufout.Flush()
  221. if err != nil {
  222. return err
  223. }
  224. client.response = client.response[size:]
  225. return nil
  226. }
  227. func (server *server) isShuttingDown() bool {
  228. return server.clientPool.IsShuttingDown()
  229. }
  230. // Handles an entire client SMTP exchange
  231. func (server *server) handleClient(client *client) {
  232. defer client.closeConn()
  233. sc := server.configStore.Load().(ServerConfig)
  234. log.Infof("Handle client [%s], id: %d", client.RemoteAddress, client.ID)
  235. // Initial greeting
  236. greeting := fmt.Sprintf("220 %s SMTP Guerrilla(%s) #%d (%d) %s gr:%d",
  237. sc.Hostname, Version, client.ID,
  238. server.clientPool.GetActiveClientsCount(), time.Now().Format(time.RFC3339), runtime.NumGoroutine())
  239. helo := fmt.Sprintf("250 %s Hello", sc.Hostname)
  240. // ehlo is a multi-line reply and need additional \r\n at the end
  241. ehlo := fmt.Sprintf("250-%s Hello\r\n", sc.Hostname)
  242. // Extended feature advertisements
  243. messageSize := fmt.Sprintf("250-SIZE %d\r\n", sc.MaxSize)
  244. pipelining := "250-PIPELINING\r\n"
  245. advertiseTLS := "250-STARTTLS\r\n"
  246. advertiseEnhancedStatusCodes := "250-ENHANCEDSTATUSCODES\r\n"
  247. // the last line doesn't need \r\n since string will be printed as a new line
  248. help := "250 HELP"
  249. if sc.TLSAlwaysOn {
  250. tlsConfig, ok := server.tlsConfigStore.Load().(*tls.Config)
  251. if ok && client.upgradeToTLS(tlsConfig) {
  252. advertiseTLS = ""
  253. } else {
  254. // server requires TLS, but can't handshake
  255. client.kill()
  256. }
  257. }
  258. if !sc.StartTLSOn {
  259. // STARTTLS turned off, don't advertise it
  260. advertiseTLS = ""
  261. }
  262. for client.isAlive() {
  263. switch client.state {
  264. case ClientGreeting:
  265. client.responseAdd(greeting)
  266. client.state = ClientCmd
  267. case ClientCmd:
  268. client.bufin.setLimit(CommandLineMaxLength)
  269. input, err := server.readCommand(client, sc.MaxSize)
  270. log.Debugf("Client sent: %s", input)
  271. if err == io.EOF {
  272. log.WithError(err).Warnf("Client closed the connection: %s", client.RemoteAddress)
  273. return
  274. } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  275. log.WithError(err).Warnf("Timeout: %s", client.RemoteAddress)
  276. return
  277. } else if err == LineLimitExceeded {
  278. resp := &response.Response{
  279. EnhancedCode: response.InvalidCommand,
  280. BasicCode: 554,
  281. Class: response.ClassPermanentFailure,
  282. Comment: "Line too long.",
  283. }
  284. client.responseAdd(resp.String())
  285. client.kill()
  286. break
  287. } else if err != nil {
  288. log.WithError(err).Warnf("Read error: %s", client.RemoteAddress)
  289. client.kill()
  290. break
  291. }
  292. if server.isShuttingDown() {
  293. client.state = ClientShutdown
  294. continue
  295. }
  296. input = strings.Trim(input, " \r\n")
  297. cmdLen := len(input)
  298. if cmdLen > CommandVerbMaxLength {
  299. cmdLen = CommandVerbMaxLength
  300. }
  301. cmd := strings.ToUpper(input[:cmdLen])
  302. switch {
  303. case strings.Index(cmd, "HELO") == 0:
  304. client.Helo = strings.Trim(input[4:], " ")
  305. client.resetTransaction()
  306. client.responseAdd(helo)
  307. case strings.Index(cmd, "EHLO") == 0:
  308. client.Helo = strings.Trim(input[4:], " ")
  309. client.resetTransaction()
  310. client.responseAdd(ehlo + messageSize + pipelining + advertiseTLS + advertiseEnhancedStatusCodes + help)
  311. case strings.Index(cmd, "HELP") == 0:
  312. client.responseAdd("214 OK\r\n" + messageSize + pipelining + advertiseTLS + help)
  313. case strings.Index(cmd, "MAIL FROM:") == 0:
  314. if client.isInTransaction() {
  315. resp := &response.Response{
  316. EnhancedCode: response.InvalidCommand,
  317. BasicCode: 503,
  318. Class: response.ClassPermanentFailure,
  319. Comment: "Error: nested MAIL command",
  320. }
  321. client.responseAdd(resp.String())
  322. break
  323. }
  324. // Fix for issue #53 - MAIL FROM may only be <> if it is a bounce
  325. mail := input[10:]
  326. from := &envelope.EmailAddress{}
  327. if !(strings.Index(mail, "<>") == 0) &&
  328. !(strings.Index(mail, " <>") == 0) {
  329. // Not Bounce, extract mail.
  330. from, err = extractEmail(mail)
  331. }
  332. if err != nil {
  333. client.responseAdd(err.Error())
  334. } else {
  335. client.MailFrom = from
  336. resp := &response.Response{
  337. EnhancedCode: response.OtherAddressStatus,
  338. Class: response.ClassSuccess,
  339. }
  340. client.responseAdd(resp.String())
  341. }
  342. case strings.Index(cmd, "RCPT TO:") == 0:
  343. if len(client.RcptTo) > RFC2821LimitRecipients {
  344. resp := &response.Response{
  345. EnhancedCode: response.TooManyRecipients,
  346. BasicCode: 452,
  347. Class: response.ClassTransientFailure,
  348. Comment: "Too many recipients",
  349. }
  350. client.responseAdd(resp.String())
  351. break
  352. }
  353. to, err := extractEmail(input[8:])
  354. if err != nil {
  355. client.responseAdd(err.Error())
  356. } else {
  357. if !server.allowsHost(to.Host) {
  358. resp := &response.Response{
  359. EnhancedCode: response.BadDestinationMailboxAddress,
  360. BasicCode: 454,
  361. Class: response.ClassTransientFailure,
  362. Comment: "Error: Relay access denied: " + to.Host,
  363. }
  364. client.responseAdd(resp.String())
  365. } else {
  366. client.RcptTo = append(client.RcptTo, *to)
  367. resp := &response.Response{
  368. EnhancedCode: response.DestinationMailboxAddressValid,
  369. Class: response.ClassSuccess,
  370. }
  371. client.responseAdd(resp.String())
  372. }
  373. }
  374. case strings.Index(cmd, "RSET") == 0:
  375. client.resetTransaction()
  376. resp := &response.Response{
  377. EnhancedCode: response.OtherAddressStatus,
  378. Class: response.ClassSuccess,
  379. }
  380. client.responseAdd(resp.String())
  381. case strings.Index(cmd, "VRFY") == 0:
  382. resp := &response.Response{
  383. EnhancedCode: response.OtherOrUndefinedProtocolStatus,
  384. BasicCode: 252,
  385. Class: response.ClassSuccess,
  386. Comment: "Cannot verify user",
  387. }
  388. client.responseAdd(resp.String())
  389. case strings.Index(cmd, "NOOP") == 0:
  390. resp := &response.Response{
  391. EnhancedCode: response.OtherStatus,
  392. Class: response.ClassSuccess,
  393. }
  394. client.responseAdd(resp.String())
  395. case strings.Index(cmd, "QUIT") == 0:
  396. resp := &response.Response{
  397. EnhancedCode: response.OtherStatus,
  398. BasicCode: 221,
  399. Class: response.ClassSuccess,
  400. Comment: "Bye",
  401. }
  402. client.responseAdd(resp.String())
  403. client.kill()
  404. case strings.Index(cmd, "DATA") == 0:
  405. if client.MailFrom.IsEmpty() {
  406. resp := &response.Response{
  407. EnhancedCode: response.InvalidCommand,
  408. BasicCode: 503,
  409. Class: response.ClassPermanentFailure,
  410. Comment: "Error: No sender",
  411. }
  412. client.responseAdd(resp.String())
  413. break
  414. }
  415. if len(client.RcptTo) == 0 {
  416. resp := &response.Response{
  417. EnhancedCode: response.InvalidCommand,
  418. BasicCode: 503,
  419. Class: response.ClassPermanentFailure,
  420. Comment: "Error: No recipients",
  421. }
  422. client.responseAdd(resp.String())
  423. break
  424. }
  425. client.responseAdd("354 Enter message, ending with '.' on a line by itself")
  426. client.state = ClientData
  427. case sc.StartTLSOn && strings.Index(cmd, "STARTTLS") == 0:
  428. resp := &response.Response{
  429. EnhancedCode: response.OtherStatus,
  430. BasicCode: 220,
  431. Class: response.ClassSuccess,
  432. Comment: "Ready to start TLS",
  433. }
  434. client.responseAdd(resp.String())
  435. client.state = ClientStartTLS
  436. default:
  437. resp := &response.Response{
  438. EnhancedCode: response.InvalidCommand,
  439. BasicCode: 554,
  440. Class: response.ClassPermanentFailure,
  441. Comment: fmt.Sprintf("Unrecognized command"),
  442. }
  443. client.responseAdd(resp.String())
  444. client.errors++
  445. if client.errors > MaxUnrecognizedCommands {
  446. resp := &response.Response{
  447. EnhancedCode: response.InvalidCommand,
  448. BasicCode: 554,
  449. Class: response.ClassPermanentFailure,
  450. Comment: "Too many unrecognized commands",
  451. }
  452. client.responseAdd(resp.String())
  453. client.kill()
  454. }
  455. }
  456. case ClientData:
  457. // intentionally placed the limit 1MB above so that reading does not return with an error
  458. // if the client goes a little over. Anything above will err
  459. client.bufin.setLimit(int64(sc.MaxSize) + 1024000) // This a hard limit.
  460. n, err := client.Data.ReadFrom(client.smtpReader.DotReader())
  461. if n > sc.MaxSize {
  462. err = fmt.Errorf("Maximum DATA size exceeded (%d)", sc.MaxSize)
  463. }
  464. if err != nil {
  465. if err == LineLimitExceeded {
  466. resp := &response.Response{
  467. EnhancedCode: response.SyntaxError,
  468. BasicCode: 550,
  469. Class: response.ClassPermanentFailure,
  470. Comment: "Error: " + LineLimitExceeded.Error(),
  471. }
  472. client.responseAdd(resp.String())
  473. client.kill()
  474. } else if err == MessageSizeExceeded {
  475. resp := &response.Response{
  476. EnhancedCode: response.SyntaxError,
  477. BasicCode: 550,
  478. Class: response.ClassPermanentFailure,
  479. Comment: "Error: " + MessageSizeExceeded.Error(),
  480. }
  481. client.responseAdd(resp.String())
  482. client.kill()
  483. } else {
  484. resp := &response.Response{
  485. EnhancedCode: response.OtherOrUndefinedMailSystemStatus,
  486. BasicCode: 451,
  487. Class: response.ClassTransientFailure,
  488. Comment: "Error: " + err.Error(),
  489. }
  490. client.responseAdd(resp.String())
  491. client.kill()
  492. }
  493. log.WithError(err).Warn("Error reading data")
  494. break
  495. }
  496. res := server.backend.Process(client.Envelope)
  497. if res.Code() < 300 {
  498. client.messagesSent++
  499. }
  500. client.responseAdd(res.String())
  501. client.state = ClientCmd
  502. if server.isShuttingDown() {
  503. client.state = ClientShutdown
  504. }
  505. client.resetTransaction()
  506. case ClientStartTLS:
  507. if !client.TLS && sc.StartTLSOn {
  508. tlsConfig, ok := server.tlsConfigStore.Load().(*tls.Config)
  509. if ok && client.upgradeToTLS(tlsConfig) {
  510. advertiseTLS = ""
  511. client.resetTransaction()
  512. }
  513. }
  514. // change to command state
  515. client.state = ClientCmd
  516. case ClientShutdown:
  517. // shutdown state
  518. resp := &response.Response{
  519. EnhancedCode: response.OtherOrUndefinedMailSystemStatus,
  520. BasicCode: 421,
  521. Class: response.ClassTransientFailure,
  522. Comment: "Server is shutting down. Please try again later. Sayonara!",
  523. }
  524. client.responseAdd(resp.String())
  525. client.kill()
  526. }
  527. if len(client.response) > 0 {
  528. log.Debugf("Writing response to client: \n%s", client.response)
  529. err := server.writeResponse(client)
  530. if err != nil {
  531. log.WithError(err).Debug("Error writing response")
  532. return
  533. }
  534. }
  535. }
  536. }