Kafka

Allows the connection to Kafka brokers to automatically consume or publish data to a topic in streaming.

Please note that Kafka pipes return (read) or accept (write) dataframes with Kafka schema.

Code repository: https://github.com/AmadeusITGroup/dataio-framework/tree/main/src/main/scala/com/amadeus/dataio/pipes/kafka

Useful links:


Common

The following fields are available for all Kafka components:

Name Mandatory Description Example Default
options No Spark options, as key = value pairs. Note that you need to provide Kafka options as you would in any Spark Kafka application. options { "kafka.bootstrap.servers" = "kafka.bootstrap.mycompany.net:443" }

No batch processing is currently available for Kafka in Data I/O.


Streaming

Input

Type: com.amadeus.dataio.pipes.kafka.streaming.KafkaInput

Name Mandatory Description Example Default
repartition No Matches the Spark Dataset repartition function, either by number, expressions or both. repartition { num = 10, exprs = "upd_date" }
coalesce No Matches the Spark Dataset coalesce function. coalesce = 10
schema No The schema of the input dataset. See schema definitions for more information. schema = "myproject.models.Query"

Output

Type: com.amadeus.dataio.pipes.kafka.streaming.KafkaOutput

Name Mandatory Description Example Default
trigger No Sets the trigger for the stream query. Can be AvailableNow, Continuous or empty. Controls the trigger() Spark function. trigger = "AvailableNow"
duration No Sets the trigger for the stream query. Controls the trigger() Spark function. In case no trigger is defined, will set a ProcessingTime trigger. duration = "60 seconds"
timeout Yes Controls the amount of time before returning from the streaming query, in hours. It can be a String or an Int. timeout = 24
mode Yes The Spark Structured Streaming output mode. mode = "complete" append