Source code for yapconf.sources

# -*- coding: utf-8 -*-
import abc
import hashlib
import json
import os
import threading
import time
import warnings
from argparse import ArgumentParser

import six
from watchdog.observers import Observer

import yapconf
from yapconf.exceptions import YapconfLoadError, YapconfSourceError
from yapconf.handlers import FileHandler

if yapconf.kubernetes_support:
    from kubernetes.watch import watch

if yapconf.etcd_support:
    from etcd import EtcdWatchTimedOut


[docs]def get_source(label, source_type, **kwargs): """Get a config source based on type and keyword args. This is meant to be used internally by the spec via ``add_source``. Args: label (str): The label for this source. source_type: The type of source. See ``yapconf.SUPPORTED_SOURCES`` Keyword Args: The keyword arguments are based on the source_type. Please see the documentation of the individual sources for a detailed list of all possible arguments. Returns (yapconf.sources.ConfigSource): A valid config source which can be used for generating an override. Raises: YapconfSourceError: If there is some kind of error with this source definition. """ if source_type not in yapconf.ALL_SUPPORTED_SOURCES: raise YapconfSourceError( "Invalid source type %s. Supported types are %s." % (source_type, yapconf.ALL_SUPPORTED_SOURCES) ) if source_type not in yapconf.SUPPORTED_SOURCES: raise YapconfSourceError( 'Unsupported source type "%s". If you want to use this type, you ' "will need to install the correct client for it (try `pip install " "yapconf[%s]. Currently supported types are %s. All supported " "types are %s" % ( source_type, source_type, yapconf.SUPPORTED_SOURCES, yapconf.ALL_SUPPORTED_SOURCES, ) ) # We pop arguments from kwargs because the individual config sources # have better error messages if a keyword argument is missed. if source_type == "dict": return DictConfigSource(label, data=kwargs.get("data")) elif source_type == "json": return JsonConfigSource(label, **kwargs) elif source_type == "yaml": filename = kwargs.get("filename") if "filename" in kwargs: kwargs.pop("filename") return YamlConfigSource(label, filename, **kwargs) elif source_type == "environment": return EnvironmentConfigSource(label) elif source_type == "etcd": return EtcdConfigSource(label, kwargs.get("client"), kwargs.get("key", "/")) elif source_type == "kubernetes": name = kwargs.get("name") if "name" in kwargs: kwargs.pop("name") client = kwargs.get("client") if "client" in kwargs: kwargs.pop("client") return KubernetesConfigSource(label, client, name, **kwargs) elif source_type == "cli": return CliConfigSource(label, **kwargs) else: raise NotImplementedError("No implementation for source type %s" % source_type)
[docs]@six.add_metaclass(abc.ABCMeta) class ConfigSource(object): """Base class for a configuration source. Config sources will be used to generate overrides during configuration loading. In later iteration, it will also be used to migrate configs based on the configuration type. The act of loading configurations/migrating those configurations and especially watching those configuration is complicated enough to warrant its own data structure. Attributes: label: The label for this config source. """ def __init__(self, label): self.label = label self.validate() self._continue = True
[docs] @abc.abstractmethod def validate(self): """Validate that this ConfigSource can be used.""" pass
[docs] def generate_override(self, separator="."): """Generate an override. Uses ``get_data`` which is expected to be implemented by each child class. Returns: A tuple of label, dict Raises: YapconfLoadError: If a known error occurs. """ data = self.get_data() if not isinstance(data, dict): raise YapconfLoadError( "Invalid source (%s). The data was loaded successfully, but " "the result was not a dictionary. Got %s" % (self.label, type(data)) ) return self.label, yapconf.flatten(data, separator=separator)
def _spawn_and_watch(self, handler): thread = self._spawn_watcher(handler) while self._continue: if thread.isAlive(): time.sleep(5) else: thread = self._spawn_watcher(handler) def _spawn_watcher(self, handler): thread = threading.Thread( name=self.label + "-watcher", target=self._watch, args=(handler, self.get_data()), ) thread.setDaemon(True) return thread
[docs] def watch(self, handler, eternal=False): """Watch a source for changes. When changes occur, call the handler. By default, watches a dictionary that is in memory. Args: handler: Must respond to handle_config_change eternal: Spawn eternal watch, or just a single watch. Returns: The daemon thread that was spawned. """ if eternal: thread = threading.Thread( name=self.label + "-watcher-eternal", target=self._spawn_and_watch, args=(handler,), ) thread.setDaemon(True) else: thread = self._spawn_watcher(handler) thread.start() return thread
def _hash_data(self, data): return hashlib.md5(repr(data).encode("utf-8")).hexdigest() def _watch(self, handler, data): current_hash = self._hash_data(data) while self._continue: new_hash = self._hash_data(self.get_data()) if current_hash != new_hash: handler.handle_config_change(self.data) current_hash = new_hash time.sleep(1)
[docs] @abc.abstractmethod def get_data(self): pass
[docs]class DictConfigSource(ConfigSource): """A basic config source with just a dictionary as the data. Keyword Args: data (dict): A dictionary that represents the data. """ def __init__(self, label, data): self.data = data self.type = "dict" super(DictConfigSource, self).__init__(label)
[docs] def validate(self): if not isinstance(self.data, dict): raise YapconfSourceError( "Invalid source provided. %s provided data but it was not a " "dictionary. Got: %s" % (self.label, self.data) )
[docs] def get_data(self): return self.data
[docs]class JsonConfigSource(ConfigSource): """JSON Config source. Needs either a filename or data keyword arg to work. Keyword Args: data (str): If provided, will be loaded via ``json.loads`` filename (str): If provided, will be loaded via ``yapconf.load_file`` kwargs: All other keyword arguments will be provided as keyword args to the ``load`` calls above. """ def __init__(self, label, data=None, filename=None, **kwargs): self.type = "json" self.data = data self.filename = filename self._load_kwargs = kwargs if "encoding" in self._load_kwargs and not yapconf.json_encode_support: warnings.warn("encoding passed to json source config but 3.9 " "dropped support for encoding. This will be ignored.") del self._load_kwargs["encoding"] if "encoding" not in self._load_kwargs and yapconf.json_encode_support: self._load_kwargs["encoding"] = "utf-8" super(JsonConfigSource, self).__init__(label)
[docs] def validate(self): if self.data is None and self.filename is None: raise YapconfSourceError( "Invalid source (%s). No data or filename was provided for a " "JSON config source." % self.label )
def _watch(self, handler, data): if self.filename: file_handler = FileHandler( filename=self.filename, handler=handler, file_type="json" ) observer = Observer() directory = os.path.dirname(self.filename) observer.schedule(file_handler, directory, recursive=False) observer.start() observer.join() else: raise YapconfSourceError( "Cannot watch a string json source. Strings are immutable." )
[docs] def get_data(self): if self.data is not None: return json.loads(self.data, **self._load_kwargs) return yapconf.load_file( self.filename, file_type="json", klazz=YapconfSourceError, load_kwargs=self._load_kwargs, )
[docs]class YamlConfigSource(ConfigSource): """YAML Config source. Needs a filename to work. Keyword Args: filename (str): Will be loaded via ``yapconf.load_file`` encoding (str): The encoding of the filename. """ def __init__(self, label, filename, **kwargs): self.type = "yaml" self.filename = filename self._encoding = kwargs.get("encoding", "utf-8") super(YamlConfigSource, self).__init__(label)
[docs] def validate(self): if self.filename is None: raise YapconfSourceError( "Invalid source (%s). No filename was provided for a YAML " "config source." % self.label )
def _watch(self, handler, data): file_handler = FileHandler( filename=self.filename, handler=handler, file_type="yaml" ) observer = Observer() directory = os.path.dirname(self.filename) observer.schedule(file_handler, directory, recursive=False) observer.start() observer.join()
[docs] def get_data(self): return yapconf.load_file( self.filename, file_type="yaml", klazz=YapconfSourceError, open_kwargs={"encoding": self._encoding}, )
[docs]class EnvironmentConfigSource(DictConfigSource): """Special dict config which gets its value from the environment.""" def __init__(self, label): super(EnvironmentConfigSource, self).__init__(label, os.environ.copy()) self.type = "environment"
[docs] def get_data(self): return os.environ.copy()
[docs]class CliConfigSource(ConfigSource): """Special dict config which gets its value from the environment.""" def __init__(self, label, spec=None): self.type = "cli" self.spec = spec super(CliConfigSource, self).__init__(label)
[docs] def validate(self): if self.spec is None: raise YapconfSourceError( "Invalid source (%s). No spec was provided for a CLI " "config source." % self.label )
[docs] def get_data(self): parser = ArgumentParser() self.spec.add_arguments(parser) ns, _ = parser.parse_known_args() return vars(ns)
[docs]class EtcdConfigSource(ConfigSource): """Etcd config source (requires python-etcd package). If your keys have '/'s in them, you're going to have a bad time. Keyword Args: client: An etcd client from the python-etcd package. key (str): The key to fetch in etcd. Defaults to "/" """ def __init__(self, label, client, key="/"): self.type = "etcd" self.client = client self.key = key super(EtcdConfigSource, self).__init__(label)
[docs] def validate(self): if not isinstance(self.client, yapconf.etcd_client.Client): raise YapconfSourceError( "Invalid source (%s). Client must be supplied and must be of " "type %s. Got type: %s" % (self.label, type(yapconf.etcd_client.Client), type(self.client),) )
[docs] def get_data(self): result = self.client.read(key=self.key, recursive=True) if not result.dir: raise YapconfLoadError( "Loaded key %s, but it was not a directory according to etcd." % self.key ) data = {} for child in result.children: keys = self._extract_keys(child.key) self._add_value(data, keys, child.value) return data
def _watch(self, handler, data): while self._continue: try: self.client.read(key=self.key, wait=True, recursive=True) handler.handle_config_change(self.get_data()) except EtcdWatchTimedOut: pass def _add_value(self, data, keys, value): for i, key in enumerate(keys): if i == len(keys) - 1: data[key] = value else: data[key] = {} def _extract_keys(self, fq_key): return fq_key.lstrip(self.key).split("/")
[docs]class KubernetesConfigSource(ConfigSource): """A kubernetes config data source. This is meant to load things directly from the kubernetes API. Specifically, it can load things from config maps. Keyword Args: client: A kubernetes client from the kubernetes package. name (str): The name of the ConfigMap to load. namespace (str): The namespace for the ConfigMap key (str): A key for the given ConfigMap data object. config_type (str): Used in conjunction with 'key', if 'key' points to a data blob, this will specify whether to use json or yaml to load the file. """ def __init__(self, label, client, name, **kwargs): self.type = "kubernetes" self.client = client self.name = name self.namespace = kwargs.get("namespace") or "default" self.key = kwargs.get("key") self.config_type = kwargs.get("config_type") or "json" super(KubernetesConfigSource, self).__init__(label)
[docs] def validate(self): if yapconf.kubernetes_client and not isinstance( self.client, yapconf.kubernetes_client.CoreV1Api ): raise YapconfSourceError( "Invalid source (%s). Client must be supplied and must be of " "type %s. Got type: %s" % ( self.label, type(yapconf.kubernetes_client.CoreV1Api), type(self.client), ) ) if self.config_type == "yaml" and not yapconf.yaml_support: raise YapconfSourceError( "Kubernetes config source specified that the configmap " "contained a yaml config. In order to support yaml loading, " "you will need to `pip install yapconf[yaml]`." ) if self.config_type not in yapconf.FILE_TYPES: raise YapconfSourceError( "Invalid config type specified for a kubernetes config. %s is " "not supported. Supported types are %s" % (self.name, yapconf.FILE_TYPES) )
[docs] def get_data(self): result = self.client.read_namespaced_config_map(self.name, self.namespace) if self.key is None: return result.data if self.key not in result.data: raise YapconfLoadError( "Loaded configmap %s, but could not find key %s in the data " "portion of the configmap." % self.key ) nested_config = result.data[self.key] if self.config_type == "json": return json.loads(nested_config,) elif self.config_type == "yaml": return yapconf.yaml.load(nested_config) else: raise NotImplementedError( "Cannot load config with type %s" % self.config_type )
def _watch(self, handler, data): w = watch.Watch() for event in w.stream( self.client.list_namespaced_config_map, namespace=self.namespace ): config_map = event["object"] if config_map.metadata.name != self.name: continue if event["type"] == "DELETED": raise YapconfSourceError( "Kubernetes config watcher died. The configmap %s was " "deleted." % self.name ) if event["type"] == "MODIFIED": handler.handle_config_change(self.get_data())