Kafka
Allows the connection to Kafka brokers to automatically consume or publish data to a topic in streaming.
Please note that Kafka pipes return (read) or accept (write) dataframes with Kafka schema.
Code repository: https://github.com/AmadeusITGroup/dataio-framework/tree/main/src/main/scala/com/amadeus/dataio/pipes/kafka
Useful links:
Common
The following fields are available for all Kafka components:
Name | Mandatory | Description | Example | Default |
---|---|---|---|---|
options | No | Spark options, as key = value pairs. Note that you need to provide Kafka options as you would in any Spark Kafka application. | options { "kafka.bootstrap.servers" = "kafka.bootstrap.mycompany.net:443" } |
No batch processing is currently available for Kafka in Data I/O.
Streaming
Input
Type: com.amadeus.dataio.pipes.kafka.streaming.KafkaInput
Name | Mandatory | Description | Example | Default |
---|---|---|---|---|
repartition | No | Matches the Spark Dataset repartition function, either by number, expressions or both. | repartition { num = 10, exprs = "upd_date" } | |
coalesce | No | Matches the Spark Dataset coalesce function. | coalesce = 10 | |
schema | No | The schema of the input dataset. See schema definitions for more information. | schema = "myproject.models.Query" |
Output
Type: com.amadeus.dataio.pipes.kafka.streaming.KafkaOutput
Name | Mandatory | Description | Example | Default |
---|---|---|---|---|
trigger | No | Sets the trigger for the stream query. Can be AvailableNow, Continuous or empty. Controls the trigger() Spark function. | trigger = "AvailableNow" | |
duration | No | Sets the trigger for the stream query. Controls the trigger() Spark function. In case no trigger is defined, will set a ProcessingTime trigger. | duration = "60 seconds" | |
timeout | Yes | Controls the amount of time before returning from the streaming query, in hours. It can be a String or an Int. | timeout = 24 | |
mode | Yes | The Spark Structured Streaming output mode. | mode = "complete" | append |