Kafka

Allows the connection to Kafka brokers to automatically consume or publish data to a topic in batch and streaming.

Please note that Kafka pipes return or accept dataframes with Kafka schema.

Code repository: https://github.com/AmadeusITGroup/dataio-framework/tree/main/src/main/scala/com/amadeus/dataio/pipes/kafka

Useful links:


Common

The following fields are available for all Kafka components:

Name Mandatory Description Example Default
Brokers Yes The Kafka brokers Brokers = "kafka1.mycompany.com:9000, kafka2.mycompany.com:8000"
Topic Yes/No The topic to consume. Corresponds to the subscribe Spark option. Only one among Topic, Pattern, Assign can be specified. Topic = "test.topic"
Options No Spark options, as key = value pairs. Note that some of the usual Kafka options for Spark are automatically added from Data IO fields (e.g. Brokers adds the kafka.bootstrap.servers option). Options { headers = "true" }

Only one among Topic, Pattern, Assign can be specified.


Batch

Input

Type: com.amadeus.dataio.pipes.kafka.batch.KafkaInput

Name Mandatory Description Example Default
Pattern Yes/No The pattern used to subscribe to topic(s). Corresponds to the subscribePattern Spark option. Only one among Topic, Pattern, Assign can be specified. Pattern = "myapp-.*"
Assign Yes/No Specific TopicPartitions to consume. Corresponds to the assign Spark option. Only one among Topic, Pattern, Assign can be specified. Assign = {"topicA":[0,1],"topicB":[2,4]}
Repartition No Matches the Spark Dataset repartition function, either by number, columns or both. One argument, either Column or Number, is mandatory. Repartition { Number = 10, Columns = "upd_date" }
Coalesce No Matches the Spark Dataset coalesce function. Coalesce = 10
Schema No The schema of the input dataset. See schema definitions for more information. Schema = "myproject.models.Query"

No batch output is currently available for Kafka in Data I/O.


Streaming

Input

Type: com.amadeus.dataio.pipes.kafka.streaming.KafkaInput

Name Mandatory Description Example Default
Pattern Yes/No The pattern used to subscribe to topic(s). Corresponds to the subscribePattern Spark option. Only one among Topic, Pattern, Assign can be specified. Pattern = "myapp-.*"
Assign Yes/No Specific TopicPartitions to consume. Corresponds to the assign Spark option. Only one among Topic, Pattern, Assign can be specified. Assign = {"topicA":[0,1],"topicB":[2,4]}
Repartition No Matches the Spark Dataset repartition function, either by number, columns or both. One argument, either Column or Number, is mandatory. Repartition { Number = 10, Columns = "upd_date" }
Coalesce No Matches the Spark Dataset coalesce function. Coalesce = 10
Schema No The schema of the input dataset. See schema definitions for more information. Schema = "myproject.models.Query"

Output

Type: com.amadeus.dataio.pipes.kafka.streaming.KafkaOutput

Name Mandatory Description Example Default
Duration Yes Sets the trigger for the stream query. Controls the trigger() Spark function. Duration = "60 seconds"
Timeout Yes Controls the amount of time before returning from the streaming query, in hours. It can be a String or an Int. Timeout = 24
Mode Yes The Spark Structured Streaming output mode. Mode = "complete" append