YouTube Search Engine & Brand-Safety Platform

1. Overview

This page documents the design and implementation of a high-performance search and classification platform built on top of the YouTube Data API. The system was originally built at a brand-safety platform vendor to power AI-driven brand-safety and contextual-targeting products that allow advertisers to run campaigns adjacent to suitable, on-brand video content within a walled-garden ecosystem (YouTube). It indexes a multi-billion-video corpus and exposes ranked, classified results through a REST API consumed by ad-targeting systems, sales tooling, and machine-learning training pipelines.

The platform combines four capabilities:

Programming & infrastructure stack: Scala (services, Spark jobs), C++ (Xapian extensions, hot-path tokenizers and BM25 modifications), Python (offline NLP training, evaluation), Xapian probabilistic search libraries, Cassandra (engagement time-series and feature store), Kinesis (event ingestion), Kafka (internal fan-out), AWS (EC2, S3, EMR, MSK, ECS).

2. Business Context — Brand Safety on a Walled-Garden Platform

Advertisers spending on YouTube face a structural problem: they cannot freely crawl or instrument the ad-serving environment the way they can on the open web. They depend on the platform's own classification, which is coarse, opaque, and (historically) inconsistent. Major brand-safety incidents — extremist content monetized with mainstream ads, brand logos appearing next to graphic videos — repeatedly forced advertisers to pause YouTube spend.

The product sold by the platform addresses this with an independent, advertiser-aligned classification layer:

The ranking and classification engine described on this page is what generates and continuously refreshes those lists across YouTube's full public corpus.

3. High-Level Architecture

The system is organized into five layers. Each layer is independently scalable and exposes well-defined APIs to the next.

4. YouTube Data API — Crawl Strategy

4.1 Endpoints Used

4.2 Quota Management

The YouTube Data API enforces a daily quota measured in "units" — a search.list call costs 100 units, while videos.list with parts costs ~7 units per call (for 50 videos batched). With a default 10,000-unit daily quota per project, naive crawling is impossible at scale. We addressed this with:

4.3 Crawler — Scala Skeleton

package com.brandsafe.crawl

import akka.actor.typed.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import io.circe.parser._

final case class VideoId(value: String) extends AnyVal
final case class ChannelId(value: String) extends AnyVal

class YouTubeClient(
    apiKeyPool: ApiKeyPool,
    rateLimiter: TokenBucket,
    http: HttpExt
)(implicit ec: ExecutionContext, sys: ActorSystem[_]) {

  private val Base = "https://www.googleapis.com/youtube/v3"

  /** Hydrate up to 50 video IDs in a single videos.list call. */
  def videosBatch(ids: Seq[VideoId]): Future[Seq[VideoResource]] = {
    require(ids.size <= 50, "videos.list max 50 ids per call")
    rateLimiter.acquire().flatMap { _ =>
      val key = apiKeyPool.checkout(quotaCost = 7)
      val parts = "snippet,contentDetails,statistics,topicDetails,status"
      val uri = Uri(s"$Base/videos")
        .withQuery(Uri.Query(
          "part" -> parts,
          "id"   -> ids.map(_.value).mkString(","),
          "maxResults" -> "50",
          "key"  -> key
        ))
      http.singleRequest(HttpRequest(uri = uri))
        .flatMap(handleResponse[VideosListResponse])
        .map(_.items)
        .recoverWith(quotaAware(key))
    }
  }

  /** Walk a channel's uploads playlist via playlistItems pagination. */
  def channelUploads(uploadsPlaylistId: String): Source[VideoId, _] =
    Source.unfoldAsync[Option[String], Seq[VideoId]](Some("")) {
      case None => Future.successful(None)
      case Some(pageToken) =>
        playlistPage(uploadsPlaylistId, pageToken).map { page =>
          Some((page.nextPageToken, page.videoIds))
        }
    }.mapConcat(identity)
}

4.4 Kinesis Hand-Off

// Each crawler worker writes raw API responses to Kinesis with channel_id
// as the partition key, guaranteeing per-channel ordering downstream.
val record = PutRecordRequest.builder()
  .streamName("youtube-raw-v1")
  .partitionKey(channelId.value)
  .data(SdkBytes.fromUtf8String(rawJson))
  .build()

kinesisAsync.putRecord(record).asScala
  .recover {
    case e: ProvisionedThroughputExceededException =>
      // Kinesis backpressure: park the record on a local SQS DLQ
      dlq.enqueue(channelId, rawJson)
  }

5. Storage Model — Cassandra & S3

5.1 Why Cassandra

Cassandra was chosen for the serving-side feature store and engagement time-series because the access pattern is overwhelmingly point reads keyed by video_id, with high write fan-out from streaming engagement updates. It scales linearly, tolerates AZ failures with QUORUM writes across three replicas, and avoids the read-amplification problems we hit with HBase on the same workload.

5.2 Schema

-- Current-state video record. One row per video.
CREATE TABLE yt.video_current (
  video_id          text PRIMARY KEY,
  channel_id        text,
  title             text,
  description       text,
  published_at      timestamp,
  duration_seconds  int,
  language          text,
  category_id       int,
  view_count        bigint,
  like_count        bigint,
  comment_count     bigint,
  topic_ids         set<text>,
  caption_available boolean,
  last_seen_at      timestamp
) WITH compaction = {'class': 'LeveledCompactionStrategy'};

-- Engagement time-series for trend detection.
CREATE TABLE yt.video_engagement_ts (
  video_id   text,
  bucket_hr  timestamp,
  views      bigint,
  likes      bigint,
  comments   bigint,
  PRIMARY KEY ((video_id), bucket_hr)
) WITH CLUSTERING ORDER BY (bucket_hr DESC)
  AND compaction = {'class': 'TimeWindowCompactionStrategy',
                    'compaction_window_unit': 'HOURS',
                    'compaction_window_size': 24};

-- Feature store consumed by the classifier.
CREATE TABLE yt.video_features (
  video_id        text PRIMARY KEY,
  tokens          frozen<list<text>>,
  pos_tags        frozen<list<text>>,
  entities        frozen<map<text, text>>,  -- entity -> type (PERSON, ORG, LOC, ...)
  embedding       frozen<list<float>>,      -- 384-dim sentence embedding
  toxicity_score  float,
  language_conf   float,
  computed_at     timestamp
);

-- Brand-safety verdicts written back by the classifier.
CREATE TABLE yt.video_safety (
  video_id            text PRIMARY KEY,
  overall_verdict     text,    -- safe | risky | unsafe
  iab_categories      frozen<map<text, float>>,
  policy_flags        frozen<set<text>>,     -- violence, hate, adult, profanity, ...
  copyright_flag      boolean,
  computed_at         timestamp,
  classifier_version  text
);

5.3 S3 Layout

S3 is the durable system of record. Cassandra is rebuildable from S3 within the SLA window.

s3://brandsafe-yt-prod/
  bronze/yt_raw/dt=YYYY-MM-DD/hh=HH/         # raw JSON, snappy parquet
  silver/yt_videos/dt=YYYY-MM-DD/            # parsed + deduped
  silver/yt_features/dt=YYYY-MM-DD/          # NLP outputs
  gold/yt_safety_verdicts/dt=YYYY-MM-DD/     # daily classifier output
  index/xapian/version=YYYYMMDDHHMM/shard=NN/  # rsyncable shard snapshots
  models/safety/version=vN/                  # trained classifier weights

6. NLP Pipeline — Tokenization, POS, Semantic Features

6.1 Pipeline Stages

  1. Language detection (CLD3) — discard or branch on non-English where applicable; keep multilingual videos for language-specific downstream models.
  2. Text normalization — Unicode NFKC, lowercase preserved for entity hints, URLs / handles / emoji extracted as features rather than discarded.
  3. Tokenization — Custom WordPiece-style tokenizer in C++ for the hot path; spaCy in the offline Spark training path.
  4. POS tagging — Averaged perceptron model; results stored as a parallel array to tokens in the feature store.
  5. Named-entity recognition — Custom CRF for video-domain entities (game titles, vehicle models, product SKUs) layered on a base spaCy NER model.
  6. Embedding — 384-dim sentence embeddings over title + description + first 1k caption tokens for nearest-neighbour search and semantic clustering.
  7. Toxicity probe — Lightweight feed-forward classifier giving an early signal for the safety classifier.

6.2 Spark Driver — Wiring the Pipeline

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import com.brandsafe.nlp._

val spark = SparkSession.builder().appName("yt-nlp").getOrCreate()
import spark.implicits._

val raw = spark.read.parquet("s3://brandsafe-yt-prod/bronze/yt_raw/dt=2026-04-21/")

val text = raw.selectExpr(
  "video_id",
  "concat_ws(' ', title, description, coalesce(captions, '')) as raw_text",
  "language as declared_lang"
)

val tokenized = text
  .withColumn("language", detectLanguageUDF($"raw_text"))
  .filter($"language" === "en")
  .withColumn("tokens",   tokenizeUDF($"raw_text"))
  .withColumn("pos_tags", posTagUDF($"tokens"))
  .withColumn("entities", nerUDF($"tokens", $"pos_tags"))
  .withColumn("embedding", sentenceEmbedUDF($"raw_text"))
  .withColumn("toxicity_score", toxicityProbeUDF($"tokens"))

tokenized.write
  .mode("overwrite")
  .partitionBy("dt")
  .parquet("s3://brandsafe-yt-prod/silver/yt_features/dt=2026-04-21/")

6.3 Hot-Path Tokenizer — C++

// tokenizer.cpp — used inside the C++ classifier service for sub-ms tokenization.
#include <string>
#include <vector>
#include <unicode/unistr.h>
#include <unicode/normalizer2.h>

namespace brandsafe::nlp {

class FastTokenizer {
 public:
  explicit FastTokenizer(const VocabTrie& vocab) : vocab_(vocab) {}

  std::vector<TokenId> tokenize(std::string_view text) const {
    icu::UnicodeString u = icu::UnicodeString::fromUTF8(
        icu::StringPiece(text.data(), text.size()));
    UErrorCode err = U_ZERO_ERROR;
    const auto* nfkc = icu::Normalizer2::getNFKCInstance(err);
    icu::UnicodeString normalized;
    nfkc->normalize(u, normalized, err);

    std::vector<TokenId> out;
    out.reserve(text.size() / 5);  // heuristic: ~5 chars/token

    int32_t i = 0;
    while (i < normalized.length()) {
      auto [token_id, consumed] = vocab_.longest_match(normalized, i);
      if (consumed == 0) {
        out.push_back(VocabTrie::kUnk);
        i += U16_LENGTH(normalized.char32At(i));
      } else {
        out.push_back(token_id);
        i += consumed;
      }
    }
    return out;
  }

 private:
  const VocabTrie& vocab_;
};

}  // namespace brandsafe::nlp

7. Xapian Index Design

7.1 Why Xapian

Xapian is a mature C++ probabilistic IR library implementing BM25 (and several other weighting schemes) with very fast posting-list traversal and an extensible Weight subclassing API. The reasons we picked it over Elasticsearch / Lucene at the time:

7.2 Document Model

Each Xapian document represents one video, with prefixed terms per field so a query can target a single field (e.g., T:tesla matches only titles).

Prefix Field Boost
TTitle3.0
DDescription1.0
CCaptions1.5
KKeywords / tags2.0
ENamed entities2.5
XCHChannel ID (filter)
XLNLanguage (filter)
XCATSafety verdict (filter)

7.3 Indexer — C++

// indexer.cpp — bulk-loads a Xapian shard from a Parquet partition.
#include <xapian.h>
#include "feature_reader.h"

void index_shard(const std::string& shard_path,
                 FeatureReader& reader) {
  Xapian::WritableDatabase db(shard_path,
      Xapian::DB_CREATE_OR_OPEN | Xapian::DB_BACKEND_GLASS);
  Xapian::TermGenerator tg;
  tg.set_stemmer(Xapian::Stem("en"));
  tg.set_stemming_strategy(Xapian::TermGenerator::STEM_SOME);

  while (auto rec = reader.next()) {
    Xapian::Document doc;
    doc.set_data(rec->video_id);
    doc.add_value(SLOT_PUBLISHED_TS,
        Xapian::sortable_serialise(rec->published_unix));
    doc.add_value(SLOT_VIEWS,
        Xapian::sortable_serialise(rec->view_count));

    tg.set_document(doc);
    tg.index_text(rec->title,        3, "T");
    tg.index_text(rec->description,  1, "D");
    tg.index_text(rec->captions,     2, "C");
    for (const auto& kw : rec->tags)      doc.add_term("K" + kw,   2);
    for (const auto& en : rec->entities)  doc.add_term("E" + en,   3);
    doc.add_boolean_term("XCH"  + rec->channel_id);
    doc.add_boolean_term("XLN"  + rec->language);
    doc.add_boolean_term("XCAT" + rec->safety_verdict);

    db.add_document(doc);
  }
  db.commit();
}

7.4 Sharding

The corpus is sharded by hash(channel_id) % N. Each indexer node owns ~10 shards of ~50M videos each; a query fans out to all shards in parallel through Xapian::Database's multi-shard support. We chose channel-based sharding (rather than video-id) so that channel-scoped queries hit a single shard, and so that a hot channel's writes serialize on one indexer rather than thundering across the fleet.

8. BM25 & Custom Weighting

8.1 BM25 Refresher

BM25 scores a document D against a query Q as:

score(D, Q) = Σ_{q ∈ Q} IDF(q) · (f(q,D) · (k1+1))
                              / (f(q,D) + k1 · (1 - b + b · |D|/avgdl))

where:
  f(q, D) = term frequency of q in D
  |D|     = length of D in tokens
  avgdl   = average document length across the corpus
  k1, b   = tunables (defaults 1.2 and 0.75)
  IDF(q)  = log((N - n(q) + 0.5) / (n(q) + 0.5) + 1)

For YouTube content, two properties of standard BM25 hurt us:

8.2 Per-Field BM25F

We addressed this with a BM25F-style per-field formulation: each field gets its own k1, b, and weight, with field-level length normalization. Title used b=0.0 (no length penalty) and a 3× weight; description used b=0.75 and 1×; captions used b=0.5 and 1.5×. The exact values were swept against a labeled relevance set of 50k judged <query, video, label> triples.

8.3 Custom Xapian::Weight Subclass

// brand_safety_weight.cpp
// Wraps standard BM25 and folds in a per-document safety prior so that risky
// content is downweighted at retrieval time, not just filtered after scoring.
#include <xapian.h>
#include "safety_score_table.h"

class BrandSafetyWeight : public Xapian::BM25Weight {
 public:
  BrandSafetyWeight(double k1, double k2, double k3, double b,
                    double min_normlen,
                    const SafetyScoreTable& safety,
                    double safety_lambda)
    : Xapian::BM25Weight(k1, k2, k3, b, min_normlen),
      safety_(safety),
      lambda_(safety_lambda) {}

  std::string name() const override { return "BrandSafetyWeight"; }

  BrandSafetyWeight* clone() const override {
    return new BrandSafetyWeight(*this);
  }

  double get_sumpart(Xapian::termcount wdf,
                     Xapian::termcount doclen,
                     Xapian::termcount uniqterms) const override {
    double bm25 = Xapian::BM25Weight::get_sumpart(wdf, doclen, uniqterms);
    // Safety prior is in [0, 1]; lambda controls how aggressively unsafe
    // documents are demoted in the ranking.
    Xapian::docid did = get_docid_for_current_post();
    double safety = safety_.lookup(did);     // 1.0 = safe, 0.0 = unsafe
    return bm25 * (1.0 - lambda_ * (1.0 - safety));
  }

 private:
  const SafetyScoreTable& safety_;
  double lambda_;
};

8.4 Tuning & Evaluation

We swept k1 ∈ {0.9, 1.2, 1.5, 1.8}, b ∈ {0.25, 0.5, 0.75}, and lambda ∈ {0.0, 0.25, 0.5, 0.75} against a held-out judged set, measuring nDCG@10 for relevance and a custom "advertiser-safe-precision@10" metric: the fraction of top-10 results that human reviewers labeled as safe for a default brand-suitability profile. Final production values: title k1=1.2 b=0.0, description k1=1.5 b=0.75, captions k1=1.4 b=0.5, lambda=0.5.

9. Brand-Safety Classification

9.1 Taxonomy

Two parallel taxonomies are scored on every video:

9.2 Hybrid Classifier

The classifier is a hybrid: hand-curated keyword/entity rules (domain-expert-authored) combined with a trained gradient-boosted model over the following features:

9.3 Inference Service — C++ / Scala Boundary

The hot inference path lives in a C++ service that loads the Xapian shard and the gradient-boosted model (compiled to a flat binary via treelite). A Scala front-end exposes the REST API and handles request validation, auth, and result hydration from Cassandra. The boundary is a thin gRPC interface.

// classifier_service.cpp — gRPC handler.
grpc::Status ClassifierServiceImpl::Classify(
    grpc::ServerContext* ctx,
    const ClassifyRequest* req,
    ClassifyResponse* resp) {
  const auto& vid = req->video_id();
  auto features = feature_cache_.get_or_load(vid);
  if (!features) {
    return grpc::Status(grpc::StatusCode::NOT_FOUND, "unknown video_id");
  }

  // 1. Score against each safety policy bundle via Xapian queries.
  for (const auto& [policy, bundle] : safety_bundles_) {
    Xapian::Query q = bundle.build_query();
    Xapian::Enquire enq(*shard_db_);
    enq.set_query(q);
    enq.set_weighting_scheme(Xapian::BM25Weight());
    auto mset = enq.get_mset(0, 1, nullptr,
        single_doc_filter(features->xapian_docid));
    double score = mset.empty() ? 0.0 : mset[0].get_weight();
    features->safety_features[policy] = normalize(score, bundle);
  }

  // 2. Concatenate features and run the GBDT.
  auto vec = features->to_dense_vector();
  auto preds = gbdt_.predict(vec);

  // 3. Apply policy thresholds to produce verdicts.
  populate_response(*features, preds, resp);
  return grpc::Status::OK;
}

10. Engagement, Geospatial & Copyright Signals

10.1 Engagement

Engagement metrics flow in through video_engagement_ts. We compute rolling 24h / 7d / 30d view-velocity, like/dislike ratios where available, comment volume per minute of video, and a comment-toxicity ratio (fraction of comments with toxicity probe > 0.7). High-velocity videos are re-classified preferentially because they are the ones most likely to be served against ads imminently.

10.2 Geospatial Attributes

Channel-declared country, video-level geo-tags (rare), and per-region view-share distributions feed a "regional appropriateness" feature: a video may be safe in one region and restricted in another. This matters operationally because campaigns are geo-targeted and an advertiser running in Germany may have stricter requirements around political content than the same advertiser running in Brazil.

10.3 Copyright Detection

A separate fingerprinting subsystem (Chromaprint-style audio fingerprints + perceptual video hashes for keyframes) flags videos containing matched copyrighted material. The flag is surfaced as copyright_flag = true in video_safety and is hard-blocked from inclusion lists for advertisers in regulated verticals (music labels, film studios, broadcasters).

11. REST API

11.1 Endpoints

11.2 Search Request / Response

# POST /v1/search
query: "electric vehicle road trip"
policy_id: "advertiser-acme-strict-v3"
contextual_segments:
  - automotive.electric_vehicles
  - travel.road_trips
filters:
  language: ["en"]
  region: ["US", "CA"]
  min_view_count: 10000
  published_after: "2025-01-01"
limit: 50
ranking:
  scheme: "bm25f_safety"
  safety_lambda: 0.6
# 200 OK
results:
  - video_id: "dQw4w9WgXcQ"
    channel_id: "UCabcdef"
    title: "Cross-country in a Model 3 — 4,200 miles"
    score: 18.42
    safety_verdict: "safe"
    safety_policy_flags: []
    contextual_scores:
      automotive.electric_vehicles: 0.93
      travel.road_trips: 0.88
  - ...
total_matched: 8421
took_ms: 47
shard_stats:
  shards_queried: 32
  shards_with_hits: 28

11.3 Akka HTTP Route — Scala

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import scala.concurrent.duration._

class SearchRoute(svc: SearchService)(implicit ec: ExecutionContext) {
  val route =
    pathPrefix("v1") {
      path("search") {
        post {
          entity(as[SearchRequest]) { req =>
            withRequestTimeout(2.seconds) {
              onSuccess(svc.search(req)) { resp =>
                complete(StatusCodes.OK -> resp)
              }
            }
          }
        }
      } ~
      path("classify") {
        post {
          entity(as[ClassifyBatchRequest]) { req =>
            require(req.videoIds.size <= 1000, "max 1000 ids per call")
            onSuccess(svc.classifyBatch(req)) { resp =>
              complete(StatusCodes.OK -> resp)
            }
          }
        }
      } ~
      path("videos" / Segment / "safety") { vid =>
        get {
          onSuccess(svc.safetyVerdict(VideoId(vid))) {
            case Some(v) => complete(v)
            case None    => complete(StatusCodes.NotFound)
          }
        }
      }
    }
}

12. Real-Time Decisioning & Pre-Bid Integration

Demand-side platforms call POST /v1/classify in the pre-bid window (typically <100ms budget) to decide whether to bid on an impression on a given YouTube video. Our SLO for this path is p99 < 80ms server-side, which the C++ classifier comfortably meets when the feature row is hot in Cassandra. Cold-feature path (video never seen before) falls back to a fast classifier path operating only on the snippet returned by the YouTube ad-server and a channel-level prior.

Backpressure: the front-end Akka HTTP service uses a bounded request queue with 503 Retry-After: 1 shedding when downstream Cassandra latency degrades. The DSP integration is configured to treat a 503 as "do not bid" — failing closed is the brand-safe default.

13. AWS Deployment Topology

14. Observability & Operations

15. Lessons Learned