Run a DQ Job from a PySpark notebook
This section shows you how to run DQ Jobs using a PySpark notebook environment, such as Databricks Notebook, Google Colab, and Jupyter. You will learn how to:
- Create a notebook
- Install PySpark
- Import the libraries
- Add the JAR files
- Add secrets and environment variables
- Start a SparkSession
- Load a DataFrame into your notebook
- Run a DQ job from your notebook
- Check that the job ran in the Collibra DQ web application
Before you begin
- You are signed into Collibra DQ.
- You have permission to run DQ Jobs.
- You have a PySpark notebook, such as Databricks Notebook, Google Colab, or Jupyter.
Steps
This example is based on a notebook created in Google Colab.
Step 1: Create a notebook
Select your PySpark notebook service of choice and create a new notebook.
Step 2: Install PySpark
To get started running a DQ Job from a PySpark notebook, open your new notebook and install PySpark.
- Open your PySpark notebook.
- Insert a new code cell into your notebook.
- Add the following code:
- Run the cell.
# Install
!pip install -q pyspark==3.4.1 # installs PySpark library version 3.4.1
!pip install -q findspark # installs the findspark library
Your Spark version should align with the Spark version supplied to you by Collibra DQ. For example, if we send you the default 3.4.1 version (as shown in the example above), the first line of code should be !pip install -q pyspark==3.4.1
.
Step 3: Import the libraries
- Insert a new code cell and add the following code:
- Run the cell.
# import libraries
import py4j # import Py4J library
import findspark # import findspark library
import os, os.path # import os and os.path to interact with operating system and file system
SPARK_HOME="/usr/local/lib/python3.10/dist-packages/pyspark" # sets SPARK_HOME to the /usr/local/lib/python3.10/dist-packages/pyspark path
!export SPARK_HOME=SPARK_HOME # exports SPARK_HOME environment variable
os.environ["SPARK_HOME"] = SPARK_HOME # sets SPARK_HOME to the value of SPARK_HOME
findspark.init() # makes Spark accessible
findspark.find() # returns the Spark installation directory
!cd /content # changes the current directory to /content
Step 4: Add the JAR files
In this step, make sure to add the correct JAR file to your notebook environment according to your Collibra DQ core JAR and Spark versions.
Step 5: Add secrets and environment details
- Insert a new code cell and add the following code, replacing the sections between
""
and''
with your own information: - Run the cell.
# Secrets
import py4j
import json
from pyspark import SparkConf,SparkContext
from pyspark.sql import SQLContext,DataFrame
from pyspark.sql import functions as F
import datetime as dt
# environment details
tenant = "public" # defines the tenant
pgHost = "host-ip" # PostgreSQL host
pgPort = "5432/dev?currentSchema="+tenant # PostgreSQL port
pgUser = "dq-username" # PostgreSQL user
pgPass = "dq-password" # PostgreSQL password
agent_id = 'agent-id' # CDQ agent ID (optional)
agent_uuid = 'agent-uuid' # CDQ agent UUID (optional)
license="cdq-license" # CDQ license
Step 6: Start the SparkSession
- Insert a new code cell and add the following code, replacing the sections between
""
and''
with your own SparkSession preferences: - Run the cell.
spark = SparkSession \ # starts building a SparkSession
.builder \
.master("local[*]")\ # allow Spark to run locally using all available cores
.appName('collibra-dq') \ # give the Spark session a name
.config("spark.sql.sources.disabledJdbcConnProviderList", "basic,oracle,mssql")\ # list disabled connections
.getOrCreate() # create a new SparkSession or retrieves an existing one
Step 7: Load the DataFrame
- Insert a new code cell and add the following code, replacing the sections between
""
and''
with your own information: - Run the cell.
# load the DataFrame
df = spark.read.csv('/content/sample_data/california_housing_test.csv', header=True, inferSchema=True) # loads the file california_housing_test.csv into the DataFrame using the first row as headers and inferring the schema
Step 8: Run the job
- Insert a new code cell and add the following code:
- Run the cell.
# Dataset
run_id = "2024-02-21" # the runId of the job
dataset="dataset_name" # the name of the dataset
# JVM
jvm = spark._jvm # retrieve the Spark JVM
gateway = spark._sc._gateway # accesses the JVM and allows Python and Java to communicate
# declare
opt = jvm.com.owl.common.options.OwlOptions()
opt.setDataset(dataset)
opt.setRunId(run_id)
opt.setHost(pgHost)
opt.setPort(pgPort)
opt.setPgUser(pgUser)
opt.setPgPassword(pgPass)
# profile (optional)
profile = opt.getProfile()
profile.setHistogram(False) # turns the histogram chart off (False) or on (True)
profile.setCorrelation(False) # turns the correlation chart off (False) or on (True)
profile.setShape(False) # turns shapes detection off (False) or on (True)
opt.setProfile(profile)
# format the Java Date as a string (optional)
date_format = jvm.java.text.SimpleDateFormat("yyyy-MM-dd")
formatted_date = date_format.parse(run_id)
opt.setRunDate(formatted_date)
# run the job
cdq = jvm.com.owl.core.util.OwlUtils.OwlContext(df._jdf, spark._jsparkSession, opt)
cdq.register(opt)
cdq.owlCheck()
Step 9: Check the Jobs page in Collibra DQ
In another tab or window with Collibra DQ open, click the Jobs icon and check that your job was submitted to the Jobs queue for processing.