Merge pull request #617 from jzelinskie/grpc-refactor

Introduce pkg/grpcutil
This commit is contained in:
Jimmy Zelinskie 2018-09-11 14:57:13 -04:00 committed by GitHub
commit 089a4e0f0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 314 additions and 195 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2017 clair authors // Copyright 2018 clair authors
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -16,9 +16,6 @@ package api
import ( import (
"context" "context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net/http" "net/http"
"time" "time"
@ -40,14 +37,10 @@ type Config struct {
} }
func Run(cfg *Config, store database.Datastore) { func Run(cfg *Config, store database.Datastore) {
tlsConfig, err := tlsClientConfig(cfg.CAFile) err := v3.ListenAndServe(cfg.Addr, cfg.CertFile, cfg.KeyFile, cfg.CAFile, store)
if err != nil { if err != nil {
log.WithError(err).Fatal("could not initialize client cert authentication") log.WithError(err).Fatal("could not initialize gRPC server")
} }
if tlsConfig != nil {
log.Info("main API configured with client certificate authentication")
}
v3.Run(cfg.Addr, tlsConfig, cfg.CertFile, cfg.KeyFile, store)
} }
func RunHealth(cfg *Config, store database.Datastore, st *stopper.Stopper) { func RunHealth(cfg *Config, store database.Datastore, st *stopper.Stopper) {
@ -77,30 +70,3 @@ func RunHealth(cfg *Config, store database.Datastore, st *stopper.Stopper) {
log.Info("health API stopped") log.Info("health API stopped")
} }
// tlsClientConfig initializes a *tls.Config using the given CA. The resulting
// *tls.Config is meant to be used to configure an HTTP server to do client
// certificate authentication.
//
// If no CA is given, a nil *tls.Config is returned; no client certificate will
// be required and verified. In other words, authentication will be disabled.
func tlsClientConfig(caPath string) (*tls.Config, error) {
if caPath == "" {
return nil, nil
}
caCert, err := ioutil.ReadFile(caPath)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
}
return tlsConfig, nil
}

View File

@ -1,4 +1,4 @@
// Copyright 2017 clair authors // Copyright 2018 clair authors
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -15,35 +15,19 @@
package v3 package v3
import ( import (
"context"
"crypto/tls"
"net"
"net/http" "net/http"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/cockroachdb/cmux"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
pb "github.com/coreos/clair/api/v3/clairpb" pb "github.com/coreos/clair/api/v3/clairpb"
"github.com/coreos/clair/database" "github.com/coreos/clair/database"
"github.com/coreos/clair/pkg/grpcutil"
) )
// handleShutdown handles the server shut down error.
func handleShutdown(err error) {
if err != nil {
if opErr, ok := err.(*net.OpError); !ok || (ok && opErr.Op != "accept") {
log.Fatal(err)
}
}
}
var ( var (
promResponseDurationMilliseconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ promResponseDurationMilliseconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "clair_v3_api_response_duration_milliseconds", Name: "clair_v3_api_response_duration_milliseconds",
@ -56,21 +40,11 @@ func init() {
prometheus.MustRegister(promResponseDurationMilliseconds) prometheus.MustRegister(promResponseDurationMilliseconds)
} }
func newGrpcServer(store database.Datastore, tlsConfig *tls.Config) *grpc.Server { func prometheusHandler(h http.Handler) http.Handler {
grpcOpts := []grpc.ServerOption{ mux := http.NewServeMux()
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), mux.Handle("/", h)
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), mux.Handle("/metrics", prometheus.Handler())
} return mux
if tlsConfig != nil {
grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
}
grpcServer := grpc.NewServer(grpcOpts...)
pb.RegisterAncestryServiceServer(grpcServer, &AncestryServer{Store: store})
pb.RegisterNotificationServiceServer(grpcServer, &NotificationServer{Store: store})
pb.RegisterStatusServiceServer(grpcServer, &StatusServer{Store: store})
return grpcServer
} }
type httpStatusWriter struct { type httpStatusWriter struct {
@ -84,145 +58,48 @@ func (w *httpStatusWriter) WriteHeader(code int) {
w.ResponseWriter.WriteHeader(code) w.ResponseWriter.WriteHeader(code)
} }
// logHandler adds request logging to an http handler. func loggingHandler(h http.Handler) http.Handler {
func logHandler(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now() start := time.Now()
lrw := &httpStatusWriter{ResponseWriter: w, StatusCode: http.StatusOK} lrw := &httpStatusWriter{ResponseWriter: w, StatusCode: http.StatusOK}
handler.ServeHTTP(lrw, r) h.ServeHTTP(lrw, r)
statusStr := strconv.Itoa(lrw.StatusCode)
if lrw.StatusCode == 0 {
statusStr = "???"
}
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"remote addr": r.RemoteAddr, "remote addr": r.RemoteAddr,
"method": r.Method, "method": r.Method,
"request uri": r.RequestURI, "request uri": r.RequestURI,
"status": statusStr, "status": strconv.Itoa(lrw.StatusCode),
"elapsed time (ms)": float64(time.Since(start).Nanoseconds()) * 1e-6, "elapsed time (ms)": float64(time.Since(start).Nanoseconds()) * 1e-6,
}).Info("Handled HTTP request") }).Info("handled HTTP request")
}) })
} }
func newGrpcGatewayServer(ctx context.Context, listenerAddr string, tlsConfig *tls.Config) http.Handler { // ListenAndServe serves the Clair v3 API over gRPC and the gRPC Gateway.
var ( func ListenAndServe(addr, keyFile, certFile, caPath string, store database.Datastore) error {
gwTLSConfig *tls.Config srv := grpcutil.MuxedGRPCServer{
gwOpts []grpc.DialOption Addr: addr,
) ServicesFunc: func(gsrv *grpc.Server) {
pb.RegisterAncestryServiceServer(gsrv, &AncestryServer{Store: store})
pb.RegisterNotificationServiceServer(gsrv, &NotificationServer{Store: store})
pb.RegisterStatusServiceServer(gsrv, &StatusServer{Store: store})
},
ServiceHandlerFuncs: []grpcutil.RegisterServiceHandlerFunc{
pb.RegisterAncestryServiceHandler,
pb.RegisterNotificationServiceHandler,
pb.RegisterStatusServiceHandler,
},
}
if tlsConfig != nil { middleware := func(h http.Handler) http.Handler {
gwTLSConfig = tlsConfig.Clone() return prometheusHandler(loggingHandler(h))
gwTLSConfig.InsecureSkipVerify = true }
gwOpts = append(gwOpts, grpc.WithTransportCredentials(credentials.NewTLS(gwTLSConfig)))
var err error
if caPath == "" {
err = srv.ListenAndServe(middleware)
} else { } else {
gwOpts = append(gwOpts, grpc.WithInsecure()) err = srv.ListenAndServeTLS(certFile, keyFile, caPath, middleware)
} }
return err
// changes json serializer to include empty fields with default values
jsonOpt := runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{EmitDefaults: true})
gwmux := runtime.NewServeMux(jsonOpt)
conn, err := grpc.DialContext(ctx, listenerAddr, gwOpts...)
if err != nil {
log.WithError(err).Fatal("could not initialize grpc gateway connection")
}
err = pb.RegisterAncestryServiceHandler(ctx, gwmux, conn)
if err != nil {
log.WithError(err).Fatal("could not initialize ancestry grpc gateway")
}
err = pb.RegisterNotificationServiceHandler(ctx, gwmux, conn)
if err != nil {
log.WithError(err).Fatal("could not initialize notification grpc gateway")
}
err = pb.RegisterStatusServiceHandler(ctx, gwmux, conn)
if err != nil {
log.WithError(err).Fatal("could not initialize status grpc gateway")
}
return logHandler(gwmux)
}
func servePrometheus(mux *http.ServeMux) {
mux.Handle("/metrics", prometheus.Handler())
}
// Run initializes grpc and grpc gateway api services on the same address
func Run(Addr string, tlsConfig *tls.Config, CertFile, KeyFile string, store database.Datastore) {
l, err := net.Listen("tcp", Addr)
if err != nil {
log.WithError(err).Fatalf("could not listen to address" + Addr)
}
log.WithField("addr", l.Addr().String()).Info("starting grpc server")
var (
apiHandler http.Handler
apiListener net.Listener
srv *http.Server
ctx = context.Background()
httpMux = http.NewServeMux()
tcpMux = cmux.New(l)
)
if tlsConfig != nil {
cert, err := tls.LoadX509KeyPair(CertFile, KeyFile)
if err != nil {
log.WithError(err).Fatal("Failed to load certificate files")
}
tlsConfig.Certificates = []tls.Certificate{cert}
tlsConfig.NextProtos = []string{"h2"}
apiListener = tls.NewListener(tcpMux.Match(cmux.Any()), tlsConfig)
go func() { handleShutdown(tcpMux.Serve()) }()
grpcServer := newGrpcServer(store, tlsConfig)
gwmux := newGrpcGatewayServer(ctx, apiListener.Addr().String(), tlsConfig)
httpMux.Handle("/", gwmux)
servePrometheus(httpMux)
apiHandler = grpcHandlerFunc(grpcServer, httpMux)
log.Info("grpc server is configured with client certificate authentication")
} else {
grpcL := tcpMux.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
apiListener = tcpMux.Match(cmux.Any())
go func() { handleShutdown(tcpMux.Serve()) }()
grpcServer := newGrpcServer(store, nil)
go func() { handleShutdown(grpcServer.Serve(grpcL)) }()
gwmux := newGrpcGatewayServer(ctx, apiListener.Addr().String(), nil)
httpMux.Handle("/", gwmux)
servePrometheus(httpMux)
apiHandler = httpMux
log.Warn("grpc server is configured without client certificate authentication")
}
srv = &http.Server{
Handler: apiHandler,
TLSConfig: tlsConfig,
}
// blocking call
handleShutdown(srv.Serve(apiListener))
log.Info("Grpc API stopped")
}
// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
// connections or otherHandler otherwise. Copied from cockroachdb.
func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
grpcServer.ServeHTTP(w, r)
} else {
otherHandler.ServeHTTP(w, r)
}
})
} }

81
pkg/grpcutil/gateway.go Normal file
View File

@ -0,0 +1,81 @@
// Copyright 2018 clair authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcutil
import (
"crypto/tls"
"net/http"
"strings"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// RegisterServiceHandlerFunc is a function that registers ServiceHandlers with
// a ServeMux.
type RegisterServiceHandlerFunc func(context.Context, *runtime.ServeMux, *grpc.ClientConn) error
// NewGateway creates a new http.Handler and grpc.ClientConn with the provided
// gRPC Services registered.
func NewGateway(addr string, tlsConfig *tls.Config, funcs []RegisterServiceHandlerFunc) (http.Handler, *grpc.ClientConn, error) {
// Configure the right DialOptions the for TLS configuration.
var dialOpts []grpc.DialOption
if tlsConfig != nil {
var gwTLSConfig *tls.Config
gwTLSConfig = tlsConfig.Clone()
gwTLSConfig.InsecureSkipVerify = true // Trust the local server.
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(gwTLSConfig)))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
conn, err := grpc.DialContext(context.TODO(), addr, dialOpts...)
if err != nil {
return nil, nil, err
}
// Register services.
srvmux := runtime.NewServeMux()
for _, fn := range funcs {
err = fn(context.TODO(), srvmux, conn)
if err != nil {
return nil, nil, err
}
}
return srvmux, conn, nil
}
// IsGRPCRequest returns true if the provided request came from a gRPC client.
//
// Its logic is a partial recreation of gRPC's internal checks, see:
// https://github.com/grpc/grpc-go/blob/01de3de/transport/handler_server.go#L61:L69
func IsGRPCRequest(r *http.Request) bool {
return r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc")
}
// HandlerFunc returns an http.Handler that delegates to grpc.Server on
// incoming gRPC connections otherwise serves with the provided handler.
func HandlerFunc(grpcServer *grpc.Server, otherwise http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if IsGRPCRequest(r) {
grpcServer.ServeHTTP(w, r)
} else {
otherwise.ServeHTTP(w, r)
}
})
}

View File

@ -0,0 +1,146 @@
// Copyright 2018 clair authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package grpcutil implements various utilities around managing gRPC services.
package grpcutil
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net"
"net/http"
"github.com/cockroachdb/cmux"
"github.com/coreos/clair/pkg/httputil"
)
// MuxedGRPCServer defines the parameters for running a gRPC Server alongside
// a Gateway server on the same port.
type MuxedGRPCServer struct {
Addr string
TLSConfig *tls.Config
ServicesFunc RegisterServicesFunc
ServiceHandlerFuncs []RegisterServiceHandlerFunc
}
// ListenAndServe listens on the TCP network address srv.Addr and handles both
// gRPC and JSON requests over HTTP. An optional HTTP middleware can be
// provided to wrap the output of each request.
//
// Internally, it muxes the Listener based on whether the request is gRPC or
// HTTP and runs multiple servers.
func (srv *MuxedGRPCServer) ListenAndServe(mw httputil.Middleware) error {
l, err := net.Listen("tcp", srv.Addr)
if err != nil {
return err
}
tcpMux := cmux.New(l)
grpcListener := tcpMux.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
defer grpcListener.Close()
httpListener := tcpMux.Match(cmux.Any())
defer httpListener.Close()
httpHandler, conn, err := NewGateway(httpListener.Addr().String(), nil, srv.ServiceHandlerFuncs)
if err != nil {
return err
}
defer conn.Close()
gsrv := NewServer(nil, srv.ServicesFunc)
defer gsrv.Stop()
go func() { tcpMux.Serve() }()
go func() { gsrv.Serve(grpcListener) }()
if mw != nil {
httpHandler = mw(httpHandler)
}
httpsrv := &http.Server{
Handler: httpHandler,
}
httpsrv.Serve(httpListener)
return nil
}
func configureCA(tlsConfig *tls.Config, caPath string) error {
caCert, err := ioutil.ReadFile(caPath)
if err != nil {
return err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.ClientCAs = caCertPool
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
return nil
}
func configureCertificate(tlsConfig *tls.Config, certFile, keyFile string) error {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return err
}
tlsConfig.Certificates = []tls.Certificate{cert}
tlsConfig.NextProtos = []string{"h2"}
return nil
}
// ListenAndServeTLS listens on the TCP network address srv.Addr and handles both
// gRPC and JSON requests over HTTP over TLS. An optional HTTP middleware can
// be provided to wrap the output of each request.
//
// Internally, the same net.Listener is used because the http.Handler will
// pivot based on whether the request is gRPC or HTTP.
func (srv *MuxedGRPCServer) ListenAndServeTLS(certFile, keyFile, caPath string, mw httputil.Middleware) error {
if srv.TLSConfig == nil {
srv.TLSConfig = &tls.Config{}
}
configureCA(srv.TLSConfig, caPath)
configureCertificate(srv.TLSConfig, certFile, keyFile)
listener, err := tls.Listen("tcp", srv.Addr, srv.TLSConfig)
if err != nil {
return err
}
gwHandler, conn, err := NewGateway(listener.Addr().String(), srv.TLSConfig, srv.ServiceHandlerFuncs)
if err != nil {
return err
}
defer conn.Close()
gsrv := NewServer(srv.TLSConfig, srv.ServicesFunc)
defer gsrv.Stop()
httpHandler := HandlerFunc(gsrv, gwHandler)
if mw != nil {
httpHandler = mw(httpHandler)
}
httpsrv := &http.Server{
Handler: httpHandler,
}
httpsrv.Serve(listener)
return nil
}

46
pkg/grpcutil/server.go Normal file
View File

@ -0,0 +1,46 @@
// Copyright 2018 clair authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcutil
import (
"crypto/tls"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// RegisterServicesFunc is a function that registers gRPC services with a given
// server.
type RegisterServicesFunc func(*grpc.Server)
// NewServer allocates a new grpc.Server and handles some some boilerplate
// configuration.
func NewServer(tlsConfig *tls.Config, fn RegisterServicesFunc) *grpc.Server {
// Default ServerOptions
grpcOpts := []grpc.ServerOption{
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
}
if tlsConfig != nil {
grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
}
// Register services with a new grpc.Server.
gsrv := grpc.NewServer(grpcOpts...)
fn(gsrv)
return gsrv
}

View File

@ -23,6 +23,9 @@ import (
"github.com/coreos/clair/pkg/version" "github.com/coreos/clair/pkg/version"
) )
// Middleware is a function used to wrap the logic of another http.Handler.
type Middleware func(http.Handler) http.Handler
// GetWithUserAgent performs an HTTP GET with the proper Clair User-Agent. // GetWithUserAgent performs an HTTP GET with the proper Clair User-Agent.
func GetWithUserAgent(url string) (*http.Response, error) { func GetWithUserAgent(url string) (*http.Response, error) {
client := &http.Client{} client := &http.Client{}