clair/vendor/github.com/tylerb/graceful/graceful.go

454 lines
12 KiB
Go
Raw Normal View History

2015-11-13 19:11:28 +00:00
package graceful
import (
"crypto/tls"
"log"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"golang.org/x/net/netutil"
)
// Server wraps an http.Server with graceful connection handling.
// It may be used directly in the same way as http.Server, or may
// be constructed with the global functions in this package.
//
// Example:
// srv := &graceful.Server{
// Timeout: 5 * time.Second,
// Server: &http.Server{Addr: ":1234", Handler: handler},
// }
// srv.ListenAndServe()
type Server struct {
*http.Server
// Timeout is the duration to allow outstanding requests to survive
// before forcefully terminating them.
Timeout time.Duration
// Limit the number of outstanding requests
ListenLimit int
2016-06-07 08:08:50 +00:00
// TCPKeepAlive sets the TCP keep-alive timeouts on accepted
// connections. It prunes dead TCP connections ( e.g. closing
// laptop mid-download)
TCPKeepAlive time.Duration
2015-11-13 19:11:28 +00:00
// ConnState specifies an optional callback function that is
// called when a client connection changes state. This is a proxy
// to the underlying http.Server's ConnState, and the original
// must not be set directly.
ConnState func(net.Conn, http.ConnState)
// BeforeShutdown is an optional callback function that is called
// before the listener is closed.
BeforeShutdown func()
// ShutdownInitiated is an optional callback function that is called
// when shutdown is initiated. It can be used to notify the client
// side of long lived connections (e.g. websockets) to reconnect.
ShutdownInitiated func()
// NoSignalHandling prevents graceful from automatically shutting down
// on SIGINT and SIGTERM. If set to true, you must shut down the server
// manually with Stop().
NoSignalHandling bool
2016-06-07 08:08:50 +00:00
// Logger used to notify of errors on startup and on stop.
Logger *log.Logger
// Interrupted is true if the server is handling a SIGINT or SIGTERM
// signal and is thus shutting down.
Interrupted bool
2015-11-13 19:11:28 +00:00
// interrupt signals the listener to stop serving connections,
// and the server to shut down.
interrupt chan os.Signal
// stopLock is used to protect against concurrent calls to Stop
stopLock sync.Mutex
// stopChan is the channel on which callers may block while waiting for
// the server to stop.
stopChan chan struct{}
// chanLock is used to protect access to the various channel constructors.
chanLock sync.RWMutex
// connections holds all connections managed by graceful
connections map[net.Conn]struct{}
2016-06-07 08:08:50 +00:00
// idleConnections holds all idle connections managed by graceful
idleConnections map[net.Conn]struct{}
2015-11-13 19:11:28 +00:00
}
// Run serves the http.Handler with graceful shutdown enabled.
//
// timeout is the duration to wait until killing active requests and stopping the server.
// If timeout is 0, the server never times out. It waits for all active requests to finish.
func Run(addr string, timeout time.Duration, n http.Handler) {
srv := &Server{
2016-06-07 08:08:50 +00:00
Timeout: timeout,
TCPKeepAlive: 3 * time.Minute,
Server: &http.Server{Addr: addr, Handler: n},
Logger: DefaultLogger(),
2015-11-13 19:11:28 +00:00
}
if err := srv.ListenAndServe(); err != nil {
if opErr, ok := err.(*net.OpError); !ok || (ok && opErr.Op != "accept") {
2016-06-07 08:08:50 +00:00
srv.Logger.Fatal(err)
2015-11-13 19:11:28 +00:00
}
}
}
2016-06-07 08:08:50 +00:00
// RunWithErr is an alternative version of Run function which can return error.
//
// Unlike Run this version will not exit the program if an error is encountered but will
// return it instead.
func RunWithErr(addr string, timeout time.Duration, n http.Handler) error {
srv := &Server{
Timeout: timeout,
TCPKeepAlive: 3 * time.Minute,
Server: &http.Server{Addr: addr, Handler: n},
Logger: DefaultLogger(),
}
return srv.ListenAndServe()
}
2015-11-13 19:11:28 +00:00
// ListenAndServe is equivalent to http.Server.ListenAndServe with graceful shutdown enabled.
//
// timeout is the duration to wait until killing active requests and stopping the server.
// If timeout is 0, the server never times out. It waits for all active requests to finish.
func ListenAndServe(server *http.Server, timeout time.Duration) error {
2016-06-07 08:08:50 +00:00
srv := &Server{Timeout: timeout, Server: server, Logger: DefaultLogger()}
2015-11-13 19:11:28 +00:00
return srv.ListenAndServe()
}
// ListenAndServe is equivalent to http.Server.ListenAndServe with graceful shutdown enabled.
func (srv *Server) ListenAndServe() error {
// Create the listener so we can control their lifetime
addr := srv.Addr
if addr == "" {
addr = ":http"
}
l, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(l)
}
// ListenAndServeTLS is equivalent to http.Server.ListenAndServeTLS with graceful shutdown enabled.
//
// timeout is the duration to wait until killing active requests and stopping the server.
// If timeout is 0, the server never times out. It waits for all active requests to finish.
func ListenAndServeTLS(server *http.Server, certFile, keyFile string, timeout time.Duration) error {
2016-06-07 08:08:50 +00:00
srv := &Server{Timeout: timeout, Server: server, Logger: DefaultLogger()}
2015-11-13 19:11:28 +00:00
return srv.ListenAndServeTLS(certFile, keyFile)
}
// ListenTLS is a convenience method that creates an https listener using the
// provided cert and key files. Use this method if you need access to the
// listener object directly. When ready, pass it to the Serve method.
func (srv *Server) ListenTLS(certFile, keyFile string) (net.Listener, error) {
// Create the listener ourselves so we can control its lifetime
addr := srv.Addr
if addr == "" {
addr = ":https"
}
config := &tls.Config{}
if srv.TLSConfig != nil {
*config = *srv.TLSConfig
}
var err error
config.Certificates = make([]tls.Certificate, 1)
config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, err
}
conn, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
2016-06-07 08:08:50 +00:00
srv.TLSConfig = config
2015-11-13 19:11:28 +00:00
tlsListener := tls.NewListener(conn, config)
return tlsListener, nil
}
// ListenAndServeTLS is equivalent to http.Server.ListenAndServeTLS with graceful shutdown enabled.
func (srv *Server) ListenAndServeTLS(certFile, keyFile string) error {
l, err := srv.ListenTLS(certFile, keyFile)
if err != nil {
return err
}
return srv.Serve(l)
}
// ListenAndServeTLSConfig can be used with an existing TLS config and is equivalent to
// http.Server.ListenAndServeTLS with graceful shutdown enabled,
func (srv *Server) ListenAndServeTLSConfig(config *tls.Config) error {
addr := srv.Addr
if addr == "" {
addr = ":https"
}
conn, err := net.Listen("tcp", addr)
if err != nil {
return err
}
2016-06-07 08:08:50 +00:00
srv.TLSConfig = config
2015-11-13 19:11:28 +00:00
tlsListener := tls.NewListener(conn, config)
return srv.Serve(tlsListener)
}
// Serve is equivalent to http.Server.Serve with graceful shutdown enabled.
//
// timeout is the duration to wait until killing active requests and stopping the server.
// If timeout is 0, the server never times out. It waits for all active requests to finish.
func Serve(server *http.Server, l net.Listener, timeout time.Duration) error {
2016-06-07 08:08:50 +00:00
srv := &Server{Timeout: timeout, Server: server, Logger: DefaultLogger()}
2015-11-13 19:11:28 +00:00
return srv.Serve(l)
}
// Serve is equivalent to http.Server.Serve with graceful shutdown enabled.
func (srv *Server) Serve(listener net.Listener) error {
if srv.ListenLimit != 0 {
listener = netutil.LimitListener(listener, srv.ListenLimit)
}
2016-06-07 08:08:50 +00:00
if srv.TCPKeepAlive != 0 {
listener = tcpKeepAliveListener{listener.(*net.TCPListener), srv.TCPKeepAlive}
}
// Make our stopchan
srv.StopChan()
2015-11-13 19:11:28 +00:00
// Track connection state
add := make(chan net.Conn)
2016-06-07 08:08:50 +00:00
idle := make(chan net.Conn)
2015-11-13 19:11:28 +00:00
remove := make(chan net.Conn)
srv.Server.ConnState = func(conn net.Conn, state http.ConnState) {
switch state {
case http.StateNew:
add <- conn
2016-06-07 08:08:50 +00:00
case http.StateIdle:
idle <- conn
2015-11-13 19:11:28 +00:00
case http.StateClosed, http.StateHijacked:
remove <- conn
}
if srv.ConnState != nil {
srv.ConnState(conn, state)
}
}
// Manage open connections
shutdown := make(chan chan struct{})
kill := make(chan struct{})
2016-06-07 08:08:50 +00:00
go srv.manageConnections(add, idle, remove, shutdown, kill)
2015-11-13 19:11:28 +00:00
interrupt := srv.interruptChan()
// Set up the interrupt handler
if !srv.NoSignalHandling {
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
}
2016-06-07 08:08:50 +00:00
quitting := make(chan struct{})
go srv.handleInterrupt(interrupt, quitting, listener)
2015-11-13 19:11:28 +00:00
// Serve with graceful listener.
// Execution blocks here until listener.Close() is called, above.
err := srv.Server.Serve(listener)
2016-06-07 08:08:50 +00:00
if err != nil {
// If the underlying listening is closed, Serve returns an error
// complaining about listening on a closed socket. This is expected, so
// let's ignore the error if we are the ones who explicitly closed the
// socket.
select {
case <-quitting:
err = nil
default:
}
}
2015-11-13 19:11:28 +00:00
srv.shutdown(shutdown, kill)
return err
}
// Stop instructs the type to halt operations and close
// the stop channel when it is finished.
//
// timeout is grace period for which to wait before shutting
// down the server. The timeout value passed here will override the
// timeout given when constructing the server, as this is an explicit
// command to stop the server.
func (srv *Server) Stop(timeout time.Duration) {
srv.stopLock.Lock()
2016-06-07 08:08:50 +00:00
defer srv.stopLock.Unlock()
2015-11-13 19:11:28 +00:00
srv.Timeout = timeout
interrupt := srv.interruptChan()
interrupt <- syscall.SIGINT
}
// StopChan gets the stop channel which will block until
// stopping has completed, at which point it is closed.
// Callers should never close the stop channel.
func (srv *Server) StopChan() <-chan struct{} {
srv.chanLock.Lock()
2016-06-07 08:08:50 +00:00
defer srv.chanLock.Unlock()
2015-11-13 19:11:28 +00:00
if srv.stopChan == nil {
srv.stopChan = make(chan struct{})
}
return srv.stopChan
}
2016-06-07 08:08:50 +00:00
// DefaultLogger returns the logger used by Run, RunWithErr, ListenAndServe, ListenAndServeTLS and Serve.
// The logger outputs to STDERR by default.
func DefaultLogger() *log.Logger {
return log.New(os.Stderr, "[graceful] ", 0)
}
func (srv *Server) manageConnections(add, idle, remove chan net.Conn, shutdown chan chan struct{}, kill chan struct{}) {
2015-11-13 19:11:28 +00:00
var done chan struct{}
srv.connections = map[net.Conn]struct{}{}
2016-06-07 08:08:50 +00:00
srv.idleConnections = map[net.Conn]struct{}{}
2015-11-13 19:11:28 +00:00
for {
select {
case conn := <-add:
srv.connections[conn] = struct{}{}
2016-06-07 08:08:50 +00:00
case conn := <-idle:
srv.idleConnections[conn] = struct{}{}
2015-11-13 19:11:28 +00:00
case conn := <-remove:
delete(srv.connections, conn)
2016-06-07 08:08:50 +00:00
delete(srv.idleConnections, conn)
2015-11-13 19:11:28 +00:00
if done != nil && len(srv.connections) == 0 {
done <- struct{}{}
return
}
case done = <-shutdown:
2016-06-07 08:08:50 +00:00
if len(srv.connections) == 0 && len(srv.idleConnections) == 0 {
2015-11-13 19:11:28 +00:00
done <- struct{}{}
return
}
2016-06-07 08:08:50 +00:00
// a shutdown request has been received. if we have open idle
// connections, we must close all of them now. this prevents idle
// connections from holding the server open while waiting for them to
// hit their idle timeout.
for k := range srv.idleConnections {
if err := k.Close(); err != nil {
srv.log("[ERROR] %s", err)
}
}
2015-11-13 19:11:28 +00:00
case <-kill:
2016-06-07 08:08:50 +00:00
srv.Server.ConnState = nil
2015-11-13 19:11:28 +00:00
for k := range srv.connections {
2016-06-07 08:08:50 +00:00
if err := k.Close(); err != nil {
srv.log("[ERROR] %s", err)
}
2015-11-13 19:11:28 +00:00
}
return
}
}
}
func (srv *Server) interruptChan() chan os.Signal {
srv.chanLock.Lock()
2016-06-07 08:08:50 +00:00
defer srv.chanLock.Unlock()
2015-11-13 19:11:28 +00:00
if srv.interrupt == nil {
srv.interrupt = make(chan os.Signal, 1)
}
return srv.interrupt
}
2016-06-07 08:08:50 +00:00
func (srv *Server) handleInterrupt(interrupt chan os.Signal, quitting chan struct{}, listener net.Listener) {
for _ = range interrupt {
if srv.Interrupted {
srv.log("already shutting down")
continue
}
srv.log("shutdown initiated")
srv.Interrupted = true
if srv.BeforeShutdown != nil {
srv.BeforeShutdown()
}
2015-11-13 19:11:28 +00:00
2016-06-07 08:08:50 +00:00
close(quitting)
srv.SetKeepAlivesEnabled(false)
if err := listener.Close(); err != nil {
srv.log("[ERROR] %s", err)
}
2015-11-13 19:11:28 +00:00
2016-06-07 08:08:50 +00:00
if srv.ShutdownInitiated != nil {
srv.ShutdownInitiated()
}
2015-11-13 19:11:28 +00:00
}
2016-06-07 08:08:50 +00:00
}
2015-11-13 19:11:28 +00:00
2016-06-07 08:08:50 +00:00
func (srv *Server) log(fmt string, v ...interface{}) {
if srv.Logger != nil {
srv.Logger.Printf(fmt, v...)
}
2015-11-13 19:11:28 +00:00
}
func (srv *Server) shutdown(shutdown chan chan struct{}, kill chan struct{}) {
// Request done notification
done := make(chan struct{})
shutdown <- done
if srv.Timeout > 0 {
select {
case <-done:
case <-time.After(srv.Timeout):
close(kill)
}
} else {
<-done
}
// Close the stopChan to wake up any blocked goroutines.
srv.chanLock.Lock()
if srv.stopChan != nil {
close(srv.stopChan)
}
srv.chanLock.Unlock()
}
2016-06-07 08:08:50 +00:00
// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
// connections. It's used by ListenAndServe and ListenAndServeTLS so
// dead TCP connections (e.g. closing laptop mid-download) eventually
// go away.
type tcpKeepAliveListener struct {
*net.TCPListener
keepAlivePeriod time.Duration
}
func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
tc, err := ln.AcceptTCP()
if err != nil {
return
}
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(ln.keepAlivePeriod)
return tc, nil
}