Usage¶
Here are some instructions on how to run the service.
Cluster¶
The first thing to do is start a Needlestack cluster.
This involves running the Merger
and Searcher
gRPC servicers.
Create BaseConfig¶
See needlestack.servicers.settings module for available settings fields.
from needlestack.servicers.settings import BaseConfig
class MyNeedlestackConfig(BaseConfig):
LOG_LEVEL = "WARNING"
LOG_FILE = "/var/log/needlestack.log"
LOG_FILE_MAX_BYTES = 10 * 1024 ** 2
LOG_FILE_BACKUPS = 10
MAX_WORKERS = 10
HOSTNAME = socket.gethostname()
SERVICER_PORT = 50051
CLUSTER_NAME = "my_cluster"
ZOOKEEPER_HOSTS = [
"zoo1:2181",
"zoo2:2181",
"zoo3:2181"
]
Service Script¶
Create Python scripts to run the gRPC services. Choose either Separate Nodes or Same Nodes.
Separate Nodes¶
Run Merger
and Searcher
on separate nodes
from grpc_health.v1 import health_pb2, health_pb2_grpc
from grpc_health.v1.health import HealthServicer
from needlestack.apis import servicers_pb2_grpc
from needlestack.servicers import factory
from needlestack.servicers.merger import MergerServicer
def main():
config = MyNeedlestackConfig()
server = factory.create_server(config)
manager = factory.create_zookeeper_cluster_manager(config)
manager.startup()
servicers_pb2_grpc.add_MergerServicer_to_server(MergerServicer(config, manager), server)
health = HealthServicer()
health_pb2_grpc.add_HealthServicer_to_server(health, server)
health.set("Merger", health_pb2.HealthCheckResponse.SERVING)
factory.serve(server)
from grpc_health.v1 import health_pb2, health_pb2_grpc
from grpc_health.v1.health import HealthServicer
from needlestack.apis import servicers_pb2_grpc
from needlestack.servicers import factory
from needlestack.servicers.searcher import SearcherServicer
def main():
config = MyNeedlestackConfig()
server = factory.create_server(config)
manager = factory.create_zookeeper_cluster_manager(config)
manager.startup()
servicers_pb2_grpc.add_SearcherServicer_to_server(SearcherServicer(config, manager), server)
health = HealthServicer()
health_pb2_grpc.add_HealthServicer_to_server(health, server)
health.set("Searcher", health_pb2.HealthCheckResponse.SERVING)
factory.serve(server)
Same Nodes¶
Run Merger
and Searcher
on same node
from grpc_health.v1 import health_pb2, health_pb2_grpc
from grpc_health.v1.health import HealthServicer
from needlestack.apis import servicers_pb2_grpc
from needlestack.servicers import factory
from needlestack.servicers.merger import MergerServicer
from needlestack.servicers.searcher import SearcherServicer
def main():
config = MyNeedlestackConfig()
server = factory.create_server(config)
manager = factory.create_zookeeper_cluster_manager(config)
manager.startup()
servicers_pb2_grpc.add_MergerServicer_to_server(MergerServicer(config, manager), server)
servicers_pb2_grpc.add_SearcherServicer_to_server(SearcherServicer(config, manager), server)
health = HealthServicer()
health_pb2_grpc.add_HealthServicer_to_server(health, server)
health.set("Merger", health_pb2.HealthCheckResponse.SERVING)
health.set("Searcher", health_pb2.HealthCheckResponse.SERVING)
factory.serve(server)
Health Checks¶
Check that a node is up with the following requests.
import grpc
from grpc_health.v1 import health_pb2, health_pb2_grpc
channel = grpc.insecure_channel("localhost:50051")
stub = health_pb2_grpc.HealthStub(channel)
stub.Check(health_pb2.HealthCheckRequest(service="Merger"))
stub.Check(health_pb2.HealthCheckRequest(service="Searcher"))
Configuration¶
When the Needlestack cluster is up, configure it via a gRPC request
to any Merger
. This will determine how to split the shards across
available Searchers
, then send gRPC requests to each Searcher
to load specific Shards
to memory.
Adding Collections¶
import grpc
from needlestack.apis import collections_pb2, servicers_pb2_grpc
# Create a list of collections_pb2.Collection objects
# that specifies collections, shards, and their data sources
# collections = [...]
channel = grpc.insecure_channel("localhost:50051")
stub = servicers_pb2_grpc.MergerStub(channel)
request = collections_pb2.CollectionsAddRequest(collections=collections)
response = stub.CollectionConfiguration(request)
Deleting Collections¶
request = collections_pb2.CollectionsDeleteRequest(
names=["my_collection_name", "another_collection_name"]
)
response = stub.CollectionsDelete(request)
Reloading Collections¶
If any data sources have changes, this will load the new changes on all searchers.
response = stub.CollectionsLoad(collections_pb2.CollectionsLoadRequest())
Query¶
Search queries should be issued to any Merger
node. These request should contain
the vector, count, collection name, and optionally a list of specific shards to in
that collection to search. If a list of shards is not provided, the search occurs over
all shards.
from needlestack.apis import serializers
from needlestack.apis import servicers_pb2, servicers_pb2_grpc
channel = grpc.insecure_channel("localhost:50051")
stub = servicers_pb2_grpc.MergerStub(channel)
# X = some vector as a numpy array
# k = number of k neighbors
vector = serializers.ndarray_to_proto(X)
request = servicers_pb2.SearchRequest(vector=vector, count=k, collection_name="my_collection")
response = stub.Search(request)
In a production environment, load balance queries across mergers to distribute network traffic.