faust.cli.faust

Program faust (umbrella command).

class faust.cli.faust.agents(ctx: click.core.Context, *args: Any, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, **kwargs: Any) → None[source]

List agents.

title = 'Agents'
headers = ['name', 'topic', 'help']
sortkey = operator.attrgetter('name')
options = [option('--local/--no-local', help='Include agents using a local channel')]
async run(local: bool) → None[source]

Dump list of available agents in this application.

Return type

None

agents(*, local: bool = False) → Sequence[faust.types.agents.AgentT][source]

Convert list of agents to terminal table rows.

Return type

Sequence[AgentT[]]

agent_to_row(agent: faust.types.agents.AgentT) → Sequence[str][source]

Convert agent fields to terminal table row.

Return type

Sequence[str]

faust.cli.faust.call_command(command: str, args: List[str] = None, stdout: IO = None, stderr: IO = None, side_effects: bool = False, **kwargs: Any) → Tuple[int, IO, IO][source]
Return type

Tuple[int, IO[AnyStr], IO[AnyStr]]

class faust.cli.faust.clean_versions(ctx: click.core.Context, *args: Any, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, **kwargs: Any) → None[source]

Delete old version directories.

Warning

This command will result in the destruction of the following files:

  1. Table data for previous versions of the app.

async run() → None[source]

Execute command.

Return type

None

remove_old_versiondirs() → None[source]

Remove data from old application versions from data directory.

Return type

None

class faust.cli.faust.completion(ctx: click.core.Context, *args: Any, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, **kwargs: Any) → None[source]

Output shell completion to be evaluated by the shell.

require_app = False
async run() → None[source]

Dump click completion script for Faust CLI.

Return type

None

shell() → str[source]

Return the current shell used in this environment.

Return type

str

class faust.cli.faust.livecheck(*args: Any, **kwargs: Any) → None[source]

Manage LiveCheck instances.

class faust.cli.faust.model(ctx: click.core.Context, *args: Any, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, **kwargs: Any) → None[source]

Show model detail.

headers = ['field', 'type', 'default']
options = [argument('name')]
async run(name: str) → None[source]

Dump list of registered models to terminal.

Return type

None

model_fields(model: Type[faust.types.models.ModelT]) → Sequence[Sequence[str]][source]

Convert model fields to terminal table rows.

Return type

Sequence[Sequence[str]]

field(field: faust.types.models.FieldDescriptorT) → Sequence[str][source]

Convert model field model to terminal table columns.

Return type

Sequence[str]

model_to_row(model: Type[faust.types.models.ModelT]) → Sequence[str][source]

Convert model to terminal table row.

Return type

Sequence[str]

class faust.cli.faust.models(ctx: click.core.Context, *args: Any, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, **kwargs: Any) → None[source]

List all available models as a tabulated list.

title = 'Models'
headers = ['name', 'help']
sortkey = operator.attrgetter('_options.namespace')
options = [option('--builtins/--no-builtins', default=False)]
async run(*, builtins: bool) → None[source]

Dump list of available models in this application.

Return type

None

models(builtins: bool) → Sequence[Type[faust.types.models.ModelT]][source]

Convert list of models to terminal table rows.

Return type

Sequence[Type[ModelT]]

model_to_row(model: Type[faust.types.models.ModelT]) → Sequence[str][source]

Convert model fields to terminal table columns.

Return type

Sequence[str]

class faust.cli.faust.reset(ctx: click.core.Context, *args: Any, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, **kwargs: Any) → None[source]

Delete local table state.

Warning

This command will result in the destruction of the following files:

  1. The local database directories/files backing tables (does not apply if an in-memory store like memory:// is used).

Notes

This data is technically recoverable from the Kafka cluster (if intact), but it’ll take a long time to get the data back as you need to consume each changelog topic in total.

It’d be faster to copy the data from any standbys that happen to have the topic partitions you require.

async run() → None[source]

Execute command.

Return type

None

async reset_tables() → None[source]

Reset local state for all tables.

Return type

None

class faust.cli.faust.send(ctx: click.core.Context, *args: Any, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, **kwargs: Any) → None[source]

Send message to agent/topic.

options = [option('--key-type', '-K', help='Name of model to serialize key into.'), option('--key-serializer', help='Override default serializer for key.'), option('--value-type', '-V', help='Name of model to serialize value into.'), option('--value-serializer', help='Override default serializer for value.'), option('--key', '-k', help='String value for key (use json if model).'), option('--partition', type=<class 'int'>, help='Specific partition to send to.'), option('--repeat', '-r', type=<class 'int'>, default=1, help='Send message n times.'), option('--min-latency', type=<class 'float'>, default=0.0, help='Minimum delay between sending.'), option('--max-latency', type=<class 'float'>, default=0.0, help='Maximum delay between sending.'), argument('entity'), argument('value', default=None, required=False)]
async run(entity: str, value: str, *args: Any, key: str = None, key_type: str = None, key_serializer: str = None, value_type: str = None, value_serializer: str = None, partition: int = 1, timestamp: float = None, repeat: int = 1, min_latency: float = 0.0, max_latency: float = 0.0, **kwargs: Any) → Any[source]

Send message to topic/agent/channel.

Return type

Any

class faust.cli.faust.tables(ctx: click.core.Context, *args: Any, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, **kwargs: Any) → None[source]

List available tables.

title = 'Tables'
async run() → None[source]

Dump list of application tables to terminal.

Return type

None

class faust.cli.faust.worker(ctx: click.core.Context, *args: Any, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, **kwargs: Any) → None[source]

Start worker instance for given app.

daemon = True
redirect_stdouts = True
worker_options = [option('--with-web/--without-web', default=True, help='Enable/disable web server and related components.'), option('--web-port', '-p', default=None, type=IntRange(1, 65535), help='Port to run web server on (default: 6066)'), option('--web-transport', default=None, type=URL, help='Web server transport (default: tcp:)'), option('--web-bind', '-b', type=<class 'str'>), option('--web-host', '-h', default='build-9933977-project-230058-faust', type=<class 'str'>, help='Canonical host name for the web server (default: 0.0.0.0)')]
options = [option('--with-web/--without-web', default=True, help='Enable/disable web server and related components.'), option('--web-port', '-p', default=None, type=IntRange(1, 65535), help='Port to run web server on (default: 6066)'), option('--web-transport', default=None, type=URL, help='Web server transport (default: tcp:)'), option('--web-bind', '-b', type=<class 'str'>), option('--web-host', '-h', default='build-9933977-project-230058-faust', type=<class 'str'>, help='Canonical host name for the web server (default: 0.0.0.0)'), option('--logfile', '-f', callback=<function compat_option.<locals>._callback>, expose_value=False, default=None, type=<click.types.Path object>, help='Path to logfile (default is <stderr>).'), option('--loglevel', '-l', callback=<function compat_option.<locals>._callback>, expose_value=False, default='WARN', type=Choice(['crit', 'error', 'warn', 'info', 'debug']), help='Logging level to use.'), option('--blocking-timeout', callback=<function compat_option.<locals>._callback>, expose_value=False, default=10.0, type=<class 'float'>, help='when --debug: Blocking detector timeout.'), option('--console-port', callback=<function compat_option.<locals>._callback>, expose_value=False, default=50101, type=IntRange(1, 65535), help='when --debug: Port to run debugger console on.')]
on_worker_created(worker: mode.worker.Worker) → None[source]

Print banner when worker starts.

Return type

None

as_service(loop: asyncio.events.AbstractEventLoop, *args: Any, **kwargs: Any) → mode.types.services.ServiceT[source]

Return the service this command should execute.

For the worker we simply start the application itself.

Note

The application will be started using a faust.Worker.

Return type

ServiceT[]

banner(worker: mode.worker.Worker) → str[source]

Generate the text banner emitted before the worker starts.

Return type

str

faust_ident() → str[source]

Return Faust version information as ANSI string.

Return type

str

platform() → str[source]

Return platform identifier as ANSI string.

Return type

str