faust.tables

class faust.tables.Collection(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]

Base class for changelog-backed data structures stored in Kafka.

data
on_recover(fun: Callable[Awaitable[None]]) → Callable[Awaitable[None]][source]

Add function as callback to be called on table recovery.

Return type:Callable[[], Awaitable[None]]
info() → Mapping[str, Any][source]
Return type:Mapping[str, Any]
persisted_offset(tp: faust.types.tuples.TP) → Optional[int][source]
Return type:Optional[int]
reset_state() → None[source]
Return type:None
join(*fields) → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
left_join(*fields) → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
inner_join(*fields) → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
outer_join(*fields) → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
clone(**kwargs) → Any[source]
Return type:Any
combine(*nodes, **kwargs) → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
contribute_to_stream(active: faust.types.streams.StreamT) → None[source]
Return type:None
label

Label used for graphs. :rtype: str

shortlabel

Label used for logging. :rtype: str

coroutine call_recover_callbacks(self) → None[source]
Return type:None
logger = <Logger faust.tables.base (WARNING)>
coroutine need_active_standby_for(self, tp: faust.types.tuples.TP) → bool[source]
Return type:bool
coroutine on_changelog_event(self, event: faust.types.events.EventT) → None[source]
Return type:None
coroutine on_rebalance(self, assigned: Set[faust.types.tuples.TP], revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_recovery_completed(self, active_tps: Set[faust.types.tuples.TP], standby_tps: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_start(self) → None[source]

Service is starting.

Return type:None
coroutine remove_from_stream(self, stream: faust.types.streams.StreamT) → None[source]
Return type:None
changelog_topic
Return type:TopicT[]
apply_changelog_batch(batch: Iterable[faust.types.events.EventT]) → None[source]
Return type:None
class faust.tables.CollectionT(app: faust.types.tables._AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: faust.types.tables._ModelArg = None, value_type: faust.types.tables._ModelArg = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]
changelog_topic
Return type:TopicT[]
apply_changelog_batch(batch: Iterable[faust.types.events.EventT]) → None[source]
Return type:None
persisted_offset(tp: faust.types.tuples.TP) → Optional[int][source]
Return type:Optional[int]
reset_state() → None[source]
Return type:None
on_recover(fun: Callable[Awaitable[None]]) → Callable[Awaitable[None]][source]
Return type:Callable[[], Awaitable[None]]
coroutine call_recover_callbacks(self) → None[source]
Return type:None
coroutine need_active_standby_for(self, tp: faust.types.tuples.TP) → bool[source]
Return type:bool
coroutine on_changelog_event(self, event: faust.types.events.EventT) → None[source]
Return type:None
coroutine on_rebalance(self, assigned: Set[faust.types.tuples.TP], revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_recovery_completed(self, active_tps: Set[faust.types.tuples.TP], standby_tps: Set[faust.types.tuples.TP]) → None[source]
Return type:None
class faust.tables.TableManager(app: faust.types.app.AppT, **kwargs) → None[source]

Manage tables used by Faust worker.

persist_offset_on_commit(store: faust.types.stores.StoreT, tp: faust.types.tuples.TP, offset: int) → None[source]

Mark the persisted offset for a TP to be saved on commit.

This is used for “exactly_once” processing guarantee. Instead of writing the persisted offset to RocksDB when the message is sent, we write it to disk when the offset is committed.

Return type:None
on_commit(offsets: MutableMapping[faust.types.tuples.TP, int]) → None[source]
Return type:None
on_commit_tp(tp: faust.types.tuples.TP) → None[source]
Return type:None
on_rebalance_start() → None[source]
Return type:None
on_actives_ready() → None[source]
Return type:None
on_standbys_ready() → None[source]
Return type:None
changelog_topics
Return type:Set[str]
changelog_queue
Return type:ThrowableQueue
logger = <Logger faust.tables.manager (WARNING)>
coroutine on_rebalance(self, assigned: Set[faust.types.tuples.TP], revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_start(self) → None[source]

Service is starting.

Return type:None
coroutine on_stop(self) → None[source]

Service is being stopped/restarted.

Return type:None
recovery
Return type:Recovery[]
add(table: faust.types.tables.CollectionT) → faust.types.tables.CollectionT[source]
Return type:CollectionT[]
on_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None[source]
Return type:None
class faust.tables.TableManagerT(app: faust.types.tables._AppT, **kwargs) → None[source]
add(table: faust.types.tables.CollectionT) → faust.types.tables.CollectionT[source]
Return type:CollectionT[]
persist_offset_on_commit(store: faust.types.stores.StoreT, tp: faust.types.tuples.TP, offset: int) → None[source]
Return type:None
on_commit(offsets: MutableMapping[faust.types.tuples.TP, int]) → None[source]
Return type:None
changelog_topics
Return type:Set[str]
coroutine on_rebalance(self, assigned: Set[faust.types.tuples.TP], revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None[source]
Return type:None
class faust.tables.Table(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]

Table (non-windowed).

class WindowWrapper(table: faust.types.tables.TableT, *, relative_to: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None] = None, key_index: bool = False, key_index_table: faust.types.tables.TableT = None) → None

Windowed table wrapper.

A windowed table does not return concrete values when keys are accessed, instead WindowSet is returned so that the values can be further reduced to the wanted time period.

ValueType

alias of WindowSet

as_ansitable(title: str = '{table.name}', **kwargs) → str
Return type:str
clone(relative_to: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None]) → faust.types.tables.WindowWrapperT
Return type:WindowWrapperT[]
get_relative_timestamp
Return type:Optional[Callable[[Optional[EventT[]]], Union[float, datetime]]]
get_timestamp(event: faust.types.events.EventT = None) → float
Return type:float
items(event: faust.types.events.EventT = None) → ItemsView
Return type:ItemsView[~KT, +VT_co]
key_index = False
key_index_table = None
keys() → KeysView
Return type:KeysView[~KT]
name
Return type:str
on_del_key(key: Any) → None
Return type:None
on_recover(fun: Callable[Awaitable[None]]) → Callable[Awaitable[None]]
Return type:Callable[[], Awaitable[None]]
on_set_key(key: Any, value: Any) → None
Return type:None
relative_to(ts: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None]) → faust.types.tables.WindowWrapperT
Return type:WindowWrapperT[]
relative_to_field(field: faust.types.models.FieldDescriptorT) → faust.types.tables.WindowWrapperT
Return type:WindowWrapperT[]
relative_to_now() → faust.types.tables.WindowWrapperT
Return type:WindowWrapperT[]
relative_to_stream() → faust.types.tables.WindowWrapperT
Return type:WindowWrapperT[]
values(event: faust.types.events.EventT = None) → ValuesView
Return type:ValuesView[+VT_co]
using_window(window: faust.types.windows.WindowT, *, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type:WindowWrapperT[]
hopping(size: Union[datetime.timedelta, float, str], step: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type:WindowWrapperT[]
tumbling(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type:WindowWrapperT[]
on_key_get(key: KT) → None[source]

Handle that key is being retrieved.

Return type:None
on_key_set(key: KT, value: VT) → None[source]

Handle that value for a key is being set.

Return type:None
on_key_del(key: KT) → None[source]

Handle that a key is deleted.

Return type:None
as_ansitable(title: str = '{table.name}', **kwargs) → str[source]
Return type:str
logger = <Logger faust.tables.table (WARNING)>
class faust.tables.TableT(app: faust.types.tables._AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: faust.types.tables._ModelArg = None, value_type: faust.types.tables._ModelArg = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]
using_window(window: faust.types.windows.WindowT, *, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type:WindowWrapperT[]
hopping(size: Union[datetime.timedelta, float, str], step: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type:WindowWrapperT[]
tumbling(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type:WindowWrapperT[]
as_ansitable(**kwargs) → str[source]
Return type:str