Add topic index rebuild script for video transcript archive
This commit is contained in:
139
tools/rebuild_topic_indices.sh
Executable file
139
tools/rebuild_topic_indices.sh
Executable file
@@ -0,0 +1,139 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
ROOT="${1:-.}"
|
||||
cd "$ROOT"
|
||||
|
||||
if [[ ! -d sources/video_transcripts ]]; then
|
||||
echo "sources/video_transcripts not found" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [[ ! -d topics ]]; then
|
||||
echo "topics directory not found" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
python3 <<'PY'
|
||||
from pathlib import Path
|
||||
import re
|
||||
|
||||
root = Path(".")
|
||||
topics_root = root / "topics"
|
||||
sources_root = root / "sources" / "video_transcripts"
|
||||
|
||||
def read_simple_yaml_list_block(text: str, key: str):
|
||||
lines = text.splitlines()
|
||||
out = []
|
||||
in_block = False
|
||||
base_indent = None
|
||||
key_prefix = f"{key}:"
|
||||
for line in lines:
|
||||
if not in_block:
|
||||
if line.strip() == key_prefix:
|
||||
in_block = True
|
||||
base_indent = None
|
||||
continue
|
||||
if not line.strip():
|
||||
continue
|
||||
indent = len(line) - len(line.lstrip(" "))
|
||||
if base_indent is None:
|
||||
base_indent = indent
|
||||
if indent < base_indent:
|
||||
break
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("- "):
|
||||
out.append(stripped[2:].strip())
|
||||
else:
|
||||
break
|
||||
return out
|
||||
|
||||
def read_scalar(text: str, key: str):
|
||||
m = re.search(rf"(?m)^{re.escape(key)}:\s*(.+?)\s*$", text)
|
||||
if not m:
|
||||
return None
|
||||
return m.group(1).strip().strip('"').strip("'")
|
||||
|
||||
def read_source_yaml(path: Path):
|
||||
text = path.read_text(encoding="utf-8")
|
||||
def list_for(block, child):
|
||||
m = re.search(rf"(?ms)^{re.escape(block)}:\s*\n(.*?)(?=^\S|\Z)", text)
|
||||
if not m:
|
||||
return []
|
||||
return read_simple_yaml_list_block(m.group(1), child)
|
||||
|
||||
primary = list_for("topic_bins", "primary")
|
||||
secondary = list_for("topic_bins", "secondary")
|
||||
|
||||
speaker = read_simple_yaml_list_block(text, "speaker")
|
||||
speaker = ", ".join(speaker) if speaker else "UNKNOWN"
|
||||
|
||||
channel = read_scalar(text, "channel") or "UNKNOWN"
|
||||
video_id = read_scalar(text, "video_id") or "UNKNOWN"
|
||||
title = read_scalar(text, "title") or path.parent.name
|
||||
relevance = read_scalar(text, "enoch_relevance") or "unclassified"
|
||||
|
||||
return {
|
||||
"speaker": speaker,
|
||||
"channel": channel,
|
||||
"video_id": video_id,
|
||||
"title": title,
|
||||
"relevance": relevance,
|
||||
"primary": primary,
|
||||
"secondary": secondary,
|
||||
}
|
||||
|
||||
topic_rows = {}
|
||||
|
||||
for topics_yaml in sources_root.rglob("topics.yaml"):
|
||||
video_dir = topics_yaml.parent
|
||||
source_yaml = video_dir / "source.yaml"
|
||||
if not source_yaml.exists():
|
||||
print(f"Skipping {video_dir}: no source.yaml")
|
||||
continue
|
||||
|
||||
source = read_source_yaml(source_yaml)
|
||||
topic_text = topics_yaml.read_text(encoding="utf-8")
|
||||
primary = read_simple_yaml_list_block(topic_text, "primary")
|
||||
secondary = read_simple_yaml_list_block(topic_text, "secondary")
|
||||
relevance = read_scalar(topic_text, "enoch_relevance") or source["relevance"]
|
||||
|
||||
rel_path = video_dir.as_posix()
|
||||
|
||||
for topic in primary + secondary:
|
||||
topic_rows.setdefault(topic, []).append({
|
||||
"speaker": source["speaker"],
|
||||
"channel": source["channel"],
|
||||
"video_id": source["video_id"],
|
||||
"title": source["title"],
|
||||
"relevance": relevance,
|
||||
"path": rel_path,
|
||||
})
|
||||
|
||||
for topic_dir in topics_root.iterdir():
|
||||
if not topic_dir.is_dir():
|
||||
continue
|
||||
topic = topic_dir.name
|
||||
index_path = topic_dir / "INDEX.md"
|
||||
rows = topic_rows.get(topic, [])
|
||||
|
||||
lines = [
|
||||
"# Topic Index",
|
||||
"",
|
||||
"| Speaker | Source | Video ID | Title | Relevance | Path |",
|
||||
"|---|---|---|---|---|---|",
|
||||
]
|
||||
|
||||
def sort_key(r):
|
||||
return (r["speaker"].lower(), r["title"].lower(), r["video_id"].lower())
|
||||
|
||||
for r in sorted(rows, key=sort_key):
|
||||
title = r["title"].replace("|", "\\|")
|
||||
lines.append(
|
||||
f'| {r["speaker"]} | {r["channel"]} | `{r["video_id"]}` | {title} | {r["relevance"]} | `{r["path"]}` |'
|
||||
)
|
||||
|
||||
index_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
||||
|
||||
print("Rebuilt topic indices.")
|
||||
PY
|
||||
Reference in New Issue
Block a user