From 1ec2759550d6a6bcae7c7252c8718b783426c653 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Mon, 10 Sep 2018 22:49:24 -0400 Subject: [PATCH] pkg/grpcutil: init This change refactors gRPC code used within the v3 API package into generic code that can be used for managing gRPC and gRPC Gateway. --- api/api.go | 40 +------- api/v3/server.go | 178 +++++++---------------------------- pkg/grpcutil/gateway.go | 81 ++++++++++++++++ pkg/grpcutil/muxed_server.go | 146 ++++++++++++++++++++++++++++ pkg/grpcutil/server.go | 46 +++++++++ pkg/httputil/httputil.go | 3 + 6 files changed, 311 insertions(+), 183 deletions(-) create mode 100644 pkg/grpcutil/gateway.go create mode 100644 pkg/grpcutil/muxed_server.go create mode 100644 pkg/grpcutil/server.go diff --git a/api/api.go b/api/api.go index ade080a2..80f3eeff 100644 --- a/api/api.go +++ b/api/api.go @@ -1,4 +1,4 @@ -// Copyright 2017 clair authors +// Copyright 2018 clair authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,9 +16,6 @@ package api import ( "context" - "crypto/tls" - "crypto/x509" - "io/ioutil" "net/http" "time" @@ -40,14 +37,10 @@ type Config struct { } func Run(cfg *Config, store database.Datastore) { - tlsConfig, err := tlsClientConfig(cfg.CAFile) + err := v3.ListenAndServe(cfg.Addr, cfg.CertFile, cfg.KeyFile, cfg.CAFile, store) if err != nil { - log.WithError(err).Fatal("could not initialize client cert authentication") + log.WithError(err).Fatal("could not initialize gRPC server") } - if tlsConfig != nil { - log.Info("main API configured with client certificate authentication") - } - v3.Run(cfg.Addr, tlsConfig, cfg.CertFile, cfg.KeyFile, store) } func RunHealth(cfg *Config, store database.Datastore, st *stopper.Stopper) { @@ -77,30 +70,3 @@ func RunHealth(cfg *Config, store database.Datastore, st *stopper.Stopper) { log.Info("health API stopped") } - -// tlsClientConfig initializes a *tls.Config using the given CA. The resulting -// *tls.Config is meant to be used to configure an HTTP server to do client -// certificate authentication. -// -// If no CA is given, a nil *tls.Config is returned; no client certificate will -// be required and verified. In other words, authentication will be disabled. -func tlsClientConfig(caPath string) (*tls.Config, error) { - if caPath == "" { - return nil, nil - } - - caCert, err := ioutil.ReadFile(caPath) - if err != nil { - return nil, err - } - - caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM(caCert) - - tlsConfig := &tls.Config{ - ClientCAs: caCertPool, - ClientAuth: tls.RequireAndVerifyClientCert, - } - - return tlsConfig, nil -} diff --git a/api/v3/server.go b/api/v3/server.go index 29c478b9..4cac16fe 100644 --- a/api/v3/server.go +++ b/api/v3/server.go @@ -1,4 +1,4 @@ -// Copyright 2017 clair authors +// Copyright 2018 clair authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,24 +15,17 @@ package v3 import ( - "context" - "crypto/tls" - "net" "net/http" "strconv" - "strings" "time" - "github.com/cockroachdb/cmux" - "github.com/grpc-ecosystem/go-grpc-prometheus" - "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "google.golang.org/grpc" - "google.golang.org/grpc/credentials" pb "github.com/coreos/clair/api/v3/clairpb" "github.com/coreos/clair/database" + "github.com/coreos/clair/pkg/grpcutil" ) var ( @@ -47,21 +40,11 @@ func init() { prometheus.MustRegister(promResponseDurationMilliseconds) } -func newGrpcServer(store database.Datastore, tlsConfig *tls.Config) *grpc.Server { - grpcOpts := []grpc.ServerOption{ - grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), - grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), - } - - if tlsConfig != nil { - grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(tlsConfig))) - } - - grpcServer := grpc.NewServer(grpcOpts...) - pb.RegisterAncestryServiceServer(grpcServer, &AncestryServer{Store: store}) - pb.RegisterNotificationServiceServer(grpcServer, &NotificationServer{Store: store}) - pb.RegisterStatusServiceServer(grpcServer, &StatusServer{Store: store}) - return grpcServer +func prometheusHandler(h http.Handler) http.Handler { + mux := http.NewServeMux() + mux.Handle("/", h) + mux.Handle("/metrics", prometheus.Handler()) + return mux } type httpStatusWriter struct { @@ -75,145 +58,48 @@ func (w *httpStatusWriter) WriteHeader(code int) { w.ResponseWriter.WriteHeader(code) } -// logHandler adds request logging to an http handler. -func logHandler(handler http.Handler) http.Handler { +func loggingHandler(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() lrw := &httpStatusWriter{ResponseWriter: w, StatusCode: http.StatusOK} - handler.ServeHTTP(lrw, r) - - statusStr := strconv.Itoa(lrw.StatusCode) - if lrw.StatusCode == 0 { - statusStr = "???" - } + h.ServeHTTP(lrw, r) log.WithFields(log.Fields{ "remote addr": r.RemoteAddr, "method": r.Method, "request uri": r.RequestURI, - "status": statusStr, + "status": strconv.Itoa(lrw.StatusCode), "elapsed time (ms)": float64(time.Since(start).Nanoseconds()) * 1e-6, - }).Info("Handled HTTP request") + }).Info("handled HTTP request") }) } -func newGrpcGatewayServer(ctx context.Context, listenerAddr string, tlsConfig *tls.Config) http.Handler { - var ( - gwTLSConfig *tls.Config - gwOpts []grpc.DialOption - ) - - if tlsConfig != nil { - gwTLSConfig = tlsConfig.Clone() - gwTLSConfig.InsecureSkipVerify = true - gwOpts = append(gwOpts, grpc.WithTransportCredentials(credentials.NewTLS(gwTLSConfig))) - } else { - gwOpts = append(gwOpts, grpc.WithInsecure()) - } - - // changes json serializer to include empty fields with default values - jsonOpt := runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{EmitDefaults: true}) - gwmux := runtime.NewServeMux(jsonOpt) - - conn, err := grpc.DialContext(ctx, listenerAddr, gwOpts...) - if err != nil { - log.WithError(err).Fatal("could not initialize grpc gateway connection") - } - - err = pb.RegisterAncestryServiceHandler(ctx, gwmux, conn) - if err != nil { - log.WithError(err).Fatal("could not initialize ancestry grpc gateway") - } - - err = pb.RegisterNotificationServiceHandler(ctx, gwmux, conn) - if err != nil { - log.WithError(err).Fatal("could not initialize notification grpc gateway") +// ListenAndServe serves the Clair v3 API over gRPC and the gRPC Gateway. +func ListenAndServe(addr, keyFile, certFile, caPath string, store database.Datastore) error { + srv := grpcutil.MuxedGRPCServer{ + Addr: addr, + ServicesFunc: func(gsrv *grpc.Server) { + pb.RegisterAncestryServiceServer(gsrv, &AncestryServer{Store: store}) + pb.RegisterNotificationServiceServer(gsrv, &NotificationServer{Store: store}) + pb.RegisterStatusServiceServer(gsrv, &StatusServer{Store: store}) + }, + ServiceHandlerFuncs: []grpcutil.RegisterServiceHandlerFunc{ + pb.RegisterAncestryServiceHandler, + pb.RegisterNotificationServiceHandler, + pb.RegisterStatusServiceHandler, + }, } - err = pb.RegisterStatusServiceHandler(ctx, gwmux, conn) - if err != nil { - log.WithError(err).Fatal("could not initialize status grpc gateway") + middleware := func(h http.Handler) http.Handler { + return prometheusHandler(loggingHandler(h)) } - return logHandler(gwmux) -} - -func servePrometheus(mux *http.ServeMux) { - mux.Handle("/metrics", prometheus.Handler()) -} - -// Run initializes grpc and grpc gateway api services on the same address -func Run(Addr string, tlsConfig *tls.Config, CertFile, KeyFile string, store database.Datastore) { - l, err := net.Listen("tcp", Addr) - if err != nil { - log.WithError(err).Fatalf("could not listen to address" + Addr) - } - log.WithField("addr", l.Addr().String()).Info("starting grpc server") - - var ( - apiHandler http.Handler - apiListener net.Listener - srv *http.Server - ctx = context.Background() - httpMux = http.NewServeMux() - tcpMux = cmux.New(l) - ) - - if tlsConfig != nil { - cert, err := tls.LoadX509KeyPair(CertFile, KeyFile) - if err != nil { - log.WithError(err).Fatal("Failed to load certificate files") - } - tlsConfig.Certificates = []tls.Certificate{cert} - tlsConfig.NextProtos = []string{"h2"} - - apiListener = tls.NewListener(tcpMux.Match(cmux.Any()), tlsConfig) - go func() { tcpMux.Serve() }() - - grpcServer := newGrpcServer(store, tlsConfig) - gwmux := newGrpcGatewayServer(ctx, apiListener.Addr().String(), tlsConfig) - - httpMux.Handle("/", gwmux) - servePrometheus(httpMux) - apiHandler = grpcHandlerFunc(grpcServer, httpMux) - - log.Info("grpc server is configured with client certificate authentication") + var err error + if caPath == "" { + err = srv.ListenAndServe(middleware) } else { - grpcL := tcpMux.Match(cmux.HTTP2HeaderField("content-type", "application/grpc")) - apiListener = tcpMux.Match(cmux.Any()) - go func() { tcpMux.Serve() }() - - grpcServer := newGrpcServer(store, nil) - go func() { grpcServer.Serve(grpcL) }() - - gwmux := newGrpcGatewayServer(ctx, apiListener.Addr().String(), nil) - - httpMux.Handle("/", gwmux) - servePrometheus(httpMux) - apiHandler = httpMux - - log.Warn("grpc server is configured without client certificate authentication") - } - - srv = &http.Server{ - Handler: apiHandler, - TLSConfig: tlsConfig, + err = srv.ListenAndServeTLS(certFile, keyFile, caPath, middleware) } - - // blocking call - srv.Serve(apiListener) - log.Info("Grpc API stopped") -} - -// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC -// connections or otherHandler otherwise. Copied from cockroachdb. -func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { - grpcServer.ServeHTTP(w, r) - } else { - otherHandler.ServeHTTP(w, r) - } - }) + return err } diff --git a/pkg/grpcutil/gateway.go b/pkg/grpcutil/gateway.go new file mode 100644 index 00000000..9f52c82e --- /dev/null +++ b/pkg/grpcutil/gateway.go @@ -0,0 +1,81 @@ +// Copyright 2018 clair authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcutil + +import ( + "crypto/tls" + "net/http" + "strings" + + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +// RegisterServiceHandlerFunc is a function that registers ServiceHandlers with +// a ServeMux. +type RegisterServiceHandlerFunc func(context.Context, *runtime.ServeMux, *grpc.ClientConn) error + +// NewGateway creates a new http.Handler and grpc.ClientConn with the provided +// gRPC Services registered. +func NewGateway(addr string, tlsConfig *tls.Config, funcs []RegisterServiceHandlerFunc) (http.Handler, *grpc.ClientConn, error) { + // Configure the right DialOptions the for TLS configuration. + var dialOpts []grpc.DialOption + if tlsConfig != nil { + var gwTLSConfig *tls.Config + gwTLSConfig = tlsConfig.Clone() + gwTLSConfig.InsecureSkipVerify = true // Trust the local server. + dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(gwTLSConfig))) + } else { + dialOpts = append(dialOpts, grpc.WithInsecure()) + } + + conn, err := grpc.DialContext(context.TODO(), addr, dialOpts...) + if err != nil { + return nil, nil, err + } + + // Register services. + srvmux := runtime.NewServeMux() + for _, fn := range funcs { + err = fn(context.TODO(), srvmux, conn) + if err != nil { + return nil, nil, err + } + } + + return srvmux, conn, nil +} + +// IsGRPCRequest returns true if the provided request came from a gRPC client. +// +// Its logic is a partial recreation of gRPC's internal checks, see: +// https://github.com/grpc/grpc-go/blob/01de3de/transport/handler_server.go#L61:L69 +func IsGRPCRequest(r *http.Request) bool { + return r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") +} + +// HandlerFunc returns an http.Handler that delegates to grpc.Server on +// incoming gRPC connections otherwise serves with the provided handler. +func HandlerFunc(grpcServer *grpc.Server, otherwise http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if IsGRPCRequest(r) { + grpcServer.ServeHTTP(w, r) + } else { + otherwise.ServeHTTP(w, r) + } + }) +} diff --git a/pkg/grpcutil/muxed_server.go b/pkg/grpcutil/muxed_server.go new file mode 100644 index 00000000..e444b90d --- /dev/null +++ b/pkg/grpcutil/muxed_server.go @@ -0,0 +1,146 @@ +// Copyright 2018 clair authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package grpcutil implements various utilities around managing gRPC services. +package grpcutil + +import ( + "crypto/tls" + "crypto/x509" + "io/ioutil" + "net" + "net/http" + + "github.com/cockroachdb/cmux" + + "github.com/coreos/clair/pkg/httputil" +) + +// MuxedGRPCServer defines the parameters for running a gRPC Server alongside +// a Gateway server on the same port. +type MuxedGRPCServer struct { + Addr string + TLSConfig *tls.Config + ServicesFunc RegisterServicesFunc + ServiceHandlerFuncs []RegisterServiceHandlerFunc +} + +// ListenAndServe listens on the TCP network address srv.Addr and handles both +// gRPC and JSON requests over HTTP. An optional HTTP middleware can be +// provided to wrap the output of each request. +// +// Internally, it muxes the Listener based on whether the request is gRPC or +// HTTP and runs multiple servers. +func (srv *MuxedGRPCServer) ListenAndServe(mw httputil.Middleware) error { + l, err := net.Listen("tcp", srv.Addr) + if err != nil { + return err + } + + tcpMux := cmux.New(l) + + grpcListener := tcpMux.Match(cmux.HTTP2HeaderField("content-type", "application/grpc")) + defer grpcListener.Close() + + httpListener := tcpMux.Match(cmux.Any()) + defer httpListener.Close() + + httpHandler, conn, err := NewGateway(httpListener.Addr().String(), nil, srv.ServiceHandlerFuncs) + if err != nil { + return err + } + defer conn.Close() + + gsrv := NewServer(nil, srv.ServicesFunc) + defer gsrv.Stop() + + go func() { tcpMux.Serve() }() + go func() { gsrv.Serve(grpcListener) }() + + if mw != nil { + httpHandler = mw(httpHandler) + } + + httpsrv := &http.Server{ + Handler: httpHandler, + } + httpsrv.Serve(httpListener) + return nil +} + +func configureCA(tlsConfig *tls.Config, caPath string) error { + caCert, err := ioutil.ReadFile(caPath) + if err != nil { + return err + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + tlsConfig.ClientCAs = caCertPool + tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert + + return nil +} + +func configureCertificate(tlsConfig *tls.Config, certFile, keyFile string) error { + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return err + } + + tlsConfig.Certificates = []tls.Certificate{cert} + tlsConfig.NextProtos = []string{"h2"} + + return nil +} + +// ListenAndServeTLS listens on the TCP network address srv.Addr and handles both +// gRPC and JSON requests over HTTP over TLS. An optional HTTP middleware can +// be provided to wrap the output of each request. +// +// Internally, the same net.Listener is used because the http.Handler will +// pivot based on whether the request is gRPC or HTTP. +func (srv *MuxedGRPCServer) ListenAndServeTLS(certFile, keyFile, caPath string, mw httputil.Middleware) error { + if srv.TLSConfig == nil { + srv.TLSConfig = &tls.Config{} + } + configureCA(srv.TLSConfig, caPath) + configureCertificate(srv.TLSConfig, certFile, keyFile) + + listener, err := tls.Listen("tcp", srv.Addr, srv.TLSConfig) + if err != nil { + return err + } + + gwHandler, conn, err := NewGateway(listener.Addr().String(), srv.TLSConfig, srv.ServiceHandlerFuncs) + if err != nil { + return err + } + defer conn.Close() + + gsrv := NewServer(srv.TLSConfig, srv.ServicesFunc) + defer gsrv.Stop() + + httpHandler := HandlerFunc(gsrv, gwHandler) + if mw != nil { + httpHandler = mw(httpHandler) + } + + httpsrv := &http.Server{ + Handler: httpHandler, + } + httpsrv.Serve(listener) + return nil +} diff --git a/pkg/grpcutil/server.go b/pkg/grpcutil/server.go new file mode 100644 index 00000000..62ecb392 --- /dev/null +++ b/pkg/grpcutil/server.go @@ -0,0 +1,46 @@ +// Copyright 2018 clair authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcutil + +import ( + "crypto/tls" + + "github.com/grpc-ecosystem/go-grpc-prometheus" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +// RegisterServicesFunc is a function that registers gRPC services with a given +// server. +type RegisterServicesFunc func(*grpc.Server) + +// NewServer allocates a new grpc.Server and handles some some boilerplate +// configuration. +func NewServer(tlsConfig *tls.Config, fn RegisterServicesFunc) *grpc.Server { + // Default ServerOptions + grpcOpts := []grpc.ServerOption{ + grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), + grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), + } + + if tlsConfig != nil { + grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(tlsConfig))) + } + + // Register services with a new grpc.Server. + gsrv := grpc.NewServer(grpcOpts...) + fn(gsrv) + return gsrv +} diff --git a/pkg/httputil/httputil.go b/pkg/httputil/httputil.go index 0eae6df0..b0ae2004 100644 --- a/pkg/httputil/httputil.go +++ b/pkg/httputil/httputil.go @@ -23,6 +23,9 @@ import ( "github.com/coreos/clair/pkg/version" ) +// Middleware is a function used to wrap the logic of another http.Handler. +type Middleware func(http.Handler) http.Handler + // GetWithUserAgent performs an HTTP GET with the proper Clair User-Agent. func GetWithUserAgent(url string) (*http.Response, error) { client := &http.Client{}