Custom transformer classes

Apart from the provided Transformer abstract class, you can define your own custom class that extend Transformer to match your specific use cases. This allows you to encapsulate common data transformation patterns or reusable logic into custom classes, making your code even more modular and maintainable.

For example, if your organization regularly needs to join data from two different datasets, you could create a JoinTransformer class, such as:

from abc import abstractmethod

from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

from pydataio.transformer import Transformer
from pydataio.job_config import JobConfig

class JoinTransformer(Transformer):
    
    job_config: JobConfig
    spark: SparkSession
    
    def featurize(self, jobConfig: JobConfig, spark: SparkSession, additionalArgs: dict = None):
        if len(jobConfig.reader) < 2:
            raise ValueError("Can not run a JoinTransformer without two inputs configurations.") 
                
        self.job_config = jobConfig
        self.spark = spark
        
        events1 = jobConfig.load(inputName="events1", spark=spark)
        events2 = jobConfig.load(inputName="events2", spark=spark)
        
        self.transform(events1, events2)
        
    @abstractmethod
    def transform(self, input_data1: DataFrame, input_data2: DataFrame) -> DataFrame:
        pass

Since the class extends Transformer, it is compatible with PyData I/O and can be used in the configuration file.