Kafka
Allows the connection to Kafka brokers to automatically consume or publish data to a topic in batch and streaming.
Please note that Kafka pipes return or accept dataframes with Kafka schema.
Code repository: https://github.com/AmadeusITGroup/dataio-framework/tree/main/src/main/scala/com/amadeus/dataio/pipes/kafka
Useful links:
Common
The following fields are available for all Kafka components:
Name | Mandatory | Description | Example | Default |
---|---|---|---|---|
Brokers | Yes | The Kafka brokers | Brokers = "kafka1.mycompany.com:9000, kafka2.mycompany.com:8000" | |
Topic | Yes/No | The topic to consume. Corresponds to the subscribe Spark option. Only one among Topic, Pattern, Assign can be specified. | Topic = "test.topic" | |
Options | No | Spark options, as key = value pairs. Note that some of the usual Kafka options for Spark are automatically added from Data IO fields (e.g. Brokers adds the kafka.bootstrap.servers option). | Options { headers = "true" } |
Only one among Topic, Pattern, Assign can be specified.
Batch
Input
Type: com.amadeus.dataio.pipes.kafka.batch.KafkaInput
Name | Mandatory | Description | Example | Default |
---|---|---|---|---|
Pattern | Yes/No | The pattern used to subscribe to topic(s). Corresponds to the subscribePattern Spark option. Only one among Topic, Pattern, Assign can be specified. | Pattern = "myapp-.*" | |
Assign | Yes/No | Specific TopicPartitions to consume. Corresponds to the assign Spark option. Only one among Topic, Pattern, Assign can be specified. | Assign = {"topicA":[0,1],"topicB":[2,4]} | |
Repartition | No | Matches the Spark Dataset repartition function, either by number, columns or both. One argument, either Column or Number, is mandatory. | Repartition { Number = 10, Columns = "upd_date" } | |
Coalesce | No | Matches the Spark Dataset coalesce function. | Coalesce = 10 | |
Schema | No | The schema of the input dataset. See schema definitions for more information. | Schema = "myproject.models.Query" |
No batch output is currently available for Kafka in Data I/O.
Streaming
Input
Type: com.amadeus.dataio.pipes.kafka.streaming.KafkaInput
Name | Mandatory | Description | Example | Default |
---|---|---|---|---|
Pattern | Yes/No | The pattern used to subscribe to topic(s). Corresponds to the subscribePattern Spark option. Only one among Topic, Pattern, Assign can be specified. | Pattern = "myapp-.*" | |
Assign | Yes/No | Specific TopicPartitions to consume. Corresponds to the assign Spark option. Only one among Topic, Pattern, Assign can be specified. | Assign = {"topicA":[0,1],"topicB":[2,4]} | |
Repartition | No | Matches the Spark Dataset repartition function, either by number, columns or both. One argument, either Column or Number, is mandatory. | Repartition { Number = 10, Columns = "upd_date" } | |
Coalesce | No | Matches the Spark Dataset coalesce function. | Coalesce = 10 | |
Schema | No | The schema of the input dataset. See schema definitions for more information. | Schema = "myproject.models.Query" |
Output
Type: com.amadeus.dataio.pipes.kafka.streaming.KafkaOutput
Name | Mandatory | Description | Example | Default |
---|---|---|---|---|
Duration | Yes | Sets the trigger for the stream query. Controls the trigger() Spark function. | Duration = "60 seconds" | |
Timeout | Yes | Controls the amount of time before returning from the streaming query, in hours. It can be a String or an Int. | Timeout = 24 | |
Mode | Yes | The Spark Structured Streaming output mode. | Mode = "complete" | append |