Usage

Here are some instructions on how to run the service.

Cluster

The first thing to do is start a Needlestack cluster. This involves running the Merger and Searcher gRPC servicers.

Create BaseConfig

See needlestack.servicers.settings module for available settings fields.

from needlestack.servicers.settings import BaseConfig

class MyNeedlestackConfig(BaseConfig):
    LOG_LEVEL = "WARNING"
    LOG_FILE = "/var/log/needlestack.log"
    LOG_FILE_MAX_BYTES = 10 * 1024 ** 2
    LOG_FILE_BACKUPS = 10

    MAX_WORKERS = 10
    HOSTNAME = socket.gethostname()
    SERVICER_PORT = 50051

    CLUSTER_NAME = "my_cluster"
    ZOOKEEPER_HOSTS = [
        "zoo1:2181",
        "zoo2:2181",
        "zoo3:2181"
    ]

Service Script

Create Python scripts to run the gRPC services. Choose either Separate Nodes or Same Nodes.

Separate Nodes

Run Merger and Searcher on separate nodes

from grpc_health.v1 import health_pb2, health_pb2_grpc
from grpc_health.v1.health import HealthServicer

from needlestack.apis import servicers_pb2_grpc
from needlestack.servicers import factory
from needlestack.servicers.merger import MergerServicer

def main():
    config = MyNeedlestackConfig()
    server = factory.create_server(config)
    manager = factory.create_zookeeper_cluster_manager(config)
    manager.startup()
    servicers_pb2_grpc.add_MergerServicer_to_server(MergerServicer(config, manager), server)
    health = HealthServicer()
    health_pb2_grpc.add_HealthServicer_to_server(health, server)
    health.set("Merger", health_pb2.HealthCheckResponse.SERVING)
    factory.serve(server)
from grpc_health.v1 import health_pb2, health_pb2_grpc
from grpc_health.v1.health import HealthServicer

from needlestack.apis import servicers_pb2_grpc
from needlestack.servicers import factory
from needlestack.servicers.searcher import SearcherServicer

def main():
    config = MyNeedlestackConfig()
    server = factory.create_server(config)
    manager = factory.create_zookeeper_cluster_manager(config)
    manager.startup()
    servicers_pb2_grpc.add_SearcherServicer_to_server(SearcherServicer(config, manager), server)
    health = HealthServicer()
    health_pb2_grpc.add_HealthServicer_to_server(health, server)
    health.set("Searcher", health_pb2.HealthCheckResponse.SERVING)
    factory.serve(server)

Same Nodes

Run Merger and Searcher on same node

from grpc_health.v1 import health_pb2, health_pb2_grpc
from grpc_health.v1.health import HealthServicer

from needlestack.apis import servicers_pb2_grpc
from needlestack.servicers import factory
from needlestack.servicers.merger import MergerServicer
from needlestack.servicers.searcher import SearcherServicer

def main():
    config = MyNeedlestackConfig()
    server = factory.create_server(config)
    manager = factory.create_zookeeper_cluster_manager(config)
    manager.startup()
    servicers_pb2_grpc.add_MergerServicer_to_server(MergerServicer(config, manager), server)
    servicers_pb2_grpc.add_SearcherServicer_to_server(SearcherServicer(config, manager), server)
    health = HealthServicer()
    health_pb2_grpc.add_HealthServicer_to_server(health, server)
    health.set("Merger", health_pb2.HealthCheckResponse.SERVING)
    health.set("Searcher", health_pb2.HealthCheckResponse.SERVING)
    factory.serve(server)

Health Checks

Check that a node is up with the following requests.

import grpc
from grpc_health.v1 import health_pb2, health_pb2_grpc

channel = grpc.insecure_channel("localhost:50051")
stub = health_pb2_grpc.HealthStub(channel)
stub.Check(health_pb2.HealthCheckRequest(service="Merger"))
stub.Check(health_pb2.HealthCheckRequest(service="Searcher"))

Configuration

When the Needlestack cluster is up, configure it via a gRPC request to any Merger. This will determine how to split the shards across available Searchers, then send gRPC requests to each Searcher to load specific Shards to memory.

Adding Collections

import grpc
from needlestack.apis import collections_pb2, servicers_pb2_grpc

# Create a list of collections_pb2.Collection objects
# that specifies collections, shards, and their data sources
# collections = [...]

channel = grpc.insecure_channel("localhost:50051")
stub = servicers_pb2_grpc.MergerStub(channel)
request = collections_pb2.CollectionsAddRequest(collections=collections)
response = stub.CollectionConfiguration(request)

Deleting Collections

request = collections_pb2.CollectionsDeleteRequest(
    names=["my_collection_name", "another_collection_name"]
)
response = stub.CollectionsDelete(request)

Reloading Collections

If any data sources have changes, this will load the new changes on all searchers.

response = stub.CollectionsLoad(collections_pb2.CollectionsLoadRequest())

Query

Search queries should be issued to any Merger node. These request should contain the vector, count, collection name, and optionally a list of specific shards to in that collection to search. If a list of shards is not provided, the search occurs over all shards.

from needlestack.apis import serializers
from needlestack.apis import servicers_pb2, servicers_pb2_grpc

channel = grpc.insecure_channel("localhost:50051")
stub = servicers_pb2_grpc.MergerStub(channel)
# X = some vector as a numpy array
# k = number of k neighbors
vector = serializers.ndarray_to_proto(X)
request = servicers_pb2.SearchRequest(vector=vector, count=k, collection_name="my_collection")
response = stub.Search(request)

In a production environment, load balance queries across mergers to distribute network traffic.