Source code for needlestack.data_sources.gcs

import tempfile
from contextlib import contextmanager
from typing import Optional
from functools import lru_cache

from google.cloud import storage

from needlestack.data_sources import DataSource


[docs]class GcsDataSource(DataSource): """Data source that lives locally on disk as a file. Attributes: bucket_name: Google Cloud Storage bucket name blob_name: Blob name in bucket project_name: Google Cloud Platform project name credentials_file: JSON credentials file for GCP. If not provided, the google.cloud package will try to get the credentials implicitly """ bucket_name: str blob_name: str project_name: str credentials_file: str @property def blob(self) -> storage.Blob: client = get_client(self.credentials_file) bucket = client.get_bucket(self.bucket_name) return bucket.get_blob(self.blob_name) @property def last_modified(self): timestamp = self.blob.updated if timestamp is not None: return timestamp.timestamp() else: return None
[docs] def populate_from_proto(self, proto): self.bucket_name = proto.bucket_name self.blob_name = proto.blob_name self.project_name = proto.project_name self.credentials_file = proto.credentials_file
[docs] @contextmanager def local_filename(self): with tempfile.NamedTemporaryFile() as f: self.blob.download_to_file(f) yield f.name
[docs] @contextmanager def get_content(self, mode: str = "rb"): yield self.blob.download_as_string()
[docs]@lru_cache(maxsize=None) def get_client(credentials_file: Optional[str] = None) -> storage.Client: """Gets a cached GCS client, or creates and caches a GCS client. If not provided a credentials file, checks the environment variables for client authentication. Args: credentials_file: Path to GCP credentials json """ if credentials_file: return storage.Client.from_service_account_json(credentials_file) else: return storage.Client()