faust.types.streams

faust.types.streams.Processor

alias of typing.Callable

faust.types.streams.GroupByKeyArg = typing.Union[faust.types.models.FieldDescriptorT, typing.Callable[[~T], typing.Union[bytes, faust.types.core._ModelT, typing.Any, NoneType]]]

Type of the key argument to Stream.group_by()

class faust.types.streams.StreamT(channel: AsyncIterator[T_co] = None, *, app: faust.types.streams._AppT = None, processors: Iterable[Callable[T]] = None, combined: List[faust.types.streams.JoinableT] = None, on_start: Callable = None, join_strategy: faust.types.streams._JoinT = None, beacon: mode.utils.types.trees.NodeT = None, concurrency_index: int = None, prev: Optional[faust.types.streams.StreamT] = None, active_partitions: Set[faust.types.tuples.TP] = None, enable_acks: bool = True, prefix: str = '', loop: asyncio.events.AbstractEventLoop = None) → None[source]
outbox = None
join_strategy = None
task_owner = None
current_event = None
active_partitions = None
concurrency_index = None
enable_acks = True
prefix = ''
get_active_stream() → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
add_processor(processor: Callable[T]) → None[source]
Return type:None
info() → Mapping[str, Any][source]
Return type:Mapping[str, Any]
clone(**kwargs) → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
enumerate(start: int = 0) → AsyncIterable[Tuple[int, T_co]][source]
Return type:AsyncIterable[Tuple[int, +T_co]]
through(channel: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
echo(*channels) → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
group_by(key: Union[faust.types.models.FieldDescriptorT, Callable[T, Union[bytes, faust.types.core._ModelT, Any, None]]], *, name: str = None, topic: faust.types.topics.TopicT = None) → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
derive_topic(name: str, *, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, prefix: str = '', suffix: str = '') → faust.types.topics.TopicT[source]
Return type:TopicT[]
coroutine ack(self, event: faust.types.events.EventT) → bool[source]
Return type:bool
coroutine events(self) → AsyncIterable[faust.types.events.EventT][source]
coroutine items(self) → AsyncIterator[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], T_co]][source]
coroutine send(self, value: T_contra) → None[source]
Return type:None
coroutine take(self, max_: int, within: Union[datetime.timedelta, float, str]) → AsyncIterable[Sequence[T_co]][source]
coroutine throw(self, exc: BaseException) → None[source]
Return type:None