faust.transport.base

Base message transport implementation.

The Transport is responsible for:

  • Holds reference to the app that created it.
  • Creates new consumers/producers.

To see a reference transport implementation go to: faust/transport/drivers/aiokafka.py

class faust.transport.base.Transport(url: List[yarl.URL], app: faust.types.app.AppT, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Message transport implementation.

class Consumer(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[None]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[None]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None

Base Consumer.

class TopicBuffer → None
add(tp: faust.types.tuples.TP, buffer: List) → None
Return type:None
classmethod map_from_records(records: Mapping[faust.types.tuples.TP, List]) → MutableMapping[str, faust.transport.utils.TopicBuffer]
Return type:MutableMapping[str, TopicBuffer[]]
ack(message: faust.types.tuples.Message) → bool
Return type:bool
close() → None
Return type:None
coroutine commit(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool

Maybe commit the offset for all or specific topics.

Parameters:topics (Optional[AbstractSet[Union[str, TP]]]) – Set containing topics and/or TopicPartitions to commit.
Return type:bool
coroutine commit_and_end_transactions(self) → None
Return type:None
consumer_stopped_errors = ()
flow_active = True
coroutine force_commit(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool
Return type:bool
getmany(timeout: float) → AsyncIterator[Tuple[faust.types.tuples.TP, faust.types.tuples.Message]]
Return type:AsyncIterator[Tuple[TP, Message]]
logger = <Logger faust.transport.consumer (WARNING)>
coroutine maybe_wait_for_commit_to_finish(self) → bool
Return type:bool
on_init_dependencies() → Iterable[mode.types.services.ServiceT]

Callback to be used to add service dependencies.

Return type:Iterable[ServiceT[]]
coroutine on_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None
Return type:None
coroutine on_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None
Return type:None
coroutine on_restart(self) → None

Called every time when the service is restarted.

Return type:None
coroutine on_stop(self) → None

Called every time before the service is stopped/restarted.

Return type:None
coroutine on_task_error(self, exc: BaseException) → None
Return type:None
pause_partitions(tps: Iterable[faust.types.tuples.TP]) → None
Return type:None
coroutine perform_seek(self) → None
Return type:None
resume_flow() → None
Return type:None
resume_partitions(tps: Iterable[faust.types.tuples.TP]) → None
Return type:None
coroutine seek(self, partition: faust.types.tuples.TP, offset: int) → None
Return type:None
coroutine seek_to_committed(self) → Mapping[faust.types.tuples.TP, int]
Return type:Mapping[TP, int]
stop_flow() → None
Return type:None
track_message(message: faust.types.tuples.Message) → None
Return type:None
unacked
Return type:Set[Message]
coroutine wait_empty(self) → None

Wait for all messages that started processing to be acked.

Return type:None
class Producer(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None

Base Producer.

coroutine abort_transaction(self, transactional_id: str) → None
Return type:None
coroutine begin_transaction(self, transactional_id: str) → None
Return type:None
coroutine commit_transaction(self, transactional_id: str) → None
Return type:None
coroutine commit_transactions(self, tid_to_offset_map: Mapping[str, Mapping[faust.types.tuples.TP, int]], group_id: str, start_new_transaction: bool = True) → None
Return type:None
coroutine create_topic(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 1000.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None
Return type:None
coroutine flush(self) → None
Return type:None
key_partition(topic: str, key: bytes) → faust.types.tuples.TP
Return type:TP
logger = <Logger faust.transport.producer (WARNING)>
coroutine maybe_begin_transaction(self, transactional_id: str) → None
Return type:None
coroutine send(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata]
Return type:Awaitable[RecordMetadata]
coroutine send_and_wait(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], *, transactional_id: str = None) → faust.types.tuples.RecordMetadata
Return type:RecordMetadata
coroutine stop_transaction(self, transactional_id: str) → None
Return type:None
class TransactionManager(transport: faust.types.transports.TransportT, *, consumer: faust.types.transports.ConsumerT, producer: faust.types.transports.ProducerT, **kwargs) → None
coroutine commit(self, offsets: Mapping[faust.types.tuples.TP, int], start_new_transaction: bool = True) → bool
Return type:bool
coroutine create_topic(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 30.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None
Return type:None
coroutine flush(self) → None
Return type:None
key_partition(topic: str, key: bytes) → faust.types.tuples.TP
Return type:TP
logger = <Logger faust.transport.consumer (WARNING)>
coroutine on_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None
Return type:None
coroutine on_rebalance(self, assigned: Set[faust.types.tuples.TP], revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None
Return type:None
coroutine send(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata]
Return type:Awaitable[RecordMetadata]
coroutine send_and_wait(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], *, transactional_id: str = None) → faust.types.tuples.RecordMetadata
Return type:RecordMetadata
transactional_id_format = '{tpg.group}-{tpg.partition}'
class Conductor(app: faust.types.app.AppT, **kwargs) → None

Manages the channels that subscribe to topics.

  • Consumes messages from topic using a single consumer.
  • Forwards messages to all channels subscribing to a topic.
acks_enabled_for(topic: str) → bool
Return type:bool
add(topic: Any) → None

Add an element.

Return type:None
clear() → None

This is slow (creates N new iterators!) but effective.

Return type:None
coroutine commit(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool
Return type:bool
discard(topic: Any) → None

Remove an element. Do not raise an exception if absent.

Return type:None
label

Label used for graphs. :rtype: str

logger = <Logger faust.transport.conductor (WARNING)>
coroutine on_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None
Return type:None
shortlabel

Label used for logging. :rtype: str

coroutine wait_for_subscriptions(self) → None
Return type:None
class Fetcher(app: faust.types.app.AppT, **kwargs) → None
logger = <Logger faust.transport.consumer (WARNING)>
coroutine on_stop(self) → None

Called every time before the service is stopped/restarted.

Return type:None
create_consumer(callback: Callable[faust.types.tuples.Message, Awaitable], **kwargs) → faust.types.transports.ConsumerT[source]
Return type:ConsumerT[]
create_producer(**kwargs) → faust.types.transports.ProducerT[source]
Return type:ProducerT[]
create_transaction_manager(consumer: faust.types.transports.ConsumerT, producer: faust.types.transports.ProducerT, **kwargs) → faust.types.transports.TransactionManagerT[source]
Return type:TransactionManagerT[]
create_conductor(**kwargs) → faust.types.transports.ConductorT[source]
Return type:ConductorT[]
class faust.transport.base.Conductor(app: faust.types.app.AppT, **kwargs) → None[source]

Manages the channels that subscribe to topics.

  • Consumes messages from topic using a single consumer.
  • Forwards messages to all channels subscribing to a topic.
logger = <Logger faust.transport.conductor (WARNING)>
acks_enabled_for(topic: str) → bool[source]
Return type:bool
clear() → None[source]

This is slow (creates N new iterators!) but effective.

Return type:None
coroutine commit(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool[source]
Return type:bool
coroutine on_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine wait_for_subscriptions(self) → None[source]
Return type:None
add(topic: Any) → None[source]

Add an element.

Return type:None
discard(topic: Any) → None[source]

Remove an element. Do not raise an exception if absent.

Return type:None
label

Label used for graphs. :rtype: str

shortlabel

Label used for logging. :rtype: str

class faust.transport.base.Consumer(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[None]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[None]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None[source]

Base Consumer.

logger = <Logger faust.transport.consumer (WARNING)>
class TopicBuffer → None
add(tp: faust.types.tuples.TP, buffer: List) → None
Return type:None
classmethod map_from_records(records: Mapping[faust.types.tuples.TP, List]) → MutableMapping[str, faust.transport.utils.TopicBuffer]
Return type:MutableMapping[str, TopicBuffer[]]
consumer_stopped_errors = ()

Tuple of exception types that may be raised when the underlying consumer driver is stopped.

flow_active = True
on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Callback to be used to add service dependencies.

Return type:Iterable[ServiceT[]]
stop_flow() → None[source]
Return type:None
resume_flow() → None[source]
Return type:None
pause_partitions(tps: Iterable[faust.types.tuples.TP]) → None[source]
Return type:None
resume_partitions(tps: Iterable[faust.types.tuples.TP]) → None[source]
Return type:None
track_message(message: faust.types.tuples.Message) → None[source]
Return type:None
ack(message: faust.types.tuples.Message) → bool[source]
Return type:bool
getmany(timeout: float) → AsyncIterator[Tuple[faust.types.tuples.TP, faust.types.tuples.Message]][source]
Return type:AsyncIterator[Tuple[TP, Message]]
coroutine commit(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool[source]

Maybe commit the offset for all or specific topics.

Parameters:topics (Optional[AbstractSet[Union[str, TP]]]) – Set containing topics and/or TopicPartitions to commit.
Return type:bool
coroutine commit_and_end_transactions(self) → None[source]
Return type:None
coroutine force_commit(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool[source]
Return type:bool
coroutine maybe_wait_for_commit_to_finish(self) → bool[source]
Return type:bool
coroutine on_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_restart(self) → None[source]

Called every time when the service is restarted.

Return type:None
coroutine on_stop(self) → None[source]

Called every time before the service is stopped/restarted.

Return type:None
coroutine on_task_error(self, exc: BaseException) → None[source]
Return type:None
coroutine perform_seek(self) → None[source]
Return type:None
coroutine seek(self, partition: faust.types.tuples.TP, offset: int) → None[source]
Return type:None
coroutine seek_to_committed(self) → Mapping[faust.types.tuples.TP, int][source]
Return type:Mapping[TP, int]
coroutine wait_empty(self) → None[source]

Wait for all messages that started processing to be acked.

Return type:None
close() → None[source]
Return type:None
unacked
Return type:Set[Message]
class faust.transport.base.Fetcher(app: faust.types.app.AppT, **kwargs) → None[source]
logger = <Logger faust.transport.consumer (WARNING)>
coroutine on_stop(self) → None[source]

Called every time before the service is stopped/restarted.

Return type:None
class faust.transport.base.Producer(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None[source]

Base Producer.

coroutine abort_transaction(self, transactional_id: str) → None[source]
Return type:None
coroutine begin_transaction(self, transactional_id: str) → None[source]
Return type:None
coroutine commit_transaction(self, transactional_id: str) → None[source]
Return type:None
coroutine commit_transactions(self, tid_to_offset_map: Mapping[str, Mapping[faust.types.tuples.TP, int]], group_id: str, start_new_transaction: bool = True) → None[source]
Return type:None
coroutine create_topic(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 1000.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None[source]
Return type:None
coroutine flush(self) → None[source]
Return type:None
key_partition(topic: str, key: bytes) → faust.types.tuples.TP[source]
Return type:TP
logger = <Logger faust.transport.producer (WARNING)>
coroutine maybe_begin_transaction(self, transactional_id: str) → None[source]
Return type:None
coroutine send(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type:Awaitable[RecordMetadata]
coroutine send_and_wait(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], *, transactional_id: str = None) → faust.types.tuples.RecordMetadata[source]
Return type:RecordMetadata
coroutine stop_transaction(self, transactional_id: str) → None[source]
Return type:None