Getting Started

Table of contents
  1. Getting Started
    1. Installation
    2. Minimal Example
      1. The Data Processor
      2. The Configuration File
      3. The Data Pipeline

Installation

Using Maven:

<dependency>
    <groupId>com.amadeus.dataio</groupId>
    <artifactId>dataio-core</artifactId>
    <version>x.x.x</version>
</dependency>

Published releases are available on GitHub Packages, in the Data I/O repository.

Minimal Example

This example presents how to write a rudimentary data pipeline: removing the duplicates from a CSV dataset, and saving the result as Parquet.

To make it work, you only need to write three components:

  • A data processor, which contains the transformations to operate on the data,
  • A configuration file, which contains information about the processor to use, the inputs, outputs, etc.,
  • A Pipeline object, which loads the configuration file and runs the data processor with the configuration that you defined.

The Data Processor

Every transformation made using Data I/O must be written in a data processor, a class that you create by extending the Processor trait or one of its sub-classes.

Data transformations happen in the run method, which is used by the Pipeline to start the data processing.

package gettingstarted
 
import com.amadeus.dataio.{HandlerAccessor, Processor}
import org.apache.spark.sql.SparkSession
 
case class DuplicatesDropper() extends Processor {
  override def run(handlers: HandlerAccessor)(implicit spark: SparkSession): Unit = {
  val myInputData = handlers.input.read("my-input")

  val fixedData = myInputData.dropDuplicates

  handlers.output.write("my-output", fixedData)
  }
}

The Configuration File

The configuration file contains the definition of the data processor your application will run, as well as inputs and outputs. In our case, we only need one input, and one output.

processing {
  type = "gettingstarted.DuplicatesDropper"
}

input {
  name = "my-input"
  type = "com.amadeus.dataio.pipes.spark.batch.SparkInput"
  path = "/path/my-input"
  format = "csv"
  options {
        header = "true"
    }
}

output {
  name = "my-output"
  type = "com.amadeus.dataio.pipes.spark.batch.SparkOutput"
  path = "/path/my-output"
  format = "parquet"
}

The Data Pipeline

Now that we’re ready, it’s time create our Pipeline and run the Processor. To do so, we’ll use the configuration file and a Spark session.

package gettingstarted

import com.amadeus.dataio.Pipeline
import org.apache.spark.sql.SparkSession

object MySparkApplication extends App {
  // val spark: SparkSession = (...)
 
  Pipeline("src/main/resources/application.conf").run(spark)
}