faust.app

class faust.app.App(id: str, *, monitor: faust.sensors.monitor.Monitor = None, config_source: Any = None, loop: asyncio.events.AbstractEventLoop = None, beacon: mode.utils.types.trees.NodeT = None, **options) → None[source]

Faust Application.

Parameters:id (str) – Application ID.
Keyword Arguments:
 loop (asyncio.AbstractEventLoop) – optional event loop to use.

See also

Application Parameters – for supported keyword arguments.

class BootStrategy(app: faust.types.app.AppT, *, enable_web: bool = None, enable_kafka: bool = None, enable_kafka_producer: bool = None, enable_kafka_consumer: bool = None, enable_sensors: bool = None) → None

App startup strategy.

The startup strategy defines the graph of services to start when the Faust worker for an app starts.

agents() → Iterable[mode.types.services.ServiceT]
Return type:Iterable[ServiceT[]]
client_only() → Iterable[mode.types.services.ServiceT]
Return type:Iterable[ServiceT[]]
enable_kafka = True
enable_kafka_consumer = None
enable_kafka_producer = None
enable_sensors = True
enable_web = None
kafka_client_consumer() → Iterable[mode.types.services.ServiceT]
Return type:Iterable[ServiceT[]]
kafka_conductor() → Iterable[mode.types.services.ServiceT]
Return type:Iterable[ServiceT[]]
kafka_consumer() → Iterable[mode.types.services.ServiceT]
Return type:Iterable[ServiceT[]]
kafka_producer() → Iterable[mode.types.services.ServiceT]
Return type:Iterable[ServiceT[]]
producer_only() → Iterable[mode.types.services.ServiceT]
Return type:Iterable[ServiceT[]]
sensors() → Iterable[mode.types.services.ServiceT]
Return type:Iterable[ServiceT[]]
server() → Iterable[mode.types.services.ServiceT]
Return type:Iterable[ServiceT[]]
tables() → Iterable[mode.types.services.ServiceT]
Return type:Iterable[ServiceT[]]
web_components() → Iterable[mode.types.services.ServiceT]
Return type:Iterable[ServiceT[]]
web_server() → Iterable[mode.types.services.ServiceT]
Return type:Iterable[ServiceT[]]
class Settings(id: str, *, version: int = None, broker: Union[str, yarl.URL, List[yarl.URL]] = None, broker_client_id: str = None, broker_request_timeout: Union[datetime.timedelta, float, str] = None, broker_credentials: Union[faust.types.auth.CredentialsT, ssl.SSLContext] = None, broker_commit_every: int = None, broker_commit_interval: Union[datetime.timedelta, float, str] = None, broker_commit_livelock_soft_timeout: Union[datetime.timedelta, float, str] = None, broker_session_timeout: Union[datetime.timedelta, float, str] = None, broker_heartbeat_interval: Union[datetime.timedelta, float, str] = None, broker_check_crcs: bool = None, broker_max_poll_records: int = None, agent_supervisor: Union[_T, str] = None, store: Union[str, yarl.URL] = None, cache: Union[str, yarl.URL] = None, web: Union[str, yarl.URL] = None, web_enabled: bool = True, processing_guarantee: Union[str, faust.types.enums.ProcessingGuarantee] = None, timezone: datetime.tzinfo = None, autodiscover: Union[bool, Iterable[str], Callable[Iterable[str]]] = None, origin: str = None, canonical_url: Union[str, yarl.URL] = None, datadir: Union[pathlib.Path, str] = None, tabledir: Union[pathlib.Path, str] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, logging_config: Dict = None, loghandlers: List[logging.Handler] = None, table_cleanup_interval: Union[datetime.timedelta, float, str] = None, table_standby_replicas: int = None, topic_replication_factor: int = None, topic_partitions: int = None, topic_allow_declare: bool = None, id_format: str = None, reply_to: str = None, reply_to_prefix: str = None, reply_create_topic: bool = None, reply_expires: Union[datetime.timedelta, float, str] = None, ssl_context: ssl.SSLContext = None, stream_buffer_maxsize: int = None, stream_wait_empty: bool = None, stream_ack_cancelled_tasks: bool = None, stream_ack_exceptions: bool = None, stream_publish_on_commit: bool = None, stream_recovery_delay: Union[datetime.timedelta, float, str] = None, producer_linger_ms: int = None, producer_max_batch_size: int = None, producer_acks: int = None, producer_max_request_size: int = None, producer_compression_type: str = None, producer_partitioner: Union[_T, str] = None, producer_request_timeout: Union[datetime.timedelta, float, str] = None, consumer_max_fetch_size: int = None, consumer_auto_offset_reset: str = None, web_bind: str = None, web_port: int = None, web_host: str = None, web_transport: Union[str, yarl.URL] = None, web_in_thread: bool = None, web_cors_options: Mapping[str, faust.types.web.ResourceOptions] = None, worker_redirect_stdouts: bool = None, worker_redirect_stdouts_level: Union[int, str] = None, Agent: Union[_T, str] = None, ConsumerScheduler: Union[_T, str] = None, Stream: Union[_T, str] = None, Table: Union[_T, str] = None, SetTable: Union[_T, str] = None, TableManager: Union[_T, str] = None, Serializers: Union[_T, str] = None, Worker: Union[_T, str] = None, PartitionAssignor: Union[_T, str] = None, LeaderAssignor: Union[_T, str] = None, Router: Union[_T, str] = None, Topic: Union[_T, str] = None, HttpClient: Union[_T, str] = None, Monitor: Union[_T, str] = None, url: Union[str, yarl.URL] = None, **kwargs) → None
Agent
Return type:Type[AgentT[]]
ConsumerScheduler
Return type:Type[SchedulingStrategyT]
HttpClient
Return type:Type[ClientSession]
LeaderAssignor
Return type:Type[LeaderAssignorT[]]
Monitor
Return type:Type[SensorT[]]
PartitionAssignor
Return type:Type[PartitionAssignorT]
Router
Return type:Type[RouterT]
Serializers
Return type:Type[RegistryT]
SetTable
Return type:Type[TableT[~KT, ~VT]]
Stream
Return type:Type[StreamT[+T_co]]
Table
Return type:Type[TableT[~KT, ~VT]]
TableManager
Return type:Type[TableManagerT[]]
Topic
Return type:Type[TopicT[]]
Worker
Return type:Type[_WorkerT]
agent_supervisor
Return type:Type[SupervisorStrategyT]
appdir
Return type:Path
autodiscover = False
broker
Return type:List[URL]
broker_check_crcs = True
broker_client_id = 'faust-1.5.0'
broker_commit_every = 10000
broker_commit_interval
Return type:float
broker_commit_livelock_soft_timeout
Return type:float
broker_credentials
Return type:Optional[CredentialsT]
broker_heartbeat_interval
Return type:float
broker_max_poll_records
Return type:Optional[int]
broker_request_timeout
Return type:float
broker_session_timeout
Return type:float
cache
Return type:URL
canonical_url
Return type:URL
consumer_auto_offset_reset = 'earliest'
consumer_max_fetch_size = 4194304
datadir
Return type:Path
find_old_versiondirs() → Iterable[pathlib.Path]
Return type:Iterable[Path]
id
Return type:str
id_format = '{id}-v{self.version}'
key_serializer = 'raw'
logging_config = None
name
Return type:str
origin
Return type:Optional[str]
processing_guarantee
Return type:ProcessingGuarantee
producer_acks = -1
producer_compression_type = None
producer_linger_ms = 0
producer_max_batch_size = 16384
producer_max_request_size = 1000000
producer_partitioner
Return type:Optional[Callable[[Optional[bytes], Sequence[int], Sequence[int]], int]]
producer_request_timeout
Return type:float
reply_create_topic = False
reply_expires
Return type:float
reply_to_prefix = 'f-reply-'
classmethod setting_names() → Set[str]
Return type:Set[str]
ssl_context = None
store
Return type:URL
stream_ack_cancelled_tasks = False
stream_ack_exceptions = True
stream_buffer_maxsize = 4096
stream_publish_on_commit = False
stream_recovery_delay
Return type:float
stream_wait_empty = True
table_cleanup_interval
Return type:float
table_standby_replicas = 1
tabledir
Return type:Path
timezone = datetime.timezone.utc
topic_allow_declare = True
topic_partitions = 8
topic_replication_factor = 1
value_serializer = 'json'
version
Return type:int
web
Return type:URL
web_bind = '0.0.0.0'
web_cors_options = None
web_host = 'build-8846501-project-230058-faust'
web_in_thread = False
web_port = 6066
web_transport
Return type:URL
worker_redirect_stdouts = True
worker_redirect_stdouts_level = 'WARN'
client_only = False

Set this to True if app should only start the services required to operate as an RPC client (producer and simple reply consumer).

producer_only = False

Set this to True if app should run without consumer/tables.

tracer = None

Optional tracing support.

on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Return list of service dependencies for this service.

Return type:Iterable[ServiceT[]]
config_from_object(obj: Any, *, silent: bool = False, force: bool = False) → None[source]

Read configuration from object.

Object is either an actual object or the name of a module to import.

Examples

>>> app.config_from_object('myproj.faustconfig')
>>> from myproj import faustconfig
>>> app.config_from_object(faustconfig)
Parameters:
  • silent (bool) – If true then import errors will be ignored.
  • force (bool) – Force reading configuration immediately. By default the configuration will be read only when required.
Return type:

None

finalize() → None[source]
Return type:None
worker_init() → None[source]
Return type:None
discover(*extra_modules, categories: Iterable[str] = ['faust.agent', 'faust.command', 'faust.page', 'faust.service', 'faust.task'], ignore: Iterable[str] = ['test_.*', '.*__main__.*']) → None[source]
Return type:None
main() → NoReturn[source]

Execute the faust umbrella command using this app.

Return type:_NoReturn
topic(*topics, pattern: Union[str, Pattern[~AnyStr]] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, maxsize: int = None, allow_empty: bool = False, loop: asyncio.events.AbstractEventLoop = None) → faust.types.topics.TopicT[source]

Create topic description.

Topics are named channels (for example a Kafka topic), that exist on a server. To make an ephemeral local communication channel use: channel().

Return type:TopicT[]
channel(*, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, maxsize: int = None, loop: asyncio.events.AbstractEventLoop = None) → faust.types.channels.ChannelT[source]

Create new channel.

By default this will create an in-memory channel used for intra-process communication, but in practice channels can be backed by any transport (network or even means of inter-process communication).

Return type:ChannelT[]
agent(channel: Union[str, faust.types.channels.ChannelT] = None, *, name: str = None, concurrency: int = 1, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, isolated_partitions: bool = False, use_reply_headers: bool = False, **kwargs) → Callable[Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], faust.types.agents.AgentT][source]

Create Agent from async def function.

It can be a regular async function:

@app.agent()
async def my_agent(stream):
    async for number in stream:
        print(f'Received: {number!r}')

Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:

@app.agent(sink=[log_topic])
async def my_agent(requests):
    async for number in requests:
        yield number * 2
Return type:Callable[[Callable[[StreamT[+T_co]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable[+T_co]]]], AgentT[]]
actor(channel: Union[str, faust.types.channels.ChannelT] = None, *, name: str = None, concurrency: int = 1, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, isolated_partitions: bool = False, use_reply_headers: bool = False, **kwargs) → Callable[Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], faust.types.agents.AgentT]

Create Agent from async def function.

It can be a regular async function:

@app.agent()
async def my_agent(stream):
    async for number in stream:
        print(f'Received: {number!r}')

Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:

@app.agent(sink=[log_topic])
async def my_agent(requests):
    async for number in requests:
        yield number * 2
Return type:Callable[[Callable[[StreamT[+T_co]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable[+T_co]]]], AgentT[]]
task(fun: Union[Callable[AppT, Awaitable], Callable[Awaitable]] = None, *, on_leader: bool = False, traced: bool = True) → Union[Callable[Union[Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]], Union[Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]]], Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]][source]

Define an async def function to be started with the app.

This is like timer() but a one-shot task only executed at worker startup (after recovery and the worker is fully ready for operation).

The function may take zero, or one argument. If the target function takes an argument, the app argument is passed:

>>> @app.task
>>> async def on_startup(app):
...    print('STARTING UP: %r' % (app,))

Nullary functions are also supported:

>>> @app.task
>>> async def on_startup():
...     print('STARTING UP')
Return type:Union[Callable[[Union[Callable[[AppT[]], Awaitable[+T_co]], Callable[[], Awaitable[+T_co]]]], Union[Callable[[AppT[]], Awaitable[+T_co]], Callable[[], Awaitable[+T_co]]]], Callable[[AppT[]], Awaitable[+T_co]], Callable[[], Awaitable[+T_co]]]
timer(interval: Union[datetime.timedelta, float, str], on_leader: bool = False, traced: bool = True) → Callable[source]

Define an async def function to be run at periodic intervals.

Like task(), but executes periodically until the worker is shut down.

This decorator takes an async function and adds it to a list of timers started with the app.

Parameters:
  • interval (Seconds) – How often the timer executes in seconds.
  • on_leader (bool) – Should the timer only run on the leader?

Example

>>> @app.timer(interval=10.0)
>>> async def every_10_seconds():
...     print('TEN SECONDS JUST PASSED')
>>> app.timer(interval=5.0, on_leader=True)
>>> async def every_5_seconds():
...     print('FIVE SECONDS JUST PASSED. ALSO, I AM THE LEADER!')
Return type:Callable
crontab(cron_format: str, *, timezone: datetime.tzinfo = None, on_leader: bool = False, traced: bool = True) → Callable[source]

Define periodic task using crontab description.

This is an async def function to be run at the fixed times, defined by the cron format.

Like timer(), but executes at fixed times instead of executing at certain intervals.

This decorator takes an async function and adds it to a list of cronjobs started with the app.

Parameters:

cron_format (str) – The cron spec defining fixed times to run the decorated function.

Keyword Arguments:
 
  • timezone – The timezone to be taken into account for the cron jobs. If not set value from timezone will be taken.
  • on_leader – Should the cron job only run on the leader?

Example

>>> @app.crontab(cron_format='30 18 * * *',
                 timezone=pytz.timezone('US/Pacific'))
>>> async def every_6_30_pm_pacific():
...     print('IT IS 6:30pm')
>>> app.crontab(cron_format='30 18 * * *', on_leader=True)
>>> async def every_6_30_pm():
...     print('6:30pm UTC; ALSO, I AM THE LEADER!')
Return type:Callable
service(cls: Type[mode.types.services.ServiceT]) → Type[mode.types.services.ServiceT][source]

Decorate mode.Service to be started with the app.

Examples

from mode import Service

@app.service
class Foo(Service):
    ...
Return type:Type[ServiceT[]]
is_leader() → bool[source]
Return type:bool
stream(channel: Union[AsyncIterable, Iterable], beacon: mode.utils.types.trees.NodeT = None, **kwargs) → faust.types.streams.StreamT[source]

Create new stream from channel/topic/iterable/async iterable.

Parameters:
Return type:

StreamT[+T_co]

Returns:

to iterate over events in the stream.

Return type:

faust.Stream

Table(name: str, *, default: Callable[Any] = None, window: faust.types.windows.WindowT = None, partitions: int = None, help: str = None, **kwargs) → faust.types.tables.TableT[source]

Define new table.

Parameters:
  • name (str) – Name used for table, note that two tables living in the same application cannot have the same name.
  • default (Optional[Callable[[], Any]]) – A callable, or type that will return a default value for keys missing in this table.
  • window (Optional[WindowT]) – A windowing strategy to wrap this window in.

Examples

>>> table = app.Table('user_to_amount', default=int)
>>> table['George']
0
>>> table['Elaine'] += 1
>>> table['Elaine'] += 1
>>> table['Elaine']
2
Return type:TableT[~KT, ~VT]
SetTable(name: str, *, window: faust.types.windows.WindowT = None, partitions: int = None, help: str = None, **kwargs) → faust.types.tables.TableT[source]
Return type:TableT[~KT, ~VT]
page(path: str, *, base: Type[faust.web.views.View] = <class 'faust.web.views.View'>, cors_options: Mapping[str, faust.types.web.ResourceOptions] = None, name: str = None) → Callable[Union[Type[faust.types.web.View], Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]], Type[faust.web.views.View]][source]
Return type:Callable[[Union[Type[View], Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]], Type[View]]
table_route(table: faust.types.tables.CollectionT, shard_param: str = None, *, query_param: str = None, match_info: str = None) → Callable[Union[Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]], Union[Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]]][source]
Return type:Callable[[Union[Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]], Union[Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]]
command(*options, base: Optional[Type[faust.app.base._AppCommand]] = None, **kwargs) → Callable[Callable, Type[faust.app.base._AppCommand]][source]
Return type:Callable[[Callable], Type[_AppCommand]]
trace(name: str, trace_enabled: bool = True, **extra_context) → ContextManager[source]
Return type:ContextManager[+T_co]
traced(fun: Callable, name: str = None, sample_rate: float = 1.0, **context) → Callable[source]
Return type:Callable
in_transaction[source]
on_rebalance_start() → None[source]
Return type:None
on_rebalance_end() → None[source]
Return type:None
FlowControlQueue(maxsize: int = None, *, clear_on_resume: bool = False, loop: asyncio.events.AbstractEventLoop = None) → mode.utils.queues.ThrowableQueue[source]

Like asyncio.Queue, but can be suspended/resumed.

Return type:ThrowableQueue
Worker(**kwargs) → faust.app.base._Worker[source]
Return type:_Worker
on_webserver_init(web: faust.types.web.Web) → None[source]
Return type:None
coroutine commit(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool[source]

Commit offset for acked messages in specified topics’.

Warning

This will commit acked messages in all topics if the topics argument is passed in as None.

Return type:bool
conf
Return type:Settings
logger = <Logger faust.app.base (WARNING)>
coroutine maybe_start_client(self) → None[source]

Start the app in Client-Only mode if not started as Server.

Return type:None
maybe_start_producer[source]

Ensure producer is started.

coroutine on_first_start(self) → None[source]

Service started for the first time in this process.

Return type:None
coroutine on_init_extra_service(self, service: Union[mode.types.services.ServiceT, Type[mode.types.services.ServiceT]]) → mode.types.services.ServiceT[source]
Return type:ServiceT[]
coroutine on_start(self) → None[source]

Service is starting.

Return type:None
coroutine on_started(self) → None[source]

Service has started.

Return type:None
coroutine on_started_init_extra_services(self) → None[source]
Return type:None
coroutine on_started_init_extra_tasks(self) → None[source]
Return type:None
coroutine on_stop(self) → None[source]

Service is being stopped/restarted.

Return type:None
coroutine send(self, channel: Union[faust.types.channels.ChannelT, str], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send event to channel/topic.

Parameters:
  • channel (Union[ChannelT[], str]) – Channel/topic or the name of a topic to send event to.
  • key (Union[bytes, _ModelT, Any, None]) – Message key.
  • value (Union[bytes, _ModelT, Any, None]) – Message value.
  • partition (Optional[int]) – Specific partition to send to. If not set the partition will be chosen by the partitioner.
  • timestamp (Optional[float]) – Epoch seconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.
  • headers (Union[List[Tuple[str, bytes]], Mapping[str, bytes], None]) – Mapping of key/value pairs, or iterable of key value pairs to use as headers for the message.
  • key_serializer (Union[CodecT, str, None]) – Serializer to use (if value is not model).
  • value_serializer (Union[CodecT, str, None]) – Serializer to use (if value is not model).
  • callback (Optional[Callable[[FutureMessage[]], Union[None, Awaitable[None]]]]) –

    Called after the message is fully delivered to the channel, but not to the consumer. Signature must be unary as the FutureMessage future is passed to it.

    The resulting faust.types.tuples.RecordMetadata object is then available as fut.result().

Return type:

Awaitable[RecordMetadata]

coroutine start_client(self) → None[source]

Start the app in Client-Only mode necessary for RPC requests.

Notes

Once started as a client the app cannot be restarted as Server.

Return type:None
producer
Return type:ProducerT[]
consumer
Return type:ConsumerT[]
transport

Message transport. :rtype: TransportT

cache
Return type:CacheBackendT[]
tables[source]

Map of available tables, and the table manager service.

topics[source]

Topic Conductor.

This is the mediator that moves messages fetched by the Consumer into the streams.

It’s also a set of registered topics by string topic name, so you can check if a topic is being consumed from by doing topic in app.topics.

monitor

Monitor keeps stats about what’s going on inside the worker. :rtype: Monitor[]

flow_control[source]

Flow control of streams.

This object controls flow into stream queues, and can also clear all buffers.

http_client

HTTP Client Session. :rtype: ClientSession

assignor[source]

Partition Assignor.

Responsible for partition assignment.

router[source]

Find the node partitioned data belongs to.

The router helps us route web requests to the wanted Faust node. If a topic is sharded by account_id, the router can send us to the Faust worker responsible for any account. Used by the @app.table_route decorator.

web[source]
serializers[source]
label

Label used for graphs. :rtype: str

shortlabel

Label used for logging. :rtype: str

class faust.app.BootStrategy(app: faust.types.app.AppT, *, enable_web: bool = None, enable_kafka: bool = None, enable_kafka_producer: bool = None, enable_kafka_consumer: bool = None, enable_sensors: bool = None) → None[source]

App startup strategy.

The startup strategy defines the graph of services to start when the Faust worker for an app starts.

enable_kafka = True
enable_kafka_producer = None
enable_kafka_consumer = None
enable_web = None
enable_sensors = True
server() → Iterable[mode.types.services.ServiceT][source]
Return type:Iterable[ServiceT[]]
client_only() → Iterable[mode.types.services.ServiceT][source]
Return type:Iterable[ServiceT[]]
producer_only() → Iterable[mode.types.services.ServiceT][source]
Return type:Iterable[ServiceT[]]
sensors() → Iterable[mode.types.services.ServiceT][source]
Return type:Iterable[ServiceT[]]
kafka_producer() → Iterable[mode.types.services.ServiceT][source]
Return type:Iterable[ServiceT[]]
kafka_consumer() → Iterable[mode.types.services.ServiceT][source]
Return type:Iterable[ServiceT[]]
kafka_client_consumer() → Iterable[mode.types.services.ServiceT][source]
Return type:Iterable[ServiceT[]]
agents() → Iterable[mode.types.services.ServiceT][source]
Return type:Iterable[ServiceT[]]
kafka_conductor() → Iterable[mode.types.services.ServiceT][source]
Return type:Iterable[ServiceT[]]
web_server() → Iterable[mode.types.services.ServiceT][source]
Return type:Iterable[ServiceT[]]
web_components() → Iterable[mode.types.services.ServiceT][source]
Return type:Iterable[ServiceT[]]
tables() → Iterable[mode.types.services.ServiceT][source]
Return type:Iterable[ServiceT[]]