Getting Started

Table of contents
  1. Getting Started
    1. Installation
      1. Dependencies
    2. Minimal Example
      1. The Data Transformer
      2. The Configuration File
      3. The Data Pipeline

Installation

Using uv:

uv add pydataio

Published releases are available on GitHub Packages, in the PyData I/O repository.

Dependencies

The pydataio project relies on several dependencies to function correctly. Here is a list of the main dependencies and their purposes:

  • Python 3.11: The programming language used for the project.
  • PyYAML 6.0.2: A YAML parser and emitter for Python, used to load and parse YAML configuration files.
  • fsspec 2024.10.0: A filesystem specification library, used to handle various filesystems and file operations.
  • adlfs 2024.7.0: An implementation of fsspec for Azure Data Lake Storage, used to interact with Azure Blob Storage.
  • pyspark 3.5.2: The Python API for Apache Spark, used for distributed data processing.

These dependencies are specified in the pyproject.toml file and managed using uv.

Minimal Example

This example presents how to write a rudimentary batch data pipeline: removing the duplicates from a CSV dataset, and saving the result as Parquet.

To make it work, you only need to write three components:

  • A data processor, which contains the transformations to operate on the data,
  • A configuration file, which contains information about the processor to use, the inputs, outputs, etc.,
  • A Pipeline object, which loads the configuration file and runs the data processor with the configuration that you defined.

The Data Transformer

Every transformation made using PyData I/O must be written in a data processor, a class that you create by extending the Transformer class.

Data transformations happen in the featurize method, which is used by the Pipeline to start the data processing.

from pydataio.transformer import Transformer

class DuplicatesDropper(Transformer):
    def featurize(self, jobConfig: JobConfig, spark: SparkSession, additionalArgs: dict = None):
        data = jobConfig.load(inputName="my-input", spark=spark)
        
        fixed_data = data.dropDuplicates
        
        jobConfig.writer.save(data=fixed_data)

The Configuration File

The configuration file contains the definition of the data processor your application will run, as well as inputs and outputs. In our case, we only need one input, and one output.

Processing:
  type: "getting_started.duplicates_dropper.DuplicatesDropper"

Input:
  - name: "my-input"
    type: "batch.FileSystemInput"
    format: "csv"
    path: "/path/my-input"
    options:
        header: true

Output:
  name: "my-output"
  type: "batch.FileSystemOutput"
  format: "parquet"
  path: "/path/my-output"

The Data Pipeline

Now that we’re ready, it’s time to create our Pipeline and run the Processor. To do so, we need to set up the main function to load the configuration file, create a Pipeline instance, and run the pipeline with the configuration.

import logging
from pydataio.pipeline import Pipeline
from azure.identity import ClientSecretCredential

def main():
    # Set up logging
    logging.basicConfig(level=logging.INFO)
    
    # Path to the configuration file
    conf_path = 'path/to/your/config.yaml'
    
    # Create a Pipeline instance
    pipeline = Pipeline()
    
    # Run the pipeline
    pipeline.run(confPath=conf_path, credential=None, additionalArgs=None)

if __name__ == "__main__":
    main()