Files
mqsrv/origet/client.py

212 lines
7.2 KiB
Python

"""
码枪堂原版 (maqt.top) API 客户端
用法:
from client import MaqtClient
c = MaqtClient()
c.login("username", "password")
schemes = c.get_schemes(page=1, sort="hot")
print(schemes)
"""
import json
import requests
from typing import Optional, Any
from api_spec import BASE_URL, ENDPOINTS
# ---- Unicode-safe Base64 ----
def b64_encode_unicode(s: str) -> str:
import base64
encoded = base64.b64encode(s.encode("utf-8")).decode("ascii")
return encoded
# ---- Client ----
class MaqtClient:
def __init__(self, base_url: str = BASE_URL, token: Optional[str] = None):
self.base_url = base_url
self.token = token
self.session = requests.Session()
self.session.headers["User-Agent"] = "MaqiangTang/7.0.4"
# ---- low-level helpers ----
def _auth_headers(self) -> dict:
if self.token:
return {"Authorization": f"Bearer {self.token}"}
return {}
def _get(self, path: str, params: Optional[dict] = None, auth: bool = False) -> dict:
h = self._auth_headers() if auth else {}
r = self.session.get(f"{self.base_url}{path}", params=params, headers=h, timeout=15)
r.raise_for_status()
return r.json()
def _post(self, path: str, body: dict, extra_headers: Optional[dict] = None, auth: bool = False) -> dict:
h = {"Content-Type": "application/json"}
if auth:
h.update(self._auth_headers())
if extra_headers:
h.update(extra_headers)
r = self.session.post(f"{self.base_url}{path}", json=body, headers=h, timeout=15)
r.raise_for_status()
return r.json()
def _delete(self, path: str, auth: bool = True) -> dict:
h = self._auth_headers() if auth else {}
r = self.session.delete(f"{self.base_url}{path}", headers=h, timeout=15)
r.raise_for_status()
return r.json()
# ---- auth ----
def login(self, username: str, password: str) -> dict:
resp = self._post("/api/login", {"username": username, "password": password})
if resp.get("success") and resp.get("token"):
self.token = resp["token"]
return resp
def register(self, username: str, password: str, email: str) -> dict:
return self._post("/api/register", {"username": username, "password": password, "email": email})
def session_status(self) -> dict:
return self._get("/api/session-status", auth=True)
# ---- VIP ----
def vip_status(self) -> dict:
return self._get("/api/vip-status", auth=True)
def activate_vip(self, card_key: str) -> dict:
return self._post("/api/activate-vip", {"cardKey": card_key}, auth=True)
# ---- weapon data ----
def get_weapon_categories(self) -> dict:
return self._get("/api/weapon-categories")
def get_weapons(self, category: Optional[str] = None) -> dict:
"""category 传中文名如'突击步枪',不传返回全部"""
params = {"category": category} if category else None
return self._get("/api/weapons", params=params)
# ---- schemes ----
def get_schemes(
self,
page: int = 1,
sort: str = "hot",
limit: int = 12,
mode: str = "schemes",
weapon_category: Optional[str] = None,
weapon_name: Optional[str] = None,
min_price: Optional[int] = None,
max_price: Optional[int] = None,
search: Optional[str] = None,
) -> dict:
params = {"sort": sort, "page": str(page), "limit": str(limit)}
if weapon_category:
params["weaponCategory"] = weapon_category
if weapon_name:
params["weaponName"] = weapon_name
if min_price is not None:
params["minPrice"] = str(min_price)
if max_price is not None:
params["maxPrice"] = str(max_price)
if search:
params["search"] = search
return self._get(f"/api/{mode}", params=params)
def get_favorites(self, page: int = 1, sort: str = "hot") -> dict:
return self._get("/api/favorites", params={"sort": sort, "page": str(page), "limit": "12"}, auth=True)
def record_use(self, scheme_id: str, mode: str = "schemes") -> dict:
return self._post(f"/api/{mode}/{scheme_id}/use", {})
def report_scheme(self, scheme_id: str, reason: str, description: str = "", mode: str = "schemes") -> dict:
return self._post(
f"/api/{mode}/{scheme_id}/report",
{"reason": reason, "description": description},
auth=True,
)
def share_scheme(
self,
description: str,
category: str,
weapon_name: str,
scheme: str,
price: Optional[int] = None,
user_id: int = 0,
username: str = "",
mode: str = "schemes",
) -> dict:
user_info = json.dumps({"id": user_id, "username": username}, ensure_ascii=False)
body = {
"description": description,
"category": category,
"weaponName": weapon_name,
"scheme": scheme,
"tags": [],
}
if price is not None:
body["price"] = price
return self._post(
f"/api/{mode}",
body,
extra_headers={"x-user-info": b64_encode_unicode(user_info)},
)
# ---- favorites ----
def add_favorite(self, scheme_id: str, source: str = "烽火地带") -> dict:
return self._post("/api/favorites", {"schemeId": scheme_id, "source": source}, auth=True)
def remove_favorite(self, scheme_id: str, source: str = "烽火地带") -> dict:
import urllib.parse
qs = urllib.parse.urlencode({"source": source})
return self._delete(f"/api/favorites/{scheme_id}?{qs}")
def check_favorite(self, scheme_id: str, source: str = "烽火地带") -> dict:
import urllib.parse
qs = urllib.parse.urlencode({"schemeId": scheme_id, "source": source})
return self._get(f"/api/favorites/check?{qs}", auth=True)
def favorites_count(self) -> dict:
return self._get("/api/favorites/count", auth=True)
# ---- adverts ----
def get_adverts(self) -> dict:
return self._get("/api/adverts/list")
def click_advert(self, advert_id: str) -> dict:
return self._post(f"/api/adverts/{advert_id}/click", {})
# ---- user ----
def user_stats(self, user_id: int) -> dict:
return self._get(f"/api/user/stats/{user_id}")
def user_schemes(self, user_id: int) -> dict:
return self._get(f"/api/user/schemes/{user_id}")
def user_favorited_count(self, user_id: int) -> dict:
return self._get(f"/api/user/favorited-count/{user_id}", auth=True)
# ---- misc ----
def get_filter_share_categories(self) -> dict:
return self._get("/api/filter-share/categories")
def get_filter_shares(self) -> dict:
return self._get("/api/filter-shares")
def copy_filter_share(self, share_id: str, slot: str = "primary") -> dict:
return self._post(f"/api/filter-shares/{share_id}/copy", {"slot": slot})
def activity_ping(self) -> dict:
return self._get("/api/activity/ping", auth=True)
def get_game_map_password(self) -> dict:
return self._get("/api/game/map-password")