Skip to content

FlinkStreamManager error process function handling and error event sideoutput

Ottomata requested to merge T326536_error_event into main

If error_sink is set, every call to stream.process(func) will use a new ErrorHandlingProcessFunction wrapper. This will catch any Exceptions raised by func, transform those Exceptions into error events, and produce them to an error event side output DataStream. The error sideoutput sink is configured by setting the error_sink on the FlinkStreamManager.Builder. Since the error events are constructed by ErrorHandlingProcessFunction, the error sink's stream_descriptor must be a stream that uses the WMF /error schema at https://schema.wikimedia.org/#!//primary/jsonschema/error

stream_manager now has an error_destination parameter that defaults to False. If True, a default error stream name will be used. If a string, it will be used as the error stream descriptor.

Other changes:

  • Add a job_name parameter to FlinkStreamManager. This will be used as the Flink job_name, as well as error event emitter_id field.

  • Remove DataStreamLocalSinkFunction. This was not being used, and was buggy. It also did not use the eventutilities-flink (Java) serializers, so it wasn't that useful for testing final output. Instead, when using EventDataStreamFactory with a file_sink, if the output path is on the local filesystem, we will use a custom LocalJsonRowFileSink wrapper. This sink is just the regular Sink, but with a handy get_all_results method that makes reading final output results for testing purposes easy.

  • Fix mypy errors where possible

Depends-On: https://gerrit.wikimedia.org/r/c/schemas/event/primary/+/893518/ Bug: T326536

Edited by Ottomata

Merge request reports