Changes

This document contain change notes for bugfix releases in the Faust 1.8 series. If you’re looking for previous releases, please visit the History section.

1.8.0

release-date

2019-09-27 4:05 P.M PST

release-by

Ask Solem (@ask)

  • Requirements

  • Tables: New “global table” support (Issue #366).

    A global table is a table where all worker instances maintain a copy of the full table.

    This is useful for smaller tables that need to be shared across all instances.

    To define a new global table use app.GlobalTable:

    global_table = app.GlobalTable('global_table_name')
    

    Contributed by Artak Papikyan (@apapikyan).

  • Transports: Fixed hanging when Kafka topics have gaps in source topic offset (Issue #401).

    This can happen when topics are compacted or similar, and Faust would previously hang when encountering offset gaps.

    Contributed by Andrei Tuppitcyn (@andr83).

  • Tables: Fixed bug with crashing when key index enabled (Issue #414).

  • Streams: Now properly handles exceptions in group_by.

    Contributed by Vikram Patki (@patkivikram).

  • Streams: Fixed bug with filter not acking messages (Issue #391).

    Fix contributed by Martin Maillard (@martinmaillard).

  • Web: Fixed typo in NotFound error.

    Fix contributed by Sanyam Satia (@ssatia).

  • Tables: Added use_partitioner option for the ability to modify tables outside of streams (for example HTTP views).

    By default tables will use the partition number of a “source event” to write an entry to the changelog topic.

    This means you can safely modify tables in streams:

    async for key, value in stream.items():
       table[key] = value
    

    when the table is modified it will know what topic the source event comes from and use the same partition number.

    An alternative to this form of partitioning is to use the Kafka default partitioner on the key, and now you can use that strategy by enabling the use_partitioner option:

        table = app.Table('name', use_partitioner=True)
    
    You may also temporarily enable this option in any location
    by using ``table.clone(...)``:
    
    .. sourcecode:: python
    
        @app.page('/foo/{key}/')
        async def foo(web, request, key: str):
            table.clone(use_partitoner)[key] = 'bar'
    
  • Models: Support for “schemas” that group key/value related settings together (Issue #315).

    This implements a single structure (Schema) that configures the key_type/value_type/key_serializer/value_serializer for a topic or agent:

        schema = faust.Schema(
            key_type=Point,
            value_type=Point,
            key_serializer='json',
            value_serializer='json',
        )
    
        topic = app.topic('mytopic', schema=schema)
    
    The benefit of having an abstraction a level above codecs
    is that schemas can implement support for serialization formats
    such as ProtocolBuffers, Apache Thrift and Avro.
    
    The schema will also have access to the Kafka message headers,
    necessary in some cases where serialization schema is specified
    in headers.
    
    .. seealso::
    
        :ref:`model-schemas` for more information.
    
  • Models: Validation now supports optional fields (Issue #430).

  • Models: Fixed support for Optional and field coercion (Issue #393).

    Fix contributed by Martin Maillard (@martinmaillard).

  • Models: Manually calling model.validate() now also validates that the value is of the correct type (Issue #425).

  • Models: Fields can now specify input_name and output_name to support fields named after Python reserved keywords.

    For example if the data you want to parse contains a field named in, this will not work since in is a reserved keyword.

    Using the new input_name feature you can rename the field to something else in Python, while still serializing/deserializing to the existing field:

    from faust.models import Record
    from faust.models.fields import StringField
    
    class OpenAPIParameter(Record):
        location: str = StringField(default='query', input_name='in')
    

    input_name is the name of the field in serialized data, while output_name is what the field will be named when you serialize this model object:

    >>> import json
    
    >>> data = {'in': 'header'}
    >>> parameter = OpenAPIParameter.loads(json.dumps(data))
    >>> assert parameter.location == 'header'
    >>> parameter.dumps(serialier='json')
    '{"in": "header"}'
    

    Note

    • The default value for input_name is the name of the field.

    • The default value for output_name is the value of input_name.

  • Models: now have a lazy_creation class option to delay class initialization to a later time.

    Field types are described using Python type annotations, and model fields can refer to other models, but not always are those models defined at the time when the class is defined.

    Such as in this example:

    class Foo(Record):
       bar: 'Bar'
    
    class Bar(Record):
       foo: Foo
    

    This example will result in an error, since trying to resolve the name Bar when the class Foo is created is impossible as that class does not exist yet.

    In this case we can enable the lazy_creation option:

    class Foo(Record, lazy_creation=True):
        bar: 'Bar'
    
    class Bar(Record):
        foo: Foo
    
    Foo.make_final()  # <-- 'Bar' is now defined so safe to create.
    
  • Transports: Fixed type mismatch in aiokafka timestamp_ms

    Contributed by @ekerstens.

  • Models: Added YAML serialization support.

    This requires the PyYAML library.

  • Sensors: Added HTTP monitoring of status codes and latency.

  • App: Added new Schema setting.

  • App: Added new Event setting.

  • Channel: A new SerializedChannel subclass can now be used to define new channel types that need to deserialize incoming messages.

  • Cython: Added missing field declaration.

    Contributed by Victor Miroshnikov (@superduper)

  • Documentation fixes by: