Getting Started

Table of contents
  1. Getting Started
    1. Installation
    2. Minimal Example
      1. The Data Processor
      2. The Configuration File
      3. The Data Pipeline

Installation

Data I/O was built and tested with Spark 3.2.1/3.3.2/3.4.1 and Scala 2.12. Support for prior versions is not guaranteed.

Published releases are available on GitHub Packages, in the AmadeusITGroup repository.

Using Maven:

<dependency>
    <groupId>com.amadeus.dataio</groupId>
    <artifactId>dataio-core</artifactId>
    <version>x.x.x</version>
</dependency>

Minimal Example

This example presents how to write a rudimentary data pipeline: removing the duplicates from a CSV dataset, and saving the result as Parquet.

To make it work, you only need to write three components:

  • A data processor, which contains the transformations to operate on the data,
  • A configuration file, which contains information about the processor to use, the inputs, outputs, etc.,
  • A data pipeline, which loads the configuration file and runs the data processor with the configuration that you defined.

The Data Processor

Every transformation made using Data I/O must be written in a data processor, a class that you create by extending the Processor trait or one of its sub-classes.

Data transformations happen in the run method, which is used by the data pipeline to start the data processing.

package gettingstarted
 
import com.amadeus.dataio.{HandlerAccessor, Processor}
import org.apache.spark.sql.SparkSession
 
case class DuplicatesDropper() extends Processor {
  override def run(handlers: HandlerAccessor)(implicit spark: SparkSession): Unit = {
  val myInputData = handlers.input.read("my-input")

  val fixedData = myInputData.dropDuplicates

  handlers.output.write("my-output", fixedData)
  }
}

The Configuration File

The configuration file contains the definition of the data processor your application will run, as well as inputs, outputs and distributors. In our case, we only need one input, and one output.

Processing {
    Type = "gettingstarted.DuplicatesDropper"
}
 
Input {
    Name = "my-input"
    Type = "com.amadeus.dataio.pipes.storage.batch.StorageInput"
    Path = "/path/my-input"
    Format = "csv"
    Options {
        header = "true"
    }
}
 
Output {
    Name = "my-output"
    Type = "com.amadeus.dataio.pipes.storage.batch.StorageOutput"
    Path = "/path/my-output"
    Format = "parquet"
}

The Data Pipeline

Now that we’re ready, it’s time create our pipeline. To do so, we’ll use the configuration file.

package gettingstarted

import com.amadeus.dataio.Pipeline
import org.apache.spark.sql.SparkSession

object MySparkApplication extends App {
  // val spark: SparkSession = (...)
 
  Pipeline("src/main/resources/application.conf").run(spark)
}