blob: fbb791aa15bbdc4ad32139fe1e5c35f5fca31329 [file] [log] [blame]
# Copyright 2020 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import logging
import random
from typing import Any, Dict, List, Optional, Set
from framework import xds_flags
from framework.infrastructure import gcp
logger = logging.getLogger(__name__)
# Type aliases
# Compute
_ComputeV1 = gcp.compute.ComputeV1
GcpResource = _ComputeV1.GcpResource
HealthCheckProtocol = _ComputeV1.HealthCheckProtocol
ZonalGcpResource = _ComputeV1.ZonalGcpResource
BackendServiceProtocol = _ComputeV1.BackendServiceProtocol
_BackendGRPC = BackendServiceProtocol.GRPC
_HealthCheckGRPC = HealthCheckProtocol.GRPC
# Network Security
_NetworkSecurityV1Beta1 = gcp.network_security.NetworkSecurityV1Beta1
ServerTlsPolicy = gcp.network_security.ServerTlsPolicy
ClientTlsPolicy = gcp.network_security.ClientTlsPolicy
AuthorizationPolicy = gcp.network_security.AuthorizationPolicy
# Network Services
_NetworkServicesV1Alpha1 = gcp.network_services.NetworkServicesV1Alpha1
_NetworkServicesV1Beta1 = gcp.network_services.NetworkServicesV1Beta1
EndpointPolicy = gcp.network_services.EndpointPolicy
GrpcRoute = gcp.network_services.GrpcRoute
Mesh = gcp.network_services.Mesh
# Testing metadata consts
TEST_AFFINITY_METADATA_KEY = 'xds_md'
class TrafficDirectorManager: # pylint: disable=too-many-public-methods
compute: _ComputeV1
resource_prefix: str
resource_suffix: str
BACKEND_SERVICE_NAME = "backend-service"
ALTERNATIVE_BACKEND_SERVICE_NAME = "backend-service-alt"
AFFINITY_BACKEND_SERVICE_NAME = "backend-service-affinity"
HEALTH_CHECK_NAME = "health-check"
URL_MAP_NAME = "url-map"
ALTERNATIVE_URL_MAP_NAME = "url-map-alt"
URL_MAP_PATH_MATCHER_NAME = "path-matcher"
TARGET_PROXY_NAME = "target-proxy"
ALTERNATIVE_TARGET_PROXY_NAME = "target-proxy-alt"
FORWARDING_RULE_NAME = "forwarding-rule"
ALTERNATIVE_FORWARDING_RULE_NAME = "forwarding-rule-alt"
FIREWALL_RULE_NAME = "allow-health-checks"
def __init__(
self,
gcp_api_manager: gcp.api.GcpApiManager,
project: str,
*,
resource_prefix: str,
resource_suffix: str,
network: str = 'default',
compute_api_version: str = 'v1',
):
# API
self.compute = _ComputeV1(gcp_api_manager,
project,
version=compute_api_version)
# Settings
self.project: str = project
self.network: str = network
self.resource_prefix: str = resource_prefix
self.resource_suffix: str = resource_suffix
# Managed resources
self.health_check: Optional[GcpResource] = None
self.backend_service: Optional[GcpResource] = None
# TODO(sergiitk): remove this flag once backend service resource loaded
self.backend_service_protocol: Optional[BackendServiceProtocol] = None
self.url_map: Optional[GcpResource] = None
self.alternative_url_map: Optional[GcpResource] = None
self.firewall_rule: Optional[GcpResource] = None
self.target_proxy: Optional[GcpResource] = None
# TODO(sergiitk): remove this flag once target proxy resource loaded
self.target_proxy_is_http: bool = False
self.alternative_target_proxy: Optional[GcpResource] = None
self.forwarding_rule: Optional[GcpResource] = None
self.alternative_forwarding_rule: Optional[GcpResource] = None
self.backends: Set[ZonalGcpResource] = set()
self.alternative_backend_service: Optional[GcpResource] = None
# TODO(sergiitk): remove this flag once backend service resource loaded
self.alternative_backend_service_protocol: Optional[
BackendServiceProtocol] = None
self.alternative_backends: Set[ZonalGcpResource] = set()
self.affinity_backend_service: Optional[GcpResource] = None
# TODO(sergiitk): remove this flag once backend service resource loaded
self.affinity_backend_service_protocol: Optional[
BackendServiceProtocol] = None
self.affinity_backends: Set[ZonalGcpResource] = set()
@property
def network_url(self):
return f'global/networks/{self.network}'
def setup_for_grpc(
self,
service_host,
service_port,
*,
backend_protocol: Optional[BackendServiceProtocol] = _BackendGRPC,
health_check_port: Optional[int] = None):
self.setup_backend_for_grpc(protocol=backend_protocol,
health_check_port=health_check_port)
self.setup_routing_rule_map_for_grpc(service_host, service_port)
def setup_backend_for_grpc(
self,
*,
protocol: Optional[BackendServiceProtocol] = _BackendGRPC,
health_check_port: Optional[int] = None):
self.create_health_check(port=health_check_port)
self.create_backend_service(protocol)
def setup_routing_rule_map_for_grpc(self, service_host, service_port):
self.create_url_map(service_host, service_port)
self.create_target_proxy()
self.create_forwarding_rule(service_port)
def cleanup(self, *, force=False):
# Cleanup in the reverse order of creation
self.delete_forwarding_rule(force=force)
self.delete_alternative_forwarding_rule(force=force)
self.delete_target_http_proxy(force=force)
self.delete_target_grpc_proxy(force=force)
self.delete_alternative_target_grpc_proxy(force=force)
self.delete_url_map(force=force)
self.delete_alternative_url_map(force=force)
self.delete_backend_service(force=force)
self.delete_alternative_backend_service(force=force)
self.delete_affinity_backend_service(force=force)
self.delete_health_check(force=force)
@functools.lru_cache(None)
def make_resource_name(self, name: str) -> str:
"""Make dash-separated resource name with resource prefix and suffix."""
parts = [self.resource_prefix, name]
# Avoid trailing dash when the suffix is empty.
if self.resource_suffix:
parts.append(self.resource_suffix)
return '-'.join(parts)
def create_health_check(
self,
*,
protocol: Optional[HealthCheckProtocol] = _HealthCheckGRPC,
port: Optional[int] = None):
if self.health_check:
raise ValueError(f'Health check {self.health_check.name} '
'already created, delete it first')
if protocol is None:
protocol = _HealthCheckGRPC
name = self.make_resource_name(self.HEALTH_CHECK_NAME)
logger.info('Creating %s Health Check "%s"', protocol.name, name)
resource = self.compute.create_health_check(name, protocol, port=port)
self.health_check = resource
def delete_health_check(self, force=False):
if force:
name = self.make_resource_name(self.HEALTH_CHECK_NAME)
elif self.health_check:
name = self.health_check.name
else:
return
logger.info('Deleting Health Check "%s"', name)
self.compute.delete_health_check(name)
self.health_check = None
def create_backend_service(
self,
protocol: Optional[BackendServiceProtocol] = _BackendGRPC,
subset_size: Optional[int] = None,
affinity_header: Optional[str] = None,
locality_lb_policies: Optional[List[dict]] = None,
outlier_detection: Optional[dict] = None):
if protocol is None:
protocol = _BackendGRPC
name = self.make_resource_name(self.BACKEND_SERVICE_NAME)
logger.info('Creating %s Backend Service "%s"', protocol.name, name)
resource = self.compute.create_backend_service_traffic_director(
name,
health_check=self.health_check,
protocol=protocol,
subset_size=subset_size,
affinity_header=affinity_header,
locality_lb_policies=locality_lb_policies,
outlier_detection=outlier_detection)
self.backend_service = resource
self.backend_service_protocol = protocol
def load_backend_service(self):
name = self.make_resource_name(self.BACKEND_SERVICE_NAME)
resource = self.compute.get_backend_service_traffic_director(name)
self.backend_service = resource
def delete_backend_service(self, force=False):
if force:
name = self.make_resource_name(self.BACKEND_SERVICE_NAME)
elif self.backend_service:
name = self.backend_service.name
else:
return
logger.info('Deleting Backend Service "%s"', name)
self.compute.delete_backend_service(name)
self.backend_service = None
def backend_service_add_neg_backends(self,
name,
zones,
max_rate_per_endpoint: Optional[
int] = None):
logger.info('Waiting for Network Endpoint Groups to load endpoints.')
for zone in zones:
backend = self.compute.wait_for_network_endpoint_group(name, zone)
logger.info('Loaded NEG "%s" in zone %s', backend.name,
backend.zone)
self.backends.add(backend)
self.backend_service_patch_backends(max_rate_per_endpoint)
def backend_service_remove_neg_backends(self, name, zones):
logger.info('Waiting for Network Endpoint Groups to load endpoints.')
for zone in zones:
backend = self.compute.wait_for_network_endpoint_group(name, zone)
logger.info('Loaded NEG "%s" in zone %s', backend.name,
backend.zone)
self.backends.remove(backend)
self.backend_service_patch_backends()
def backend_service_patch_backends(
self, max_rate_per_endpoint: Optional[int] = None):
logging.info('Adding backends to Backend Service %s: %r',
self.backend_service.name, self.backends)
self.compute.backend_service_patch_backends(self.backend_service,
self.backends,
max_rate_per_endpoint)
def backend_service_remove_all_backends(self):
logging.info('Removing backends from Backend Service %s',
self.backend_service.name)
self.compute.backend_service_remove_all_backends(self.backend_service)
def wait_for_backends_healthy_status(self):
logger.debug(
"Waiting for Backend Service %s to report all backends healthy %r",
self.backend_service, self.backends)
self.compute.wait_for_backends_healthy_status(self.backend_service,
self.backends)
def create_alternative_backend_service(
self, protocol: Optional[BackendServiceProtocol] = _BackendGRPC):
if protocol is None:
protocol = _BackendGRPC
name = self.make_resource_name(self.ALTERNATIVE_BACKEND_SERVICE_NAME)
logger.info('Creating %s Alternative Backend Service "%s"',
protocol.name, name)
resource = self.compute.create_backend_service_traffic_director(
name, health_check=self.health_check, protocol=protocol)
self.alternative_backend_service = resource
self.alternative_backend_service_protocol = protocol
def load_alternative_backend_service(self):
name = self.make_resource_name(self.ALTERNATIVE_BACKEND_SERVICE_NAME)
resource = self.compute.get_backend_service_traffic_director(name)
self.alternative_backend_service = resource
def delete_alternative_backend_service(self, force=False):
if force:
name = self.make_resource_name(
self.ALTERNATIVE_BACKEND_SERVICE_NAME)
elif self.alternative_backend_service:
name = self.alternative_backend_service.name
else:
return
logger.info('Deleting Alternative Backend Service "%s"', name)
self.compute.delete_backend_service(name)
self.alternative_backend_service = None
def alternative_backend_service_add_neg_backends(self, name, zones):
logger.info('Waiting for Network Endpoint Groups to load endpoints.')
for zone in zones:
backend = self.compute.wait_for_network_endpoint_group(name, zone)
logger.info('Loaded NEG "%s" in zone %s', backend.name,
backend.zone)
self.alternative_backends.add(backend)
self.alternative_backend_service_patch_backends()
def alternative_backend_service_patch_backends(self):
logging.info('Adding backends to Backend Service %s: %r',
self.alternative_backend_service.name,
self.alternative_backends)
self.compute.backend_service_patch_backends(
self.alternative_backend_service, self.alternative_backends)
def alternative_backend_service_remove_all_backends(self):
logging.info('Removing backends from Backend Service %s',
self.alternative_backend_service.name)
self.compute.backend_service_remove_all_backends(
self.alternative_backend_service)
def wait_for_alternative_backends_healthy_status(self):
logger.debug(
"Waiting for Backend Service %s to report all backends healthy %r",
self.alternative_backend_service, self.alternative_backends)
self.compute.wait_for_backends_healthy_status(
self.alternative_backend_service, self.alternative_backends)
def create_affinity_backend_service(
self, protocol: Optional[BackendServiceProtocol] = _BackendGRPC):
if protocol is None:
protocol = _BackendGRPC
name = self.make_resource_name(self.AFFINITY_BACKEND_SERVICE_NAME)
logger.info('Creating %s Affinity Backend Service "%s"', protocol.name,
name)
resource = self.compute.create_backend_service_traffic_director(
name,
health_check=self.health_check,
protocol=protocol,
affinity_header=TEST_AFFINITY_METADATA_KEY)
self.affinity_backend_service = resource
self.affinity_backend_service_protocol = protocol
def load_affinity_backend_service(self):
name = self.make_resource_name(self.AFFINITY_BACKEND_SERVICE_NAME)
resource = self.compute.get_backend_service_traffic_director(name)
self.affinity_backend_service = resource
def delete_affinity_backend_service(self, force=False):
if force:
name = self.make_resource_name(self.AFFINITY_BACKEND_SERVICE_NAME)
elif self.affinity_backend_service:
name = self.affinity_backend_service.name
else:
return
logger.info('Deleting Affinity Backend Service "%s"', name)
self.compute.delete_backend_service(name)
self.affinity_backend_service = None
def affinity_backend_service_add_neg_backends(self, name, zones):
logger.info('Waiting for Network Endpoint Groups to load endpoints.')
for zone in zones:
backend = self.compute.wait_for_network_endpoint_group(name, zone)
logger.info('Loaded NEG "%s" in zone %s', backend.name,
backend.zone)
self.affinity_backends.add(backend)
self.affinity_backend_service_patch_backends()
def affinity_backend_service_patch_backends(self):
logging.info('Adding backends to Backend Service %s: %r',
self.affinity_backend_service.name, self.affinity_backends)
self.compute.backend_service_patch_backends(
self.affinity_backend_service, self.affinity_backends)
def affinity_backend_service_remove_all_backends(self):
logging.info('Removing backends from Backend Service %s',
self.affinity_backend_service.name)
self.compute.backend_service_remove_all_backends(
self.affinity_backend_service)
def wait_for_affinity_backends_healthy_status(self):
logger.debug(
"Waiting for Backend Service %s to report all backends healthy %r",
self.affinity_backend_service, self.affinity_backends)
self.compute.wait_for_backends_healthy_status(
self.affinity_backend_service, self.affinity_backends)
@staticmethod
def _generate_url_map_body(
name: str,
matcher_name: str,
src_hosts,
dst_default_backend_service: GcpResource,
dst_host_rule_match_backend_service: Optional[GcpResource] = None,
) -> Dict[str, Any]:
if dst_host_rule_match_backend_service is None:
dst_host_rule_match_backend_service = dst_default_backend_service
return {
'name':
name,
'defaultService':
dst_default_backend_service.url,
'hostRules': [{
'hosts': src_hosts,
'pathMatcher': matcher_name,
}],
'pathMatchers': [{
'name': matcher_name,
'defaultService': dst_host_rule_match_backend_service.url,
}],
}
def create_url_map(self, src_host: str, src_port: int) -> GcpResource:
src_address = f'{src_host}:{src_port}'
name = self.make_resource_name(self.URL_MAP_NAME)
matcher_name = self.make_resource_name(self.URL_MAP_PATH_MATCHER_NAME)
logger.info('Creating URL map "%s": %s -> %s', name, src_address,
self.backend_service.name)
resource = self.compute.create_url_map_with_content(
self._generate_url_map_body(name, matcher_name, [src_address],
self.backend_service))
self.url_map = resource
return resource
def patch_url_map(self, src_host: str, src_port: int,
backend_service: GcpResource):
src_address = f'{src_host}:{src_port}'
name = self.make_resource_name(self.URL_MAP_NAME)
matcher_name = self.make_resource_name(self.URL_MAP_PATH_MATCHER_NAME)
logger.info('Patching URL map "%s": %s -> %s', name, src_address,
backend_service.name)
self.compute.patch_url_map(
self.url_map,
self._generate_url_map_body(name, matcher_name, [src_address],
backend_service))
def create_url_map_with_content(self, url_map_body: Any) -> GcpResource:
logger.info('Creating URL map: %s', url_map_body)
resource = self.compute.create_url_map_with_content(url_map_body)
self.url_map = resource
return resource
def delete_url_map(self, force=False):
if force:
name = self.make_resource_name(self.URL_MAP_NAME)
elif self.url_map:
name = self.url_map.name
else:
return
logger.info('Deleting URL Map "%s"', name)
self.compute.delete_url_map(name)
self.url_map = None
def create_alternative_url_map(
self,
src_host: str,
src_port: int,
backend_service: Optional[GcpResource] = None) -> GcpResource:
name = self.make_resource_name(self.ALTERNATIVE_URL_MAP_NAME)
src_address = f'{src_host}:{src_port}'
matcher_name = self.make_resource_name(self.URL_MAP_PATH_MATCHER_NAME)
if backend_service is None:
backend_service = self.alternative_backend_service
logger.info('Creating alternative URL map "%s": %s -> %s', name,
src_address, backend_service.name)
resource = self.compute.create_url_map_with_content(
self._generate_url_map_body(name, matcher_name, [src_address],
backend_service))
self.alternative_url_map = resource
return resource
def delete_alternative_url_map(self, force=False):
if force:
name = self.make_resource_name(self.ALTERNATIVE_URL_MAP_NAME)
elif self.alternative_url_map:
name = self.alternative_url_map.name
else:
return
logger.info('Deleting alternative URL Map "%s"', name)
self.compute.delete_url_map(name)
self.url_map = None
def create_target_proxy(self):
name = self.make_resource_name(self.TARGET_PROXY_NAME)
if self.backend_service_protocol is BackendServiceProtocol.GRPC:
target_proxy_type = 'GRPC'
create_proxy_fn = self.compute.create_target_grpc_proxy
self.target_proxy_is_http = False
elif self.backend_service_protocol is BackendServiceProtocol.HTTP2:
target_proxy_type = 'HTTP'
create_proxy_fn = self.compute.create_target_http_proxy
self.target_proxy_is_http = True
else:
raise TypeError('Unexpected backend service protocol')
logger.info('Creating target %s proxy "%s" to URL map %s', name,
target_proxy_type, self.url_map.name)
self.target_proxy = create_proxy_fn(name, self.url_map)
def delete_target_grpc_proxy(self, force=False):
if force:
name = self.make_resource_name(self.TARGET_PROXY_NAME)
elif self.target_proxy:
name = self.target_proxy.name
else:
return
logger.info('Deleting Target GRPC proxy "%s"', name)
self.compute.delete_target_grpc_proxy(name)
self.target_proxy = None
self.target_proxy_is_http = False
def delete_target_http_proxy(self, force=False):
if force:
name = self.make_resource_name(self.TARGET_PROXY_NAME)
elif self.target_proxy and self.target_proxy_is_http:
name = self.target_proxy.name
else:
return
logger.info('Deleting HTTP Target proxy "%s"', name)
self.compute.delete_target_http_proxy(name)
self.target_proxy = None
self.target_proxy_is_http = False
def create_alternative_target_proxy(self):
name = self.make_resource_name(self.ALTERNATIVE_TARGET_PROXY_NAME)
if self.backend_service_protocol is BackendServiceProtocol.GRPC:
logger.info(
'Creating alternative target GRPC proxy "%s" to URL map %s',
name, self.alternative_url_map.name)
self.alternative_target_proxy = self.compute.create_target_grpc_proxy(
name, self.alternative_url_map, False)
else:
raise TypeError('Unexpected backend service protocol')
def delete_alternative_target_grpc_proxy(self, force=False):
if force:
name = self.make_resource_name(self.ALTERNATIVE_TARGET_PROXY_NAME)
elif self.alternative_target_proxy:
name = self.alternative_target_proxy.name
else:
return
logger.info('Deleting alternative Target GRPC proxy "%s"', name)
self.compute.delete_target_grpc_proxy(name)
self.alternative_target_proxy = None
def find_unused_forwarding_rule_port(
self,
*,
lo: int = 1024, # To avoid confusion, skip well-known ports.
hi: int = 65535,
attempts: int = 25) -> int:
for _ in range(attempts):
src_port = random.randint(lo, hi)
if not self.compute.exists_forwarding_rule(src_port):
return src_port
# TODO(sergiitk): custom exception
raise RuntimeError("Couldn't find unused forwarding rule port")
def create_forwarding_rule(self, src_port: int):
name = self.make_resource_name(self.FORWARDING_RULE_NAME)
src_port = int(src_port)
logging.info(
'Creating forwarding rule "%s" in network "%s": 0.0.0.0:%s -> %s',
name, self.network, src_port, self.target_proxy.url)
resource = self.compute.create_forwarding_rule(name, src_port,
self.target_proxy,
self.network_url)
self.forwarding_rule = resource
return resource
def delete_forwarding_rule(self, force=False):
if force:
name = self.make_resource_name(self.FORWARDING_RULE_NAME)
elif self.forwarding_rule:
name = self.forwarding_rule.name
else:
return
logger.info('Deleting Forwarding rule "%s"', name)
self.compute.delete_forwarding_rule(name)
self.forwarding_rule = None
def create_alternative_forwarding_rule(self,
src_port: int,
ip_address='0.0.0.0'):
name = self.make_resource_name(self.ALTERNATIVE_FORWARDING_RULE_NAME)
src_port = int(src_port)
logging.info(
'Creating alternative forwarding rule "%s" in network "%s": %s:%s -> %s',
name, self.network, ip_address, src_port,
self.alternative_target_proxy.url)
resource = self.compute.create_forwarding_rule(
name,
src_port,
self.alternative_target_proxy,
self.network_url,
ip_address=ip_address)
self.alternative_forwarding_rule = resource
return resource
def delete_alternative_forwarding_rule(self, force=False):
if force:
name = self.make_resource_name(
self.ALTERNATIVE_FORWARDING_RULE_NAME)
elif self.alternative_forwarding_rule:
name = self.alternative_forwarding_rule.name
else:
return
logger.info('Deleting alternative Forwarding rule "%s"', name)
self.compute.delete_forwarding_rule(name)
self.alternative_forwarding_rule = None
def create_firewall_rule(self, allowed_ports: List[str]):
name = self.make_resource_name(self.FIREWALL_RULE_NAME)
logging.info(
'Creating firewall rule "%s" in network "%s" with allowed ports %s',
name, self.network, allowed_ports)
resource = self.compute.create_firewall_rule(
name, self.network_url, xds_flags.FIREWALL_SOURCE_RANGE.value,
allowed_ports)
self.firewall_rule = resource
def delete_firewall_rule(self, force=False):
"""The firewall rule won't be automatically removed."""
if force:
name = self.make_resource_name(self.FIREWALL_RULE_NAME)
elif self.firewall_rule:
name = self.firewall_rule.name
else:
return
logger.info('Deleting Firewall Rule "%s"', name)
self.compute.delete_firewall_rule(name)
self.firewall_rule = None
class TrafficDirectorAppNetManager(TrafficDirectorManager):
GRPC_ROUTE_NAME = "grpc-route"
MESH_NAME = "mesh"
netsvc: _NetworkServicesV1Alpha1
def __init__(self,
gcp_api_manager: gcp.api.GcpApiManager,
project: str,
*,
resource_prefix: str,
resource_suffix: Optional[str] = None,
network: str = 'default',
compute_api_version: str = 'v1'):
super().__init__(gcp_api_manager,
project,
resource_prefix=resource_prefix,
resource_suffix=resource_suffix,
network=network,
compute_api_version=compute_api_version)
# API
self.netsvc = _NetworkServicesV1Alpha1(gcp_api_manager, project)
# Managed resources
# TODO(gnossen) PTAL at the pylint error
self.grpc_route: Optional[GrpcRoute] = None
self.mesh: Optional[Mesh] = None
def create_mesh(self) -> GcpResource:
name = self.make_resource_name(self.MESH_NAME)
logger.info("Creating Mesh %s", name)
body = {}
resource = self.netsvc.create_mesh(name, body)
self.mesh = self.netsvc.get_mesh(name)
logger.debug("Loaded Mesh: %s", self.mesh)
return resource
def delete_mesh(self, force=False):
if force:
name = self.make_resource_name(self.MESH_NAME)
elif self.mesh:
name = self.mesh.name
else:
return
logger.info('Deleting Mesh %s', name)
self.netsvc.delete_mesh(name)
self.mesh = None
def create_grpc_route(self, src_host: str, src_port: int) -> GcpResource:
host = f'{src_host}:{src_port}'
service_name = self.netsvc.resource_full_name(self.backend_service.name,
"backendServices")
body = {
"meshes": [self.mesh.url],
"hostnames":
host,
"rules": [{
"action": {
"destinations": [{
"serviceName": service_name
}]
}
}],
}
name = self.make_resource_name(self.GRPC_ROUTE_NAME)
logger.info("Creating GrpcRoute %s", name)
resource = self.netsvc.create_grpc_route(name, body)
self.grpc_route = self.netsvc.get_grpc_route(name)
logger.debug("Loaded GrpcRoute: %s", self.grpc_route)
return resource
def create_grpc_route_with_content(self, body: Any) -> GcpResource:
name = self.make_resource_name(self.GRPC_ROUTE_NAME)
logger.info("Creating GrpcRoute %s", name)
resource = self.netsvc.create_grpc_route(name, body)
self.grpc_route = self.netsvc.get_grpc_route(name)
logger.debug("Loaded GrpcRoute: %s", self.grpc_route)
return resource
def delete_grpc_route(self, force=False):
if force:
name = self.make_resource_name(self.GRPC_ROUTE_NAME)
elif self.grpc_route:
name = self.grpc_route.name
else:
return
logger.info('Deleting GrpcRoute %s', name)
self.netsvc.delete_grpc_route(name)
self.grpc_route = None
def cleanup(self, *, force=False):
self.delete_grpc_route(force=force)
self.delete_mesh(force=force)
super().cleanup(force=force)
class TrafficDirectorSecureManager(TrafficDirectorManager):
SERVER_TLS_POLICY_NAME = "server-tls-policy"
CLIENT_TLS_POLICY_NAME = "client-tls-policy"
AUTHZ_POLICY_NAME = "authz-policy"
ENDPOINT_POLICY = "endpoint-policy"
CERTIFICATE_PROVIDER_INSTANCE = "google_cloud_private_spiffe"
netsec: _NetworkSecurityV1Beta1
netsvc: _NetworkServicesV1Beta1
def __init__(
self,
gcp_api_manager: gcp.api.GcpApiManager,
project: str,
*,
resource_prefix: str,
resource_suffix: Optional[str] = None,
network: str = 'default',
compute_api_version: str = 'v1',
):
super().__init__(gcp_api_manager,
project,
resource_prefix=resource_prefix,
resource_suffix=resource_suffix,
network=network,
compute_api_version=compute_api_version)
# API
self.netsec = _NetworkSecurityV1Beta1(gcp_api_manager, project)
self.netsvc = _NetworkServicesV1Beta1(gcp_api_manager, project)
# Managed resources
self.server_tls_policy: Optional[ServerTlsPolicy] = None
self.client_tls_policy: Optional[ClientTlsPolicy] = None
self.authz_policy: Optional[AuthorizationPolicy] = None
self.endpoint_policy: Optional[EndpointPolicy] = None
def setup_server_security(self,
*,
server_namespace,
server_name,
server_port,
tls=True,
mtls=True):
self.create_server_tls_policy(tls=tls, mtls=mtls)
self.create_endpoint_policy(server_namespace=server_namespace,
server_name=server_name,
server_port=server_port)
def setup_client_security(self,
*,
server_namespace,
server_name,
tls=True,
mtls=True):
self.create_client_tls_policy(tls=tls, mtls=mtls)
self.backend_service_apply_client_mtls_policy(server_namespace,
server_name)
def cleanup(self, *, force=False):
# Cleanup in the reverse order of creation
super().cleanup(force=force)
self.delete_endpoint_policy(force=force)
self.delete_server_tls_policy(force=force)
self.delete_client_tls_policy(force=force)
self.delete_authz_policy(force=force)
def create_server_tls_policy(self, *, tls, mtls):
name = self.make_resource_name(self.SERVER_TLS_POLICY_NAME)
logger.info('Creating Server TLS Policy %s', name)
if not tls and not mtls:
logger.warning(
'Server TLS Policy %s neither TLS, nor mTLS '
'policy. Skipping creation', name)
return
certificate_provider = self._get_certificate_provider()
policy = {}
if tls:
policy["serverCertificate"] = certificate_provider
if mtls:
policy["mtlsPolicy"] = {
"clientValidationCa": [certificate_provider],
}
self.netsec.create_server_tls_policy(name, policy)
self.server_tls_policy = self.netsec.get_server_tls_policy(name)
logger.debug('Server TLS Policy loaded: %r', self.server_tls_policy)
def delete_server_tls_policy(self, force=False):
if force:
name = self.make_resource_name(self.SERVER_TLS_POLICY_NAME)
elif self.server_tls_policy:
name = self.server_tls_policy.name
else:
return
logger.info('Deleting Server TLS Policy %s', name)
self.netsec.delete_server_tls_policy(name)
self.server_tls_policy = None
def create_authz_policy(self, *, action: str, rules: list):
name = self.make_resource_name(self.AUTHZ_POLICY_NAME)
logger.info('Creating Authz Policy %s', name)
policy = {
"action": action,
"rules": rules,
}
self.netsec.create_authz_policy(name, policy)
self.authz_policy = self.netsec.get_authz_policy(name)
logger.debug('Authz Policy loaded: %r', self.authz_policy)
def delete_authz_policy(self, force=False):
if force:
name = self.make_resource_name(self.AUTHZ_POLICY_NAME)
elif self.authz_policy:
name = self.authz_policy.name
else:
return
logger.info('Deleting Authz Policy %s', name)
self.netsec.delete_authz_policy(name)
self.authz_policy = None
def create_endpoint_policy(self, *, server_namespace: str, server_name: str,
server_port: int) -> None:
name = self.make_resource_name(self.ENDPOINT_POLICY)
logger.info('Creating Endpoint Policy %s', name)
endpoint_matcher_labels = [{
"labelName": "app",
"labelValue": f"{server_namespace}-{server_name}"
}]
port_selector = {"ports": [str(server_port)]}
label_matcher_all = {
"metadataLabelMatchCriteria": "MATCH_ALL",
"metadataLabels": endpoint_matcher_labels,
}
config = {
"type": "GRPC_SERVER",
"trafficPortSelector": port_selector,
"endpointMatcher": {
"metadataLabelMatcher": label_matcher_all,
},
}
if self.server_tls_policy:
config["serverTlsPolicy"] = self.server_tls_policy.name
else:
logger.warning(
'Creating Endpoint Policy %s with '
'no Server TLS policy attached', name)
if self.authz_policy:
config["authorizationPolicy"] = self.authz_policy.name
self.netsvc.create_endpoint_policy(name, config)
self.endpoint_policy = self.netsvc.get_endpoint_policy(name)
logger.debug('Loaded Endpoint Policy: %r', self.endpoint_policy)
def delete_endpoint_policy(self, force: bool = False) -> None:
if force:
name = self.make_resource_name(self.ENDPOINT_POLICY)
elif self.endpoint_policy:
name = self.endpoint_policy.name
else:
return
logger.info('Deleting Endpoint Policy %s', name)
self.netsvc.delete_endpoint_policy(name)
self.endpoint_policy = None
def create_client_tls_policy(self, *, tls, mtls):
name = self.make_resource_name(self.CLIENT_TLS_POLICY_NAME)
logger.info('Creating Client TLS Policy %s', name)
if not tls and not mtls:
logger.warning(
'Client TLS Policy %s neither TLS, nor mTLS '
'policy. Skipping creation', name)
return
certificate_provider = self._get_certificate_provider()
policy = {}
if tls:
policy["serverValidationCa"] = [certificate_provider]
if mtls:
policy["clientCertificate"] = certificate_provider
self.netsec.create_client_tls_policy(name, policy)
self.client_tls_policy = self.netsec.get_client_tls_policy(name)
logger.debug('Client TLS Policy loaded: %r', self.client_tls_policy)
def delete_client_tls_policy(self, force=False):
if force:
name = self.make_resource_name(self.CLIENT_TLS_POLICY_NAME)
elif self.client_tls_policy:
name = self.client_tls_policy.name
else:
return
logger.info('Deleting Client TLS Policy %s', name)
self.netsec.delete_client_tls_policy(name)
self.client_tls_policy = None
def backend_service_apply_client_mtls_policy(
self,
server_namespace,
server_name,
):
if not self.client_tls_policy:
logger.warning(
'Client TLS policy not created, '
'skipping attaching to Backend Service %s',
self.backend_service.name)
return
server_spiffe = (f'spiffe://{self.project}.svc.id.goog/'
f'ns/{server_namespace}/sa/{server_name}')
logging.info(
'Adding Client TLS Policy to Backend Service %s: %s, '
'server %s', self.backend_service.name, self.client_tls_policy.url,
server_spiffe)
self.compute.patch_backend_service(
self.backend_service, {
'securitySettings': {
'clientTlsPolicy': self.client_tls_policy.url,
'subjectAltNames': [server_spiffe]
}
})
@classmethod
def _get_certificate_provider(cls):
return {
"certificateProviderInstance": {
"pluginInstance": cls.CERTIFICATE_PROVIDER_INSTANCE,
},
}