Skip to content

Issue async requests from EventProcessFunction.

Gmodena requested to merge T332948-minibatch-async-call into main

This change adds a threadpool to the EventProcessFunction to allow for async operations in mini-batches on a keyed data stream.

The change is a temporary fix while we wait for Async I/O datastream APIs to be included in Flink 1.18.

Windowing is performed using an ad-hoc KeyedProcessFunction that combines a count tumbling window with a timeout.

This approaches allows for async operations, and at the same time reduces memory footprint generated by having too many open windows.

API changes

A functional overview of API changes was proposed in this design doc RFC: https://docs.google.com/document/d/1PyUel0b_p18rN1_Zj5aeDMCdK1m_hqdQ9kqhLbKFpOE/edit#heading=h.fq4pvj9blwco.

This change is largely transparent to user code. However now it requires a DataStream to be keyed when the process stream manager method is invoked.

A new partition_by() method is proposed for the public stream manager API that performs keying.

This MR introduces API changes to downstream code. Related MRs:

Module changes

A new functions sub-module is proposed that contains purpose build EventProcessFunctions as describe in Sharing State Across threads.

AsyncHttpProcessFunction is an example of such a function, that provides a shared connection pool for the Python requests module.

Metrics reporting changes

We used to measure latency for each callback fired by process. We now report the time elapsed for processing a mini-batch window instead. Since we are executing functions asynchronously measuring by decorating the call would lead to meaningless results.

To get more introspection, instrumentation should be probably added inside the callback. For instance, to measure http latency we could access the Response object elapsed field (assuming Python's requests is used).

We could implement this in "user" code, or in a helper class provided by us (e.g. AsyncHttpProcessFunction).

Integration Testing

This changes has been tested on YARN (application_1678266962370_240972) with the pipeline provided at mediawiki-event-enrichment!46 (closed).

Bug: T332948

cc / @otto @tchin @dcausse

Edited by Gmodena

Merge request reports