Skip to content

feat: convert anomaly demo to spark-connect #209

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

Binary file not shown.
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@
:pyspark: https://spark.apache.org/docs/latest/api/python/getting_started/index.html
:forest-algo: https://cs.nju.edu.cn/zhouzh/zhouzh.files/publication/icdm08b.pdf
:nyc-taxi: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
:jupyterhub-k8s: https://github.com/jupyterhub/zero-to-jupyterhub-k8s
:jupyterlab: https://jupyterlab.readthedocs.io/en/stable/
:parquet: https://parquet.apache.org/
:hadoop: https://hadoop.apache.org/
:jupyter: https://jupyter.org
:spark-connect: https://spark.apache.org/docs/latest/spark-connect-overview.html
:spark-connect-client: https://github.com/stackabletech/docker-images/blob/main/spark-connect-client/Dockerfile

This demo showcases the integration between {jupyter}[Jupyter] and {hadoop}[Apache Hadoop] deployed on the Stackable Data Platform (SDP) Kubernetes cluster.
{jupyterlab}[JupyterLab] is deployed using the {jupyterhub-k8s}[pyspark-notebook stack] provided by the Jupyter community.
The SDP makes this integration easy by publishing a discovery ConfigMap for the HDFS cluster.
This ConfigMap is then mounted in all Pods running {pyspark}[PySpark] notebooks so that these have access to HDFS data.
This demo showcases the integration between {jupyterlab}[JupyterLab], {spark-connect}[Spark Connect] and {hadoop}[Apache Hadoop] deployed on the Stackable Data Platform (SDP) Kubernetes cluster.
The SDP makes this integration easy by publishing a discovery ConfigMap for the HDFS cluster and a Spark Connect service.
This ConfigMap is then mounted in all Pods running {pyspark}[PySpark] so that these have access to HDFS data.
The Jupyter notebook is a lightweight client that delegates the model training to the Spark Connect service.
For this demo, the HDFS cluster is provisioned with a small sample of the {nyc-taxi}[NYC taxi trip dataset], which is analyzed with a notebook that is provisioned automatically in the JupyterLab interface.

Install this demo on an existing Kubernetes cluster:
Expand All @@ -39,12 +40,9 @@ To run this demo, your system needs at least:

== Aim / Context

This demo does not use the Stackable operator for Spark but rather delegates the creation of executor pods to JupyterHub.
The intention is to demonstrate how to interact with SDP components when designing and testing Spark jobs:
the resulting script and Spark job definition can then be transferred with a Stackable SparkApplication resource.
When logging in to JupyterHub (described below), a pod will be created with the username as a suffix, e.g. `jupyter-admin`.
Doing so runs a container hosting a Jupyter Notebook with pre-installed Spark, Java and Python.
When the user creates a SparkSession, temporary spark executors are constructed that are persisted until the notebook kernel is shut down or restarted.
This demo uses stackable operators to deploy a Spark Connect server and an HDFS cluster.
The intention is to demonstrate how clients, in this case a JupyterLab notebook, can interact with SDP components.
The notebook creates a SparkSession, that delegates the data analysis and model training to a Spark Connect service thus offloading resources into the Kubernetes cluster.
The notebook can thus be used as a sandbox for writing, testing and benchmarking Spark jobs before they are moved into production.

== Overview
Expand All @@ -53,7 +51,7 @@ This demo will:

* Install the required Stackable Data Platform operators.
* Spin up the following data products:
** *JupyterHub*: A multi-user server for Jupyter notebooks
** *JupyterLab*: A web-based interactive development environment for notebooks.
** *Apache HDFS*: A distributed file system used to store the taxi dataset
* Download a sample of the NY taxi dataset into HDFS.
* Install Jupyter notebook.
Expand All @@ -78,61 +76,47 @@ Found 1 items

There should be one parquet file containing taxi trip data from September 2020.

== JupyterHub
== JupyterLab

Have a look at the available Pods before logging in:

[source,console]
----
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
hdfs-datanode-default-0 1/1 Running 0 5m12s
hdfs-journalnode-default-0 1/1 Running 0 5m12s
hdfs-namenode-default-0 2/2 Running 0 5m12s
hdfs-namenode-default-1 2/2 Running 0 3m44s
hub-567c994c8c-rbdbd 1/1 Running 0 5m36s
load-test-data-5sp68 0/1 Completed 0 5m11s
proxy-7bf49bb844-mhx66 1/1 Running 0 5m36s
zookeeper-server-default-0 1/1 Running 0 5m12s
----

JupyterHub will create a Pod for each active user.
In order to reach the JupyterHub web interface, create a port-forward:
NAME READY STATUS RESTARTS AGE
hdfs-datanode-default-0 1/1 Running 0 38m
hdfs-journalnode-default-0 1/1 Running 0 38m
hdfs-namenode-default-0 2/2 Running 0 38m
hdfs-namenode-default-1 2/2 Running 0 36m
jupyterlab-76d67b9bfb-thmtq 1/1 Running 0 22m
load-test-data-hcj92 0/1 Completed 0 26m
spark-connect-server-66db874cbb-9nbpf 1/1 Running 0 34m
spark-connect-server-9c6bfd9690213314-exec-1 1/1 Running 0 34m
spark-connect-server-9c6bfd9690213314-exec-2 1/1 Running 0 34m
spark-connect-server-9c6bfd9690213314-exec-3 1/1 Running 0 34m
spark-connect-server-9c6bfd9690213314-exec-4 1/1 Running 0 34m
zookeeper-server-default-0 1/1 Running 0 38m
----

In order to reach the JupyterLab web interface, create a port-forward:

[source,console]
----
$ kubectl port-forward service/proxy-public 8080:http
$ kubectl port-forward service/jupyterlab 8080:http
----

WARNING: Use the `proxy-public` service and not something else!
The `jupyterlab` service is created along side the the JupyterLab deployment.

Now access the JupyterHub web interface via http://localhost:8080

You should see the JupyterHub login page.

image::jupyterhub-pyspark-hdfs-anomaly-detection-taxi-data/jupyter_hub_login.png[]

Log in with username `admin` and password `adminadmin`.
There should appear a new pod called `jupyter-admin`:
You should see the JupyterLab login page.

[source,console]
----
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
hdfs-datanode-default-0 1/1 Running 0 6m12s
hdfs-journalnode-default-0 1/1 Running 0 6m12s
hdfs-namenode-default-0 2/2 Running 0 6m12s
hdfs-namenode-default-1 2/2 Running 0 4m44s
hub-567c994c8c-rbdbd 1/1 Running 0 6m36s
jupyter-admin 1/1 Running 0 77s
load-test-data-5sp68 0/1 Completed 0 6m11s
proxy-7bf49bb844-mhx66 1/1 Running 0 6m36s
zookeeper-server-default-0 1/1 Running 0 6m12s
----
image::jupyterhub-pyspark-hdfs-anomaly-detection-taxi-data/jupyterlab_login.png[]

Log in with token `adminadmin`.
You should arrive at your workspace:

image::jupyterhub-pyspark-hdfs-anomaly-detection-taxi-data/jupyter_hub_workspace.png[]
image::jupyterhub-pyspark-hdfs-anomaly-detection-taxi-data/jupyterlab_workspace.png[]

Now you can double-click on the `notebook` folder on the left, open and run the contained file.
Click on the double arrow (⏩️) to execute the Python scripts (click on the image below to go to the notebook file).
Expand All @@ -141,78 +125,17 @@ image::jupyterhub-pyspark-hdfs-anomaly-detection-taxi-data/jupyter_hub_run_noteb

You can also inspect the `hdfs` folder where the `core-site.xml` and `hdfs-site.xml` from the discovery ConfigMap of the HDFS cluster are located.

The image defined for the spark job must contain all dependencies needed for that job to run.
For PySpark jobs, this will mean that Python libraries either need to be baked into the image or {spark-pkg}[packaged in some other way].
This demo contains a custom image created from a Dockerfile that is used to generate an image containing scikit-learn, pandas and their dependencies.
This is described below.

=== Install the libraries into a product image

Libraries can be added to a custom *product* image launched by the notebook. Suppose a Spark job is prepared like this:

// TODO (@NickLarsenNZ): Use stackable0.0.0-dev so that the demo is reproducable for the release
// and it will be automatically replaced for the release branch.
// Also update the reference in notebook.ipynb.

[source,python]
----
spark = (SparkSession
.builder
.master(f'k8s://https://{os.environ["KUBERNETES_SERVICE_HOST"]}:{os.environ["KUBERNETES_SERVICE_PORT"]}')
.config("spark.kubernetes.container.image", "oci.stackable.tech/stackable/spark-k8s-with-scikit-learn:3.5.0-stackable24.3.0")
.config("spark.driver.port", "2222")
.config("spark.driver.blockManager.port", "7777")
.config("spark.driver.host", "driver-service.default.svc.cluster.local")
.config("spark.driver.bindAddress", "0.0.0.0")
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
.config("spark.kubernetes.authenticate.serviceAccountName", "spark")
.config("spark.executor.instances", "4")
.config("spark.kubernetes.container.image.pullPolicy", "IfNotPresent")
.appName("taxi-data-anomaly-detection")
.getOrCreate()
)
----

It requires a specific Spark image:

// TODO (@NickLarsenNZ): Use stackable0.0.0-dev so that the demo is reproducable for the release
// and it will be automatically replaced for the release branch.
// Also update the reference in notebook.ipynb.

[source,python]
----
.config("spark.kubernetes.container.image",
"oci.stackable.tech/stackable/spark-k8s-with-scikit-learn:3.5.0-stackable24.3.0"),
...
----

This is created by taking a Spark image, in this case `oci.stackable.tech/sdp/spark-k8s:3.5.0-stackable24.3.0`, installing specific python libraries into it
, and re-tagging the image:

[source,console]
----
FROM oci.stackable.tech/sdp/spark-k8s:3.5.0-stackable24.3.0

COPY demos/jupyterhub-pyspark-hdfs-anomaly-detection-taxi-data/requirements.txt .

RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r ./requirements.txt
----

Where `requirements.txt` contains:

[source,console]
----
scikit-learn==1.3.1
pandas==2.0.3
----
The Python notebook uses libraries such as `pandas` and `scikit-learn` to analyze the data.
In addition, since the model training is delegated to a Spark Connect server, some of these dependencies, most notably `scikit-learn`, must also be made available on the Spark Connect pods.
For convenience, a custom image is used in this demo that bundles all the required libraries for both the notebook and the Spark Connect server.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: We could link to the Dockerfile (so that others can take the next steps for their use case).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The source of the image is available {spark-connect-client}[here].

NOTE: Using a custom image requires access to a repository where the image can be made available.
In practice, clients of Spark Connect do not need a full-blown Spark installation available locally, but only the libraries that are used in the notebook.

== Model details

The job uses an implementation of the Isolation Forest {forest-algo}[algorithm] provided by the scikit-learn {scikit-lib}[library]:
the model is trained and then invoked by a user-defined function (see {forest-article}[this article] for how to call the sklearn library with a pyspark UDF), all of which is run using the Spark executors spun up in the current SparkSession.
the model is trained and then invoked by a user-defined function (see {forest-article}[this article] for how to call the sklearn library with a pyspark UDF), all of which is run using the Spark Connect executors.
This type of model attempts to isolate each data point by continually partitioning the data.
Data closely packed together will require more partitions to separate data points.
In contrast, any outliers will require less: the number of partitions needed for a particular data point is thus inversely proportional to the anomaly "score".
Expand Down
70 changes: 0 additions & 70 deletions stacks/_templates/jupyterhub.yaml

This file was deleted.

67 changes: 67 additions & 0 deletions stacks/jupyterhub-pyspark-hdfs/jupyterlab.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: jupyterlab
labels:
app: jupyterlab
stackable.tech/vendor: Stackable
spec:
replicas: 1
selector:
matchLabels:
app: jupyterlab
stackable.tech/vendor: Stackable
template:
metadata:
labels:
app: jupyterlab
stackable.tech/vendor: Stackable
spec:
serviceAccountName: default
containers:
- name: jupyterlab
image: oci.stackable.tech/stackable/spark-connect-client:3.5.5-stackable0.0.0-dev
imagePullPolicy: IfNotPresent
command:
- bash
args:
- -c
- /stackable/.local/bin/jupyter lab --ServerApp.token='{{ jupyterLabToken }}' --ServerApp.port=8080 --no-browser --notebook-dir /notebook
env:
- name: JUPYTER_PORT
value: "8080"
ports:
- name: http
containerPort: 8080
volumeMounts:
- mountPath: /notebook
name: notebook
initContainers:
- name: download-notebook
image: oci.stackable.tech/sdp/spark-connect-client:3.5.5-stackable0.0.0-dev
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might change depending on the outcome mentioned in the dependent PR:

stackabletech/docker-images#1071 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, let's wait with merging this one

command: ['sh', '-c', 'curl https://raw.githubusercontent.com/stackabletech/demos/main/stacks/jupyterhub-pyspark-hdfs/notebook.ipynb -o /notebook/notebook.ipynb']
volumeMounts:
- mountPath: /notebook
name: notebook
volumes:
- name: notebook
emptyDir:
sizeLimit: 500Mi
---
apiVersion: v1
kind: Service
metadata:
name: jupyterlab
labels:
app: jupyterlab
stackable.tech/vendor: Stackable
spec:
type: NodePort
selector:
app: jupyterlab
stackable.tech/vendor: Stackable
ports:
- name: http
port: 8080
targetPort: 8080
Loading