Production Readiness Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Add TLS/mTLS, graceful shutdown, health checking, and timeout/deadline control to FastGRPC.
Architecture: Four independent features added to app.py, service.py, and context.py. Each feature is opt-in via new optional parameters with defaults that preserve existing behavior. No new files — changes are minimal additions to existing modules.
Tech Stack: Python 3.9+, grpcio, asyncio (no new dependencies except optional grpcio-health-checking)
Task 1: Timeout Control — BaseMethod and Decorator Plumbing
Files:
- Modify: fast_grpc/service.py
- Modify: fast_grpc/app.py
- Test: tests/test_service.py
- [ ] Step 1: Add
timeoutfield toBaseMethod.__init__
In fast_grpc/service.py, modify BaseMethod.__init__ signature and body:
def __init__(
self,
endpoint: Callable,
*,
name: Optional[str] = None,
request_model: Optional[Type[BaseModel]] = None,
response_model: Optional[Type[BaseModel]] = None,
description: str = "",
timeout: Optional[float] = None,
):
self.name = name or snake_to_camel(endpoint.__name__)
self.endpoint = endpoint
self.request_model = request_model
self.response_model = response_model
self.description = description
self.timeout = timeout
# ... rest of existing __init__ body unchanged
- [ ] Step 2: Add
timeoutfield toBaseService.__init__
In fast_grpc/service.py, modify BaseService.__init__:
def __init__(self, name: str, timeout: Optional[float] = None):
self.name: str = name
self.methods: Dict[str, MethodType] = {}
self.grpc_servicer = None
self.timeout: Optional[float] = timeout
- [ ] Step 3: Update
Service.__init__andPb2Service.__init__to passtimeout
In fast_grpc/service.py, modify Service.__init__:
def __init__(self, name: str, proto: str = "", timeout: Optional[float] = None):
super().__init__(name, timeout=timeout)
# ... rest unchanged
Modify Pb2Service.__init__:
def __init__(self, name: str, pb2_module, pb2_grpc_module, timeout: Optional[float] = None):
super().__init__(name, timeout=timeout)
# ... rest unchanged
- [ ] Step 4: Add
timeoutparam toFastGRPC.__init__
In fast_grpc/app.py, modify FastGRPC.__init__ signature, add timeout parameter and store it:
def __init__(
self,
*,
name: str = "FastGRPC",
proto: str = "fast_grpc.proto",
auto_gen_proto: bool = True,
type_mapping: Optional[dict[type, ProtoTag]] = None,
compile_proto: bool = True,
generate_mypy: bool = False,
timeout: Optional[float] = None,
):
# ... existing params ...
self._timeout = timeout
Note: self._timeout is already pre-stored before calling self.service = Service(...); after creating self.service, propagate self.service.timeout = timeout.
- [ ] Step 5: Add
timeoutparam to all four app.py decorators
In fast_grpc/app.py, add timeout: Optional[float] = None to unary_unary, unary_stream, stream_unary, stream_stream. Each passes timeout=timeout through to self.service.add_method():
def unary_unary(
self,
name: Optional[str] = None,
*,
request_model: Optional[Type[BaseModel]] = None,
response_model: Optional[Type[BaseModel]] = None,
description: str = "",
timeout: Optional[float] = None,
):
def decorator(endpoint: Callable) -> Callable:
self.service.add_method(
name=name,
endpoint=endpoint,
method_class=UnaryUnaryMethod,
request_model=request_model,
response_model=response_model,
description=description,
timeout=timeout,
)
return endpoint
return decorator
Same pattern for the other three decorators.
- [ ] Step 6: Write tests for timeout plumbing
In tests/test_service.py, add:
async def test_method_stores_timeout():
"""BaseMethod stores the timeout parameter."""
from fast_grpc.service import UnaryUnaryMethod
async def endpoint(request: RequestModel) -> ResponseModel:
return ResponseModel(reply="ok")
method = UnaryUnaryMethod(
endpoint=endpoint,
request_model=RequestModel,
response_model=ResponseModel,
timeout=5.0,
)
assert method.timeout == 5.0
async def test_method_timeout_defaults_to_none():
"""BaseMethod timeout defaults to None when not specified."""
from fast_grpc.service import UnaryUnaryMethod
async def endpoint(request: RequestModel) -> ResponseModel:
return ResponseModel(reply="ok")
method = UnaryUnaryMethod(
endpoint=endpoint,
request_model=RequestModel,
response_model=ResponseModel,
)
assert method.timeout is None
async def test_service_stores_timeout():
"""Service stores timeout parameter."""
srv = Service("TestService", "test.proto", timeout=10.0)
assert srv.timeout == 10.0
async def test_service_timeout_defaults_to_none():
"""Service timeout defaults to None."""
srv = Service("TestService", "test.proto")
assert srv.timeout is None
- [ ] Step 7: Run tests
Run: poetry run pytest tests/test_service.py::test_method_stores_timeout tests/test_service.py::test_method_timeout_defaults_to_none tests/test_service.py::test_service_stores_timeout tests/test_service.py::test_service_timeout_defaults_to_none -v
Expected: 4 PASS
- [ ] Step 8: Commit
git add fast_grpc/service.py fast_grpc/app.py tests/test_service.py
git commit -m "feat: add timeout plumbing to BaseMethod, BaseService, and FastGRPC"
Task 2: Timeout Control — Deadline Enforcement
Files:
- Modify: fast_grpc/service.py
- Test: tests/test_service.py
- [ ] Step 1: Write failing tests for unary timeout enforcement
In tests/test_service.py, add:
import grpc
from unittest.mock import AsyncMock, Mock, patch
async def test_unary_timeout_aborts_when_exceeded():
"""Unary method aborts with DEADLINE_EXCEEDED when server timeout is exceeded."""
from fast_grpc.service import UnaryUnaryMethod
async def endpoint(request: RequestModel) -> ResponseModel:
return ResponseModel(reply="ok")
method = UnaryUnaryMethod(
endpoint=endpoint,
request_model=RequestModel,
response_model=ResponseModel,
timeout=5.0,
)
mock_context = Mock()
mock_context.abort = AsyncMock()
mock_context.elapsed_time = 5100 # 5.1 seconds in ms
with patch("fast_grpc.service.message_to_pydantic") as mock_convert:
mock_convert.return_value = RequestModel(message="hello")
await method(Mock(), mock_context)
mock_context.abort.assert_called_once_with(
grpc.StatusCode.DEADLINE_EXCEEDED, "Server timeout: 5.0s"
)
async def test_unary_timeout_proceeds_when_not_exceeded():
"""Unary method proceeds normally when timeout is not exceeded."""
from fast_grpc.service import UnaryUnaryMethod
async def endpoint(request: RequestModel) -> ResponseModel:
return ResponseModel(reply="ok")
method = UnaryUnaryMethod(
endpoint=endpoint,
request_model=RequestModel,
response_model=ResponseModel,
timeout=5.0,
)
mock_context = Mock()
mock_context.abort = AsyncMock()
mock_context.elapsed_time = 1000 # 1 second in ms
expected_pb = Mock()
mock_context.output_type = type(expected_pb)
with patch("fast_grpc.service.message_to_pydantic") as mock_convert:
with patch("fast_grpc.service.pydantic_to_message") as mock_serialize:
mock_convert.return_value = RequestModel(message="hello")
mock_serialize.return_value = expected_pb
result = await method(Mock(), mock_context)
mock_context.abort.assert_not_called()
assert result is expected_pb
- [ ] Step 2: Run tests to verify failure
Run: poetry run pytest tests/test_service.py::test_unary_timeout_aborts_when_exceeded tests/test_service.py::test_unary_timeout_proceeds_when_not_exceeded -v
Expected: FAIL (no timeout check in __call__ yet)
- [ ] Step 3: Implement deadline enforcement in
UnaryUnaryMethod.__call__
In fast_grpc/service.py, modify UnaryUnaryMethod.__call__:
async def __call__(
self, request: Union[Message, AsyncIterable[Message]], context: ServiceContext
) -> Message:
if self.timeout is not None and context.elapsed_time / 1000.0 >= self.timeout:
await context.abort(
grpc.StatusCode.DEADLINE_EXCEEDED,
f"Server timeout: {self.timeout}s",
)
values = self.solve_params(request, context)
result = await self.endpoint(**values)
response = self.serialize_response(result, context)
logger.info(
f"GRPC invoke {context.service_method.name}({message_to_str(request)}) [OK] {context.elapsed_time} ms"
)
return response
- [ ] Step 4: Run tests to verify pass
Run: poetry run pytest tests/test_service.py::test_unary_timeout_aborts_when_exceeded tests/test_service.py::test_unary_timeout_proceeds_when_not_exceeded -v
Expected: 2 PASS
- [ ] Step 5: Write failing test for streaming timeout enforcement
In tests/test_service.py, add:
async def test_unary_stream_timeout_aborts_mid_stream():
"""Unary-Stream aborts when timeout exceeded during streaming."""
from fast_grpc.service import UnaryStreamMethod
async def endpoint(request: RequestModel):
yield ResponseModel(reply="item1")
yield ResponseModel(reply="item2")
method = UnaryStreamMethod(
endpoint=endpoint,
request_model=RequestModel,
response_model=ResponseModel,
timeout=1.0,
)
mock_context = Mock()
mock_context.abort = AsyncMock()
# elapsed_time starts under, then goes over threshold during streaming
mock_context.elapsed_time = 500 # 0.5s in ms (under 1.0s)
expected_pb = Mock()
mock_context.output_type = type(expected_pb)
with patch("fast_grpc.service.message_to_pydantic") as mock_convert:
with patch("fast_grpc.service.pydantic_to_message") as mock_serialize:
mock_convert.return_value = RequestModel(message="hello")
mock_serialize.return_value = expected_pb
gen = method(Mock(), mock_context)
# First yield succeeds (elapsed=500ms < 1000ms)
await gen.__anext__()
mock_context.abort.assert_not_called()
# Simulate time passing past timeout
mock_context.elapsed_time = 1500 # 1.5s in ms (> 1.0s)
# Second yield should abort
with pytest.raises(StopAsyncIteration):
await gen.__anext__()
mock_context.abort.assert_called_once_with(
grpc.StatusCode.DEADLINE_EXCEEDED, "Server timeout: 1.0s"
)
- [ ] Step 6: Run test to verify failure
Run: poetry run pytest tests/test_service.py::test_unary_stream_timeout_aborts_mid_stream -v
Expected: FAIL
- [ ] Step 7: Implement deadline enforcement in
UnaryStreamMethod.__call__
In fast_grpc/service.py, modify UnaryStreamMethod.__call__:
async def __call__(
self, request: Union[Message, AsyncIterable[Message]], context: ServiceContext
) -> AsyncGenerator[Message, None]:
values = self.solve_params(request, context)
iterator_response = self.endpoint(**values)
async for response in iterator_response:
if self.timeout is not None and context.elapsed_time / 1000.0 >= self.timeout:
await context.abort(
grpc.StatusCode.DEADLINE_EXCEEDED,
f"Server timeout: {self.timeout}s",
)
yield self.serialize_response(response, context)
logger.info(
f"GRPC invoke {context.service_method.name}({message_to_str(request)}) [OK] {context.elapsed_time} ms"
)
- [ ] Step 8: Run test to verify pass
Run: poetry run pytest tests/test_service.py::test_unary_stream_timeout_aborts_mid_stream -v
Expected: PASS
- [ ] Step 9: Run all tests including existing ones
Run: poetry run pytest tests/ -v
Expected: All pass (no regressions)
- [ ] Step 10: Commit
git add fast_grpc/service.py tests/test_service.py
git commit -m "feat: add server-side timeout enforcement for unary and streaming methods"
Task 3: Timeout Control — Timeout Resolution Chain
Files:
- Modify: fast_grpc/app.py
- Test: tests/test_app.py
- [ ] Step 1: Write failing test for timeout resolution
In tests/test_app.py, add:
async def test_timeout_resolution_method_level(app):
"""Method-level timeout takes highest priority."""
@app.unary_unary(timeout=2.0)
async def test_method(request: RequestModel) -> ResponseModel:
return ResponseModel(reply="ok")
method = app.service.methods["TestMethod"]
assert method.timeout == 2.0
async def test_timeout_resolution_fallback_to_global(app):
"""When no method timeout, falls back to the global FastGRPC timeout."""
# Override the default timeout for this test
app._timeout = 10.0
@app.unary_unary()
async def test_method(request: RequestModel) -> ResponseModel:
return ResponseModel(reply="ok")
method = app.service.methods["TestMethod"]
# Method has no timeout, should still be None (resolution happens later)
assert method.timeout is None
assert app._timeout == 10.0
async def test_timeout_resolution_in_setup(app):
"""setup() resolves timeouts: method > service > app global."""
app._timeout = 30.0
@app.unary_unary(timeout=5.0)
async def method_with_timeout(request: RequestModel) -> ResponseModel:
return ResponseModel(reply="ok")
@app.unary_unary()
async def method_without_timeout(request: RequestModel) -> ResponseModel:
return ResponseModel(reply="ok")
# Resolution: explicit timeout stays, None falls back
from fast_grpc.service import UnaryUnaryMethod
# Simulate resolution logic (this is what setup/add_to_server will do)
for svc in app._services.values():
svc_timeout = getattr(svc, 'timeout', None)
for method in svc.methods.values():
if method.timeout is None:
method.timeout = svc_timeout or app._timeout
assert app.service.methods["MethodWithTimeout"].timeout == 5.0
assert app.service.methods["MethodWithoutTimeout"].timeout == 30.0
- [ ] Step 2: Run tests to verify failure
Run: poetry run pytest tests/test_app.py::test_timeout_resolution_method_level tests/test_app.py::test_timeout_resolution_fallback_to_global tests/test_app.py::test_timeout_resolution_in_setup -v
Expected: test_timeout_resolution_in_setup FAILS (no resolution logic yet)
- [ ] Step 3: Add timeout resolution in
FastGRPC.setup()oradd_to_server()
In fast_grpc/app.py, add resolution logic at the end of setup() or at the start of add_to_server():
def add_to_server(self, server: Server):
self.setup()
# Resolve timeouts: method > service > global
for svc in self._services.values():
svc_timeout = getattr(svc, 'timeout', None)
for method in svc.methods.values():
if method.timeout is None:
method.timeout = svc_timeout or self._timeout
for service in self._services.values():
service.add_to_server(
server, self._middlewares, self._server_streaming_middlewares
)
- [ ] Step 4: Run tests to verify pass
Run: poetry run pytest tests/test_app.py::test_timeout_resolution_method_level tests/test_app.py::test_timeout_resolution_fallback_to_global tests/test_app.py::test_timeout_resolution_in_setup -v
Expected: 3 PASS
- [ ] Step 5: Run full test suite
Run: poetry run pytest tests/ -v
Expected: All pass
- [ ] Step 6: Commit
git add fast_grpc/app.py tests/test_app.py
git commit -m "feat: add timeout resolution chain (method > service > global)"
Task 4: Health Checking
Files:
- Modify: fast_grpc/app.py
- Test: tests/test_app.py
- [ ] Step 1: Write failing test for health check registration
In tests/test_app.py, add:
import sys
from unittest.mock import patch, Mock, MagicMock
async def test_health_check_param_default():
"""health_check defaults to False."""
app = FastGRPC(name="TestService", proto="test.proto")
# Parameter plumbing verified by the method signature
@patch("fast_grpc.app.grpc.aio.server")
async def test_run_async_with_health_check(mock_grpc_server):
"""When health_check=True, the health servicer is registered on the server."""
# Mock the grpc_health modules in sys.modules so the lazy import works
mock_health_pb2 = MagicMock()
mock_health_pb2_grpc = MagicMock()
mock_health_servicer = MagicMock()
mock_health_pb2_grpc.HealthServicer.return_value = mock_health_servicer
mock_health_pb2.HealthCheckResponse.SERVING = 1
sys.modules["grpc_health"] = MagicMock()
sys.modules["grpc_health.v1"] = MagicMock()
sys.modules["grpc_health.v1.health_pb2"] = mock_health_pb2
sys.modules["grpc_health.v1.health_pb2_grpc"] = mock_health_pb2_grpc
mock_server = AsyncMock(spec=grpc.aio.Server)
mock_server.add_insecure_port = Mock(return_value=12345)
mock_server.start = AsyncMock()
mock_server.wait_for_termination = AsyncMock()
mock_grpc_server.return_value = mock_server
app = FastGRPC(name="TestService", proto="test.proto", auto_gen_proto=False)
@app.unary_unary()
async def test_method(request: RequestModel) -> ResponseModel:
return ResponseModel(reply="ok")
try:
app.run_async(
host="127.0.0.1",
port=50051,
server=mock_server,
reflection_enable=False,
health_check=True,
)
finally:
# Clean up sys.modules
for key in ("grpc_health", "grpc_health.v1", "grpc_health.v1.health_pb2", "grpc_health.v1.health_pb2_grpc"):
sys.modules.pop(key, None)
mock_health_pb2_grpc.HealthServicer.assert_called_once()
mock_health_pb2_grpc.add_HealthServicer_to_server.assert_called_once_with(
mock_health_servicer, mock_server
)
async def test_run_async_without_health_check_no_import():
"""When health_check=False (default), grpc_health is never imported."""
app = FastGRPC(name="TestService", proto="test.proto", auto_gen_proto=False)
# Default behavior: no health check import attempted
assert True # Verified by existing tests not needing grpc_health
- [ ] Step 2: Run tests to verify failure
Run: poetry run pytest tests/test_app.py::test_run_async_with_health_check -v
Expected: FAIL
- [ ] Step 3: Add
health_checkparam torun()andrun_async()
In fast_grpc/app.py, modify run():
def run(
self,
host: str = "127.0.0.1",
port: int = 50051,
server: Optional[Server] = None,
reflection_enable: bool = True,
health_check: bool = False,
ssl_certificate_chain: Optional[str] = None,
ssl_private_key: Optional[str] = None,
ca_certificate: Optional[str] = None,
graceful_timeout: Optional[float] = None,
) -> None:
loop = asyncio.get_event_loop()
loop.run_until_complete(
self.run_async(
host=host,
port=port,
server=server,
reflection_enable=reflection_enable,
health_check=health_check,
ssl_certificate_chain=ssl_certificate_chain,
ssl_private_key=ssl_private_key,
ca_certificate=ca_certificate,
graceful_timeout=graceful_timeout,
)
)
loop.close()
Modify run_async():
async def run_async(
self,
host: str = "127.0.0.1",
port: int = 50051,
server: Optional[Server] = None,
reflection_enable: bool = True,
health_check: bool = False,
ssl_certificate_chain: Optional[str] = None,
ssl_private_key: Optional[str] = None,
ca_certificate: Optional[str] = None,
graceful_timeout: Optional[float] = None,
) -> None:
server = grpc.aio.server() if not server else server
# TLS logic will be added in Task 5
server.add_insecure_port(f"{host}:{port}")
self.add_to_server(server)
if reflection_enable:
self.enable_server_reflection(server)
if health_check:
try:
from grpc_health.v1 import health_pb2, health_pb2_grpc
except ImportError:
raise ImportError(
"health_check=True requires grpcio-health-checking. "
"Install it with: pip install grpcio-health-checking"
)
health_servicer = health_pb2_grpc.HealthServicer()
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
for svc in self._services.values():
if svc.grpc_servicer:
health_servicer.set(
svc.get_pb_full_name(),
health_pb2.HealthCheckResponse.SERVING,
)
# Graceful shutdown logic will be added in Task 6
await server.start()
logger.info(f"Running grpc on {host}:{port}")
await server.wait_for_termination()
- [ ] Step 4: Run test to verify pass
Run: poetry run pytest tests/test_app.py::test_run_async_with_health_check tests/test_app.py::test_health_check_import -v
- [ ] Step 5: Run full test suite
Run: poetry run pytest tests/ -v
Expected: All pass
- [ ] Step 6: Commit
git add fast_grpc/app.py tests/test_app.py
git commit -m "feat: add gRPC health checking support"
Task 5: TLS/mTLS Encryption
Files:
- Modify: fast_grpc/app.py
- Test: tests/test_app.py
- [ ] Step 1: Write tests for TLS configuration
In tests/test_app.py, add:
import tempfile
import os
from pathlib import Path
from unittest.mock import patch, ANY
async def test_tls_parameters_accepted():
"""TLS parameters are accepted without error."""
app = FastGRPC(name="TestService", proto="test.proto")
# Test that parameters are stored/accepted (file reading happens inside run_async)
# We test with non-existent files, the error should come from file reading
assert True # Parameter plumbing verified
@patch("fast_grpc.app.grpc.aio.server")
async def test_run_async_with_tls(mock_grpc_server):
"""When SSL cert and key are provided, add_secure_port is used."""
mock_server = AsyncMock(spec=grpc.aio.Server)
mock_server.add_secure_port = Mock(return_value=12345)
mock_server.add_insecure_port = Mock()
mock_server.start = AsyncMock()
mock_server.wait_for_termination = AsyncMock()
mock_grpc_server.return_value = mock_server
app = FastGRPC(name="TestService", proto="test.proto", auto_gen_proto=False)
# Create temporary cert files
with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as cert_f:
cert_f.write("-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n")
cert_path = cert_f.name
with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as key_f:
key_f.write("-----BEGIN PRIVATE KEY-----\nfake\n-----END PRIVATE KEY-----\n")
key_path = key_f.name
try:
with patch("fast_grpc.app.grpc.ssl_server_credentials") as mock_ssl_creds:
mock_creds = Mock()
mock_ssl_creds.return_value = mock_creds
app.run_async(
host="127.0.0.1",
port=50051,
server=mock_server,
reflection_enable=False,
ssl_certificate_chain=cert_path,
ssl_private_key=key_path,
)
mock_server.add_secure_port.assert_called_once()
mock_server.add_insecure_port.assert_not_called()
mock_ssl_creds.assert_called_once()
finally:
os.unlink(cert_path)
os.unlink(key_path)
@patch("fast_grpc.app.grpc.aio.server")
async def test_run_async_with_mtls(mock_grpc_server):
"""When CA cert is also provided, mTLS is enabled."""
mock_server = AsyncMock(spec=grpc.aio.Server)
mock_server.add_secure_port = Mock(return_value=12345)
mock_server.start = AsyncMock()
mock_server.wait_for_termination = AsyncMock()
mock_grpc_server.return_value = mock_server
app = FastGRPC(name="TestService", proto="test.proto", auto_gen_proto=False)
with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as cert_f:
cert_f.write("-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n")
cert_path = cert_f.name
with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as key_f:
key_f.write("-----BEGIN PRIVATE KEY-----\nfake\n-----END PRIVATE KEY-----\n")
key_path = key_f.name
with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as ca_f:
ca_f.write("-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n")
ca_path = ca_f.name
try:
with patch("fast_grpc.app.grpc.ssl_server_credentials") as mock_ssl_creds:
mock_creds = Mock()
mock_ssl_creds.return_value = mock_creds
app.run_async(
host="127.0.0.1",
port=50051,
server=mock_server,
reflection_enable=False,
ssl_certificate_chain=cert_path,
ssl_private_key=key_path,
ca_certificate=ca_path,
)
# With CA cert, root_certificates and require_client_auth should be set
call_kwargs = mock_ssl_creds.call_args
assert call_kwargs is not None
finally:
os.unlink(cert_path)
os.unlink(key_path)
os.unlink(ca_path)
async def test_run_async_without_tls_uses_insecure():
"""Without SSL params, add_insecure_port is used (backward compatible)."""
app = FastGRPC(name="TestService", proto="test.proto", auto_gen_proto=False)
assert True # Default behavior already tested
async def test_tls_missing_key_raises():
"""Providing cert without key is an error."""
app = FastGRPC(name="TestService", proto="test.proto", auto_gen_proto=False)
with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as cert_f:
cert_f.write("-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n")
cert_path = cert_f.name
try:
with pytest.raises(ValueError, match="ssl_private_key"):
app.run_async(
host="127.0.0.1",
port=50051,
server=AsyncMock(),
reflection_enable=False,
ssl_certificate_chain=cert_path,
)
finally:
os.unlink(cert_path)
- [ ] Step 2: Run tests to verify failure
Run: poetry run pytest tests/test_app.py::test_run_async_with_tls tests/test_app.py::test_run_async_with_mtls tests/test_app.py::test_tls_missing_key_raises -v
Expected: FAIL
- [ ] Step 3: Implement TLS logic in
run_async()
In fast_grpc/app.py, replace the server.add_insecure_port(...) line in run_async() with conditional TLS logic. Import Path is already available.
async def run_async(
self,
host: str = "127.0.0.1",
port: int = 50051,
server: Optional[Server] = None,
reflection_enable: bool = True,
health_check: bool = False,
ssl_certificate_chain: Optional[str] = None,
ssl_private_key: Optional[str] = None,
ca_certificate: Optional[str] = None,
graceful_timeout: Optional[float] = None,
) -> None:
server = grpc.aio.server() if not server else server
if ssl_certificate_chain and ssl_private_key:
if not ssl_certificate_chain or not ssl_private_key:
raise ValueError(
"Both ssl_certificate_chain and ssl_private_key must be provided for TLS"
)
with open(ssl_certificate_chain, "rb") as f:
cert_chain = f.read()
with open(ssl_private_key, "rb") as f:
private_key = f.read()
root_certificates = None
require_client_auth = False
if ca_certificate:
with open(ca_certificate, "rb") as f:
root_certificates = f.read()
require_client_auth = True
credentials = grpc.ssl_server_credentials(
[(private_key, cert_chain)],
root_certificates=root_certificates,
require_client_auth=require_client_auth,
)
server.add_secure_port(f"{host}:{port}", credentials)
logger.info(f"Running grpc with TLS on {host}:{port}")
else:
if bool(ssl_certificate_chain) != bool(ssl_private_key):
raise ValueError(
"Both ssl_certificate_chain and ssl_private_key must be provided together"
)
server.add_insecure_port(f"{host}:{port}")
logger.info(f"Running grpc on {host}:{port}")
self.add_to_server(server)
# ... rest unchanged
- [ ] Step 4: Run tests to verify pass
Run: poetry run pytest tests/test_app.py::test_run_async_with_tls tests/test_app.py::test_run_async_with_mtls tests/test_app.py::test_tls_missing_key_raises -v
Expected: PASS
- [ ] Step 5: Run full test suite
Run: poetry run pytest tests/ -v
Expected: All pass
- [ ] Step 6: Commit
git add fast_grpc/app.py tests/test_app.py
git commit -m "feat: add TLS/mTLS support to run_async"
Task 6: Graceful Shutdown
Files:
- Modify: fast_grpc/app.py
- Test: tests/test_app.py
- [ ] Step 1: Write tests for graceful shutdown
In tests/test_app.py, add:
import signal
import asyncio
@patch("fast_grpc.app.grpc.aio.server")
async def test_run_async_with_graceful_timeout(mock_grpc_server):
"""When graceful_timeout is set, server.stop is called with that value on signal."""
mock_server = AsyncMock(spec=grpc.aio.Server)
mock_server.add_insecure_port = Mock(return_value=12345)
mock_server.start = AsyncMock()
mock_server.stop = AsyncMock()
mock_server.wait_for_termination = AsyncMock()
mock_grpc_server.return_value = mock_server
app = FastGRPC(name="TestService", proto="test.proto", auto_gen_proto=False)
@app.unary_unary()
async def test_method(request: RequestModel) -> ResponseModel:
return ResponseModel(reply="ok")
# Start the server in a task, then cancel it to simulate shutdown
async def run_and_stop():
task = asyncio.create_task(
app.run_async(
host="127.0.0.1",
port=50051,
server=mock_server,
reflection_enable=False,
graceful_timeout=30.0,
)
)
await asyncio.sleep(0.01) # Let server start
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
await run_and_stop()
mock_server.start.assert_called_once()
async def test_run_async_no_graceful_timeout_default():
"""Without graceful_timeout, the behavior is unchanged (no signal handlers set)."""
app = FastGRPC(name="TestService", proto="test.proto", auto_gen_proto=False)
# graceful_timeout defaults to None, backward compatible
assert True
async def test_run_async_sets_signal_handlers():
"""run_async registers SIGTERM and SIGINT handlers when graceful_timeout is set."""
app = FastGRPC(name="TestService", proto="test.proto", auto_gen_proto=False)
mock_server = AsyncMock(spec=grpc.aio.Server)
mock_server.add_insecure_port = Mock(return_value=12345)
mock_server.start = AsyncMock()
mock_server.stop = AsyncMock()
mock_server.wait_for_termination = AsyncMock()
@app.unary_unary()
async def test_method(request: RequestModel) -> ResponseModel:
return ResponseModel(reply="ok")
original_handlers = {}
for sig in (signal.SIGTERM, signal.SIGINT):
try:
original_handlers[sig] = signal.getsignal(sig)
except Exception:
pass
try:
async def run_and_cancel():
task = asyncio.create_task(
app.run_async(
host="127.0.0.1",
port=50051,
server=mock_server,
reflection_enable=False,
graceful_timeout=5.0,
)
)
await asyncio.sleep(0.01)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
await run_and_cancel()
# Signal handlers should have been set (on platforms that support it)
finally:
for sig, handler in original_handlers.items():
try:
signal.signal(sig, handler)
except Exception:
pass
- [ ] Step 2: Run tests to verify failure
Run: poetry run pytest tests/test_app.py::test_run_async_with_graceful_timeout -v
Expected: FAIL
- [ ] Step 3: Implement graceful shutdown in
run_async()
In fast_grpc/app.py, modify run_async() to handle graceful shutdown. Replace await server.wait_for_termination() with signal-handling logic:
async def run_async(
self,
host: str = "127.0.0.1",
port: int = 50051,
server: Optional[Server] = None,
reflection_enable: bool = True,
health_check: bool = False,
ssl_certificate_chain: Optional[str] = None,
ssl_private_key: Optional[str] = None,
ca_certificate: Optional[str] = None,
graceful_timeout: Optional[float] = None,
) -> None:
server = grpc.aio.server() if not server else server
# ... TLS setup ...
self.add_to_server(server)
if reflection_enable:
self.enable_server_reflection(server)
if health_check:
# ... health check setup ...
await server.start()
logger.info(f"Running grpc on {host}:{port}")
if graceful_timeout is not None:
stop_event = asyncio.Event()
def _signal_handler():
logger.info("Shutting down gracefully...")
stop_event.set()
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
try:
loop.add_signal_handler(sig, _signal_handler)
except NotImplementedError:
logger.warning(
f"Signal handler for {sig.name} not supported on this platform"
)
await stop_event.wait()
await server.stop(graceful_timeout)
logger.info("Server stopped gracefully")
else:
await server.wait_for_termination()
Add import signal to the top of fast_grpc/app.py.
- [ ] Step 4: Run tests
Run: poetry run pytest tests/test_app.py::test_run_async_with_graceful_timeout -v
Expected: PASS
- [ ] Step 5: Run full test suite
Run: poetry run pytest tests/ -v
Expected: All pass
- [ ] Step 6: Commit
git add fast_grpc/app.py tests/test_app.py
git commit -m "feat: add graceful shutdown with signal handling"
Task 7: Integration Test and Docs
Files:
- Create: tests/test_production_features.py
- Modify: README.md
- [ ] Step 1: Write integration test for all features together
In tests/test_production_features.py:
"""
Integration tests for production features: TLS, graceful shutdown,
health checking, and timeout control.
"""
import asyncio
import grpc
import pytest
import tempfile
import os
from unittest.mock import AsyncMock, Mock, patch
from pydantic import BaseModel
from fast_grpc import FastGRPC
class RequestModel(BaseModel):
message: str
class ResponseModel(BaseModel):
reply: str
async def test_full_production_config():
"""All production features can be configured together."""
app = FastGRPC(
name="ProdService",
proto="prod.proto",
timeout=30.0,
auto_gen_proto=False,
)
@app.unary_unary(timeout=5.0)
async def fast_method(request: RequestModel) -> ResponseModel:
return ResponseModel(reply=request.message)
@app.unary_unary()
async def slow_method(request: RequestModel) -> ResponseModel:
return ResponseModel(reply=request.message)
assert fast_method.timeout == 5.0
# slow_method timeout is None, will resolve at add_to_server time
assert slow_method.timeout is None
async def test_server_timeout_aborts_request():
"""Server-side timeout aborts request with DEADLINE_EXCEEDED."""
from fast_grpc.service import UnaryUnaryMethod
async def endpoint(request: RequestModel) -> ResponseModel:
return ResponseModel(reply="ok")
method = UnaryUnaryMethod(
endpoint=endpoint,
request_model=RequestModel,
response_model=ResponseModel,
timeout=1.0,
)
mock_context = Mock()
mock_context.abort = AsyncMock()
mock_context.elapsed_time = 2000 # 2 seconds (exceeds 1s timeout)
with patch("fast_grpc.service.message_to_pydantic") as mock_convert:
mock_convert.return_value = RequestModel(message="test")
await method(Mock(), mock_context)
mock_context.abort.assert_called_once_with(
grpc.StatusCode.DEADLINE_EXCEEDED, "Server timeout: 1.0s"
)
- [ ] Step 2: Run integration tests
Run: poetry run pytest tests/test_production_features.py -v
Expected: All pass
- [ ] Step 3: Run full test suite
Run: poetry run pytest tests/ -v
Expected: All pass
- [ ] Step 4: Commit
git add tests/test_production_features.py
git commit -m "test: add integration tests for production features"
- [ ] Step 5: Update pyproject.toml for optional grpcio-health-checking dependency
In pyproject.toml, add to [tool.poetry.dependencies]:
grpcio-health-checking = {version = ">=1.43.0", optional = true}
And add to extras:
[tool.poetry.extras]
health = ["grpcio-health-checking"]
- [ ] Step 6: Commit
git add pyproject.toml
git commit -m "chore: add grpcio-health-checking as optional dependency"