Source code for faust.cli.models

"""Program ``faust models`` used to list models available."""
from operator import attrgetter
from typing import Any, Callable, Sequence, Type, cast
from faust.models import registry
from faust.types import ModelT
from .base import AppCommand, option

__all__ = ['models']

[docs]class models(AppCommand): """List all available models as a tabulated list.""" title = 'Models' headers = ['name', 'help'] sortkey = attrgetter('_options.namespace') options = [ option('--builtins/--no-builtins', default=False), ]
[docs] async def run(self, *, builtins: bool) -> None: """Dump list of available models in this application.""" self.say( self.tabulate( [self.model_to_row(model) for model in self.models(builtins)], headers=self.headers, title=self.title, ))
[docs] def models(self, builtins: bool) -> Sequence[Type[ModelT]]: """Convert list of models to terminal table rows.""" sortkey = cast(Callable[[Type[ModelT]], Any], self.sortkey) return [ model for model in sorted(registry.values(), key=sortkey) if not model._options.namespace.startswith('@') or builtins ]
[docs] def model_to_row(self, model: Type[ModelT]) -> Sequence[str]: """Convert model fields to terminal table columns.""" return [ self.bold_tail(self._name(model)), self.dark(self._help(model)), ]
def _name(self, model: Type[ModelT]) -> str: return self.abbreviate_fqdn(model._options.namespace) def _help(self, model: Type[ModelT]) -> str: return model.__doc__ or '<N/A>'