Writing Your Transformers

Table of contents
  1. Writing Your Transformers
    1. Overview
      1. Transformer abstract class
      2. Streaming and for each batch sink
    2. Custom Configuration

Data transformer play a crucial role in the PyData I/O framework, allowing you to implement custom data transformation logic within your pipelines. This page will guide you through the process of creating your own data transformer using the PyData I/O framework.

Overview

Data transformer encapsulate the specific data processing steps required for your ETL pipelines. Each PyData I/O application requires a transformer that is responsible for manipulating the data according to your business requirements.

Transformer abstract class

The Transformer abstract class is the base class for creating custom transformer. It provides the structure and functionality required to define your data transformation logic. By extending the Transformer class, you can create your own custom processors and implement the featurize method with your specific transformation steps.

Here’s an example of a custom transformer that extends the Transformer trait:

from pyspark.sql import DataFrame, SparkSession

from pydataio.job_config import JobConfig
from pydataio.transformer import Transformer

class MyDataTransformer(Transformer):
    def featurize(self, jobConfig: JobConfig, spark: SparkSession, additionalArgs: dict = None):
        # Access input data
        data = jobConfig.load(inputName="my-input", spark=spark)
        
        # Perform data transformation
        transformed_data = self.transformData(data)
        
        # Write transformed data to output
        jobConfig.writer.save(data=fixed_data)
        
    def transformData(self, inputData: DataFrame):
        # Your custom data transformation logic here
        # Example: Perform data cleansing, filtering, or aggregations
        # Return the transformed DataFrame
        ...

Streaming and for each batch sink

The SinkProcessor abstract class can be used to specify a dedicated sink function when using a for each batch sink.

Here’s an example of a custom sink that extends the SinkProcessor class:

from pydataio.io.abstract_io import SinkProcessor
from pyspark.sql import DataFrame

class MySinkProcessor(SinkProcessor):

    def process(self, data: DataFrame):
        # apply transformation in batch context
        ....
        
        # return the transformed dataset so that it can be written to the output
        return data

You can also define your own processor abstract class by extending the Transformer class. For more information on that, please visit the advanced section.

Custom Configuration

You can access custom configuration options directly within your transformer code. The custom configuration options defined under the Processing configuration node can be accessed through the parameters member variable of the JobConfig class.

Here’s an example of how to access custom configuration values within your processor:

from pyspark.sql import DataFrame, SparkSession

from pydataio.job_config import JobConfig
from pydataio.transformer import Transformer

class MyDataTransformer(Transformer):
    def featurize(self, jobConfig: JobConfig, spark: SparkSession, additionalArgs: dict = None):
        # Access input data
        data = jobConfig.load(inputName="my-input", spark=spark)
        
        # Access custom configuration values
        custom_value1 = jobConfig.parameters["custom_value_1"]
        custom_value2 = jobConfig.parameters["custom_value_2"]
        
        # Perform data transformation
        transformed_data = self.transformData(data)
        
        # Write transformed data to output
        jobConfig.writer.save(data=fixed_data)

In the above example, custom_value_1 and custom_value_2 are custom configuration values defined under the Processing configuration node in your configuration file:

Processing:
  type: com.mycompany.MyDataProcessor
  parameters:
    custom_value_1: = "june"
    custom_value_2: = "2023"

By directly accessing the config member variable in your processor, you can leverage custom configuration options to parameterize and customize the behavior of your processors.

It is also possible to access additional command lines parameters using the additionalArgs dictionary injected in the featurize function.