faust

Python Stream processing.

class faust.Agent(fun: Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], *, app: faust.types.app.AppT, name: str = None, channel: Union[str, faust.types.channels.ChannelT] = None, concurrency: int = 1, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, on_error: Callable[[AgentT, BaseException], Awaitable] = None, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, help: str = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, isolated_partitions: bool = False, use_reply_headers: bool = None, **kwargs) → None[source]

Agent.

This is the type of object returned by the @app.agent decorator.

supervisor = None
on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Return list of service dependencies for this service.

Return type

Iterable[ServiceT[]]

cancel() → None[source]
Return type

None

info() → Mapping[source]
Return type

Mapping[~KT, +VT_co]

clone(*, cls: Type[faust.types.agents.AgentT] = None, **kwargs) → faust.types.agents.AgentT[source]
Return type

AgentT[]

test_context(channel: faust.types.channels.ChannelT = None, supervisor_strategy: mode.types.supervisors.SupervisorStrategyT = None, on_error: Callable[[AgentT, BaseException], Awaitable] = None, **kwargs) → faust.types.agents.AgentTestWrapperT[source]
Return type

AgentTestWrapperT[]

actor_from_stream(stream: Optional[faust.types.streams.StreamT], *, index: int = None, active_partitions: Set[faust.types.tuples.TP] = None, channel: faust.types.channels.ChannelT = None) → faust.types.agents.ActorT[Union[AsyncIterable, Awaitable]][source]
Return type

ActorT[]

add_sink(sink: Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]) → None[source]
Return type

None

stream(channel: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, **kwargs) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

coroutine ask(self, value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None) → Any[source]
Return type

Any

coroutine ask_nowait(self, value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None, force: bool = False) → faust.agents.replies.ReplyPromise[source]
Return type

ReplyPromise

coroutine cast(self, value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None) → None[source]
Return type

None

coroutine join(self, values: Union[AsyncIterable[Union[bytes, faust.types.core._ModelT, Any]], Iterable[Union[bytes, faust.types.core._ModelT, Any]]], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]
Return type

List[Any]

coroutine kvjoin(self, items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]
Return type

List[Any]

kvmap(items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[str][source]
Return type

AsyncIterator[str]

logger = <Logger faust.agents.agent (WARNING)>
map(values: Union[AsyncIterable, Iterable], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[source]
Return type

AsyncIterator[+T_co]

coroutine on_isolated_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]
Return type

None

coroutine on_isolated_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None[source]
Return type

None

coroutine on_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]
Return type

None

coroutine on_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None[source]
Return type

None

coroutine on_shared_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]
Return type

None

coroutine on_shared_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None[source]
Return type

None

coroutine on_start(self) → None[source]

Service is starting.

Return type

None

coroutine on_stop(self) → None[source]

Service is being stopped/restarted.

Return type

None

coroutine send(self, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send message to topic used by agent.

Return type

Awaitable[RecordMetadata]

get_topic_names() → Iterable[str][source]
Return type

Iterable[str]

channel
Return type

ChannelT[]

channel_iterator
Return type

AsyncIterator[+T_co]

label

Label used for graphs. :rtype: str

shortlabel

Label used for logging. :rtype: str

class faust.App(id: str, *, monitor: faust.sensors.monitor.Monitor = None, config_source: Any = None, loop: asyncio.events.AbstractEventLoop = None, beacon: mode.utils.types.trees.NodeT = None, **options) → None[source]

Faust Application.

Parameters

id (str) – Application ID.

Keyword Arguments

loop (asyncio.AbstractEventLoop) – optional event loop to use.

See also

Application Parameters – for supported keyword arguments.

class BootStrategy(app: faust.types.app.AppT, *, enable_web: bool = None, enable_kafka: bool = None, enable_kafka_producer: bool = None, enable_kafka_consumer: bool = None, enable_sensors: bool = None) → None

App startup strategy.

The startup strategy defines the graph of services to start when the Faust worker for an app starts.

agents() → Iterable[mode.types.services.ServiceT]
Return type

Iterable[ServiceT[]]

client_only() → Iterable[mode.types.services.ServiceT]
Return type

Iterable[ServiceT[]]

enable_kafka = True
enable_kafka_consumer = None
enable_kafka_producer = None
enable_sensors = True
enable_web = None
kafka_client_consumer() → Iterable[mode.types.services.ServiceT]
Return type

Iterable[ServiceT[]]

kafka_conductor() → Iterable[mode.types.services.ServiceT]
Return type

Iterable[ServiceT[]]

kafka_consumer() → Iterable[mode.types.services.ServiceT]
Return type

Iterable[ServiceT[]]

kafka_producer() → Iterable[mode.types.services.ServiceT]
Return type

Iterable[ServiceT[]]

producer_only() → Iterable[mode.types.services.ServiceT]
Return type

Iterable[ServiceT[]]

sensors() → Iterable[mode.types.services.ServiceT]
Return type

Iterable[ServiceT[]]

server() → Iterable[mode.types.services.ServiceT]
Return type

Iterable[ServiceT[]]

tables() → Iterable[mode.types.services.ServiceT]
Return type

Iterable[ServiceT[]]

web_components() → Iterable[mode.types.services.ServiceT]
Return type

Iterable[ServiceT[]]

web_server() → Iterable[mode.types.services.ServiceT]
Return type

Iterable[ServiceT[]]

class Settings(id: str, *, version: int = None, broker: Union[str, yarl.URL, List[yarl.URL]] = None, broker_client_id: str = None, broker_request_timeout: Union[datetime.timedelta, float, str] = None, broker_credentials: Union[faust.types.auth.CredentialsT, ssl.SSLContext] = None, broker_commit_every: int = None, broker_commit_interval: Union[datetime.timedelta, float, str] = None, broker_commit_livelock_soft_timeout: Union[datetime.timedelta, float, str] = None, broker_session_timeout: Union[datetime.timedelta, float, str] = None, broker_heartbeat_interval: Union[datetime.timedelta, float, str] = None, broker_check_crcs: bool = None, broker_max_poll_records: int = None, agent_supervisor: Union[_T, str] = None, store: Union[str, yarl.URL] = None, cache: Union[str, yarl.URL] = None, web: Union[str, yarl.URL] = None, web_enabled: bool = True, processing_guarantee: Union[str, faust.types.enums.ProcessingGuarantee] = None, timezone: datetime.tzinfo = None, autodiscover: Union[bool, Iterable[str], Callable[Iterable[str]]] = None, origin: str = None, canonical_url: Union[str, yarl.URL] = None, datadir: Union[pathlib.Path, str] = None, tabledir: Union[pathlib.Path, str] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, logging_config: Dict = None, loghandlers: List[logging.Handler] = None, table_cleanup_interval: Union[datetime.timedelta, float, str] = None, table_standby_replicas: int = None, topic_replication_factor: int = None, topic_partitions: int = None, topic_allow_declare: bool = None, id_format: str = None, reply_to: str = None, reply_to_prefix: str = None, reply_create_topic: bool = None, reply_expires: Union[datetime.timedelta, float, str] = None, ssl_context: ssl.SSLContext = None, stream_buffer_maxsize: int = None, stream_wait_empty: bool = None, stream_ack_cancelled_tasks: bool = None, stream_ack_exceptions: bool = None, stream_publish_on_commit: bool = None, stream_recovery_delay: Union[datetime.timedelta, float, str] = None, producer_linger_ms: int = None, producer_max_batch_size: int = None, producer_acks: int = None, producer_max_request_size: int = None, producer_compression_type: str = None, producer_partitioner: Union[_T, str] = None, producer_request_timeout: Union[datetime.timedelta, float, str] = None, producer_api_version: str = None, consumer_max_fetch_size: int = None, consumer_auto_offset_reset: str = None, web_bind: str = None, web_port: int = None, web_host: str = None, web_transport: Union[str, yarl.URL] = None, web_in_thread: bool = None, web_cors_options: Mapping[str, faust.types.web.ResourceOptions] = None, worker_redirect_stdouts: bool = None, worker_redirect_stdouts_level: Union[int, str] = None, Agent: Union[_T, str] = None, ConsumerScheduler: Union[_T, str] = None, Stream: Union[_T, str] = None, Table: Union[_T, str] = None, SetTable: Union[_T, str] = None, TableManager: Union[_T, str] = None, Serializers: Union[_T, str] = None, Worker: Union[_T, str] = None, PartitionAssignor: Union[_T, str] = None, LeaderAssignor: Union[_T, str] = None, Router: Union[_T, str] = None, Topic: Union[_T, str] = None, HttpClient: Union[_T, str] = None, Monitor: Union[_T, str] = None, url: Union[str, yarl.URL] = None, **kwargs) → None
Agent
Return type

Type[AgentT[]]

ConsumerScheduler
Return type

Type[SchedulingStrategyT]

HttpClient
Return type

Type[ClientSession]

LeaderAssignor
Return type

Type[LeaderAssignorT[]]

Monitor
Return type

Type[SensorT[]]

PartitionAssignor
Return type

Type[PartitionAssignorT]

Router
Return type

Type[RouterT]

Serializers
Return type

Type[RegistryT]

SetTable
Return type

Type[TableT[~KT, ~VT]]

Stream
Return type

Type[StreamT[+T_co]]

Table
Return type

Type[TableT[~KT, ~VT]]

TableManager
Return type

Type[TableManagerT[]]

Topic
Return type

Type[TopicT[]]

Worker
Return type

Type[_WorkerT]

agent_supervisor
Return type

Type[SupervisorStrategyT]

appdir
Return type

Path

autodiscover = False
broker
Return type

List[URL]

broker_check_crcs = True
broker_client_id = 'faust-1.6.0'
broker_commit_every = 10000
broker_commit_interval
Return type

float

broker_commit_livelock_soft_timeout
Return type

float

broker_credentials
Return type

Optional[CredentialsT]

broker_heartbeat_interval
Return type

float

broker_max_poll_records
Return type

Optional[int]

broker_request_timeout
Return type

float

broker_session_timeout
Return type

float

cache
Return type

URL

canonical_url
Return type

URL

consumer_auto_offset_reset = 'earliest'
consumer_max_fetch_size = 4194304
datadir
Return type

Path

find_old_versiondirs() → Iterable[pathlib.Path]
Return type

Iterable[Path]

id
Return type

str

id_format = '{id}-v{self.version}'
key_serializer = 'raw'
logging_config = None
name
Return type

str

origin
Return type

Optional[str]

processing_guarantee
Return type

ProcessingGuarantee

producer_acks = -1
producer_api_version = 'auto'
producer_compression_type = None
producer_linger_ms = 0
producer_max_batch_size = 16384
producer_max_request_size = 1000000
producer_partitioner
Return type

Optional[Callable[[Optional[bytes], Sequence[int], Sequence[int]], int]]

producer_request_timeout
Return type

float

reply_create_topic = False
reply_expires
Return type

float

reply_to_prefix = 'f-reply-'
classmethod setting_names() → Set[str]
Return type

Set[str]

ssl_context = None
store
Return type

URL

stream_ack_cancelled_tasks = True
stream_ack_exceptions = True
stream_buffer_maxsize = 4096
stream_publish_on_commit = False
stream_recovery_delay
Return type

float

stream_wait_empty = True
table_cleanup_interval
Return type

float

table_standby_replicas = 1
tabledir
Return type

Path

timezone = datetime.timezone.utc
topic_allow_declare = True
topic_partitions = 8
topic_replication_factor = 1
value_serializer = 'json'
version
Return type

int

web
Return type

URL

web_bind = '0.0.0.0'
web_cors_options = None
web_host = 'build-8986882-project-230058-faust'
web_in_thread = False
web_port = 6066
web_transport
Return type

URL

worker_redirect_stdouts = True
worker_redirect_stdouts_level = 'WARN'
client_only = False

Set this to True if app should only start the services required to operate as an RPC client (producer and simple reply consumer).

producer_only = False

Set this to True if app should run without consumer/tables.

tracer = None

Optional tracing support.

on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Return list of service dependencies for this service.

Return type

Iterable[ServiceT[]]

config_from_object(obj: Any, *, silent: bool = False, force: bool = False) → None[source]

Read configuration from object.

Object is either an actual object or the name of a module to import.

Examples

>>> app.config_from_object('myproj.faustconfig')
>>> from myproj import faustconfig
>>> app.config_from_object(faustconfig)
Parameters
  • silent (bool) – If true then import errors will be ignored.

  • force (bool) – Force reading configuration immediately. By default the configuration will be read only when required.

Return type

None

finalize() → None[source]
Return type

None

worker_init() → None[source]
Return type

None

discover(*extra_modules, categories: Iterable[str] = ['faust.agent', 'faust.command', 'faust.page', 'faust.service', 'faust.task'], ignore: Iterable[Any] = [<built-in method search of _sre.SRE_Pattern object>, '.__main__']) → None[source]
Return type

None

main() → NoReturn[source]

Execute the faust umbrella command using this app.

Return type

_NoReturn

topic(*topics, pattern: Union[str, Pattern[~AnyStr]] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, maxsize: int = None, allow_empty: bool = False, loop: asyncio.events.AbstractEventLoop = None) → faust.types.topics.TopicT[source]

Create topic description.

Topics are named channels (for example a Kafka topic), that exist on a server. To make an ephemeral local communication channel use: channel().

Return type

TopicT[]

channel(*, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, maxsize: int = None, loop: asyncio.events.AbstractEventLoop = None) → faust.types.channels.ChannelT[source]

Create new channel.

By default this will create an in-memory channel used for intra-process communication, but in practice channels can be backed by any transport (network or even means of inter-process communication).

Return type

ChannelT[]

agent(channel: Union[str, faust.types.channels.ChannelT] = None, *, name: str = None, concurrency: int = 1, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, isolated_partitions: bool = False, use_reply_headers: bool = False, **kwargs) → Callable[Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], faust.types.agents.AgentT][source]

Create Agent from async def function.

It can be a regular async function:

@app.agent()
async def my_agent(stream):
    async for number in stream:
        print(f'Received: {number!r}')

Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:

@app.agent(sink=[log_topic])
async def my_agent(requests):
    async for number in requests:
        yield number * 2
Return type

Callable[[Callable[[StreamT[+T_co]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable[+T_co]]]], AgentT[]]

actor(channel: Union[str, faust.types.channels.ChannelT] = None, *, name: str = None, concurrency: int = 1, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, isolated_partitions: bool = False, use_reply_headers: bool = False, **kwargs) → Callable[Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], faust.types.agents.AgentT]

Create Agent from async def function.

It can be a regular async function:

@app.agent()
async def my_agent(stream):
    async for number in stream:
        print(f'Received: {number!r}')

Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:

@app.agent(sink=[log_topic])
async def my_agent(requests):
    async for number in requests:
        yield number * 2
Return type

Callable[[Callable[[StreamT[+T_co]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable[+T_co]]]], AgentT[]]

task(fun: Union[Callable[AppT, Awaitable], Callable[Awaitable]] = None, *, on_leader: bool = False, traced: bool = True) → Union[Callable[Union[Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]], Union[Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]]], Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]][source]

Define an async def function to be started with the app.

This is like timer() but a one-shot task only executed at worker startup (after recovery and the worker is fully ready for operation).

The function may take zero, or one argument. If the target function takes an argument, the app argument is passed:

>>> @app.task
>>> async def on_startup(app):
...    print('STARTING UP: %r' % (app,))

Nullary functions are also supported:

>>> @app.task
>>> async def on_startup():
...     print('STARTING UP')
Return type

Union[Callable[[Union[Callable[[AppT[]], Awaitable[+T_co]], Callable[[], Awaitable[+T_co]]]], Union[Callable[[AppT[]], Awaitable[+T_co]], Callable[[], Awaitable[+T_co]]]], Callable[[AppT[]], Awaitable[+T_co]], Callable[[], Awaitable[+T_co]]]

timer(interval: Union[datetime.timedelta, float, str], on_leader: bool = False, traced: bool = True, name: str = None, max_drift_correction: float = 0.1) → Callable[source]

Define an async def function to be run at periodic intervals.

Like task(), but executes periodically until the worker is shut down.

This decorator takes an async function and adds it to a list of timers started with the app.

Parameters
  • interval (Seconds) – How often the timer executes in seconds.

  • on_leader (bool) – Should the timer only run on the leader?

Example

>>> @app.timer(interval=10.0)
>>> async def every_10_seconds():
...     print('TEN SECONDS JUST PASSED')
>>> app.timer(interval=5.0, on_leader=True)
>>> async def every_5_seconds():
...     print('FIVE SECONDS JUST PASSED. ALSO, I AM THE LEADER!')
Return type

Callable

crontab(cron_format: str, *, timezone: datetime.tzinfo = None, on_leader: bool = False, traced: bool = True) → Callable[source]

Define periodic task using Crontab description.

This is an async def function to be run at the fixed times, defined by the Cron format.

Like timer(), but executes at fixed times instead of executing at certain intervals.

This decorator takes an async function and adds it to a list of Cronjobs started with the app.

Parameters

cron_format (str) – The Cron spec defining fixed times to run the decorated function.

Keyword Arguments
  • timezone – The timezone to be taken into account for the Cron jobs. If not set value from timezone will be taken.

  • on_leader – Should the Cron job only run on the leader?

Example

>>> @app.crontab(cron_format='30 18 * * *',
                 timezone=pytz.timezone('US/Pacific'))
>>> async def every_6_30_pm_pacific():
...     print('IT IS 6:30pm')
>>> app.crontab(cron_format='30 18 * * *', on_leader=True)
>>> async def every_6_30_pm():
...     print('6:30pm UTC; ALSO, I AM THE LEADER!')
Return type

Callable

service(cls: Type[mode.types.services.ServiceT]) → Type[mode.types.services.ServiceT][source]

Decorate mode.Service to be started with the app.

Examples

from mode import Service

@app.service
class Foo(Service):
    ...
Return type

Type[ServiceT[]]

is_leader() → bool[source]
Return type

bool

stream(channel: Union[AsyncIterable, Iterable], beacon: mode.utils.types.trees.NodeT = None, **kwargs) → faust.types.streams.StreamT[source]

Create new stream from channel/topic/iterable/async iterable.

Parameters
Return type

StreamT[+T_co]

Returns

to iterate over events in the stream.

Return type

faust.Stream

Table(name: str, *, default: Callable[Any] = None, window: faust.types.windows.WindowT = None, partitions: int = None, help: str = None, **kwargs) → faust.types.tables.TableT[source]

Define new table.

Parameters
  • name (str) – Name used for table, note that two tables living in the same application cannot have the same name.

  • default (Optional[Callable[[], Any]]) – A callable, or type that will return a default value for keys missing in this table.

  • window (Optional[WindowT]) – A windowing strategy to wrap this window in.

Examples

>>> table = app.Table('user_to_amount', default=int)
>>> table['George']
0
>>> table['Elaine'] += 1
>>> table['Elaine'] += 1
>>> table['Elaine']
2
Return type

TableT[~KT, ~VT]

SetTable(name: str, *, window: faust.types.windows.WindowT = None, partitions: int = None, help: str = None, **kwargs) → faust.types.tables.TableT[source]
Return type

TableT[~KT, ~VT]

page(path: str, *, base: Type[faust.web.views.View] = <class 'faust.web.views.View'>, cors_options: Mapping[str, faust.types.web.ResourceOptions] = None, name: str = None) → Callable[Union[Type[faust.types.web.View], Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]], Type[faust.web.views.View]][source]
Return type

Callable[[Union[Type[View], Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]], Type[View]]

table_route(table: faust.types.tables.CollectionT, shard_param: str = None, *, query_param: str = None, match_info: str = None) → Callable[Union[Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]], Union[Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]]][source]
Return type

Callable[[Union[Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]], Union[Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]]

command(*options, base: Optional[Type[faust.app.base._AppCommand]] = None, **kwargs) → Callable[Callable, Type[faust.app.base._AppCommand]][source]
Return type

Callable[[Callable], Type[_AppCommand]]

trace(name: str, trace_enabled: bool = True, **extra_context) → ContextManager[source]
Return type

ContextManager[+T_co]

traced(fun: Callable, name: str = None, sample_rate: float = 1.0, **context) → Callable[source]
Return type

Callable

in_transaction[source]
on_rebalance_start() → None[source]
Return type

None

on_rebalance_return() → None[source]
Return type

None

on_rebalance_end() → None[source]
Return type

None

FlowControlQueue(maxsize: int = None, *, clear_on_resume: bool = False, loop: asyncio.events.AbstractEventLoop = None) → mode.utils.queues.ThrowableQueue[source]

Like asyncio.Queue, but can be suspended/resumed.

Return type

ThrowableQueue

Worker(**kwargs) → faust.app.base._Worker[source]
Return type

_Worker

on_webserver_init(web: faust.types.web.Web) → None[source]
Return type

None

coroutine commit(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool[source]

Commit offset for acked messages in specified topics’.

Warning

This will commit acked messages in all topics if the topics argument is passed in as None.

Return type

bool

conf
Return type

Settings

logger = <Logger faust.app.base (WARNING)>
coroutine maybe_start_client(self) → None[source]

Start the app in Client-Only mode if not started as Server.

Return type

None

maybe_start_producer[source]

Ensure producer is started.

coroutine on_first_start(self) → None[source]

Service started for the first time in this process.

Return type

None

coroutine on_init_extra_service(self, service: Union[mode.types.services.ServiceT, Type[mode.types.services.ServiceT]]) → mode.types.services.ServiceT[source]
Return type

ServiceT[]

coroutine on_start(self) → None[source]

Service is starting.

Return type

None

coroutine on_started(self) → None[source]

Service has started.

Return type

None

coroutine on_started_init_extra_services(self) → None[source]
Return type

None

coroutine on_started_init_extra_tasks(self) → None[source]
Return type

None

coroutine on_stop(self) → None[source]

Service is being stopped/restarted.

Return type

None

coroutine send(self, channel: Union[faust.types.channels.ChannelT, str], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send event to channel/topic.

Parameters
  • channel (Union[ChannelT[], str]) – Channel/topic or the name of a topic to send event to.

  • key (Union[bytes, _ModelT, Any, None]) – Message key.

  • value (Union[bytes, _ModelT, Any, None]) – Message value.

  • partition (Optional[int]) – Specific partition to send to. If not set the partition will be chosen by the partitioner.

  • timestamp (Optional[float]) – Epoch seconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.

  • headers (Union[List[Tuple[str, bytes]], Mapping[str, bytes], None]) – Mapping of key/value pairs, or iterable of key value pairs to use as headers for the message.

  • key_serializer (Union[CodecT, str, None]) – Serializer to use (if value is not model).

  • value_serializer (Union[CodecT, str, None]) – Serializer to use (if value is not model).

  • callback (Optional[Callable[[FutureMessage[]], Union[None, Awaitable[None]]]]) –

    Called after the message is fully delivered to the channel, but not to the consumer. Signature must be unary as the FutureMessage future is passed to it.

    The resulting faust.types.tuples.RecordMetadata object is then available as fut.result().

Return type

Awaitable[RecordMetadata]

coroutine start_client(self) → None[source]

Start the app in Client-Only mode necessary for RPC requests.

Notes

Once started as a client the app cannot be restarted as Server.

Return type

None

producer
Return type

ProducerT[]

consumer
Return type

ConsumerT[]

transport

Message transport. :rtype: TransportT

cache
Return type

CacheBackendT[]

tables[source]

Map of available tables, and the table manager service.

topics[source]

Topic Conductor.

This is the mediator that moves messages fetched by the Consumer into the streams.

It’s also a set of registered topics by string topic name, so you can check if a topic is being consumed from by doing topic in app.topics.

monitor

Monitor keeps stats about what’s going on inside the worker. :rtype: Monitor[]

flow_control[source]

Flow control of streams.

This object controls flow into stream queues, and can also clear all buffers.

http_client

HTTP Client Session. :rtype: ClientSession

assignor[source]

Partition Assignor.

Responsible for partition assignment.

router[source]

Find the node partitioned data belongs to.

The router helps us route web requests to the wanted Faust node. If a topic is sharded by account_id, the router can send us to the Faust worker responsible for any account. Used by the @app.table_route decorator.

web[source]
serializers[source]
label

Label used for graphs. :rtype: str

shortlabel

Label used for logging. :rtype: str

class faust.Channel(app: faust.types.app.AppT, *, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, is_iterator: bool = False, queue: mode.utils.queues.ThrowableQueue = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Create new channel.

Parameters
  • app (AppT[]) – The app that created this channel (app.channel())

  • key_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – The Model used for keys in this channel.

  • value_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – The Model used for values in this channel.

  • maxsize (Optional[int]) – The maximum number of messages this channel can hold. If exceeded any new put call will block until a message is removed from the channel.

  • loop (Optional[AbstractEventLoop]) – The asyncio event loop to use.

coroutine deliver(self, message: faust.types.tuples.Message) → None[source]
Return type

None

queue
Return type

ThrowableQueue

clone(*, is_iterator: bool = None, **kwargs) → faust.types.channels.ChannelT[source]
Return type

ChannelT[]

clone_using_queue(queue: asyncio.queues.Queue) → faust.types.channels.ChannelT[source]
Return type

ChannelT[]

stream(**kwargs) → faust.types.streams.StreamT[source]

Create stream reading from this channel.

Return type

StreamT[+T_co]

get_topic_name() → str[source]
Return type

str

as_future_message(key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None) → faust.types.tuples.FutureMessage[source]
Return type

FutureMessage[]

prepare_headers(headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None]) → Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]][source]
Return type

Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]

prepare_key(key: Union[bytes, faust.types.core._ModelT, Any, None], key_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]
Return type

Any

prepare_value(value: Union[bytes, faust.types.core._ModelT, Any], value_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]
Return type

Any

empty() → bool[source]
Return type

bool

on_stop_iteration() → None[source]
Return type

None

derive(**kwargs) → faust.types.channels.ChannelT[source]
Return type

ChannelT[]

coroutine declare(self) → None[source]
Return type

None

coroutine decode(self, message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.events.EventT[source]
Return type

EventT[]

coroutine get(self, *, timeout: Union[datetime.timedelta, float, str] = None) → Any[source]
Return type

Any

maybe_declare[source]
coroutine on_decode_error(self, exc: Exception, message: faust.types.tuples.Message) → None[source]
Return type

None

coroutine on_key_decode_error(self, exc: Exception, message: faust.types.tuples.Message) → None[source]
Return type

None

coroutine on_value_decode_error(self, exc: Exception, message: faust.types.tuples.Message) → None[source]
Return type

None

coroutine publish_message(self, fut: faust.types.tuples.FutureMessage, wait: bool = True) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type

Awaitable[RecordMetadata]

coroutine put(self, value: Any) → None[source]
Return type

None

coroutine send(self, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send message to channel.

Return type

Awaitable[RecordMetadata]

coroutine throw(self, exc: BaseException) → None[source]
Return type

None

subscriber_count
Return type

int

label
Return type

str

class faust.ChannelT(app: faust.types.channels._AppT, *, key_type: faust.types.channels._ModelArg = None, value_type: faust.types.channels._ModelArg = None, is_iterator: bool = False, queue: mode.utils.queues.ThrowableQueue = None, maxsize: int = None, root: Optional[faust.types.channels.ChannelT] = None, active_partitions: Set[faust.types.tuples.TP] = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]
clone(*, is_iterator: bool = None, **kwargs) → faust.types.channels.ChannelT[source]
Return type

ChannelT[]

clone_using_queue(queue: asyncio.queues.Queue) → faust.types.channels.ChannelT[source]
Return type

ChannelT[]

stream(**kwargs) → faust.types.channels._StreamT[source]
Return type

_StreamT

get_topic_name() → str[source]
Return type

str

as_future_message(key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None) → faust.types.tuples.FutureMessage[source]
Return type

FutureMessage[]

prepare_key(key: Union[bytes, faust.types.core._ModelT, Any, None], key_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]
Return type

Any

prepare_value(value: Union[bytes, faust.types.core._ModelT, Any], value_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]
Return type

Any

empty() → bool[source]
Return type

bool

on_stop_iteration() → None[source]
Return type

None

derive(**kwargs) → faust.types.channels.ChannelT[source]
Return type

ChannelT[]

subscriber_count
Return type

int

queue
Return type

ThrowableQueue

coroutine declare(self) → None[source]
Return type

None

coroutine decode(self, message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.channels._EventT[source]
Return type

_EventT

coroutine deliver(self, message: faust.types.tuples.Message) → None[source]
Return type

None

coroutine get(self, *, timeout: Union[datetime.timedelta, float, str] = None) → Any[source]
Return type

Any

maybe_declare[source]
coroutine on_decode_error(self, exc: Exception, message: faust.types.tuples.Message) → None[source]
Return type

None

coroutine on_key_decode_error(self, exc: Exception, message: faust.types.tuples.Message) → None[source]
Return type

None

coroutine on_value_decode_error(self, exc: Exception, message: faust.types.tuples.Message) → None[source]
Return type

None

coroutine publish_message(self, fut: faust.types.tuples.FutureMessage, wait: bool = True) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type

Awaitable[RecordMetadata]

coroutine put(self, value: Any) → None[source]
Return type

None

coroutine send(self, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type

Awaitable[RecordMetadata]

coroutine throw(self, exc: BaseException) → None[source]
Return type

None

class faust.Event(app: faust.types.app.AppT, key: Union[bytes, faust.types.core._ModelT, Any, None], value: Union[bytes, faust.types.core._ModelT, Any], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], message: faust.types.tuples.Message) → None[source]

An event received on a channel.

Notes

  • Events have a key and a value:

    event.key, event.value
    
  • They also have a reference to the original message (if available), such as a Kafka record:

    event.message.offset

  • Iterating over channels/topics yields Event:

    async for event in channel:

  • Iterating over a stream (that in turn iterate over channel) yields Event.value:

    async for value in channel.stream()  # value is event.value
        ...
    
  • If you only have a Stream object, you can also access underlying events by using Stream.events.

    For example:

    async for event in channel.stream.events():
        ...
    

    Also commonly used for finding the “current event” related to a value in the stream:

    stream = channel.stream()
    async for event in stream.events():
        event = stream.current_event
        message = event.message
        topic = event.message.topic
    

    You can retrieve the current event in a stream to:

    • Get access to the serialized key+value.

    • Get access to message properties like, what topic+partition the value was received on, or its offset.

    If you want access to both key and value, you should use stream.items() instead.

    async for key, value in stream.items():
        ...
    

    stream.current_event can also be accessed but you must take extreme care you are using the correct stream object. Methods such as .group_by(key) and .through(topic) returns cloned stream objects, so in the example:

    The best way to access the current_event in an agent is to use the ContextVar:

    from faust import current_event
    
    @app.agent(topic)
    async def process(stream):
        async for value in stream:
            event = current_event()
    
coroutine forward(self, channel: Union[str, faust.types.channels.ChannelT], key: Union[bytes, faust.types.core._ModelT, Any, None] = <object object>, value: Union[bytes, faust.types.core._ModelT, Any] = <object object>, partition: int = None, timestamp: float = None, headers: Any = <object object>, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Forward original message (will not be reserialized).

Return type

Awaitable[RecordMetadata]

coroutine send(self, channel: Union[str, faust.types.channels.ChannelT], key: Union[bytes, faust.types.core._ModelT, Any, None] = <object object>, value: Union[bytes, faust.types.core._ModelT, Any] = <object object>, partition: int = None, timestamp: float = None, headers: Any = <object object>, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send object to channel.

Return type

Awaitable[RecordMetadata]

ack() → bool[source]
Return type

bool

class faust.EventT(app: faust.types.events._AppT, key: Union[bytes, faust.types.core._ModelT, Any, None], value: Union[bytes, faust.types.core._ModelT, Any], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], message: faust.types.tuples.Message) → None[source]
app
key
value
headers
message
acked
ack() → bool[source]
Return type

bool

coroutine forward(self, channel: Union[str, faust.types.events._ChannelT], key: Any = None, value: Any = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type

Awaitable[RecordMetadata]

coroutine send(self, channel: Union[str, faust.types.events._ChannelT], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type

Awaitable[RecordMetadata]

class faust.ModelOptions(*args, **kwargs)[source]
serializer = None
include_metadata = True
allow_blessed_key = False
isodates = False
decimals = False
coercions = None
fields = None

Flattened view of __annotations__ in MRO order.

Type

Index

fieldset = None

Set of required field names, for fast argument checking.

Type

Index

fieldpos = None

Positional argument index to field name. Used by Record.__init__ to map positional arguments to fields.

Type

Index

optionalset = None

Set of optional field names, for fast argument checking.

Type

Index

models = None

Mapping of fields that are ModelT

Type

Index

modelattrs = None
field_coerce = None

Mapping of fields that need to be coerced. Key is the name of the field, value is the coercion handler function.

Type

Index

defaults = None

Mapping of field names to default value.

initfield = None

Mapping of init field conversion callbacks.

clone_defaults() → faust.types.models.ModelOptions[source]
Return type

ModelOptions

class faust.Record → None[source]

Describes a model type that is a record (Mapping).

Examples

>>> class LogEvent(Record, serializer='json'):
...     severity: str
...     message: str
...     timestamp: float
...     optional_field: str = 'default value'
>>> event = LogEvent(
...     severity='error',
...     message='Broken pact',
...     timestamp=666.0,
... )
>>> event.severity
'error'
>>> serialized = event.dumps()
'{"severity": "error", "message": "Broken pact", "timestamp": 666.0}'
>>> restored = LogEvent.loads(serialized)
<LogEvent: severity='error', message='Broken pact', timestamp=666.0>
>>> # You can also subclass a Record to create a new record
>>> # with additional fields
>>> class RemoteLogEvent(LogEvent):
...     url: str
>>> # You can also refer to record fields and pass them around:
>>> LogEvent.severity
>>> <FieldDescriptor: LogEvent.severity (str)>
classmethod from_data(data: Mapping, *, preferred_type: Type[faust.types.models.ModelT] = None) → faust.models.record.Record[source]
Return type

Record

to_representation() → Mapping[str, Any][source]

Convert object to JSON serializable object.

Return type

Mapping[str, Any]

asdict() → Dict[str, Any][source]
Return type

Dict[str, Any]

class faust.Monitor(*, max_avg_history: int = None, max_commit_latency_history: int = None, max_send_latency_history: int = None, max_assignment_latency_history: int = None, messages_sent: int = 0, tables: MutableMapping[str, faust.sensors.monitor.TableState] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: Counter[str] = None, events_total: int = 0, events_by_stream: Counter[faust.types.streams.StreamT] = None, events_by_task: Counter[_asyncio.Task] = None, events_runtime: Deque[float] = None, commit_latency: Deque[float] = None, send_latency: Deque[float] = None, assignment_latency: Deque[float] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: Counter[faust.types.topics.TopicT] = None, rebalances: int = None, rebalance_return_latency: Deque[float] = None, rebalance_end_latency: Deque[float] = None, rebalance_return_avg: float = 0.0, rebalance_end_avg: float = 0.0, time: Callable[float] = <built-in function monotonic>, **kwargs) → None[source]

Default Faust Sensor.

This is the default sensor, recording statistics about events, etc.

send_errors = 0

Number of produce operations that ended in error.

assignments_completed = 0

Number of partition assignments completed.

assignments_failed = 0

Number of partitions assignments that failed.

max_avg_history = 100

Max number of total run time values to keep to build average.

max_commit_latency_history = 30

Max number of commit latency numbers to keep.

max_send_latency_history = 30

Max number of send latency numbers to keep.

max_assignment_latency_history = 30

Max number of assignment latency numbers to keep.

rebalances = 0

Number of rebalances seen by this worker.

tables = None

Mapping of tables

commit_latency = None

Deque of commit latency values

send_latency = None

Deque of send latency values

assignment_latency = None

Deque of assignment latency values.

rebalance_return_latency = None

Deque of previous n rebalance return latencies.

rebalance_end_latency = None

Deque of previous n rebalance end latencies.

rebalance_return_avg = 0.0

Average rebalance return latency.

rebalance_end_avg = 0.0

Average rebalance end latency.

messages_active = 0

Number of messages currently being processed.

messages_received_total = 0

Number of messages processed in total.

messages_received_by_topic = None

Count of messages received by topic

messages_sent = 0

Number of messages sent in total.

messages_sent_by_topic = None

Number of messages sent by topic.

messages_s = 0

Number of messages being processed this second.

events_active = 0

Number of events currently being processed.

events_total = 0

Number of events processed in total.

events_by_task = None

Count of events processed by task

events_by_stream = None

Count of events processed by stream

events_s = 0

Number of events being processed this second.

events_runtime_avg = 0.0

Average event runtime over the last second.

events_runtime = None

Deque of run times used for averages

topic_buffer_full = None

Counter of times a topics buffer was full

metric_counts = None

Arbitrary counts added by apps

tp_committed_offsets = None

Last committed offsets by TopicPartition

tp_read_offsets = None

Last read offsets by TopicPartition

tp_end_offsets = None

Log end offsets by TopicPartition

secs_since(start_time: float) → float[source]

Given timestamp start, return number of seconds since that time.

Return type

float

ms_since(start_time: float) → float[source]

Given timestamp start, return number of ms since that time.

Return type

float

secs_to_ms(timestamp: float) → float[source]

Convert seconds to milliseconds.

Return type

float

logger = <Logger faust.sensors.monitor (WARNING)>
asdict() → Mapping[source]
Return type

Mapping[~KT, +VT_co]

on_message_in(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

Message received by a consumer.

Return type

None

on_stream_event_in(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → Optional[Dict][source]

Call when stream starts processing an event.

Return type

Optional[Dict[~KT, ~VT]]

on_stream_event_out(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT, state: Dict = None) → None[source]

Call when stream is done processing an event.

Return type

None

on_topic_buffer_full(topic: faust.types.topics.TopicT) → None[source]

Topic buffer full so conductor had to wait.

Return type

None

on_message_out(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

All streams finished processing message.

Return type

None

on_table_get(table: faust.types.tables.CollectionT, key: Any) → None[source]

Key retrieved from table.

Return type

None

on_table_set(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]

Value set for key in table.

Return type

None

on_table_del(table: faust.types.tables.CollectionT, key: Any) → None[source]

Key deleted from table.

Return type

None

on_commit_initiated(consumer: faust.types.transports.ConsumerT) → Any[source]

Consumer is about to commit topic offset.

Return type

Any

on_commit_completed(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]

Consumer finished committing topic offset.

Return type

None

on_send_initiated(producer: faust.types.transports.ProducerT, topic: str, message: faust.types.tuples.PendingMessage, keysize: int, valsize: int) → Any[source]

About to send a message.

Return type

Any

on_send_completed(producer: faust.types.transports.ProducerT, state: Any, metadata: faust.types.tuples.RecordMetadata) → None[source]

Message successfully sent.

Return type

None

on_send_error(producer: faust.types.transports.ProducerT, exc: BaseException, state: Any) → None[source]

Error while sending message.

Return type

None

count(metric_name: str, count: int = 1) → None[source]
Return type

None

on_tp_commit(tp_offsets: MutableMapping[faust.types.tuples.TP, int]) → None[source]
Return type

None

track_tp_end_offset(tp: faust.types.tuples.TP, offset: int) → None[source]
Return type

None

on_assignment_start(assignor: faust.types.assignor.PartitionAssignorT) → Dict[source]

Partition assignor is starting to assign partitions.

Return type

Dict[~KT, ~VT]

on_assignment_error(assignor: faust.types.assignor.PartitionAssignorT, state: Dict, exc: BaseException) → None[source]
Return type

None

on_assignment_completed(assignor: faust.types.assignor.PartitionAssignorT, state: Dict) → None[source]

Partition assignor completed assignment.

Return type

None

on_rebalance_start(app: faust.types.app.AppT) → Dict[source]

Cluster rebalance in progress.

Return type

Dict[~KT, ~VT]

on_rebalance_return(app: faust.types.app.AppT, state: Dict) → None[source]

Consumer replied assignment is done to broker.

Return type

None

on_rebalance_end(app: faust.types.app.AppT, state: Dict) → None[source]

Cluster rebalance fully completed (including recovery).

Return type

None

class faust.Sensor(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Base class for sensors.

This sensor does not do anything at all, but can be subclassed to create new monitors.

on_message_in(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

Message received by a consumer.

Return type

None

on_stream_event_in(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → Optional[Dict][source]

Message sent to a stream as an event.

Return type

Optional[Dict[~KT, ~VT]]

on_stream_event_out(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT, state: Dict = None) → None[source]

Event was acknowledged by stream.

Notes

Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through on_message_out() just before offsets are committed.

Return type

None

on_message_out(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

All streams finished processing message.

Return type

None

on_topic_buffer_full(topic: faust.types.topics.TopicT) → None[source]

Topic buffer full so conductor had to wait.

Return type

None

on_table_get(table: faust.types.tables.CollectionT, key: Any) → None[source]

Key retrieved from table.

Return type

None

on_table_set(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]

Value set for key in table.

Return type

None

on_table_del(table: faust.types.tables.CollectionT, key: Any) → None[source]

Key deleted from table.

Return type

None

on_commit_initiated(consumer: faust.types.transports.ConsumerT) → Any[source]

Consumer is about to commit topic offset.

Return type

Any

on_commit_completed(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]

Consumer finished committing topic offset.

Return type

None

on_send_initiated(producer: faust.types.transports.ProducerT, topic: str, message: faust.types.tuples.PendingMessage, keysize: int, valsize: int) → Any[source]

About to send a message.

Return type

Any

on_send_completed(producer: faust.types.transports.ProducerT, state: Any, metadata: faust.types.tuples.RecordMetadata) → None[source]

Message successfully sent.

Return type

None

on_send_error(producer: faust.types.transports.ProducerT, exc: BaseException, state: Any) → None[source]

Error while sending message.

Return type

None

on_assignment_start(assignor: faust.types.assignor.PartitionAssignorT) → Dict[source]

Partition assignor is starting to assign partitions.

Return type

Dict[~KT, ~VT]

on_assignment_error(assignor: faust.types.assignor.PartitionAssignorT, state: Dict, exc: BaseException) → None[source]
Return type

None

on_assignment_completed(assignor: faust.types.assignor.PartitionAssignorT, state: Dict) → None[source]

Partition assignor completed assignment.

Return type

None

on_rebalance_start(app: faust.types.app.AppT) → Dict[source]

Cluster rebalance in progress.

Return type

Dict[~KT, ~VT]

on_rebalance_return(app: faust.types.app.AppT, state: Dict) → None[source]

Consumer replied assignment is done to broker.

Return type

None

on_rebalance_end(app: faust.types.app.AppT, state: Dict) → None[source]

Cluster rebalance fully completed (including recovery).

Return type

None

asdict() → Mapping[source]
Return type

Mapping[~KT, +VT_co]

logger = <Logger faust.sensors.base (WARNING)>
class faust.Codec(children: Tuple[faust.types.codecs.CodecT, ...] = None, **kwargs) → None[source]

Base class for codecs.

children = None

next steps in the recursive codec chain. x = pickle | binary returns codec with children set to (pickle, binary).

nodes = None

cached version of children including this codec as the first node. could use chain below, but seems premature so just copying the list.

kwargs = None

subclasses can support keyword arguments, the base implementation of clone() uses this to preserve keyword arguments in copies.

dumps(obj: Any) → bytes[source]

Encode object obj.

Return type

bytes

loads(s: bytes) → Any[source]

Decode object from string.

Return type

Any

clone(*children) → faust.types.codecs.CodecT[source]

Create a clone of this codec, with optional children added.

Return type

CodecT

class faust.Stream(channel: AsyncIterator[T_co], *, app: faust.types.app.AppT, processors: Iterable[Callable[T]] = None, combined: List[faust.types.streams.JoinableT] = None, on_start: Callable = None, join_strategy: faust.types.joins.JoinT = None, beacon: mode.utils.types.trees.NodeT = None, concurrency_index: int = None, prev: faust.types.streams.StreamT = None, active_partitions: Set[faust.types.tuples.TP] = None, enable_acks: bool = True, prefix: str = '', loop: asyncio.events.AbstractEventLoop = None) → None[source]

A stream: async iterator processing events in channels/topics.

logger = <Logger faust.streams (WARNING)>
mundane_level = 'debug'
get_active_stream() → faust.types.streams.StreamT[source]

Return the currently active stream.

A stream can be derived using Stream.group_by etc, so if this stream was used to create another derived stream, this function will return the stream being actively consumed from. E.g. in the example:

>>> @app.agent()
... async def agent(a):
..      a = a
...     b = a.group_by(Withdrawal.account_id)
...     c = b.through('backup_topic')
...     async for value in c:
...         ...

The return value of a.get_active_stream() would be c.

Notes

The chain of streams that leads to the active stream is decided by the _next attribute. To get to the active stream we just traverse this linked-list:

>>> def get_active_stream(self):
...     node = self
...     while node._next:
...         node = node._next
Return type

StreamT[+T_co]

get_root_stream() → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

add_processor(processor: Callable[T]) → None[source]

Add processor callback executed whenever a new event is received.

Processor functions can be async or non-async, must accept a single argument, and should return the value, mutated or not.

For example a processor handling a stream of numbers may modify the value:

def double(value: int) -> int:
    return value * 2

stream.add_processor(double)
Return type

None

info() → Mapping[str, Any][source]

Return stream settings as a dictionary.

Return type

Mapping[str, Any]

clone(**kwargs) → faust.types.streams.StreamT[source]

Create a clone of this stream.

Notes

If the cloned stream is supposed to supersede this stream, like in group_by/through/etc., you should use _chain() instead so stream._next = cloned_stream is set and get_active_stream() returns the cloned stream.

Return type

StreamT[+T_co]

noack() → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

events() → AsyncIterable[faust.types.events.EventT][source]

Iterate over the stream as events exclusively.

This means the stream must be iterating over a channel, or at least an iterable of event objects.

Return type

AsyncIterable[EventT[]]

enumerate(start: int = 0) → AsyncIterable[Tuple[int, T_co]][source]

Enumerate values received on this stream.

Unlike Python’s built-in enumerate, this works with async generators.

Return type

AsyncIterable[Tuple[int, +T_co]]

through(channel: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]

Forward values to in this stream to channel.

Send messages received on this stream to another channel, and return a new stream that consumes from that channel.

Notes

The messages are forwarded after any processors have been applied.

Example

topic = app.topic('foo')

@app.agent(topic)
async def mytask(stream):
    async for value in stream.through(app.topic('bar')):
        # value was first received in topic 'foo',
        # then forwarded and consumed from topic 'bar'
        print(value)
Return type

StreamT[+T_co]

echo(*channels) → faust.types.streams.StreamT[source]

Forward values to one or more channels.

Unlike through(), we don’t consume from these channels.

Return type

StreamT[+T_co]

group_by(key: Union[faust.types.models.FieldDescriptorT, Callable[T, Union[bytes, faust.types.core._ModelT, Any, None]]], *, name: str = None, topic: faust.types.topics.TopicT = None, partitions: int = None) → faust.types.streams.StreamT[source]

Create new stream that repartitions the stream using a new key.

Parameters
  • key (Union[FieldDescriptorT, Callable[[~T], Union[bytes, _ModelT, Any, None]]]) –

    The key argument decides how the new key is generated, it can be a field descriptor, a callable, or an async callable.

    Note: The name argument must be provided if the key

    argument is a callable.

  • name (Optional[str]) – Suffix to use for repartitioned topics. This argument is required if key is a callable.

Examples

Using a field descriptor to use a field in the event as the new key:

s = withdrawals_topic.stream()
# values in this stream are of type Withdrawal
async for event in s.group_by(Withdrawal.account_id):
    ...

Using an async callable to extract a new key:

s = withdrawals_topic.stream()

async def get_key(withdrawal):
    return await aiohttp.get(
        f'http://e.com/resolve_account/{withdrawal.account_id}')

async for event in s.group_by(get_key):
    ...

Using a regular callable to extract a new key:

s = withdrawals_topic.stream()

def get_key(withdrawal):
    return withdrawal.account_id.upper()

async for event in s.group_by(get_key):
    ...
Return type

StreamT[+T_co]

derive_topic(name: str, *, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, prefix: str = '', suffix: str = '') → faust.types.topics.TopicT[source]

Create Topic description derived from the K/V type of this stream.

Parameters
  • name (str) – Topic name.

  • key_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – Specific key type to use for this topic. If not set, the key type of this stream will be used.

  • value_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – Specific value type to use for this topic. If not set, the value type of this stream will be used.

Raises

ValueError – if the stream channel is not a topic.

Return type

TopicT[]

combine(*nodes, **kwargs) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

contribute_to_stream(active: faust.types.streams.StreamT) → None[source]
Return type

None

join(*fields) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

left_join(*fields) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

inner_join(*fields) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

outer_join(*fields) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

coroutine on_merge(self, value: T = None) → Optional[T][source]
Return type

Optional[~T]

coroutine ack(self, event: faust.types.events.EventT) → bool[source]

Ack event.

This will decrease the reference count of the event message by one, and when the reference count reaches zero, the worker will commit the offset so that the message will not be seen by a worker again.

Parameters

event (EventT[]) – Event to ack.

Return type

bool

items() → AsyncIterator[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], T_co]][source]

Iterate over the stream as key, value pairs.

Examples

@app.agent(topic)
async def mytask(stream):
    async for key, value in stream.items():
        print(key, value)
Return type

AsyncIterator[Tuple[Union[bytes, _ModelT, Any, None], +T_co]]

coroutine on_start(self) → None[source]

Service is starting.

Return type

None

coroutine on_stop(self) → None[source]

Service is being stopped/restarted.

Return type

None

coroutine remove_from_stream(self, stream: faust.types.streams.StreamT) → None[source]
Return type

None

coroutine send(self, value: T_contra) → None[source]

Send value into stream locally (bypasses topic).

Return type

None

coroutine stop(self) → None[source]

Stop the service.

Return type

None

take(max_: int, within: Union[datetime.timedelta, float, str]) → AsyncIterable[Sequence[T_co]][source]

Buffer n values at a time and yield a list of buffered values.

Parameters

within (Union[timedelta, float, str]) – Timeout for when we give up waiting for another value, and process the values we have. Warning: If there’s no timeout (i.e. timeout=None), the agent is likely to stall and block buffered events for an unreasonable length of time(!).

Return type

AsyncIterable[Sequence[+T_co]]

coroutine throw(self, exc: BaseException) → None[source]
Return type

None

label

Label used for graphs. :rtype: str

shortlabel[source]
class faust.StreamT(channel: AsyncIterator[T_co] = None, *, app: faust.types.streams._AppT = None, processors: Iterable[Callable[T]] = None, combined: List[faust.types.streams.JoinableT] = None, on_start: Callable = None, join_strategy: faust.types.streams._JoinT = None, beacon: mode.utils.types.trees.NodeT = None, concurrency_index: int = None, prev: Optional[faust.types.streams.StreamT] = None, active_partitions: Set[faust.types.tuples.TP] = None, enable_acks: bool = True, prefix: str = '', loop: asyncio.events.AbstractEventLoop = None) → None[source]
outbox = None
join_strategy = None
task_owner = None
current_event = None
active_partitions = None
concurrency_index = None
enable_acks = True
prefix = ''
get_active_stream() → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

add_processor(processor: Callable[T]) → None[source]
Return type

None

info() → Mapping[str, Any][source]
Return type

Mapping[str, Any]

clone(**kwargs) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

enumerate(start: int = 0) → AsyncIterable[Tuple[int, T_co]][source]
Return type

AsyncIterable[Tuple[int, +T_co]]

through(channel: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

echo(*channels) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

group_by(key: Union[faust.types.models.FieldDescriptorT, Callable[T, Union[bytes, faust.types.core._ModelT, Any, None]]], *, name: str = None, topic: faust.types.topics.TopicT = None) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

derive_topic(name: str, *, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, prefix: str = '', suffix: str = '') → faust.types.topics.TopicT[source]
Return type

TopicT[]

coroutine ack(self, event: faust.types.events.EventT) → bool[source]
Return type

bool

coroutine events(self) → AsyncIterable[faust.types.events.EventT][source]
coroutine items(self) → AsyncIterator[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], T_co]][source]
coroutine send(self, value: T_contra) → None[source]
Return type

None

coroutine take(self, max_: int, within: Union[datetime.timedelta, float, str]) → AsyncIterable[Sequence[T_co]][source]
coroutine throw(self, exc: BaseException) → None[source]
Return type

None

faust.current_event() → Optional[faust.types.events.EventT][source]

Return the event currently being processed, or None.

Return type

Optional[EventT[]]

class faust.SetTable(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]

Table that maintains a dictionary of sets.

WindowWrapper

alias of SetWindowWrapper

logger = <Logger faust.tables.sets (WARNING)>
class faust.Table(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]

Table (non-windowed).

class WindowWrapper(table: faust.types.tables.TableT, *, relative_to: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None] = None, key_index: bool = False, key_index_table: faust.types.tables.TableT = None) → None

Windowed table wrapper.

A windowed table does not return concrete values when keys are accessed, instead WindowSet is returned so that the values can be further reduced to the wanted time period.

ValueType

alias of WindowSet

as_ansitable(title: str = '{table.name}', **kwargs) → str
Return type

str

clone(relative_to: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None]) → faust.types.tables.WindowWrapperT
Return type

WindowWrapperT[]

get_relative_timestamp
Return type

Optional[Callable[[Optional[EventT[]]], Union[float, datetime]]]

get_timestamp(event: faust.types.events.EventT = None) → float
Return type

float

items(event: faust.types.events.EventT = None) → ItemsView
Return type

ItemsView[~KT, +VT_co]

key_index = False
key_index_table = None
keys() → KeysView
Return type

KeysView[~KT]

name
Return type

str

on_del_key(key: Any) → None
Return type

None

on_recover(fun: Callable[Awaitable[None]]) → Callable[Awaitable[None]]
Return type

Callable[[], Awaitable[None]]

on_set_key(key: Any, value: Any) → None
Return type

None

relative_to(ts: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None]) → faust.types.tables.WindowWrapperT
Return type

WindowWrapperT[]

relative_to_field(field: faust.types.models.FieldDescriptorT) → faust.types.tables.WindowWrapperT
Return type

WindowWrapperT[]

relative_to_now() → faust.types.tables.WindowWrapperT
Return type

WindowWrapperT[]

relative_to_stream() → faust.types.tables.WindowWrapperT
Return type

WindowWrapperT[]

values(event: faust.types.events.EventT = None) → ValuesView
Return type

ValuesView[+VT_co]

using_window(window: faust.types.windows.WindowT, *, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type

WindowWrapperT[]

hopping(size: Union[datetime.timedelta, float, str], step: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type

WindowWrapperT[]

tumbling(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type

WindowWrapperT[]

on_key_get(key: KT) → None[source]

Handle that key is being retrieved.

Return type

None

on_key_set(key: KT, value: VT) → None[source]

Handle that value for a key is being set.

Return type

None

on_key_del(key: KT) → None[source]

Handle that a key is deleted.

Return type

None

as_ansitable(title: str = '{table.name}', **kwargs) → str[source]
Return type

str

logger = <Logger faust.tables.table (WARNING)>
class faust.Topic(app: faust.types.app.AppT, *, topics: Sequence[str] = None, pattern: Union[str, Pattern[~AnyStr]] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, is_iterator: bool = False, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, queue: mode.utils.queues.ThrowableQueue = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, allow_empty: bool = False, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Define new topic description.

Parameters
  • app (AppT[]) – App instance used to create this topic description.

  • topics (Optional[Sequence[str]]) – List of topic names.

  • partitions (Optional[int]) – Number of partitions for these topics. On declaration, topics are created using this. Note: If a message is produced before the topic is declared, and autoCreateTopics is enabled on the Kafka Server, the number of partitions used will be specified by the server configuration.

  • retention (Union[timedelta, float, str, None]) – Number of seconds (as float/timedelta) to keep messages in the topic before they can be expired by the server.

  • pattern (Union[str, Pattern[AnyStr], None]) – Regular expression evaluated to decide what topics to subscribe to. You cannot specify both topics and a pattern.

  • key_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – How to deserialize keys for messages in this topic. Can be a faust.Model type, str, bytes, or None for “autodetect”

  • value_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – How to deserialize values for messages in this topic. Can be a faust.Model type, str, bytes, or None for “autodetect”

  • active_partitions (Optional[Set[TP]]) – Set of faust.types.tuples.TP that this topic should be restricted to.

Raises

TypeError – if both topics and pattern is provided.

pattern
Return type

Optional[Pattern[AnyStr]]

derive(**kwargs) → faust.types.channels.ChannelT[source]

Create new Topic derived from this topic.

Configuration will be copied from this topic, but any parameter overridden as a keyword argument.

See also

derive_topic(): for a list of supported keyword arguments.

Return type

ChannelT[]

derive_topic(*, topics: Sequence[str] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, internal: bool = None, config: Mapping[str, Any] = None, prefix: str = '', suffix: str = '', **kwargs) → faust.types.topics.TopicT[source]
Return type

TopicT[]

get_topic_name() → str[source]
Return type

str

coroutine declare(self) → None[source]
Return type

None

coroutine decode(self, message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.events.EventT[source]
Return type

EventT[]

maybe_declare[source]
coroutine publish_message(self, fut: faust.types.tuples.FutureMessage, wait: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type

Awaitable[RecordMetadata]

coroutine put(self, event: faust.types.events.EventT) → None[source]
Return type

None

coroutine send(self, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send message to topic.

Return type

Awaitable[RecordMetadata]

prepare_key(key: Union[bytes, faust.types.core._ModelT, Any, None], key_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]
Return type

Any

prepare_value(value: Union[bytes, faust.types.core._ModelT, Any], value_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]
Return type

Any

on_stop_iteration() → None[source]
Return type

None

partitions
Return type

Optional[int]

class faust.TopicT(app: faust.types.topics._AppT, *, topics: Sequence[str] = None, pattern: Union[str, Pattern[~AnyStr]] = None, key_type: faust.types.topics._ModelArg = None, value_type: faust.types.topics._ModelArg = None, is_iterator: bool = False, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, queue: mode.utils.queues.ThrowableQueue = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, allow_empty: bool = False, loop: asyncio.events.AbstractEventLoop = None) → None[source]
topics = None

Iterable/Sequence of topic names to subscribe to.

retention = None

expiry time in seconds for messages in the topic.

Type

Topic retention setting

compacting = None

Flag that when enabled means the topic can be “compacted”: if the topic is a log of key/value pairs, the broker can delete old values for the same key.

replicas = None

Number of replicas for topic.

config = None

Additional configuration as a mapping.

acks = None

Enable acks for this topic.

internal = None

it’s owned by us and we are allowed to create or delete the topic as necessary.

Type

Mark topic as internal

pattern

or instead of topics, a regular expression used to match topics we want to subscribe to. :rtype: Optional[Pattern[AnyStr]]

partitions
Return type

Optional[int]

derive(**kwargs) → faust.types.channels.ChannelT[source]
Return type

ChannelT[]

derive_topic(*, topics: Sequence[str] = None, key_type: faust.types.topics._ModelArg = None, value_type: faust.types.topics._ModelArg = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, internal: bool = False, config: Mapping[str, Any] = None, prefix: str = '', suffix: str = '', **kwargs) → faust.types.topics.TopicT[source]
Return type

TopicT[]

class faust.GSSAPICredentials(*, kerberos_service_name: str = 'kafka', kerberos_domain_name: str = None, ssl_context: ssl.SSLContext = None, mechanism: Union[str, faust.types.auth.SASLMechanism] = None) → None[source]

Describe GSSAPI credentials over SASL.

protocol = 'SASL_PLAINTEXT'
mechanism = 'GSSAPI'
class faust.SASLCredentials(*, username: str = None, password: str = None, ssl_context: ssl.SSLContext = None, mechanism: Union[str, faust.types.auth.SASLMechanism] = None) → None[source]

Describe SASL credentials.

protocol = 'SASL_PLAINTEXT'
mechanism = 'PLAIN'
class faust.SSLCredentials(context: ssl.SSLContext = None, *, purpose: Any = None, cafile: Optional[str] = None, capath: Optional[str] = None, cadata: Optional[str] = None) → None[source]

Describe SSL credentials/settings.

protocol = 'SSL'
class faust.Settings(id: str, *, version: int = None, broker: Union[str, yarl.URL, List[yarl.URL]] = None, broker_client_id: str = None, broker_request_timeout: Union[datetime.timedelta, float, str] = None, broker_credentials: Union[faust.types.auth.CredentialsT, ssl.SSLContext] = None, broker_commit_every: int = None, broker_commit_interval: Union[datetime.timedelta, float, str] = None, broker_commit_livelock_soft_timeout: Union[datetime.timedelta, float, str] = None, broker_session_timeout: Union[datetime.timedelta, float, str] = None, broker_heartbeat_interval: Union[datetime.timedelta, float, str] = None, broker_check_crcs: bool = None, broker_max_poll_records: int = None, agent_supervisor: Union[_T, str] = None, store: Union[str, yarl.URL] = None, cache: Union[str, yarl.URL] = None, web: Union[str, yarl.URL] = None, web_enabled: bool = True, processing_guarantee: Union[str, faust.types.enums.ProcessingGuarantee] = None, timezone: datetime.tzinfo = None, autodiscover: Union[bool, Iterable[str], Callable[Iterable[str]]] = None, origin: str = None, canonical_url: Union[str, yarl.URL] = None, datadir: Union[pathlib.Path, str] = None, tabledir: Union[pathlib.Path, str] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, logging_config: Dict = None, loghandlers: List[logging.Handler] = None, table_cleanup_interval: Union[datetime.timedelta, float, str] = None, table_standby_replicas: int = None, topic_replication_factor: int = None, topic_partitions: int = None, topic_allow_declare: bool = None, id_format: str = None, reply_to: str = None, reply_to_prefix: str = None, reply_create_topic: bool = None, reply_expires: Union[datetime.timedelta, float, str] = None, ssl_context: ssl.SSLContext = None, stream_buffer_maxsize: int = None, stream_wait_empty: bool = None, stream_ack_cancelled_tasks: bool = None, stream_ack_exceptions: bool = None, stream_publish_on_commit: bool = None, stream_recovery_delay: Union[datetime.timedelta, float, str] = None, producer_linger_ms: int = None, producer_max_batch_size: int = None, producer_acks: int = None, producer_max_request_size: int = None, producer_compression_type: str = None, producer_partitioner: Union[_T, str] = None, producer_request_timeout: Union[datetime.timedelta, float, str] = None, producer_api_version: str = None, consumer_max_fetch_size: int = None, consumer_auto_offset_reset: str = None, web_bind: str = None, web_port: int = None, web_host: str = None, web_transport: Union[str, yarl.URL] = None, web_in_thread: bool = None, web_cors_options: Mapping[str, faust.types.web.ResourceOptions] = None, worker_redirect_stdouts: bool = None, worker_redirect_stdouts_level: Union[int, str] = None, Agent: Union[_T, str] = None, ConsumerScheduler: Union[_T, str] = None, Stream: Union[_T, str] = None, Table: Union[_T, str] = None, SetTable: Union[_T, str] = None, TableManager: Union[_T, str] = None, Serializers: Union[_T, str] = None, Worker: Union[_T, str] = None, PartitionAssignor: Union[_T, str] = None, LeaderAssignor: Union[_T, str] = None, Router: Union[_T, str] = None, Topic: Union[_T, str] = None, HttpClient: Union[_T, str] = None, Monitor: Union[_T, str] = None, url: Union[str, yarl.URL] = None, **kwargs) → None[source]
classmethod setting_names() → Set[str][source]
Return type

Set[str]

id_format = '{id}-v{self.version}'
ssl_context = None
autodiscover = False
broker_client_id = 'faust-1.6.0'
timezone = datetime.timezone.utc
broker_commit_every = 10000
broker_check_crcs = True
key_serializer = 'raw'
value_serializer = 'json'
table_standby_replicas = 1
topic_replication_factor = 1
topic_partitions = 8
topic_allow_declare = True
reply_create_topic = False
logging_config = None
stream_buffer_maxsize = 4096
stream_wait_empty = True
stream_ack_cancelled_tasks = True
stream_ack_exceptions = True
stream_publish_on_commit = False
producer_linger_ms = 0
producer_max_batch_size = 16384
producer_acks = -1
producer_max_request_size = 1000000
producer_compression_type = None
producer_api_version = 'auto'
consumer_max_fetch_size = 4194304
consumer_auto_offset_reset = 'earliest'
web_bind = '0.0.0.0'
web_port = 6066
web_host = 'build-8986882-project-230058-faust'
web_in_thread = False
web_cors_options = None
worker_redirect_stdouts = True
worker_redirect_stdouts_level = 'WARN'
reply_to_prefix = 'f-reply-'
name
Return type

str

id
Return type

str

origin
Return type

Optional[str]

version
Return type

int

broker
Return type

List[URL]

store
Return type

URL

web
Return type

URL

cache
Return type

URL

canonical_url
Return type

URL

datadir
Return type

Path

appdir
Return type

Path

find_old_versiondirs() → Iterable[pathlib.Path][source]
Return type

Iterable[Path]

tabledir
Return type

Path

processing_guarantee
Return type

ProcessingGuarantee

broker_credentials
Return type

Optional[CredentialsT]

broker_request_timeout
Return type

float

broker_session_timeout
Return type

float

broker_heartbeat_interval
Return type

float

broker_commit_interval
Return type

float

broker_commit_livelock_soft_timeout
Return type

float

broker_max_poll_records
Return type

Optional[int]

producer_partitioner
Return type

Optional[Callable[[Optional[bytes], Sequence[int], Sequence[int]], int]]

producer_request_timeout
Return type

float

table_cleanup_interval
Return type

float

reply_expires
Return type

float

stream_recovery_delay
Return type

float

agent_supervisor
Return type

Type[SupervisorStrategyT]

web_transport
Return type

URL

Agent
Return type

Type[AgentT[]]

ConsumerScheduler
Return type

Type[SchedulingStrategyT]

Stream
Return type

Type[StreamT[+T_co]]

Table
Return type

Type[TableT[~KT, ~VT]]

SetTable
Return type

Type[TableT[~KT, ~VT]]

TableManager
Return type

Type[TableManagerT[]]

Serializers
Return type

Type[RegistryT]

Worker
Return type

Type[_WorkerT]

PartitionAssignor
Return type

Type[PartitionAssignorT]

LeaderAssignor
Return type

Type[LeaderAssignorT[]]

Router
Return type

Type[RouterT]

Topic
Return type

Type[TopicT[]]

HttpClient
Return type

Type[ClientSession]

Monitor
Return type

Type[SensorT[]]

faust.HoppingWindow

alias of faust.windows._PyHoppingWindow

class faust.TumblingWindow(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None) → None[source]

Tumbling window type.

Fixed-size, non-overlapping, gap-less windows.

faust.SlidingWindow

alias of faust.windows._PySlidingWindow

class faust.Window(*args, **kwargs)[source]

Base class for window types.

class faust.Worker(app: faust.types.app.AppT, *services, sensors: Iterable[faust.types.sensors.SensorT] = None, debug: bool = False, quiet: bool = False, loglevel: Union[str, int] = None, logfile: Union[str, IO] = None, stdout: IO = <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, stderr: IO = <_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>, blocking_timeout: float = 10.0, workdir: Union[pathlib.Path, str] = None, console_port: int = 50101, loop: asyncio.events.AbstractEventLoop = None, redirect_stdouts: bool = None, redirect_stdouts_level: int = None, logging_config: Dict = None, **kwargs) → None[source]

Worker.

Usage:

You can start a worker using:

  1. the faust worker program.

  2. instantiating Worker programmatically and calling execute_from_commandline():

    >>> worker = Worker(app)
    >>> worker.execute_from_commandline()
    
  3. or if you already have an event loop, calling await start, but in that case you are responsible for gracefully shutting down the event loop:

    async def start_worker(worker: Worker) -> None:
        await worker.start()
    
    def manage_loop():
        loop = asyncio.get_event_loop()
        worker = Worker(app, loop=loop)
        try:
            loop.run_until_complete(start_worker(worker)
        finally:
            worker.stop_and_shutdown_loop()
    
Parameters
  • app (AppT[]) – The Faust app to start.

  • *services – Services to start with worker. This includes application instances to start.

  • sensors (Iterable[SensorT]) – List of sensors to include.

  • debug (bool) – Enables debugging mode [disabled by default].

  • quiet (bool) – Do not output anything to console [disabled by default].

  • loglevel (Union[str, int]) – Level to use for logging, can be string (one of: CRIT|ERROR|WARN|INFO|DEBUG), or integer.

  • logfile (Union[str, IO]) – Name of file or a stream to log to.

  • stdout (IO) – Standard out stream.

  • stderr (IO) – Standard err stream.

  • blocking_timeout (float) – When debug is enabled this sets the timeout for detecting that the event loop is blocked.

  • workdir (Union[str, Path]) – Custom working directory for the process that the worker will change into when started. This working directory change is permanent for the process, or until something else changes the working directory again.

  • loop (asyncio.AbstractEventLoop) – Custom event loop object.

logger = <Logger faust.worker (WARNING)>
app = None

The Faust app started by this worker.

sensors = None

Additional sensors to add to the Faust app.

workdir = None

Current working directory. Note that if passed as an argument to Worker, the worker will change to this directory when started.

spinner = None

Class that displays a terminal progress spinner (see progress).

on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Return list of service dependencies for this service.

Return type

Iterable[ServiceT[]]

change_workdir(path: pathlib.Path) → None[source]
Return type

None

autodiscover() → None[source]
Return type

None

coroutine on_execute(self) → None[source]
Return type

None

coroutine on_first_start(self) → None[source]

Service started for the first time in this process.

Return type

None

coroutine on_start(self) → None[source]

Service is starting.

Return type

None

coroutine on_startup_finished(self) → None[source]
Return type

None

on_worker_shutdown() → None[source]
Return type

None

on_setup_root_logger(logger: logging.Logger, level: int) → None[source]
Return type

None

faust.uuid() → str[source]

Generate random UUID string.

Shortcut to str(uuid4()).

Return type

str

class faust.Service(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]

An asyncio service that can be started/stopped/restarted.

Keyword Arguments
abstract = False
class Diag(service: mode.types.services.ServiceT) → None

Service diagnostics.

This can be used to track what your service is doing. For example if your service is a Kafka consumer with a background thread that commits the offset every 30 seconds, you may want to see when this happens:

DIAG_COMMITTING = 'committing'

class Consumer(Service):

    @Service.task
    async def _background_commit(self) -> None:
        while not self.should_stop:
            await self.sleep(30.0)
            self.diag.set_flag(DIAG_COMITTING)
            try:
                await self._consumer.commit()
            finally:
                self.diag.unset_flag(DIAG_COMMITTING)

The above code is setting the flag manually, but you can also use a decorator to accomplish the same thing:

@Service.timer(30.0)
async def _background_commit(self) -> None:
    await self.commit()

@Service.transitions_with(DIAG_COMITTING)
async def commit(self) -> None:
    await self._consumer.commit()
set_flag(flag: str) → None
Return type

None

unset_flag(flag: str) → None
Return type

None

wait_for_shutdown = False

Set to True if .stop must wait for the shutdown flag to be set.

shutdown_timeout = 60.0

Time to wait for shutdown flag set before we give up.

restart_count = 0

Current number of times this service instance has been restarted.

mundane_level = 'info'

The log level for mundane info such as starting, stopping, etc. Set this to "debug" for less information.

classmethod from_awaitable(coro: Awaitable, *, name: str = None, **kwargs) → mode.types.services.ServiceT[source]
Return type

ServiceT[]

classmethod task(fun: Callable[Any, Awaitable[None]]) → mode.services.ServiceTask[source]

Decorate function to be used as background task.

Example

>>> class S(Service):
...
...     @Service.task
...     async def background_task(self):
...         while not self.should_stop:
...             await self.sleep(1.0)
...             print('Waking up')
Return type

ServiceTask

classmethod timer(interval: Union[datetime.timedelta, float, str]) → Callable[Callable[mode.types.services.ServiceT, Awaitable[None]], mode.services.ServiceTask][source]

Background timer executing every n seconds.

Example

>>> class S(Service):
...
...     @Service.timer(1.0)
...     async def background_timer(self):
...         print('Waking up')
Return type

Callable[[Callable[[ServiceT[]], Awaitable[None]]], ServiceTask]

classmethod transitions_to(flag: str) → Callable[source]

Decorate function to set and reset diagnostic flag.

Return type

Callable

add_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]

Add dependency to other service.

The service will be started/stopped with this service.

Return type

ServiceT[]

add_context(context: ContextManager) → Any[source]
Return type

Any

add_future(coro: Awaitable) → _asyncio.Future[source]

Add relationship to asyncio.Future.

The future will be joined when this service is stopped.

Return type

Future

on_init() → None[source]
Return type

None

on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Return list of service dependencies for this service.

Return type

Iterable[ServiceT[]]

coroutine add_async_context(self, context: AsyncContextManager) → Any[source]
Return type

Any

coroutine add_runtime_dependency(self, service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]
Return type

ServiceT[]

coroutine crash(self, reason: BaseException) → None[source]

Crash the service and all child services.

Return type

None

itertimer(interval: Union[datetime.timedelta, float, str], *, max_drift_correction: float = 0.1, loop: asyncio.events.AbstractEventLoop = None, sleep: Callable[..., Awaitable] = None, clock: Callable[float] = <built-in function perf_counter>, name: str = '') → AsyncIterator[float][source]

Sleep interval seconds for every iteration.

This is an async iterator that takes advantage of timer_intervals() to act as a timer that stop drift from occurring, and adds a tiny amount of drift to timers so that they don’t start at the same time.

Uses Service.sleep which will bail-out-quick if the service is stopped.

Note

Will sleep the full interval seconds before returning from first iteration.

Examples

>>> async for sleep_time in self.itertimer(1.0):
...   print('another second passed, just woke up...')
...   await perform_some_http_request()
Return type

AsyncIterator[float]

coroutine join_services(self, services: Sequence[mode.types.services.ServiceT]) → None[source]
Return type

None

logger = <Logger mode.services (WARNING)>
coroutine maybe_start(self) → None[source]

Start the service, if it has not already been started.

Return type

None

coroutine restart(self) → None[source]

Restart this service.

Return type

None

service_reset() → None[source]
Return type

None

coroutine sleep(self, n: Union[datetime.timedelta, float, str], *, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Sleep for n seconds, or until service stopped.

Return type

None

coroutine start(self) → None[source]
Return type

None

coroutine stop(self) → None[source]

Stop the service.

Return type

None

coroutine transition_with(self, flag: str, fut: Awaitable, *args, **kwargs) → Any[source]
Return type

Any

coroutine wait(self, *coros, timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult[source]

Wait for coroutines to complete, or until the service stops.

Return type

WaitResult

coroutine wait_first(self, *coros, timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResults[source]
Return type

WaitResults

coroutine wait_for_stopped(self, *coros, timeout: Union[datetime.timedelta, float, str] = None) → bool[source]
Return type

bool

coroutine wait_many(self, coros: Iterable[Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event]], *, timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult[source]
Return type

WaitResult

coroutine wait_until_stopped(self) → None[source]

Wait until the service is signalled to stop.

Return type

None

set_shutdown() → None[source]

Set the shutdown signal.

Notes

If wait_for_shutdown is set, stopping the service will wait for this flag to be set.

Return type

None

started

Return True if the service was started. :rtype: bool

crashed
Return type

bool

should_stop

Return True if the service must stop. :rtype: bool

state

Service state - as a human readable string. :rtype: str

label

Label used for graphs. :rtype: str

shortlabel

Label used for logging. :rtype: str

beacon

Beacon used to track services in a dependency graph. :rtype: NodeT[~_T]

class faust.ServiceT(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Abstract type for an asynchronous service that can be started/stopped.

See also

mode.Service.

wait_for_shutdown = False
restart_count = 0
supervisor = None
add_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]
Return type

ServiceT[]

add_context(context: ContextManager) → Any[source]
Return type

Any

service_reset() → None[source]
Return type

None

set_shutdown() → None[source]
Return type

None

started
Return type

bool

crashed
Return type

bool

should_stop
Return type

bool

state
Return type

str

label
Return type

str

shortlabel
Return type

str

beacon
Return type

NodeT[~_T]

coroutine add_async_context(self, context: AsyncContextManager) → Any[source]
Return type

Any

coroutine add_runtime_dependency(self, service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]
Return type

ServiceT[]

coroutine crash(self, reason: BaseException) → None[source]
Return type

None

coroutine maybe_start(self) → None[source]
Return type

None

coroutine restart(self) → None[source]
Return type

None

coroutine start(self) → None[source]
Return type

None

coroutine stop(self) → None[source]
Return type

None

coroutine wait_until_stopped(self) → None[source]
Return type

None

loop
Return type

AbstractEventLoop