Source code for needlestack.servicers.factory

import os
import logging
import time
from concurrent import futures

import grpc
from grpc._server import _Server

from needlestack.servicers.settings import BaseConfig
from needlestack.servicers.logging import configure_logger
from needlestack.cluster_managers.manager import ClusterManager

_ONE_DAY_IN_SECONDS = 60 * 60 * 24

logger = logging.getLogger("needlestack")


[docs]def create_server(config: BaseConfig) -> _Server: """Create a gRPC server app with a health servicer. Args: config: Config with settings on how to setup the server """ configure_logger(config) server = grpc.server(futures.ThreadPoolExecutor(max_workers=config.MAX_WORKERS)) if config.use_server_ssl: server.add_secure_port( f"[::]:{config.SERVICER_PORT}", config.ssl_server_credentials ) else: server.add_insecure_port(f"[::]:{config.SERVICER_PORT}") return server
[docs]def create_zookeeper_cluster_manager(config: BaseConfig) -> ClusterManager: """Create a Zookeeper client for cluster managment. Args: config: Config with settings on how to set up a Zookeeper client """ from needlestack.cluster_managers.zookeeper import ZookeeperClusterManager return ZookeeperClusterManager( config.CLUSTER_NAME, config.hostport, config.ZOOKEEPER_HOSTS, config.ZOOKEEPER_ROOT, )
[docs]def serve(server: _Server): server.start() logger.info(f"Started gRPC server on {os.getpid()}") try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: logger.info(f"Stopped gRPC server on {os.getpid()}") server.stop(0)