Exploring Security, Metrics, and Error-handling with gRPC in Python
In my post “Using gRPC in Python,” we wrote a basic gRPC server implementing a users
service. We are going to expand on it and explore more gRPC concepts, such as secure client-server communication via self-signed SSL certificates, implementing gRPC middleware (or interceptors
), and error handling.
We will be using Python 3.6 for our demos in this article. The git repository has all the code listings we will be discussing in this article in the subdirectory demo2
.
Securing Client-server Communication
We initialized the server in the first article as follows:
.. server.add_insecure_port('127.0.0.1:50051') ..
The add_insecure_port()
method initializes an insecure
server and client-server communication is unencrypted. It is however recommended that a gRPC client-server communication takes place over a secure channel via SSL/TLS.
First, we will create a self-signed certificate using openssl
:
$ # Generate a private key $ openssl genrsa -out server.key 2048 Generating RSA private key, 2048 bit long modulus .........+++ .......................................................................+++ e is 65537 (0x10001) $ openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650 You are about to be asked to enter information that will be incorporated into your certificate request. What you are about to enter is what is called a Distinguished Name or a DN. There are quite a few fields but you can leave some blank For some fields there will be a default value, If you enter '.', the field will be left blank. ----- Country Name (2 letter code) [AU]:AU State or Province Name (full name) [Some-State]:NSW Locality Name (eg, city) []:Sydney Organization Name (eg, company) [Internet Widgits Pty Ltd]:gRPC Demo Organizational Unit Name (eg, section) []:gRPC Common Name (e.g. server FQDN or YOUR name) []:localhost Email Address []:a@a.com (
At this stage, we will have two files: server.key
and server.crt
. On the server side, we will need both these files; on the client side, we will only need the server.crt
file.
Using the above certificate, we will now modify our server code as follows:
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) users_service.add_UsersServicer_to_server(UsersService(), server) # read in key and certificate with open(os.path.join(os.path.split(__file__)[0], 'server.key')) as f: private_key = f.read().encode() with open(os.path.join(os.path.split(__file__)[0], 'server.crt')) as f: certificate_chain = f.read().encode() # create server credentials server_creds = grpc.ssl_server_credentials(((private_key, certificate_chain,),)) server.add_secure_port('localhost:50051', server_creds)
Comparing it to the earlier version of our server, add_secure_port
has replaced add_insecure_port()
. We have kept things simple here and have copied the server.key
and server.crt
file to the same directory as the server.py
file. For all purposes beyond demonstration, these locations should be passed as configuration values (environment variables, for example).
On the client side, we will do a similar modification. Earlier, we created a channel as follows:
channel = grpc.insecure_channel('localhost:50051')
Now to communicate with our server over TLS, we will do the following:
with open('server.crt') as f: trusted_certs = f.read().encode() # create credentials credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs) channel = grpc.secure_channel('localhost:50051', credentials) ...
Implementing Server-side gRPC Interceptors
Interceptors are custom code that run on servers and clients during a request/response lifecycle. If you are familiar with the concept of middleware in HTTP, interceptors are middleware for gRPC. Support for interceptors in Python is still under discussion, but here we will play with the current proposed implementation.
The demo2/grpc-services/grpc_interceptors
sub-directory contains a copy of the proposed implementation (commit hash).
Using the above implementation, we will be writing our first server interceptor.
Exporting server metrics
The metric interceptor
we will implement will push request latency in seconds (for every request) to statsd
, which we will then convert to prometheus
metrics via a statsd bridge
. I recommend my earlier post to understand more about my choice of the approach.
To write an interceptor, we first subclass the appropriate classes from the grpc_interceptors
package:
... from grpc_interceptors import UnaryUnaryServerInterceptor, UnaryStreamServerInterceptor class MetricInterceptor(UnaryUnaryServerInterceptor, UnaryStreamServerInterceptor): def __init__(self): print("Initializing metric interceptor") @send_metrics def intercept_unary_unary_handler(self, handler, method, request, servicer_context): return handler(request, servicer_context) @send_metrics def intercept_unary_stream_handler(self, handler, method, request, servicer_context): result = handler(request, servicer_context) for response in result: yield response ...
Our class, MetricInterceptor
, derives from UnaryUnaryServerInterceptor
and UnaryStreamServerInterceptor
and then overrides the intercept_unary_unary_handler
and intercept_unary_stream_handler
methods respectively. This is because our service implements only two kinds of RPC methods:
- unary request and response (
CreateUser
) - unary request and streaming response (
UsersGet
)
If we implemented other kinds of RPC methods, we would be deriving from the corresponding interceptor base class and implementing the corresponding method as well.
The send_metrics
decorator wraps around the interceptor method to implement the metric calculation and push it to the configured statsd
server (the entire function is in the module metric_interceptor
in the server
sub-directory).
Let’s first go through the following snippet from the function:
@wraps(func) def wrapper(*args, **kw): service_method = None service_name = None if isinstance(args[4], grpc._server._Context): servicer_context = args[4] # This gives us <service>/<method name> service_method = servicer_context._rpc_event.request_call_details.method service_name, method_name = str(service_method).rsplit('/')[1::] else: logging.warning('Cannot derive the service name and method') ...
The func
function here refers to the original interceptor
method that we wrapped above. We extract two important pieces of data here to attach as labels
to the metric:
- The gRPC service name
- The method name
Both of these are available via the _rpc_event.request_call_details.method
attribute of the context
object, which is an instance of the grpc._server._Context
class. I have the isinstance
check here since I am working with the experimental implementation and probably will not be necessary.
Next, we store the starting time and call the actual RPC method:
try: start_time = time.time() result = func(*args, **kw) result_status = 'success' except Exception: result_status = 'error' raise finally: resp_time = time.time() - start_time push_to_statsd_histogram( REQUEST_LATENCY_METRIC_NAME, resp_time, [ 'service:{0}'.format(service_name), 'method:{0}'.format(method_name), 'status:{0}'.format(result_status), ]) return result
We add three statsd
tags here:
- the service name
- the method name
- the status of the call when we push the time taken to complete the call
To register the interceptor with the server, we call create a new object of type MetricInterceptor()
and then call the intercept_server()
function (part of the proposed grpc_interceptors
package):
.. server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) metric_interceptor = MetricInterceptor() server = intercept_server(server, metric_interceptor) ...
!Sign up for a free Codeship Account
Error Handling and Logging
If there is an unhandled exception that happens when serving a request, we will see a traceback on the server corresponding to the runtime error that occured. On the client, we will see a traceback of the form:
... raise _Rendezvous(state, None, None, deadline) grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNKNOWN, Exception calling application: division by zero)>
The unhandled error here is an 1/0
statement that was injected into the CreateUser
function implementation.
With a couple of additional steps — one on the server and one on the client side — we can have more control over the traceback logging for unhandled runtime errors. Let’s modify the sample_client_demo.py
client so that we have try..except
block in which we make the call to the RPC method:
... try: response = stub.CreateUser( users_messages.CreateUserRequest(username='tom'), metadata=metadata, ) except grpc.RpcError as e: print('CreateUser failed with {0}: {1}'.format(e.code(), e.details())) else: print("User created:", response.user.username) ...
Now, if we run the client again:
CreateUser failed with StatusCode.UNKNOWN: Exception calling application: division by zero
The code()
method of the returned exception gives us access to the status code returned by the server. It’s value is one of the gRPC status codes. The details()
method gives us access to additional details sent by the server. Here it gives us the exception message associated with the runtime error on the server.
Ideally, we should write a client interceptor to always make the call in a try..except
block as above and hence not have to do so individually for all method calls.
On the server side, we will write a logging interceptor (similar to the metric interceptor above) so that it looks for any unhandled errors and logs it with any metadata associated with the request. This will be useful to identify and correlate the failure to the request. In addition, we can also perform a translation of the error message to a more user friendly error if we so desired.
The module, logging_interceptor.py
in the server/
sub-directory implements a logging interceptor. The most important snippet is as follows:
from pythonjsonlogger import jsonlogger from grpc_interceptors import UnaryUnaryServerInterceptor, UnaryStreamServerInterceptor logger = logging.getLogger() logHandler = logging.StreamHandler() formatter = jsonlogger.JsonFormatter() logHandler.setFormatter(formatter) logger.addHandler(logHandler) ... ... metadata = {} metadata['timestamp'] = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ') servicer_context = None if isinstance(args[4], grpc._server._Context): servicer_context = args[4] metadata.update(dict(servicer_context.invocation_metadata())) try: result = func(*args, **kw) except Exception as e: logger.error(e, exc_info=True, extra=metadata) if servicer_context: servicer_context.set_details(str(e)) servicer_context.set_code(grpc.StatusCode.UNKNOWN) # TODO: need to return an appropriate response type here # Currently this will raise a serialization error on the server # side # see https://github.com/grpc/grpc/issues/12824 return None else: return result
We use the set_details()
method on the servicer context object to send back unhandled exception message. This is where we can customize what the client’s get_details()
method returns. Then we set the grpc status code to UNKNOWN
. This tells the client that this is a non-OK response and will be manifested as an exception of type grpc.RpcError
. Since this is an error, we don’t have anything meaningful to return here, so we return None
. This however, results in a serialization here on the server side. Please see issue to learn more.
We then modify our server code to register the logging interceptor:
... server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) metric_interceptor = MetricInterceptor() logging_interceptor = LoggingInterceptor() server = intercept_server(server, metric_interceptor, logging_interceptor) ...
The order of registration of the interceptors is important here. As we have it above, the call flow is as follows:
- Logging interceptor invoked
- Metric interceptor invoked
- Service method called
- Metric interceptor returns
- Logging interceptor returns
Running the server
In addition to Python 3.6, we will be using docker
and docker-compose
to run the gRPC server as the other software we will be using (promethus, statsd exporter and grafana). If you don’t have these installed, please follow the official install guide to install these on your operating system.
Starting with a clone of the repo:
$ cd demo2/grpc-services
We will first build the docker image for the gRPC server:
$ docker build -t amitsaha/grpc-users -f Dockerfile.users .
Start the server along with statsd-exporter, Prometheus, and Grafana:
$ docker-compose -f docker-compse.yml -f docker-compose-infra.yml up
Run the client:
$ docker exect -ti users bash # cd /client # # run the sample_client_demo.py file 10 times # ./run-client.sh 10
The server.py
file (in users/server/server.py
) has a random runtime error injected into the CreateUser
method which happens 50 percent of the time.
On the server console, you will see that the exception is being logged as a JSON object with a timestamp and additional metadata added.
Then open http://127.0.0.1:9090/graph?g0.range_input=5m&g0.expr=request_latency_seconds_timer&g0.tab=0
in your browser. You will see the Prometheus expression browser showing the latency of the requests and you will see different metrics for each unique combination of the labels we added.
Conclusion
We extended our gRPC server in this article to be secure, export metrics, and log errors. We also learned about the in-progress work on bringing support for interceptors to gRPC Python. Next, here are a few resources to learn more about these and other gRPC features:
- gRPC Interceptors proposal
- gRPC response status as None
- Error Handling
- Using ssl with grpc in Python
- gRPC health checks
- gRPC load balancing
- Golang: Secure gRPC with SSL/TLS
- Golang: Authentication in gRPC
Published on Web Code Geeks with permission by Amit Saha, partner at our WCG program. See the original article here: Exploring Security, Metrics, and Error-handling with gRPC in Python Opinions expressed by Web Code Geeks contributors are their own. |