faust.app.base

Faust Application.

An app is an instance of the Faust library. Everything starts here.

class faust.app.base.BootStrategy(app: faust.types.app.AppT, *, enable_web: bool = None, enable_kafka: bool = None, enable_kafka_producer: bool = None, enable_kafka_consumer: bool = None, enable_sensors: bool = None) → None[source]

App startup strategy.

The startup strategy defines the graph of services to start when the Faust worker for an app starts.

enable_kafka = True
enable_kafka_producer = None
enable_kafka_consumer = None
enable_web = None
enable_sensors = True
server() → Iterable[mode.types.services.ServiceT][source]

Return services to start when app is in default mode.

Return type

Iterable[ServiceT[]]

client_only() → Iterable[mode.types.services.ServiceT][source]

Return services to start when app is in client_only mode.

Return type

Iterable[ServiceT[]]

producer_only() → Iterable[mode.types.services.ServiceT][source]

Return services to start when app is in producer_only mode.

Return type

Iterable[ServiceT[]]

sensors() → Iterable[mode.types.services.ServiceT][source]

Return list of services required to start sensors.

Return type

Iterable[ServiceT[]]

kafka_producer() → Iterable[mode.types.services.ServiceT][source]

Return list of services required to start Kafka producer.

Return type

Iterable[ServiceT[]]

kafka_consumer() → Iterable[mode.types.services.ServiceT][source]

Return list of services required to start Kafka consumer.

Return type

Iterable[ServiceT[]]

kafka_client_consumer() → Iterable[mode.types.services.ServiceT][source]

Return list of services required to start Kafka client consumer.

Return type

Iterable[ServiceT[]]

agents() → Iterable[mode.types.services.ServiceT][source]

Return list of services required to start agents.

Return type

Iterable[ServiceT[]]

kafka_conductor() → Iterable[mode.types.services.ServiceT][source]

Return list of services required to start Kafka conductor.

Return type

Iterable[ServiceT[]]

web_server() → Iterable[mode.types.services.ServiceT][source]

Return list of web-server services.

Return type

Iterable[ServiceT[]]

web_components() → Iterable[mode.types.services.ServiceT][source]

Return list of web-related services (excluding web server).

Return type

Iterable[ServiceT[]]

tables() → Iterable[mode.types.services.ServiceT][source]

Return list of table-related services.

Return type

Iterable[ServiceT[]]

class faust.app.base.App(id: str, *, monitor: faust.sensors.monitor.Monitor = None, config_source: Any = None, loop: asyncio.events.AbstractEventLoop = None, beacon: mode.utils.types.trees.NodeT = None, **options: Any) → None[source]

Faust Application.

Parameters

id (str) – Application ID.

Keyword Arguments

loop (asyncio.AbstractEventLoop) – optional event loop to use.

See also

Application Parameters – for supported keyword arguments.

SCAN_CATEGORIES = ['faust.agent', 'faust.command', 'faust.page', 'faust.service', 'faust.task']
class BootStrategy(app: faust.types.app.AppT, *, enable_web: bool = None, enable_kafka: bool = None, enable_kafka_producer: bool = None, enable_kafka_consumer: bool = None, enable_sensors: bool = None) → None

App startup strategy.

The startup strategy defines the graph of services to start when the Faust worker for an app starts.

agents() → Iterable[mode.types.services.ServiceT]

Return list of services required to start agents.

Return type

Iterable[ServiceT[]]

client_only() → Iterable[mode.types.services.ServiceT]

Return services to start when app is in client_only mode.

Return type

Iterable[ServiceT[]]

enable_kafka = True
enable_kafka_consumer = None
enable_kafka_producer = None
enable_sensors = True
enable_web = None
kafka_client_consumer() → Iterable[mode.types.services.ServiceT]

Return list of services required to start Kafka client consumer.

Return type

Iterable[ServiceT[]]

kafka_conductor() → Iterable[mode.types.services.ServiceT]

Return list of services required to start Kafka conductor.

Return type

Iterable[ServiceT[]]

kafka_consumer() → Iterable[mode.types.services.ServiceT]

Return list of services required to start Kafka consumer.

Return type

Iterable[ServiceT[]]

kafka_producer() → Iterable[mode.types.services.ServiceT]

Return list of services required to start Kafka producer.

Return type

Iterable[ServiceT[]]

producer_only() → Iterable[mode.types.services.ServiceT]

Return services to start when app is in producer_only mode.

Return type

Iterable[ServiceT[]]

sensors() → Iterable[mode.types.services.ServiceT]

Return list of services required to start sensors.

Return type

Iterable[ServiceT[]]

server() → Iterable[mode.types.services.ServiceT]

Return services to start when app is in default mode.

Return type

Iterable[ServiceT[]]

tables() → Iterable[mode.types.services.ServiceT]

Return list of table-related services.

Return type

Iterable[ServiceT[]]

web_components() → Iterable[mode.types.services.ServiceT]

Return list of web-related services (excluding web server).

Return type

Iterable[ServiceT[]]

web_server() → Iterable[mode.types.services.ServiceT]

Return list of web-server services.

Return type

Iterable[ServiceT[]]

class Settings(id: str, *, debug: bool = None, version: int = None, broker: Union[str, yarl.URL, List[yarl.URL]] = None, broker_client_id: str = None, broker_request_timeout: Union[datetime.timedelta, float, str] = None, broker_credentials: Union[faust.types.auth.CredentialsT, ssl.SSLContext] = None, broker_commit_every: int = None, broker_commit_interval: Union[datetime.timedelta, float, str] = None, broker_commit_livelock_soft_timeout: Union[datetime.timedelta, float, str] = None, broker_session_timeout: Union[datetime.timedelta, float, str] = None, broker_heartbeat_interval: Union[datetime.timedelta, float, str] = None, broker_check_crcs: bool = None, broker_max_poll_records: int = None, broker_max_poll_interval: int = None, broker_consumer: Union[str, yarl.URL, List[yarl.URL]] = None, broker_producer: Union[str, yarl.URL, List[yarl.URL]] = None, agent_supervisor: Union[_T, str] = None, store: Union[str, yarl.URL] = None, cache: Union[str, yarl.URL] = None, web: Union[str, yarl.URL] = None, web_enabled: bool = True, processing_guarantee: Union[str, faust.types.enums.ProcessingGuarantee] = None, timezone: datetime.tzinfo = None, autodiscover: Union[bool, Iterable[str], Callable[Iterable[str]]] = None, origin: str = None, canonical_url: Union[str, yarl.URL] = None, datadir: Union[pathlib.Path, str] = None, tabledir: Union[pathlib.Path, str] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, logging_config: Dict = None, loghandlers: List[logging.Handler] = None, table_cleanup_interval: Union[datetime.timedelta, float, str] = None, table_standby_replicas: int = None, table_key_index_size: int = None, topic_replication_factor: int = None, topic_partitions: int = None, topic_allow_declare: bool = None, topic_disable_leader: bool = None, id_format: str = None, reply_to: str = None, reply_to_prefix: str = None, reply_create_topic: bool = None, reply_expires: Union[datetime.timedelta, float, str] = None, ssl_context: ssl.SSLContext = None, stream_buffer_maxsize: int = None, stream_wait_empty: bool = None, stream_ack_cancelled_tasks: bool = None, stream_ack_exceptions: bool = None, stream_publish_on_commit: bool = None, stream_recovery_delay: Union[datetime.timedelta, float, str] = None, producer_linger_ms: int = None, producer_max_batch_size: int = None, producer_acks: int = None, producer_max_request_size: int = None, producer_compression_type: str = None, producer_partitioner: Union[_T, str] = None, producer_request_timeout: Union[datetime.timedelta, float, str] = None, producer_api_version: str = None, consumer_max_fetch_size: int = None, consumer_auto_offset_reset: str = None, web_bind: str = None, web_port: int = None, web_host: str = None, web_transport: Union[str, yarl.URL] = None, web_in_thread: bool = None, web_cors_options: Mapping[str, faust.types.web.ResourceOptions] = None, worker_redirect_stdouts: bool = None, worker_redirect_stdouts_level: Union[int, str] = None, Agent: Union[_T, str] = None, ConsumerScheduler: Union[_T, str] = None, Event: Union[_T, str] = None, Schema: Union[_T, str] = None, Stream: Union[_T, str] = None, Table: Union[_T, str] = None, SetTable: Union[_T, str] = None, GlobalTable: Union[_T, str] = None, SetGlobalTable: Union[_T, str] = None, TableManager: Union[_T, str] = None, Serializers: Union[_T, str] = None, Worker: Union[_T, str] = None, PartitionAssignor: Union[_T, str] = None, LeaderAssignor: Union[_T, str] = None, Router: Union[_T, str] = None, Topic: Union[_T, str] = None, HttpClient: Union[_T, str] = None, Monitor: Union[_T, str] = None, url: Union[str, yarl.URL] = None, **kwargs: Any) → None
property Agent
Return type

Type[AgentT[]]

property ConsumerScheduler
Return type

Type[SchedulingStrategyT]

property Event
Return type

Type[EventT[]]

property GlobalTable
Return type

Type[GlobalTableT[]]

property HttpClient
Return type

Type[ClientSession]

property LeaderAssignor
Return type

Type[LeaderAssignorT[]]

property Monitor
Return type

Type[SensorT[]]

property PartitionAssignor
Return type

Type[PartitionAssignorT]

property Router
Return type

Type[RouterT]

property Schema
Return type

Type[SchemaT[~KT, ~VT]]

property Serializers
Return type

Type[RegistryT]

property SetGlobalTable
Return type

Type[GlobalTableT[]]

property SetTable
Return type

Type[TableT[~KT, ~VT]]

property Stream
Return type

Type[StreamT[+T_co]]

property Table
Return type

Type[TableT[~KT, ~VT]]

property TableManager
Return type

Type[TableManagerT[]]

property Topic
Return type

Type[TopicT[]]

property Worker
Return type

Type[_WorkerT]

property agent_supervisor
Return type

Type[SupervisorStrategyT]

property appdir
Return type

Path

autodiscover = False
property broker
Return type

List[URL]

broker_check_crcs = True
broker_client_id = 'faust-1.9.0'
broker_commit_every = 10000
property broker_commit_interval
Return type

float

property broker_commit_livelock_soft_timeout
Return type

float

property broker_consumer
Return type

List[URL]

property broker_credentials
Return type

Optional[CredentialsT]

property broker_heartbeat_interval
Return type

float

broker_max_poll_interval = 1000.0
property broker_max_poll_records
Return type

Optional[int]

property broker_producer
Return type

List[URL]

property broker_request_timeout
Return type

float

property broker_session_timeout
Return type

float

property cache
Return type

URL

property canonical_url
Return type

URL

consumer_auto_offset_reset = 'earliest'
consumer_max_fetch_size = 4194304
property datadir
Return type

Path

debug = False
find_old_versiondirs() → Iterable[pathlib.Path]
Return type

Iterable[Path]

property id
Return type

str

id_format = '{id}-v{self.version}'
key_serializer = 'raw'
logging_config = None
property name
Return type

str

property origin
Return type

Optional[str]

property processing_guarantee
Return type

ProcessingGuarantee

producer_acks = -1
producer_api_version = 'auto'
producer_compression_type = None
producer_linger_ms = 0
producer_max_batch_size = 16384
producer_max_request_size = 1000000
property producer_partitioner
Return type

Optional[Callable[[Optional[bytes], Sequence[int], Sequence[int]], int]]

property producer_request_timeout
Return type

float

reply_create_topic = False
property reply_expires
Return type

float

reply_to_prefix = 'f-reply-'
classmethod setting_names() → Set[str]
Return type

Set[str]

ssl_context = None
property store
Return type

URL

stream_ack_cancelled_tasks = True
stream_ack_exceptions = True
stream_buffer_maxsize = 4096
stream_publish_on_commit = False
property stream_recovery_delay
Return type

float

stream_wait_empty = True
property table_cleanup_interval
Return type

float

table_key_index_size = 1000
table_standby_replicas = 1
property tabledir
Return type

Path

timezone = datetime.timezone.utc
topic_allow_declare = True
topic_disable_leader = False
topic_partitions = 8
topic_replication_factor = 1
value_serializer = 'json'
property version
Return type

int

property web
Return type

URL

web_bind = '0.0.0.0'
web_cors_options = None
web_host = 'build-9933977-project-230058-faust'
web_in_thread = False
web_port = 6066
property web_transport
Return type

URL

worker_redirect_stdouts = True
worker_redirect_stdouts_level = 'WARN'
client_only = False

Set this to True if app should only start the services required to operate as an RPC client (producer and simple reply consumer).

producer_only = False

Set this to True if app should run without consumer/tables.

tracer = None

Optional tracing support.

on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Return list of additional service dependencies.

The services returned will be started with the app when the app starts.

Return type

Iterable[ServiceT[]]

async on_first_start() → None[source]

Call first time app starts in this process.

Return type

None

async on_start() → None[source]

Call every time app start/restarts.

Return type

None

async on_started() → None[source]

Call when app is fully started.

Return type

None

async on_started_init_extra_tasks() → None[source]

Call when started to start additional tasks.

Return type

None

async on_started_init_extra_services() → None[source]

Call when initializing extra services at startup.

Return type

None

async on_init_extra_service(service: Union[mode.types.services.ServiceT, Type[mode.types.services.ServiceT]]) → mode.types.services.ServiceT[source]

Call when adding user services to this app.

Return type

ServiceT[]

config_from_object(obj: Any, *, silent: bool = False, force: bool = False) → None[source]

Read configuration from object.

Object is either an actual object or the name of a module to import.

Examples

>>> app.config_from_object('myproj.faustconfig')
>>> from myproj import faustconfig
>>> app.config_from_object(faustconfig)
Parameters
  • silent (bool) – If true then import errors will be ignored.

  • force (bool) – Force reading configuration immediately. By default the configuration will be read only when required.

Return type

None

finalize() → None[source]

Finalize app configuration.

Return type

None

worker_init() → None[source]

Init worker/CLI commands.

Return type

None

worker_init_post_autodiscover() → None[source]

Init worker after autodiscover.

Return type

None

discover(*extra_modules: str, categories: Iterable[str] = None, ignore: Iterable[Any] = [<built-in method search of _sre.SRE_Pattern object>, '.__main__']) → None[source]

Discover decorators in packages.

Return type

None

main() → NoReturn[source]

Execute the faust umbrella command using this app.

Return type

_NoReturn

topic(*topics: str, pattern: Union[str, Pattern[~AnyStr]] = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, maxsize: int = None, allow_empty: bool = False, has_prefix: bool = False, loop: asyncio.events.AbstractEventLoop = None) → faust.types.topics.TopicT[source]

Create topic description.

Topics are named channels (for example a Kafka topic), that exist on a server. To make an ephemeral local communication channel use: channel().

Return type

TopicT[]

channel(*, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, maxsize: int = None, loop: asyncio.events.AbstractEventLoop = None) → faust.types.channels.ChannelT[source]

Create new channel.

By default this will create an in-memory channel used for intra-process communication, but in practice channels can be backed by any transport (network or even means of inter-process communication).

Return type

ChannelT[]

agent(channel: Union[str, faust.types.channels.ChannelT] = None, *, name: str = None, concurrency: int = 1, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, isolated_partitions: bool = False, use_reply_headers: bool = False, **kwargs: Any) → Callable[Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], faust.types.agents.AgentT][source]

Create Agent from async def function.

It can be a regular async function:

@app.agent()
async def my_agent(stream):
    async for number in stream:
        print(f'Received: {number!r}')

Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:

@app.agent(sink=[log_topic])
async def my_agent(requests):
    async for number in requests:
        yield number * 2
Return type

Callable[[Callable[[StreamT[+T_co]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable[+T_co]]]], AgentT[]]

actor(channel: Union[str, faust.types.channels.ChannelT] = None, *, name: str = None, concurrency: int = 1, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, isolated_partitions: bool = False, use_reply_headers: bool = False, **kwargs: Any) → Callable[Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], faust.types.agents.AgentT]

Create Agent from async def function.

It can be a regular async function:

@app.agent()
async def my_agent(stream):
    async for number in stream:
        print(f'Received: {number!r}')

Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:

@app.agent(sink=[log_topic])
async def my_agent(requests):
    async for number in requests:
        yield number * 2
Return type

Callable[[Callable[[StreamT[+T_co]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable[+T_co]]]], AgentT[]]

task(fun: Union[Callable[AppT, Awaitable], Callable[Awaitable]] = None, *, on_leader: bool = False, traced: bool = True) → Union[Callable[Union[Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]], Union[Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]]], Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]][source]

Define an async def function to be started with the app.

This is like timer() but a one-shot task only executed at worker startup (after recovery and the worker is fully ready for operation).

The function may take zero, or one argument. If the target function takes an argument, the app argument is passed:

>>> @app.task
>>> async def on_startup(app):
...    print('STARTING UP: %r' % (app,))

Nullary functions are also supported:

>>> @app.task
>>> async def on_startup():
...     print('STARTING UP')
Return type

Union[Callable[[Union[Callable[[AppT[]], Awaitable[+T_co]], Callable[[], Awaitable[+T_co]]]], Union[Callable[[AppT[]], Awaitable[+T_co]], Callable[[], Awaitable[+T_co]]]], Callable[[AppT[]], Awaitable[+T_co]], Callable[[], Awaitable[+T_co]]]

timer(interval: Union[datetime.timedelta, float, str], on_leader: bool = False, traced: bool = True, name: str = None, max_drift_correction: float = 0.1) → Callable[source]

Define an async def function to be run at periodic intervals.

Like task(), but executes periodically until the worker is shut down.

This decorator takes an async function and adds it to a list of timers started with the app.

Parameters
  • interval (Seconds) – How often the timer executes in seconds.

  • on_leader (bool) – Should the timer only run on the leader?

Example

>>> @app.timer(interval=10.0)
>>> async def every_10_seconds():
...     print('TEN SECONDS JUST PASSED')
>>> app.timer(interval=5.0, on_leader=True)
>>> async def every_5_seconds():
...     print('FIVE SECONDS JUST PASSED. ALSO, I AM THE LEADER!')
Return type

Callable

crontab(cron_format: str, *, timezone: datetime.tzinfo = None, on_leader: bool = False, traced: bool = True) → Callable[source]

Define periodic task using Crontab description.

This is an async def function to be run at the fixed times, defined by the Cron format.

Like timer(), but executes at fixed times instead of executing at certain intervals.

This decorator takes an async function and adds it to a list of Cronjobs started with the app.

Parameters

cron_format (str) – The Cron spec defining fixed times to run the decorated function.

Keyword Arguments
  • timezone – The timezone to be taken into account for the Cron jobs. If not set value from timezone will be taken.

  • on_leader – Should the Cron job only run on the leader?

Example

>>> @app.crontab(cron_format='30 18 * * *',
                 timezone=pytz.timezone('US/Pacific'))
>>> async def every_6_30_pm_pacific():
...     print('IT IS 6:30pm')
>>> app.crontab(cron_format='30 18 * * *', on_leader=True)
>>> async def every_6_30_pm():
...     print('6:30pm UTC; ALSO, I AM THE LEADER!')
Return type

Callable

service(cls: Type[mode.types.services.ServiceT]) → Type[mode.types.services.ServiceT][source]

Decorate mode.Service to be started with the app.

Examples

from mode import Service

@app.service
class Foo(Service):
    ...
Return type

Type[ServiceT[]]

is_leader() → bool[source]

Return True if we are in leader worker process.

Return type

bool

stream(channel: Union[AsyncIterable, Iterable], beacon: mode.utils.types.trees.NodeT = None, **kwargs: Any) → faust.types.streams.StreamT[source]

Create new stream from channel/topic/iterable/async iterable.

Parameters
Return type

StreamT[+T_co]

Returns

to iterate over events in the stream.

Return type

faust.Stream

Table(name: str, *, default: Callable[Any] = None, window: faust.types.windows.WindowT = None, partitions: int = None, help: str = None, **kwargs: Any) → faust.types.tables.TableT[source]

Define new table.

Parameters
  • name (str) – Name used for table, note that two tables living in the same application cannot have the same name.

  • default (Optional[Callable[[], Any]]) – A callable, or type that will return a default value for keys missing in this table.

  • window (Optional[WindowT]) – A windowing strategy to wrap this window in.

Examples

>>> table = app.Table('user_to_amount', default=int)
>>> table['George']
0
>>> table['Elaine'] += 1
>>> table['Elaine'] += 1
>>> table['Elaine']
2
Return type

TableT[~KT, ~VT]

GlobalTable(name: str, *, default: Callable[Any] = None, window: faust.types.windows.WindowT = None, partitions: int = None, help: str = None, **kwargs: Any) → faust.types.tables.GlobalTableT[source]

Define new global table.

Parameters
  • name (str) – Name used for global table, note that two global tables living in the same application cannot have the same name.

  • default (Optional[Callable[[], Any]]) – A callable, or type that will return a default valu for keys missing in this global table.

  • window (Optional[WindowT]) – A windowing strategy to wrap this window in.

Examples

>>> gtable = app.GlobalTable('user_to_amount', default=int)
>>> gtable['George']
0
>>> gtable['Elaine'] += 1
>>> gtable['Elaine'] += 1
>>> gtable['Elaine']
2
Return type

GlobalTableT[]

SetTable(name: str, *, window: faust.types.windows.WindowT = None, partitions: int = None, start_manager: bool = False, help: str = None, **kwargs: Any) → faust.types.tables.TableT[source]

Table of sets.

Return type

TableT[~KT, ~VT]

SetGlobalTable(name: str, *, window: faust.types.windows.WindowT = None, partitions: int = None, start_manager: bool = False, help: str = None, **kwargs: Any) → faust.types.tables.TableT[source]

Table of sets (global).

Return type

TableT[~KT, ~VT]

page(path: str, *, base: Type[faust.web.views.View] = <class 'faust.web.views.View'>, cors_options: Mapping[str, faust.types.web.ResourceOptions] = None, name: str = None) → Callable[Union[Type[faust.types.web.View], Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]], Type[faust.web.views.View]][source]

Decorate view to be included in the web server.

Return type

Callable[[Union[Type[View], Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]], Type[View]]

table_route(table: faust.types.tables.CollectionT, shard_param: str = None, *, query_param: str = None, match_info: str = None, exact_key: str = None) → Callable[Union[Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]], Union[Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]]][source]

Decorate view method to route request to table key destination.

Return type

Callable[[Union[Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]], Union[Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]]

command(*options: Any, base: Optional[Type[faust.app.base._AppCommand]] = None, **kwargs: Any) → Callable[Callable, Type[faust.app.base._AppCommand]][source]

Decorate async def function to be used as CLI command.

Return type

Callable[[Callable], Type[_AppCommand]]

create_event(key: Union[bytes, faust.types.core._ModelT, Any, None], value: Union[bytes, faust.types.core._ModelT, Any], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], message: faust.types.tuples.Message) → faust.types.events.EventT[source]

Create new faust.Event object.

Return type

EventT[]

async start_client() → None[source]

Start the app in Client-Only mode necessary for RPC requests.

Notes

Once started as a client the app cannot be restarted as Server.

Return type

None

async maybe_start_client() → None[source]

Start the app in Client-Only mode if not started as Server.

Return type

None

trace(name: str, trace_enabled: bool = True, **extra_context: Any) → ContextManager[source]

Return new trace context to trace operation using OpenTracing.

Return type

ContextManager[+T_co]

traced(fun: Callable, name: str = None, sample_rate: float = 1.0, **context: Any) → Callable[source]

Decorate function to be traced using the OpenTracing API.

Return type

Callable

async send(channel: Union[faust.types.channels.ChannelT, str], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send event to channel/topic.

Parameters
  • channel (Union[ChannelT[], str]) – Channel/topic or the name of a topic to send event to.

  • key (Union[bytes, _ModelT, Any, None]) – Message key.

  • value (Union[bytes, _ModelT, Any, None]) – Message value.

  • partition (Optional[int]) – Specific partition to send to. If not set the partition will be chosen by the partitioner.

  • timestamp (Optional[float]) – Epoch seconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.

  • headers (Union[List[Tuple[str, bytes]], Mapping[str, bytes], None]) – Mapping of key/value pairs, or iterable of key value pairs to use as headers for the message.

  • schema (Optional[SchemaT[~KT, ~VT]]) – Schema to use for serialization.

  • key_serializer (Union[CodecT, str, None]) – Serializer to use (if value is not model). Overrides schema if one is specified.

  • value_serializer (Union[CodecT, str, None]) – Serializer to use (if value is not model). Overrides schema if one is specified.

  • callback (Optional[Callable[[FutureMessage[]], Union[None, Awaitable[None]]]]) –

    Called after the message is fully delivered to the channel, but not to the consumer. Signature must be unary as the FutureMessage future is passed to it.

    The resulting faust.types.tuples.RecordMetadata object is then available as fut.result().

Return type

Awaitable[RecordMetadata]

in_transaction[source]

Return True if stream is using transactions.

LiveCheck(**kwargs: Any) → faust.app.base._LiveCheck[source]

Return new LiveCheck instance testing features for this app.

Return type

_LiveCheck

maybe_start_producer[source]

Ensure producer is started. :rtype: ProducerT[]

async commit(topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool[source]

Commit offset for acked messages in specified topics’.

Warning

This will commit acked messages in all topics if the topics argument is passed in as None.

Return type

bool

async on_stop() → None[source]

Call when application stops.

Tip

Remember to call super if you override this method.

Return type

None

on_rebalance_start() → None[source]

Call when rebalancing starts.

Return type

None

on_rebalance_return() → None[source]
Return type

None

on_rebalance_end() → None[source]

Call when rebalancing is done.

Return type

None

FlowControlQueue(maxsize: int = None, *, clear_on_resume: bool = False, loop: asyncio.events.AbstractEventLoop = None) → mode.utils.queues.ThrowableQueue[source]

Like asyncio.Queue, but can be suspended/resumed.

Return type

ThrowableQueue

Worker(**kwargs: Any) → faust.app.base._Worker[source]

Return application worker instance.

Return type

_Worker

on_webserver_init(web: faust.types.web.Web) → None[source]

Call when the Web server is initializing.

Return type

None

property conf

Application configuration. :rtype: Settings

property producer

Message producer. :rtype: ProducerT[]

property consumer

Message consumer. :rtype: ConsumerT[]

property transport

Consumer message transport. :rtype: TransportT

logger = <Logger faust.app.base (WARNING)>
property producer_transport

Producer message transport. :rtype: TransportT

property cache

Cache backend. :rtype: CacheBackendT[]

tables[source]

Map of available tables, and the table manager service.

topics[source]

Topic Conductor.

This is the mediator that moves messages fetched by the Consumer into the streams.

It’s also a set of registered topics by string topic name, so you can check if a topic is being consumed from by doing topic in app.topics.

property monitor

Monitor keeps stats about what’s going on inside the worker. :rtype: Monitor[]

flow_control[source]

Flow control of streams.

This object controls flow into stream queues, and can also clear all buffers.

property http_client

HTTP client Session. :rtype: ClientSession

assignor[source]

Partition Assignor.

Responsible for partition assignment.

router[source]

Find the node partitioned data belongs to.

The router helps us route web requests to the wanted Faust node. If a topic is sharded by account_id, the router can send us to the Faust worker responsible for any account. Used by the @app.table_route decorator.

web[source]

Web driver.

serializers[source]

Return serializer registry.

property label

Return human readable description of application. :rtype: str

property shortlabel

Return short description of application. :rtype: str