Run a DQ Job from a PySpark notebook

This section shows you how to run DQ Jobs using a PySpark notebook environment, such as Databricks Notebook, Google Colab, and Jupyter. You will learn how to:

  1. Create a notebook
  2. Install PySpark
  3. Import the libraries
  4. Add the JAR files
  5. Add secrets and environment variables
  6. Start a SparkSession
  7. Load a DataFrame into your notebook
  8. Run a DQ job from your notebook
  9. Check that the job ran in the Collibra DQ web application

Before you begin

  • You are signed into Collibra DQ.
  • You have permission to run DQ Jobs.
  • You have a PySpark notebook, such as Databricks Notebook, Google Colab, or Jupyter.

Steps

This example is based on a notebook created in Google Colab.

Step 1: Create a notebook

Select your PySpark notebook service of choice and create a new notebook.

Step 2: Install PySpark

To get started running a DQ Job from a PySpark notebook, open your new notebook and install PySpark.

  1. Open your PySpark notebook.
  2. Insert a new code cell into your notebook.
  3. Add the following code:
  4. # Install
    !pip install -q pyspark==3.4.1         # installs PySpark library version 3.4.1
    !pip install -q findspark              # installs the findspark library

    Your Spark version should align with the Spark version supplied to you by Collibra DQ. For example, if we send you the default 3.4.1 version (as shown in the example above), the first line of code should be !pip install -q pyspark==3.4.1.

  5. Run the cell.

Step 3: Import the libraries

  1. Insert a new code cell and add the following code:
  2. # import libraries
    import py4j                                                   # import Py4J library
    import findspark                                              # import findspark library
    import os, os.path                                            # import os and os.path to interact with operating system and file system
    
    
    SPARK_HOME="/usr/local/lib/python3.10/dist-packages/pyspark"  # sets SPARK_HOME to the /usr/local/lib/python3.10/dist-packages/pyspark path
    !export SPARK_HOME=SPARK_HOME                                 # exports SPARK_HOME environment variable
    os.environ["SPARK_HOME"] = SPARK_HOME                         # sets SPARK_HOME to the value of SPARK_HOME
    findspark.init()                                              # makes Spark accessible
    findspark.find()                                              # returns the Spark installation directory
    !cd /content                                                  # changes the current directory to /content
  3. Run the cell.

Step 4: Add the JAR files

In this step, make sure to add the correct JAR file to your notebook environment according to your Collibra DQ core JAR and Spark versions.

Step 5: Add secrets and environment details

  1. Insert a new code cell and add the following code, replacing the sections between "" and '' with your own information:
  2. # Secrets 
    import py4j
    import json
    from pyspark import SparkConf,SparkContext
    from pyspark.sql import SQLContext,DataFrame
    from pyspark.sql import functions as F
    import datetime as dt
    
    # environment details
    tenant = "public"                                 # defines the tenant
    pgHost = "host-ip"                                # PostgreSQL host
    pgPort = "5432/dev?currentSchema="+tenant         # PostgreSQL port
    pgUser = "dq-username"                            # PostgreSQL user
    pgPass = "dq-password"                            # PostgreSQL password
    agent_id = 'agent-id'                             # CDQ agent ID (optional)
    agent_uuid = 'agent-uuid'                         # CDQ agent UUID (optional)
    license="cdq-license"                             # CDQ license
  3. Run the cell.

Step 6: Start the SparkSession

  1. Insert a new code cell and add the following code, replacing the sections between "" and '' with your own SparkSession preferences:
  2. spark = SparkSession \                                                                 # starts building a SparkSession
        .builder \                                                                         
        .master("local[*]")\                                                               # allow Spark to run locally using all available cores
        .appName('collibra-dq') \                                                          # give the Spark session a name
        .config("spark.sql.sources.disabledJdbcConnProviderList", "basic,oracle,mssql")\   # list disabled connections
        .getOrCreate()                                                                     # create a new SparkSession or retrieves an existing one
  3. Run the cell.

Step 7: Load the DataFrame

  1. Insert a new code cell and add the following code, replacing the sections between "" and '' with your own information:
  2. # load the DataFrame
    df = spark.read.csv('/content/sample_data/california_housing_test.csv', header=True, inferSchema=True)  # loads the file california_housing_test.csv into the DataFrame using the first row as headers and inferring the schema
  3. Run the cell.

Step 8: Run the job

  1. Insert a new code cell and add the following code:
  2. # Dataset
    run_id = "2024-02-21"             # the runId of the job
    dataset="dataset_name"            # the name of the dataset
    
    # JVM
    jvm = spark._jvm                  # retrieve the Spark JVM
    gateway = spark._sc._gateway      # accesses the JVM and allows Python and Java to communicate
    
    # declare
    opt = jvm.com.owl.common.options.OwlOptions()
    opt.setDataset(dataset)
    opt.setRunId(run_id)
    opt.setHost(pgHost)
    opt.setPort(pgPort)
    opt.setPgUser(pgUser)
    opt.setPgPassword(pgPass)
    
    # profile (optional)
    profile = opt.getProfile()
    profile.setHistogram(False)       # turns the histogram chart off (False) or on (True)
    profile.setCorrelation(False)     # turns the correlation chart off (False) or on (True)
    profile.setShape(False)           # turns shapes detection off (False) or on (True)
    opt.setProfile(profile)
    
    # format the Java Date as a string (optional) 
    date_format = jvm.java.text.SimpleDateFormat("yyyy-MM-dd")
    formatted_date = date_format.parse(run_id)
    opt.setRunDate(formatted_date)
    
    # run the job
    cdq = jvm.com.owl.core.util.OwlUtils.OwlContext(df._jdf, spark._jsparkSession, opt)
    cdq.register(opt)
    cdq.owlCheck()
  3. Run the cell.

Step 9: Check the Jobs page in Collibra DQ

In another tab or window with Collibra DQ open, click the Jobs icon and check that your job was submitted to the Jobs queue for processing.