Process events in a Kafka topic

orders_topic = app.topic('orders', value_serializer='json')

async def process_order(orders):
    async for order in orders:

Describe stream data using models

from datetime import datetime
import faust

class Order(faust.Record, serializer='json', isodates=True):
    id: str
    user_id: str
    product_id: str
    amount: float
    price: float
    date_created: datatime = None
    date_updated: datetime = None

orders_topic = app.topic('orders', value_type=Order)

async def process_order(orders):
    async for order in orders:

Use async. I/O to perform other actions while processing the stream

# [...]
async def process_order(orders):
    session = aiohttp.ClientSession()
    async for order in orders:
        async with session.get(f'{}/') as resp:
            product_info = await request.text()
                f'http://cache/{}/', data=product_info)

Buffer up many events at a time

Here we get up to 100 events within a 30 second window:

# [...]
async for orders_batch in orders.take(100, within=30.0):

Aggregate information into a table

orders_by_country = app.Table('orders_by_country', default=int)

async def process_order(orders):
    async for order in orders.group_by(order.country_origin):
        country = order.country_origin
        orders_by_country[country] += 1
        print(f'Orders for country {country}: {orders_by_country[country]}')

Aggregate information using a window

Count number of orders by country, within the last two days:

orders_by_country = app.Table(

async for order in
    orders_by_country[order.country_origin] += 1
    # values in this table are not concrete! access .current
    # for the value related to the time of the current event