344 lines
12 KiB
Python
Executable File
344 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Xbox Captures Downloader (Unofficial)
|
|
|
|
A Typer-based CLI to download **your** Xbox screenshots and game clips
|
|
using Xbox Live's *undocumented* MediaHub search endpoints.
|
|
|
|
⚠️ Disclaimer
|
|
- These endpoints are not officially documented by Microsoft and may change or stop working.
|
|
- Use only with your own account. Respect Microsoft/Xbox terms of service.
|
|
- You need a valid Xbox Live authorization token (XBL3.0). See --help for details.
|
|
|
|
Install deps:
|
|
pip install typer[all] requests tenacity tqdm
|
|
|
|
Examples:
|
|
# Using an XUID you already know
|
|
python xbox_captures_downloader.py pull --xuid 2533274791234567 --token "XBL3.0 x=uhs;eyJhbGciOi..."
|
|
|
|
# Resolve XUID from a gamertag first
|
|
python xbox_captures_downloader.py pull --gamertag "YourGamertag" --token "XBL3.0 x=uhs;eyJhbGciOi..."
|
|
|
|
Token notes:
|
|
- The CLI expects the *full* header value including the prefix, e.g.
|
|
"XBL3.0 x=uhs;token"
|
|
- You can obtain this from an authenticated Xbox web session's requests
|
|
or via a proper OAuth + XSTS flow.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
|
|
|
import requests
|
|
import typer
|
|
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
|
|
from tqdm import tqdm
|
|
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
|
|
app = typer.Typer(add_completion=False, no_args_is_help=True)
|
|
|
|
PROFILE_BASE = "https://profile.xboxlive.com"
|
|
MEDIAHUB_BASE = "https://mediahub.xboxlive.com"
|
|
|
|
# --- Helpers -----------------------------------------------------------------
|
|
|
|
def _auth_header(token: str) -> Dict[str,str]:
|
|
token = token.strip()
|
|
if not token:
|
|
raise typer.Exit(code=2)
|
|
# Basic validation: must contain XBL3.0 or at least 'x=' and a semicolon
|
|
if not (token.startswith("XBL3.0 ") or re.search(r"x=.*;", token)):
|
|
typer.echo("Input token does not contain a valid-looking Xbox token.", err=True)
|
|
raise typer.Exit(code=2)
|
|
return {"Authorization": token}
|
|
|
|
|
|
def _ensure_dir(p: Path) -> None:
|
|
p.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def _slugify(text: str, maxlen: int = 80) -> str:
|
|
text = re.sub(r"[\\/:*?\"<>|]", " ", text)
|
|
text = re.sub(r"\s+", " ", text).strip()
|
|
if len(text) > maxlen:
|
|
text = text[: maxlen - 1].rstrip() + "…"
|
|
return text or "untitled"
|
|
|
|
|
|
@retry(wait=wait_exponential(multiplier=0.8, min=1, max=10),
|
|
stop=stop_after_attempt(5),
|
|
retry=retry_if_exception_type(requests.RequestException))
|
|
def _req(method: str, url: str, *, headers: Dict[str, str], json_body: Optional[dict] = None, params: Optional[dict] = None) -> requests.Response:
|
|
resp = requests.request(method, url, headers=headers, json=json_body, params=params, timeout=30)
|
|
resp.raise_for_status()
|
|
return resp
|
|
|
|
|
|
def _pick_urls_from_obj(obj: Any) -> List[str]:
|
|
"""Recursively find plausible media URLs (https) in the object by common keys.
|
|
This keeps the code resilient to minor response-shape changes.
|
|
"""
|
|
urls: List[str] = []
|
|
if isinstance(obj, dict):
|
|
for k, v in obj.items():
|
|
lk = k.lower()
|
|
if lk in {"uri", "url", "downloaduri", "downloadurl"} and isinstance(v, str) and v.startswith("http"):
|
|
urls.append(v)
|
|
else:
|
|
urls.extend(_pick_urls_from_obj(v))
|
|
elif isinstance(obj, list):
|
|
for it in obj:
|
|
urls.extend(_pick_urls_from_obj(it))
|
|
return urls
|
|
|
|
|
|
def _guess_ext_from_url(u: str, fallback: str = "bin") -> str:
|
|
path = u.split("?")[0]
|
|
ext = Path(path).suffix.lower().lstrip(".")
|
|
if ext:
|
|
return ext
|
|
# Rough content-type sniff as a last resort
|
|
if any(x in u.lower() for x in (".jpg", ".jpeg")):
|
|
return "jpg"
|
|
if ".png" in u.lower():
|
|
return "png"
|
|
if ".mp4" in u.lower():
|
|
return "mp4"
|
|
return fallback
|
|
|
|
|
|
def _save_jsonl(path: Path, rows: Iterable[dict]) -> None:
|
|
with path.open("a", encoding="utf-8") as f:
|
|
for r in rows:
|
|
f.write(json.dumps(r, ensure_ascii=False) + "\n")
|
|
|
|
|
|
# --- API calls ----------------------------------------------------------------
|
|
|
|
def resolve_xuid_from_gamertag(gamertag: str, token: str) -> str:
|
|
headers = {
|
|
**_auth_header(token),
|
|
"x-xbl-contract-version": "2",
|
|
"Accept": "application/json",
|
|
}
|
|
url = f"{PROFILE_BASE}/users/gt({requests.utils.quote(gamertag)})/profile/settings"
|
|
resp = _req("GET", url, headers=headers, params={"settings": "Gamertag"})
|
|
data = resp.json()
|
|
try:
|
|
xuid = data["profileUsers"][0]["id"]
|
|
return xuid
|
|
except Exception as e:
|
|
raise RuntimeError(f"Could not resolve XUID for gamertag '{gamertag}'. Raw: {data}") from e
|
|
|
|
|
|
def _search_media(media_kind: str, xuid: str, token: str, max_per_page: int = 100):
|
|
assert media_kind in {"screenshots", "gameclips"}
|
|
url = f"{MEDIAHUB_BASE}/{media_kind}/search"
|
|
headers = {
|
|
**_auth_header(token),
|
|
"x-xbl-contract-version": "1",
|
|
"Accept": "application/json",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
continuation: Optional[str] = None
|
|
while True:
|
|
body = {
|
|
"max": max_per_page,
|
|
"skip": 0,
|
|
"query": f"OwnerXuid eq {xuid}"}
|
|
if continuation:
|
|
body["continuationToken"] = continuation
|
|
|
|
resp = _req("POST", url, headers=headers, json_body=body)
|
|
payload = resp.json()
|
|
|
|
results = payload.get("results") or payload.get("values") or []
|
|
for item in results:
|
|
yield item
|
|
|
|
paging = payload.get("pagingInfo") or {}
|
|
continuation = paging.get("continuationToken")
|
|
if not continuation:
|
|
break
|
|
|
|
|
|
# --- Download logic ------------------------------------------------------------
|
|
|
|
def _compose_filename(item: dict, url: str, base: Path, kind: str) -> Path:
|
|
# Try to extract a few friendly bits
|
|
ts_raw = item.get("dateRecorded") or item.get('uploadDate') or item.get("dateUploaded") or item.get("created") or item.get("lastModified")
|
|
try:
|
|
# incoming sample formats can vary; handle common ISO-like strings
|
|
if ts_raw:
|
|
dt = datetime.fromisoformat(ts_raw.replace("Z", "+00:00"))
|
|
date_part = dt.strftime("%Y-%m-%d_%H%M%S")
|
|
else:
|
|
date_part = "unknown-date"
|
|
except Exception:
|
|
date_part = "unknown-date"
|
|
|
|
title = item.get("titleId") or item.get("titleName") or item.get("gameTitle") or kind
|
|
title = _slugify(str(title))
|
|
|
|
short_id = str(item.get("id") or item.get("contentId") or item.get("deviceId") or "id")
|
|
|
|
ext = _guess_ext_from_url(url, fallback=("mp4" if kind == "clips" else "jpg"))
|
|
|
|
fname = f"{date_part}__{title}__{short_id}.{ext}"
|
|
return base / kind / fname
|
|
|
|
|
|
def _download_one(session: requests.Session, url: str, out_path: Path, overwrite: bool = False) -> Tuple[Path, int]:
|
|
_ensure_dir(out_path.parent)
|
|
if out_path.exists() and not overwrite:
|
|
return out_path, 0
|
|
|
|
with session.get(url, stream=True, timeout=60) as r:
|
|
r.raise_for_status()
|
|
tmp = out_path.with_suffix(out_path.suffix + ".part")
|
|
with tmp.open("wb") as f:
|
|
for chunk in r.iter_content(chunk_size=1 << 20): # 1 MiB
|
|
if chunk:
|
|
f.write(chunk)
|
|
tmp.replace(out_path)
|
|
size = out_path.stat().st_size
|
|
return out_path, size
|
|
|
|
|
|
def _extract_best_url(item: dict) -> Optional[str]:
|
|
# Known shapes often include arrays like contentLocators, gameClipUris, thumbnailUris etc.
|
|
# We'll pick the first HTTP URL that doesn't look like a thumbnail if possible.
|
|
urls = _pick_urls_from_obj(item)
|
|
if not urls:
|
|
return None
|
|
# Prefer mp4-like for clips, and non-thumbnail for images
|
|
urls_sorted = sorted(
|
|
urls,
|
|
key=lambda u: (
|
|
("thumbnail" in u.lower()) or ("thumb" in u.lower()),
|
|
not (u.lower().endswith(".mp4") or ".mp4?" in u.lower()),
|
|
),
|
|
)
|
|
return urls_sorted[0]
|
|
|
|
|
|
# --- CLI commands --------------------------------------------------------------
|
|
|
|
@app.command()
|
|
def pull(
|
|
token: str = typer.Option(None, help="Xbox Live auth header value, '-' to read from STDIN, or 'clip' to read from clipboard. If omitted, reads from env XBOX_TOKEN."),
|
|
gamertag: Optional[str] = typer.Option(None, help="Gamertag to resolve to XUID. If omitted, reads from env XBOX_GAMERTAG."),
|
|
xuid: Optional[str] = typer.Option(None, help="XUID (if you already know it). If omitted, reads from env XBOX_XUID."),
|
|
outdir: Path = typer.Option(Path("xbox_captures"), help="Output directory."),
|
|
media: str = typer.Option("both", help="What to download: screenshots | clips | both", case_sensitive=False),
|
|
max_per_page: int = typer.Option(100, min=1, max=100, help="Search page size."),
|
|
concurrency: int = typer.Option(6, min=1, max=16, help="Parallel downloads."),
|
|
overwrite: bool = typer.Option(False, help="Overwrite existing files."),
|
|
save_metadata: bool = typer.Option(True, help="Write a metadata.jsonl next to files."),
|
|
):
|
|
"""Download screenshots and/or game clips for a user.
|
|
|
|
You must provide a valid Xbox Live auth token (XBL3.0). Provide either --xuid or --gamertag.
|
|
"""
|
|
# Fallback to environment variables
|
|
if token is None:
|
|
token = os.environ.get("XBOX_TOKEN") or ""
|
|
if not token:
|
|
typer.echo("Token not provided and XBOX_TOKEN not set.", err=True)
|
|
raise typer.Exit(code=2)
|
|
elif token == "-":
|
|
# read from stdin
|
|
typer.echo("Reading token from STDIN…", err=True)
|
|
token = sys.stdin.readline().strip()
|
|
if not (gamertag or xuid):
|
|
gamertag = gamertag or os.environ.get("XBOX_GAMERTAG")
|
|
xuid = xuid or os.environ.get("XBOX_XUID")
|
|
if not (gamertag or xuid):
|
|
typer.echo("You must pass either --gamertag or --xuid or set XBOX_GAMERTAG/XBOX_XUID env vars.", err=True)
|
|
raise typer.Exit(code=2)
|
|
|
|
# Resolve XUID if needed
|
|
if not xuid:
|
|
typer.echo(f"Resolving XUID for gamertag '{gamertag}'…")
|
|
xuid = resolve_xuid_from_gamertag(gamertag, token)
|
|
typer.echo(f"XUID: {xuid}")
|
|
|
|
# Decide which media kinds to pull
|
|
kinds: List[str]
|
|
media_lc = media.lower()
|
|
if media_lc == "both":
|
|
kinds = ["screenshots", "gameclips"]
|
|
elif media_lc in ("screenshots", "clips", "gameclips"):
|
|
kinds = ["gameclips" if media_lc in ("clips", "gameclips") else "screenshots"]
|
|
else:
|
|
typer.echo("Invalid --media. Use: screenshots | clips | both", err=True)
|
|
raise typer.Exit(code=2)
|
|
|
|
_ensure_dir(outdir)
|
|
|
|
session = requests.Session()
|
|
|
|
# Aggregate items and prepare downloads
|
|
all_items: List[Tuple[str, Path, dict, str]] = [] # (url, path, metadata, kind)
|
|
for kind in kinds:
|
|
typer.echo(f"Searching {kind}…")
|
|
count = 0
|
|
for item in _search_media(kind, xuid, token, max_per_page=max_per_page):
|
|
url = _extract_best_url(item)
|
|
if not url:
|
|
continue
|
|
target = _compose_filename(item, url, outdir, kind=("clips" if kind == "gameclips" else "screens"))
|
|
all_items.append((url, target, item, kind))
|
|
count += 1
|
|
typer.echo(f"Found {count} {kind} (with downloadable URLs)")
|
|
|
|
if not all_items:
|
|
typer.echo("No downloadable items found.")
|
|
raise typer.Exit(code=0)
|
|
|
|
# Download concurrently
|
|
pbar = tqdm(total=len(all_items), desc="Downloading", unit="file")
|
|
total_bytes = 0
|
|
futures = []
|
|
with ThreadPoolExecutor(max_workers=concurrency) as ex:
|
|
for url, path, meta, kind in all_items:
|
|
futures.append(ex.submit(_download_one, session, url, path, overwrite))
|
|
for fut in as_completed(futures):
|
|
try:
|
|
path, sz = fut.result()
|
|
total_bytes += sz
|
|
except Exception as e:
|
|
typer.echo(f"Download failed: {e}", err=True)
|
|
finally:
|
|
pbar.update(1)
|
|
pbar.close()
|
|
|
|
# Save metadata
|
|
if save_metadata:
|
|
meta_path = outdir / "metadata.jsonl"
|
|
_save_jsonl(meta_path, (m for _, _, m, _ in all_items))
|
|
typer.echo(f"Metadata appended to: {meta_path}")
|
|
|
|
mb = total_bytes / (1024 * 1024)
|
|
typer.echo(f"Done. Downloaded ~{mb:.1f} MiB to {outdir}")
|
|
|
|
|
|
@app.callback(invoke_without_command=True)
|
|
def main(ctx: typer.Context):
|
|
if ctx.invoked_subcommand is None:
|
|
typer.echo(ctx.get_help())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app() |