Greenhouse & Workday API Integration

1. Overview

A reference design for an enterprise HR data platform and API integration framework supporting People Systems and employee lifecycle analytics. The integration layer ingests candidates, employees, and recruitment records from Greenhouse (ATS) and Workday (HCM), lands them in a Medallion lakehouse, and serves real-time dashboards for hiring, attrition, and diversity analytics.

2. High-Level Architecture

3. Authentication

3.1 Greenhouse Harvest API (Basic Auth)

Greenhouse uses HTTP Basic Auth with a base64-encoded API key as the username and an empty password. Keys are scoped per-integration with read-only permissions set in the Greenhouse admin console.

import base64
import os
import requests

GH_BASE = "https://harvest.greenhouse.io/v1"

def greenhouse_headers() -> dict:
    api_key = os.environ["GREENHOUSE_API_KEY"]
    token = base64.b64encode(f"{api_key}:".encode()).decode()
    return {
        "Authorization": f"Basic {token}",
        "On-Behalf-Of": os.environ["GREENHOUSE_USER_ID"],  # audit trail
        "Accept": "application/json",
    }

3.2 Workday REST API (OAuth 2.0 Client Credentials)

Workday issues short-lived access tokens via an Integration System User (ISU) registered as an OAuth client. Tokens are cached in a secrets manager and refreshed proactively before expiry.

import time
import requests

class WorkdayAuth:
    def __init__(self, tenant: str, client_id: str, client_secret: str, token_url: str):
        self.tenant = tenant
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self._token = None
        self._expires_at = 0

    def token(self) -> str:
        if self._token and time.time() < self._expires_at - 60:
            return self._token
        resp = requests.post(
            self.token_url,
            data={"grant_type": "client_credentials"},
            auth=(self.client_id, self.client_secret),
            timeout=15,
        )
        resp.raise_for_status()
        payload = resp.json()
        self._token = payload["access_token"]
        self._expires_at = time.time() + payload["expires_in"]
        return self._token

4. API Endpoints Used

4.1 Greenhouse Harvest

Pagination uses Link headers (rel="next"). Rate limit is 50 req/10s per API key; the server returns 429 with a Retry-After header when exceeded.

4.2 Workday REST & RaaS

5. Retry, Backoff, and Rate Limiting

All API clients wrap requests in a tenacity retry decorator with exponential backoff + jitter, honor Retry-After, and cap concurrency with a token-bucket limiter to stay under published quotas.

import random
import time
from typing import Iterator
import requests
from tenacity import (
    retry, retry_if_exception_type, stop_after_attempt,
    wait_exponential_jitter, before_sleep_log
)
import logging

log = logging.getLogger("hr_api")

class RateLimiter:
    """Simple token bucket (thread-safe enough for single-driver Spark jobs)."""
    def __init__(self, rate_per_sec: float, burst: int):
        self.rate = rate_per_sec
        self.capacity = burst
        self.tokens = burst
        self.last = time.monotonic()

    def acquire(self) -> None:
        while True:
            now = time.monotonic()
            self.tokens = min(self.capacity, self.tokens + (now - self.last) * self.rate)
            self.last = now
            if self.tokens >= 1:
                self.tokens -= 1
                return
            time.sleep((1 - self.tokens) / self.rate)

GH_LIMITER = RateLimiter(rate_per_sec=4.5, burst=10)  # under 50/10s

class RetryableHTTPError(Exception):
    pass

@retry(
    reraise=True,
    retry=retry_if_exception_type((RetryableHTTPError, requests.ConnectionError, requests.Timeout)),
    wait=wait_exponential_jitter(initial=1, max=30),
    stop=stop_after_attempt(6),
    before_sleep=before_sleep_log(log, logging.WARNING),
)
def gh_get(path: str, params: dict | None = None) -> requests.Response:
    GH_LIMITER.acquire()
    resp = requests.get(f"{GH_BASE}{path}", headers=greenhouse_headers(),
                        params=params, timeout=30)
    if resp.status_code == 429:
        time.sleep(int(resp.headers.get("Retry-After", "2")) + random.random())
        raise RetryableHTTPError("429 rate limited")
    if 500 <= resp.status_code < 600:
        raise RetryableHTTPError(f"{resp.status_code} server error")
    resp.raise_for_status()
    return resp

def gh_paginate(path: str, params: dict | None = None) -> Iterator[dict]:
    url = f"{GH_BASE}{path}"
    params = dict(params or {}, per_page=500)
    while url:
        resp = gh_get(url.replace(GH_BASE, ""), params=params)
        yield from resp.json()
        url = _parse_next_link(resp.headers.get("Link", ""))
        params = None  # subsequent pages have params baked into the Link URL

def _parse_next_link(header: str) -> str | None:
    for part in header.split(","):
        if 'rel="next"' in part:
            return part.split(";")[0].strip().strip("<>")
    return None

6. Bronze Ingestion — Landing Raw API Payloads

from datetime import datetime, timezone
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp, input_file_name

spark = SparkSession.builder.getOrCreate()

def land_greenhouse_candidates(run_id: str, updated_after: str) -> int:
    """Pull candidates and write JSON records to Bronze Delta table."""
    rows = []
    pulled_at = datetime.now(timezone.utc).isoformat()
    for record in gh_paginate("/candidates", params={"updated_after": updated_after}):
        rows.append({
            "source_system": "greenhouse",
            "entity": "candidate",
            "source_id": str(record["id"]),
            "pulled_at": pulled_at,
            "run_id": run_id,
            "payload": json.dumps(record),
        })
    if not rows:
        return 0
    df = spark.createDataFrame(rows)
    (df.write
       .mode("append")
       .format("delta")
       .saveAsTable("hr_prod.bronze.greenhouse_candidates"))
    return len(rows)

7. Silver — Cleansing, Normalization, SCD Type 2

Silver applies regex normalization (phone/email), fuzzy-match dedup on candidate identity, and business-rule engines that map Greenhouse stages and Workday job codes onto a conformed enterprise taxonomy.

from pyspark.sql import functions as F
from pyspark.sql.types import StringType

def normalize_email(col):
    return F.lower(F.trim(col))

def normalize_phone(col):
    # strip non-digits, keep last 10 for US
    return F.regexp_replace(col, r"\D", "").substr(-10, 10)

bronze = spark.read.table("hr_prod.bronze.greenhouse_candidates")
parsed = bronze.selectExpr(
    "source_id",
    "pulled_at",
    "from_json(payload, 'id BIGINT, first_name STRING, last_name STRING, "
    "email_addresses ARRAY<STRUCT<value:STRING,type:STRING>>, "
    "phone_numbers ARRAY<STRUCT<value:STRING,type:STRING>>, "
    "updated_at TIMESTAMP') AS c"
).select(
    F.col("source_id"),
    F.col("c.first_name").alias("first_name"),
    F.col("c.last_name").alias("last_name"),
    normalize_email(F.expr("c.email_addresses[0].value")).alias("email"),
    normalize_phone(F.expr("c.phone_numbers[0].value")).alias("phone_e164"),
    F.col("c.updated_at").alias("source_updated_at"),
    F.col("pulled_at"),
)

# MERGE into Silver with SCD Type 2
from delta.tables import DeltaTable
target = DeltaTable.forName(spark, "hr_prod.silver.candidate_scd2")
(target.alias("t")
 .merge(
     parsed.alias("s"),
     "t.source_id = s.source_id AND t.is_current = true"
 )
 .whenMatchedUpdate(
     condition="t.email <> s.email OR t.phone_e164 <> s.phone_e164 "
               "OR t.first_name <> s.first_name OR t.last_name <> s.last_name",
     set={"is_current": "false", "valid_to": "s.source_updated_at"}
 )
 .whenNotMatchedInsertAll()
 .execute())

8. Anomaly Detection (PySpark Statistical Methods)

Custom detectors flag data-quality issues: duplicate candidate identities across sources, outlier time-to-hire, missing EEOC fields on diverse-hire cohorts, and interview-feedback scores that deviate >3σ from interviewer baselines.

from pyspark.sql import Window
from pyspark.sql import functions as F

scorecards = spark.table("hr_prod.silver.scorecards")

w = Window.partitionBy("interviewer_id")
stats = scorecards.withColumn("mu", F.avg("overall_score").over(w)) \
                  .withColumn("sigma", F.stddev_pop("overall_score").over(w))

anomalies = (stats
    .withColumn("z", (F.col("overall_score") - F.col("mu")) / F.col("sigma"))
    .filter(F.abs("z") > 3)
    .select("scorecard_id", "interviewer_id", "candidate_id",
            "overall_score", "mu", "sigma", "z"))

(anomalies.write.mode("overwrite")
 .saveAsTable("hr_prod.gold.anomaly_scorecard_outliers"))

9. Streaming — Real-Time Dashboards

Workday webhooks and Greenhouse event exports are fanned into a Kafka topic; a Structured Streaming job micro-batches events into Bronze, then declarative streaming pipelines cascade updates into Silver and Gold aggregates (hiring velocity, open requisitions, diversity dashboards) with end-to-end latency under 2 minutes.

from pyspark.sql import functions as F

events = (spark.readStream
    .format("kafka")
    .option("subscribe", "hr.events.v1")
    .option("kafka.bootstrap.servers", "")
    .option("startingOffsets", "latest")
    .load())

parsed = (events
    .select(F.col("value").cast("string").alias("json"),
            F.col("timestamp").alias("event_ts"))
    .select(F.from_json("json",
        "event_id STRING, source STRING, entity STRING, op STRING, "
        "payload STRING, emitted_at TIMESTAMP").alias("e"),
        "event_ts")
    .select("e.*", "event_ts"))

(parsed.writeStream
    .format("delta")
    .option("checkpointLocation", "/mnt/chk/hr_events_bronze")
    .outputMode("append")
    .trigger(processingTime="30 seconds")
    .toTable("hr_prod.bronze.hr_events"))

10. Data Governance & Security

11. Observability & Operations