diff --git a/datamaxi/aio/__init__.py b/datamaxi/aio/__init__.py index 1e6f0fa..b74738c 100644 --- a/datamaxi/aio/__init__.py +++ b/datamaxi/aio/__init__.py @@ -1,4 +1,4 @@ -"""Async client (pilot) — ``httpx``-based, mirrors a slice of the sync surface. +"""Async client — ``httpx``-based mirror of the sync DataMaxi+ client. Requires the ``async`` extra:: @@ -14,236 +14,41 @@ ticker = await client.cex.ticker.get(exchange="binance", market="spot", symbol="BTC-USDT") -This is a deliberately small pilot (candle + ticker). It reuses the sync -client's endpoint resolution and error handling (``datamaxi._dispatch``) and -the shared DataFrame / ResponseMeta helpers, so the two clients can't drift on -request building or error semantics. +Mirrors the full sync surface (``cex.*``, ``funding_rate``, ``forex``, +``premium``, ``liquidation``, ``open_interest``, ``margin_borrow``, +``index_price``, plus standalone ``AsyncTelegram`` / ``AsyncNaver``). Reuses +the sync client's endpoint resolution and error handling (``datamaxi._dispatch``) +and the shared DataFrame / ResponseMeta helpers, so the two clients can't drift +on request building or error semantics. """ -from __future__ import annotations - -import asyncio -import os -from typing import Any, Union, TYPE_CHECKING - -from datamaxi.__version__ import __version__ -from datamaxi.api import ResponseMeta -from datamaxi._dispatch import resolve_endpoint, raise_for_error, extract_limit_usage -from datamaxi.lib.constants import ( - BASE_URL, - SPOT, - FUTURES, - USD, - INTERVAL_1D, - Market, - Interval, +from typing import Any + +from datamaxi.lib.constants import BASE_URL +from datamaxi.aio._core import AsyncAPI, AsyncResource +from datamaxi.aio.cex import ( + AsyncCex, + AsyncCexCandle, + AsyncCexTicker, + AsyncCexFee, + AsyncCexWalletStatus, + AsyncCexAnnouncement, + AsyncCexToken, + AsyncCexSymbol, ) -from datamaxi.lib.utils import check_required_parameters -from datamaxi.resources.responses import CandleResponse, TickerResponse - -if TYPE_CHECKING: - import pandas as pd - - -def _import_httpx(): - try: - import httpx - except ImportError as exc: # pragma: no cover - exercised via extra - raise ImportError( - "The async client requires httpx. Install it with: " - "pip install 'datamaxi[async]'" - ) from exc - return httpx - - -class AsyncAPI: - """Async transport built on ``httpx.AsyncClient``. - - Mirrors the sync ``API``: shared endpoint resolution, bounded retry of - transient gateway 5xx, the same ``ClientError`` / ``ServerError`` contract, - and ``last_response`` metadata. - """ - - def __init__( - self, - api_key=None, - base_url=None, - timeout=10, - max_retries=3, - retry_backoff=0.5, - retry_statuses=(502, 503, 504), - transport=None, - ): - httpx = _import_httpx() - self.api_key = api_key or os.environ.get("DATAMAXI_API_KEY") - self.base_url = base_url - self.timeout = timeout - self.max_retries = max_retries - self.retry_backoff = retry_backoff - self.retry_statuses = tuple(retry_statuses) - self.last_response = None - self._client = httpx.AsyncClient( - base_url=base_url or "", - timeout=timeout, - transport=transport, - headers={ - "Content-Type": "application/json;charset=utf-8", - "User-Agent": "datamaxi/" + __version__, - "X-DTMX-APIKEY": str(self.api_key), - }, - ) - - async def request_endpoint(self, op_id, **params): - method, url_path, query_params = resolve_endpoint(op_id, **params) - return await self.send_request(method, url_path, payload=query_params) - - async def send_request(self, method, url_path, payload=None): - # str()-encode scalars so bools match the sync client's urlencode - # output (e.g. include_source -> "True", not httpx's "true"). - params = {k: str(v) for k, v in (payload or {}).items() if v is not None} - for attempt in range(self.max_retries + 1): - response = await self._client.request(method, url_path, params=params) - if ( - response.status_code in self.retry_statuses - and attempt < self.max_retries - ): - await asyncio.sleep(self.retry_backoff * (attempt + 1)) - continue - break - - raise_for_error(response.status_code, response.text, response.headers) - - try: - data = response.json() - except ValueError: - data = response.text - - self.last_response = ResponseMeta( - status_code=response.status_code, - headers=response.headers, - limit_usage=extract_limit_usage(response.headers), - data=data, - ) - return data - - async def aclose(self): - await self._client.aclose() - - async def __aenter__(self): - return self - - async def __aexit__(self, *exc): - await self.aclose() - - -class AsyncResource: - """Base for async resources — composes a shared ``AsyncAPI``.""" - - def __init__(self, api: "AsyncAPI"): - self._api = api - - async def request_endpoint(self, op_id, **params): - return await self._api.request_endpoint(op_id, **params) - - @property - def last_response(self): - return self._api.last_response - - -class AsyncCexCandle(AsyncResource): - async def __call__( - self, - exchange: str, - market: Market, - symbol: str, - currency: str = USD, - interval: Interval = INTERVAL_1D, - from_unix: str = None, - to_unix: str = None, - pandas: bool = True, - ) -> Union[pd.DataFrame, CandleResponse]: - """Fetch candle data (async). See ``datamaxi.Datamaxi.cex.candle``.""" - check_required_parameters( - [ - [exchange, "exchange"], - [symbol, "symbol"], - [interval, "interval"], - [market, "market"], - [currency, "currency"], - ] - ) - if market not in [SPOT, FUTURES]: - raise ValueError("market must be either spot or futures") - - res = await self.request_endpoint( - "cex_candle", - exchange=exchange, - market=market, - symbol=symbol, - interval=interval, - currency=currency, - **{"from": from_unix, "to": to_unix}, - ) - if res["data"] is None or len(res["data"]) == 0: - raise ValueError("no data found") - - if pandas: - from datamaxi.resources.utils import convert_data_to_data_frame - - return convert_data_to_data_frame(res["data"]) - return res - - -class AsyncCexTicker(AsyncResource): - async def get( - self, - exchange: str, - symbol: str, - market: Market, - currency: str = None, - conversion_base: str = None, - include_source: bool = False, - pandas: bool = True, - ) -> Union[pd.DataFrame, TickerResponse]: - """Fetch ticker data (async). See ``datamaxi.Datamaxi.cex.ticker``.""" - check_required_parameters( - [ - [exchange, "exchange"], - [symbol, "symbol"], - [market, "market"], - ] - ) - if market not in [SPOT, FUTURES]: - raise ValueError("market must be either spot or futures") - - res = await self.request_endpoint( - "ticker", - exchange=exchange, - symbol=symbol, - market=market, - currency=currency, - conversion_base=conversion_base, - include_source=include_source, - ) - - if pandas: - import pandas as pd - - df = pd.DataFrame([res["data"]]) - df = df.set_index("d") - return df - return res - - -class AsyncCex(AsyncResource): - def __init__(self, api: "AsyncAPI"): - super().__init__(api) - self.candle = AsyncCexCandle(api) - self.ticker = AsyncCexTicker(api) +from datamaxi.aio.funding_rate import AsyncFundingRate +from datamaxi.aio.forex import AsyncForex +from datamaxi.aio.premium import AsyncPremium +from datamaxi.aio.liquidation import AsyncLiquidation +from datamaxi.aio.open_interest import AsyncOpenInterest +from datamaxi.aio.margin_borrow import AsyncMarginBorrow +from datamaxi.aio.index_price import AsyncIndexPrice +from datamaxi.aio.telegram import AsyncTelegram +from datamaxi.aio.naver import AsyncNaver class AsyncDatamaxi: - """Async entrypoint (pilot). Exposes ``cex.candle`` and ``cex.ticker``. + """Async entrypoint — full mirror of the sync :class:`datamaxi.Datamaxi`. Use as an async context manager so the underlying ``httpx`` client is closed, or call :meth:`aclose` explicitly. @@ -252,8 +57,17 @@ class AsyncDatamaxi: def __init__(self, api_key=None, **kwargs: Any): if "base_url" not in kwargs: kwargs["base_url"] = BASE_URL - self._api = AsyncAPI(api_key, **kwargs) - self.cex = AsyncCex(self._api) + api = AsyncAPI(api_key, **kwargs) + self._api = api + + self.cex = AsyncCex(api) + self.funding_rate = AsyncFundingRate(api) + self.forex = AsyncForex(api) + self.premium = AsyncPremium(api) + self.liquidation = AsyncLiquidation(api) + self.open_interest = AsyncOpenInterest(api) + self.margin_borrow = AsyncMarginBorrow(api) + self.index_price = AsyncIndexPrice(api) async def aclose(self): await self._api.aclose() @@ -272,9 +86,23 @@ def __repr__(self): __all__ = [ "AsyncDatamaxi", + "AsyncTelegram", + "AsyncNaver", "AsyncAPI", "AsyncResource", "AsyncCex", "AsyncCexCandle", "AsyncCexTicker", + "AsyncCexFee", + "AsyncCexWalletStatus", + "AsyncCexAnnouncement", + "AsyncCexToken", + "AsyncCexSymbol", + "AsyncFundingRate", + "AsyncForex", + "AsyncPremium", + "AsyncLiquidation", + "AsyncOpenInterest", + "AsyncMarginBorrow", + "AsyncIndexPrice", ] diff --git a/datamaxi/aio/_core.py b/datamaxi/aio/_core.py new file mode 100644 index 0000000..798690d --- /dev/null +++ b/datamaxi/aio/_core.py @@ -0,0 +1,118 @@ +"""Async transport core — ``httpx``-based, shared by all async resources. + +Reuses the sync client's endpoint resolution and error handling +(``datamaxi._dispatch``) plus ``ResponseMeta``, so the sync and async clients +can't drift on request building or error semantics. +""" + +import asyncio +import os + +from datamaxi.__version__ import __version__ +from datamaxi.api import ResponseMeta +from datamaxi._dispatch import resolve_endpoint, raise_for_error, extract_limit_usage + + +def _import_httpx(): + try: + import httpx + except ImportError as exc: # pragma: no cover - exercised via extra + raise ImportError( + "The async client requires httpx. Install it with: " + "pip install 'datamaxi[async]'" + ) from exc + return httpx + + +class AsyncAPI: + """Async transport built on ``httpx.AsyncClient``. + + Mirrors the sync ``API``: shared endpoint resolution, bounded retry of + transient gateway 5xx, the same ``ClientError`` / ``ServerError`` contract, + and ``last_response`` metadata. + """ + + def __init__( + self, + api_key=None, + base_url=None, + timeout=10, + max_retries=3, + retry_backoff=0.5, + retry_statuses=(502, 503, 504), + transport=None, + ): + httpx = _import_httpx() + self.api_key = api_key or os.environ.get("DATAMAXI_API_KEY") + self.base_url = base_url + self.timeout = timeout + self.max_retries = max_retries + self.retry_backoff = retry_backoff + self.retry_statuses = tuple(retry_statuses) + self.last_response = None + self._client = httpx.AsyncClient( + base_url=base_url or "", + timeout=timeout, + transport=transport, + headers={ + "Content-Type": "application/json;charset=utf-8", + "User-Agent": "datamaxi/" + __version__, + "X-DTMX-APIKEY": str(self.api_key), + }, + ) + + async def request_endpoint(self, op_id, **params): + method, url_path, query_params = resolve_endpoint(op_id, **params) + return await self.send_request(method, url_path, payload=query_params) + + async def send_request(self, method, url_path, payload=None): + # str()-encode scalars so bools match the sync client's urlencode + # output (e.g. include_source -> "True", not httpx's "true"). + params = {k: str(v) for k, v in (payload or {}).items() if v is not None} + for attempt in range(self.max_retries + 1): + response = await self._client.request(method, url_path, params=params) + if ( + response.status_code in self.retry_statuses + and attempt < self.max_retries + ): + await asyncio.sleep(self.retry_backoff * (attempt + 1)) + continue + break + + raise_for_error(response.status_code, response.text, response.headers) + + try: + data = response.json() + except ValueError: + data = response.text + + self.last_response = ResponseMeta( + status_code=response.status_code, + headers=response.headers, + limit_usage=extract_limit_usage(response.headers), + data=data, + ) + return data + + async def aclose(self): + await self._client.aclose() + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + await self.aclose() + + +class AsyncResource: + """Base for async resources — composes a shared ``AsyncAPI``.""" + + def __init__(self, api: "AsyncAPI"): + self._api = api + + async def request_endpoint(self, op_id, **params): + return await self._api.request_endpoint(op_id, **params) + + @property + def last_response(self): + return self._api.last_response diff --git a/datamaxi/aio/cex.py b/datamaxi/aio/cex.py new file mode 100644 index 0000000..e1e949d --- /dev/null +++ b/datamaxi/aio/cex.py @@ -0,0 +1,383 @@ +"""Async CEX resources — mirror of ``datamaxi.resources.cex`` + sub-resources.""" + +from __future__ import annotations + +from typing import Any, List, Dict, Union, Optional, Tuple, Callable, TYPE_CHECKING + +from datamaxi.aio._core import AsyncAPI, AsyncResource +from datamaxi.lib.utils import check_required_parameter, check_required_parameters +from datamaxi.lib.constants import ( + SPOT, + FUTURES, + USD, + INTERVAL_1D, + ASC, + DESC, + Market, + Interval, + SortOrder, +) +from datamaxi.resources.responses import ( + CandleResponse, + TickerResponse, + WalletStatusRow, + AnnouncementResponse, + TokenUpdateResponse, +) + +if TYPE_CHECKING: + import pandas as pd + + +class AsyncCexCandle(AsyncResource): + async def __call__( + self, + exchange: str, + market: Market, + symbol: str, + currency: str = USD, + interval: Interval = INTERVAL_1D, + from_unix: str = None, + to_unix: str = None, + pandas: bool = True, + ) -> Union[pd.DataFrame, CandleResponse]: + """Fetch candle data (async). See ``datamaxi.Datamaxi.cex.candle``.""" + check_required_parameters( + [ + [exchange, "exchange"], + [symbol, "symbol"], + [interval, "interval"], + [market, "market"], + [currency, "currency"], + ] + ) + if market not in [SPOT, FUTURES]: + raise ValueError("market must be either spot or futures") + + res = await self.request_endpoint( + "cex_candle", + exchange=exchange, + market=market, + symbol=symbol, + interval=interval, + currency=currency, + **{"from": from_unix, "to": to_unix}, + ) + if res["data"] is None or len(res["data"]) == 0: + raise ValueError("no data found") + + if pandas: + from datamaxi.resources.utils import convert_data_to_data_frame + + return convert_data_to_data_frame(res["data"]) + return res + + async def exchanges(self, market: Market) -> List[str]: + check_required_parameter(market, "market") + if market not in [SPOT, FUTURES]: + raise ValueError("market must be either spot or futures") + return await self.request_endpoint("cex_candle_exchanges", market=market) + + async def symbols( + self, exchange: str = None, market: Optional[Market] = None + ) -> List[Dict]: + if market is not None and market not in [SPOT, FUTURES]: + raise ValueError("market must be either spot or futures") + return await self.request_endpoint( + "cex_candle_symbols", exchange=exchange, market=market + ) + + async def intervals(self) -> List[str]: + return await self.request_endpoint("cex_candle_intervals") + + +class AsyncCexTicker(AsyncResource): + async def get( + self, + exchange: str, + symbol: str, + market: Market, + currency: str = None, + conversion_base: str = None, + include_source: bool = False, + pandas: bool = True, + ) -> Union[pd.DataFrame, TickerResponse]: + """Fetch ticker data (async). See ``datamaxi.Datamaxi.cex.ticker``.""" + check_required_parameters( + [ + [exchange, "exchange"], + [symbol, "symbol"], + [market, "market"], + ] + ) + if market not in [SPOT, FUTURES]: + raise ValueError("market must be either spot or futures") + + res = await self.request_endpoint( + "ticker", + exchange=exchange, + symbol=symbol, + market=market, + currency=currency, + conversion_base=conversion_base, + include_source=include_source, + ) + + if pandas: + import pandas as pd + + df = pd.DataFrame([res["data"]]) + df = df.set_index("d") + return df + return res + + async def exchanges(self, market: Market) -> List[str]: + check_required_parameters([[market, "market"]]) + if market not in [SPOT, FUTURES]: + raise ValueError("market must be either spot or futures") + return await self.request_endpoint("ticker_exchanges", market=market) + + async def symbols(self, exchange: str, market: Market) -> List[str]: + check_required_parameters( + [ + [exchange, "exchange"], + [market, "market"], + ] + ) + if market not in [SPOT, FUTURES]: + raise ValueError("market must be either spot or futures") + return await self.request_endpoint( + "ticker_symbols", exchange=exchange, market=market + ) + + +class AsyncCexFee(AsyncResource): + async def __call__( + self, + exchange: str = None, + symbol: str = None, + ) -> List[Dict]: + return await self.request_endpoint("cex_fees", exchange=exchange, symbol=symbol) + + async def exchanges(self) -> List[str]: + return await self.request_endpoint("cex_fees_exchanges") + + async def symbols(self, exchange: str) -> List[str]: + check_required_parameter(exchange, "exchange") + return await self.request_endpoint("cex_fees_symbols", exchange=exchange) + + +class AsyncCexWalletStatus(AsyncResource): + async def __call__( + self, + exchange: str, + asset: str, + pandas: bool = True, + ) -> Union[pd.DataFrame, List[WalletStatusRow]]: + check_required_parameters( + [ + [exchange, "exchange"], + [asset, "asset"], + ] + ) + res = await self.request_endpoint( + "wallet_status", exchange=exchange, asset=asset + ) + if pandas: + import pandas as pd + + df = pd.DataFrame(res) + df = df.set_index("network") + return df + return res + + async def exchanges(self) -> List[str]: + return await self.request_endpoint("wallet_status_exchanges") + + async def assets(self, exchange: str) -> List[str]: + check_required_parameter(exchange, "exchange") + return await self.request_endpoint("wallet_status_assets", exchange=exchange) + + +class AsyncCexAnnouncement(AsyncResource): + async def __call__( + self, + page: int = 1, + limit: int = 1000, + sort: SortOrder = DESC, + key: Optional[str] = None, + exchange: Optional[str] = None, + category: Optional[str] = None, + ) -> Tuple[AnnouncementResponse, Callable]: + if page < 1: + raise ValueError("page must be greater than 0") + if limit < 1: + raise ValueError("limit must be greater than 0") + if sort not in [ASC, DESC]: + raise ValueError("sort must be either asc or desc") + + res = await self.request_endpoint( + "cex_announcements", + page=page, + limit=limit, + sort=sort, + key=key, + exchange=exchange, + category=category, + ) + if res["data"] is None: + raise ValueError("no data found") + + async def next_request(): + return await self.__call__( + key=key, + exchange=exchange, + category=category, + page=page + 1, + limit=limit, + sort=sort, + ) + + return res, next_request + + +class AsyncCexToken(AsyncResource): + async def updates( + self, + page: int = 1, + limit: int = 1000, + type: Optional[str] = None, + ) -> Tuple[TokenUpdateResponse, Callable]: + if page < 1: + raise ValueError("page must be greater than 0") + if limit < 1: + raise ValueError("limit must be greater than 0") + if type is not None and type not in ["listed", "delisted"]: + raise ValueError("type must be either listed or delisted when set") + + res = await self.request_endpoint( + "cex_token_updates", page=page, limit=limit, type=type + ) + if res["data"] is None: + raise ValueError("no data found") + + async def next_request(): + return await self.updates( + type=type, + page=page + 1, + limit=limit, + ) + + return res, next_request + + +class AsyncCexSymbol(AsyncResource): + async def metadata( + self, + exchange: Optional[str] = None, + base: Optional[str] = None, + market: Optional[str] = None, + quote: Optional[str] = None, + status: Optional[str] = None, + ) -> Dict[str, Any]: + return await self.request_endpoint( + "cex_symbol_metadata", + exchange=exchange, + base=base, + market=market, + quote=quote, + status=status, + ) + + async def tags( + self, + exchange: Optional[str] = None, + base: Optional[str] = None, + tag: Optional[str] = None, + market: Optional[str] = None, + source: Optional[str] = None, + min_confidence: Optional[int] = None, + ) -> Dict[str, Any]: + return await self.request_endpoint( + "cex_symbol_tags", + exchange=exchange, + base=base, + tag=tag, + market=market, + source=source, + min_confidence=min_confidence, + ) + + async def cautions( + self, + exchange: Optional[str] = None, + market: Optional[str] = None, + min_level: Optional[str] = None, + active_only: Optional[bool] = None, + ) -> Dict[str, Any]: + return await self.request_endpoint( + "cex_symbol_cautions", + exchange=exchange, + market=market, + min_level=min_level, + active_only=active_only, + ) + + async def delistings( + self, + exchange: Optional[str] = None, + market: Optional[str] = None, + from_ms: Optional[int] = None, + to_ms: Optional[int] = None, + include_past: Optional[bool] = None, + ) -> Dict[str, Any]: + return await self.request_endpoint( + "cex_symbol_delistings", + exchange=exchange, + market=market, + from_ms=from_ms, + to_ms=to_ms, + include_past=include_past, + ) + + async def volume(self, base: str, market: Optional[str] = None) -> Dict[str, Any]: + return await self.request_endpoint( + "cex_symbol_volume", base=base, market=market + ) + + async def oi(self, base: str, exchange: Optional[str] = None) -> Dict[str, Any]: + return await self.request_endpoint( + "cex_symbol_oi", base=base, exchange=exchange + ) + + async def oi_stats( + self, + base: str, + exchange: Optional[str] = None, + currency: str = "USD", + ) -> Dict[str, Any]: + if currency not in ("USD", "KRW"): + raise ValueError("currency must be either USD or KRW") + return await self.request_endpoint( + "cex_symbol_oi_stats", + base=base, + exchange=exchange, + currency=currency, + ) + + async def liquidation(self, base: str, window: str = "24h") -> Dict[str, Any]: + return await self.request_endpoint( + "cex_symbol_liquidation", base=base, window=window + ) + + +class AsyncCex(AsyncResource): + def __init__(self, api: "AsyncAPI"): + super().__init__(api) + self.candle = AsyncCexCandle(api) + self.ticker = AsyncCexTicker(api) + self.fee = AsyncCexFee(api) + self.wallet_status = AsyncCexWalletStatus(api) + self.announcement = AsyncCexAnnouncement(api) + self.token = AsyncCexToken(api) + self.symbol = AsyncCexSymbol(api) diff --git a/datamaxi/aio/forex.py b/datamaxi/aio/forex.py new file mode 100644 index 0000000..0712181 --- /dev/null +++ b/datamaxi/aio/forex.py @@ -0,0 +1,30 @@ +"""Async forex resource — mirror of ``datamaxi.resources.forex``.""" + +from __future__ import annotations + +from typing import List, Union, TYPE_CHECKING + +from datamaxi.aio._core import AsyncResource +from datamaxi.resources.responses import ForexRow +from datamaxi.lib.utils import check_required_parameter + +if TYPE_CHECKING: + import pandas as pd + + +class AsyncForex(AsyncResource): + async def __call__( + self, + symbol: str, + pandas: bool = True, + ) -> Union[pd.DataFrame, ForexRow]: + check_required_parameter(symbol, "symbol") + res = await self.request_endpoint("forex", symbol=symbol) + if pandas: + import pandas as pd + + return pd.DataFrame([res]) + return res + + async def symbols(self) -> List[str]: + return await self.request_endpoint("forex_symbols") diff --git a/datamaxi/aio/funding_rate.py b/datamaxi/aio/funding_rate.py new file mode 100644 index 0000000..15b5ed6 --- /dev/null +++ b/datamaxi/aio/funding_rate.py @@ -0,0 +1,98 @@ +"""Async funding-rate resource — mirror of ``datamaxi.resources.funding_rate``.""" + +from __future__ import annotations + +from typing import Callable, Tuple, List, Union, TYPE_CHECKING + +from datamaxi.aio._core import AsyncResource +from datamaxi.lib.utils import check_required_parameter, check_required_parameters +from datamaxi.resources.responses import FundingHistoryResponse, LatestFundingRate +from datamaxi.lib.constants import ASC, DESC, SortOrder + +if TYPE_CHECKING: + import pandas as pd + + +class AsyncFundingRate(AsyncResource): + async def history( + self, + exchange: str, + symbol: str, + page: int = 1, + limit: int = 1000, + fromDateTime: str = None, + toDateTime: str = None, + sort: SortOrder = DESC, + pandas: bool = True, + ) -> Union[Tuple[pd.DataFrame, Callable], Tuple[FundingHistoryResponse, Callable]]: + check_required_parameters( + [ + [exchange, "exchange"], + [symbol, "symbol"], + ] + ) + if page < 1: + raise ValueError("page must be greater than 0") + if limit < 1: + raise ValueError("limit must be greater than 0") + if fromDateTime is not None and toDateTime is not None: + raise ValueError( + "fromDateTime and toDateTime cannot be set at the same time" + ) + if sort not in [ASC, DESC]: + raise ValueError("sort must be either asc or desc") + + res = await self.request_endpoint( + "funding_rate_history", + exchange=exchange, + symbol=symbol, + page=page, + limit=limit, + sort=sort, + **{"from": fromDateTime, "to": toDateTime}, + ) + if res["data"] is None or len(res["data"]) == 0: + raise ValueError("no data found") + + async def next_request(): + return await self.history( + exchange, + symbol, + page + 1, + limit, + fromDateTime, + toDateTime, + sort, + pandas, + ) + + if pandas: + from datamaxi.resources.utils import convert_data_to_data_frame + + df = convert_data_to_data_frame(res["data"]) + return df, next_request + return res, next_request + + async def latest( + self, + exchange: str = None, + symbol: str = None, + pandas: bool = True, + ) -> Union[pd.DataFrame, LatestFundingRate]: + res = await self.request_endpoint( + "funding_rate_latest", exchange=exchange, symbol=symbol + ) + if pandas: + import pandas as pd + + df = pd.DataFrame([res]) + df = df.set_index("d") + return df + return res + + async def exchanges(self) -> List[str]: + return await self.request_endpoint("funding_rate_exchanges") + + async def symbols(self, exchange: str) -> List[str]: + check_required_parameter(exchange, "exchange") + return await self.request_endpoint("funding_rate_symbols", exchange=exchange) diff --git a/datamaxi/aio/index_price.py b/datamaxi/aio/index_price.py new file mode 100644 index 0000000..e9f21f1 --- /dev/null +++ b/datamaxi/aio/index_price.py @@ -0,0 +1,24 @@ +"""Async index-price resource — mirror of ``datamaxi.resources.index_price``.""" + +from typing import Any, Dict, Optional + +from datamaxi.aio._core import AsyncResource +from datamaxi.lib.utils import check_required_parameter +from datamaxi.lib.constants import Interval + + +class AsyncIndexPrice(AsyncResource): + async def __call__( + self, + asset: str, + from_: Optional[str] = None, + to: Optional[str] = None, + interval: Interval = "5m", + ) -> Dict[str, Any]: + check_required_parameter(asset, "asset") + return await self.request_endpoint( + "index_price", + asset=asset, + interval=interval, + **{"from": from_, "to": to}, + ) diff --git a/datamaxi/aio/liquidation.py b/datamaxi/aio/liquidation.py new file mode 100644 index 0000000..28c394c --- /dev/null +++ b/datamaxi/aio/liquidation.py @@ -0,0 +1,90 @@ +"""Async liquidation resource — mirror of ``datamaxi.resources.liquidation``.""" + +from typing import Any, Dict, Optional + +from datamaxi.aio._core import AsyncResource +from datamaxi.lib.constants import Interval + + +class AsyncLiquidation(AsyncResource): + async def __call__( + self, + exchange: str, + symbol: str, + limit: int = 100, + ) -> Dict[str, Any]: + if limit < 1: + raise ValueError("limit must be greater than 0") + return await self.request_endpoint( + "liquidation", exchange=exchange, symbol=symbol, limit=limit + ) + + async def feed( + self, + limit: int = 100, + exchange: Optional[str] = None, + base: Optional[str] = None, + min_volume_usd: Optional[float] = None, + ) -> Dict[str, Any]: + if limit < 1: + raise ValueError("limit must be greater than 0") + return await self.request_endpoint( + "liquidation_feed", + limit=limit, + exchange=exchange, + base=base, + min_volume_usd=min_volume_usd, + ) + + async def heatmap( + self, + window: str = "1h", + topN: int = 10, + ) -> Dict[str, Any]: + if topN < 1 or topN > 30: + raise ValueError("topN must be between 1 and 30") + return await self.request_endpoint( + "liquidation_heatmap", window=window, top_n=topN + ) + + async def map( + self, + base: str, + exchange: str = "binance", + quote: str = "USDT", + ) -> Dict[str, Any]: + return await self.request_endpoint( + "liquidation_map", base=base, exchange=exchange, quote=quote + ) + + async def stats( + self, + window: str = "1h", + exchange: Optional[str] = None, + min_volume_usd: Optional[float] = None, + ) -> Dict[str, Any]: + if window not in ("1h", "4h", "24h"): + raise ValueError("window must be one of 1h, 4h, or 24h") + return await self.request_endpoint( + "liquidation_stats", + window=window, + exchange=exchange, + min_volume_usd=min_volume_usd, + ) + + async def symbol_history( + self, + symbol: str, + quote: str = "USDT", + exchange: Optional[str] = None, + interval: Interval = "5m", + window: str = "24h", + ) -> Dict[str, Any]: + return await self.request_endpoint( + "liquidation_symbol_history", + symbol=symbol, + quote=quote, + exchange=exchange, + interval=interval, + window=window, + ) diff --git a/datamaxi/aio/margin_borrow.py b/datamaxi/aio/margin_borrow.py new file mode 100644 index 0000000..98a557b --- /dev/null +++ b/datamaxi/aio/margin_borrow.py @@ -0,0 +1,12 @@ +"""Async margin-borrow resource — mirror of ``datamaxi.resources.margin_borrow``.""" + +from typing import Any, Dict + +from datamaxi.aio._core import AsyncResource +from datamaxi.lib.utils import check_required_parameter + + +class AsyncMarginBorrow(AsyncResource): + async def __call__(self, asset: str) -> Dict[str, Any]: + check_required_parameter(asset, "asset") + return await self.request_endpoint("margin_borrow", asset=asset) diff --git a/datamaxi/aio/naver.py b/datamaxi/aio/naver.py new file mode 100644 index 0000000..12267e5 --- /dev/null +++ b/datamaxi/aio/naver.py @@ -0,0 +1,54 @@ +"""Async Naver client — mirror of ``datamaxi.naver.Naver``. + +Standalone top-level async client (builds its own ``AsyncAPI``). Use as an +async context manager so the underlying httpx client is closed. +""" + +from __future__ import annotations + +from typing import Any, List, Union, TYPE_CHECKING + +from datamaxi.aio._core import AsyncAPI, AsyncResource +from datamaxi.resources.responses import NaverTrendRow +from datamaxi.lib.utils import check_required_parameter +from datamaxi.lib.constants import BASE_URL + +if TYPE_CHECKING: + import pandas as pd + + +class AsyncNaver(AsyncResource): + """Client to fetch Naver trend data from DataMaxi+ API (async).""" + + def __init__(self, api_key=None, **kwargs: Any): + if "base_url" not in kwargs: + kwargs["base_url"] = BASE_URL + super().__init__(AsyncAPI(api_key, **kwargs)) + + async def aclose(self): + await self._api.aclose() + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + await self.aclose() + + def __repr__(self): + return "AsyncNaver(base_url={!r}, has_key={})".format( + self._api.base_url, bool(self._api.api_key) + ) + + async def symbols(self) -> List[str]: + return await self.request_endpoint("naver_trend_symbols") + + async def trend( + self, symbol: str, pandas: bool = True + ) -> Union[pd.DataFrame, List[NaverTrendRow]]: + check_required_parameter(symbol, "symbol") + res = await self.request_endpoint("naver_trend", symbol=symbol) + if pandas: + import pandas as pd + + return pd.DataFrame(res) + return res diff --git a/datamaxi/aio/open_interest.py b/datamaxi/aio/open_interest.py new file mode 100644 index 0000000..3d69560 --- /dev/null +++ b/datamaxi/aio/open_interest.py @@ -0,0 +1,58 @@ +"""Async open-interest resource — mirror of ``datamaxi.resources.open_interest``.""" + +from typing import Any, Dict, Optional + +from datamaxi.aio._core import AsyncResource +from datamaxi.lib.constants import Interval, SortOrder + + +class AsyncOpenInterest(AsyncResource): + async def __call__(self, exchange: str, symbol: str) -> Dict[str, Any]: + return await self.request_endpoint( + "open_interest", exchange=exchange, symbol=symbol + ) + + async def list(self, exchange: Optional[str] = None) -> Dict[str, Any]: + return await self.request_endpoint("open_interest_list", exchange=exchange) + + async def overview( + self, + page: int = 1, + limit: int = 20, + key: str = "binance", + sort: SortOrder = "desc", + query: Optional[str] = None, + ) -> Dict[str, Any]: + if page < 1: + raise ValueError("page must be greater than 0") + if limit < 1: + raise ValueError("limit must be greater than 0") + if sort not in ("asc", "desc"): + raise ValueError("sort must be either asc or desc") + return await self.request_endpoint( + "open_interest_overview", + page=page, + limit=limit, + key=key, + sort=sort, + query=query, + ) + + async def summary(self, topN: int = 10) -> Dict[str, Any]: + if topN < 1 or topN > 30: + raise ValueError("topN must be between 1 and 30") + return await self.request_endpoint("open_interest_summary", top_n=topN) + + async def history_aggregated( + self, + token_id: str, + interval: Interval = "1h", + from_: Optional[int] = None, + to: Optional[int] = None, + ) -> Dict[str, Any]: + return await self.request_endpoint( + "open_interest_history_aggregated", + token_id=token_id, + interval=interval, + **{"from": from_, "to": to}, + ) diff --git a/datamaxi/aio/premium.py b/datamaxi/aio/premium.py new file mode 100644 index 0000000..f34c1ff --- /dev/null +++ b/datamaxi/aio/premium.py @@ -0,0 +1,131 @@ +"""Async premium resource — mirror of ``datamaxi.resources.premium``.""" + +from __future__ import annotations + +from typing import List, Union, Optional, TYPE_CHECKING + +from datamaxi.aio._core import AsyncResource +from datamaxi.resources.responses import PremiumResponse +from datamaxi.lib.constants import Market, SortOrder + +if TYPE_CHECKING: + import pandas as pd + + +class AsyncPremium(AsyncResource): + async def __call__( # noqa: C901 + self, + source_exchange: str = None, + target_exchange: str = None, + asset: str = None, + source_quote: str = None, + target_quote: str = None, + sort: Optional[SortOrder] = None, + key: str = None, + page: int = 1, + limit: int = 100, + currency: str = None, + conversion_base: str = None, + min_sv: str = None, + min_tv: str = None, + source_market: Optional[Market] = None, + target_market: Optional[Market] = None, + only_transferable: bool = False, + network: str = None, + premium_type: str = None, + token_include: str = None, + token_exclude: str = None, + query: str = None, + pandas: bool = True, + ) -> Union[pd.DataFrame, PremiumResponse]: + params = {} + + if source_exchange is not None: + params["source_exchange"] = source_exchange + + if target_exchange is not None: + params["target_exchange"] = target_exchange + + if asset is not None: + params["asset"] = asset + + if source_quote is not None: + params["source_quote"] = source_quote + + if target_quote is not None: + params["target_quote"] = target_quote + + if sort is not None: + params["sort"] = sort + + if key is not None: + params["key"] = key + + if query is not None: + params["query"] = query + + if page is not None: + params["page"] = page + + if limit is not None: + params["limit"] = limit + + if currency is not None: + params["currency"] = currency + + if conversion_base is not None: + params["conversion_base"] = conversion_base + + if min_sv is not None: + params["min_sv"] = min_sv + + if min_tv is not None: + params["min_tv"] = min_tv + + if source_market is not None: + params["source_market"] = source_market + + if target_market is not None: + params["target_market"] = target_market + + if only_transferable: + params["only_transferable"] = True + + if network is not None: + params["network"] = network + + if premium_type is not None: + params["premium_type"] = premium_type + + if token_include is not None: + params["token_include"] = token_include + + if token_exclude is not None: + params["token_exclude"] = token_exclude + + res = await self.request_endpoint("premium", **params) + if res["data"] is None or len(res["data"]) == 0: + raise ValueError("no data found") + + if pandas: + import pandas as pd + + df = pd.DataFrame( + [ + { + **item["detail"], + "source_annualized_funding_rate": item.get( + "source_annualized_funding_rate" + ), + "target_annualized_funding_rate": item.get( + "target_annualized_funding_rate" + ), + } + for item in res["data"] + ] + ) + return df + return res + + async def exchanges(self) -> List[str]: + return await self.request_endpoint("premium_exchanges") diff --git a/datamaxi/aio/telegram.py b/datamaxi/aio/telegram.py new file mode 100644 index 0000000..4854786 --- /dev/null +++ b/datamaxi/aio/telegram.py @@ -0,0 +1,113 @@ +"""Async Telegram client — mirror of ``datamaxi.telegram.Telegram``. + +Standalone top-level async client (builds its own ``AsyncAPI``). Use as an +async context manager so the underlying httpx client is closed. +""" + +from typing import Any, Optional, Tuple, Callable + +from datamaxi.aio._core import AsyncAPI, AsyncResource +from datamaxi.resources.responses import ( + TelegramChannelsResponse, + TelegramMessagesResponse, +) +from datamaxi.lib.constants import BASE_URL, SortOrder + + +class AsyncTelegram(AsyncResource): + """Client to fetch Telegram data from DataMaxi+ API (async).""" + + def __init__(self, api_key=None, **kwargs: Any): + if "base_url" not in kwargs: + kwargs["base_url"] = BASE_URL + super().__init__(AsyncAPI(api_key, **kwargs)) + + async def aclose(self): + await self._api.aclose() + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + await self.aclose() + + def __repr__(self): + return "AsyncTelegram(base_url={!r}, has_key={})".format( + self._api.base_url, bool(self._api.api_key) + ) + + async def channels( + self, + page: int = 1, + limit: int = 1000, + category: Optional[str] = None, + key: Optional[str] = None, + sort: SortOrder = "desc", + ) -> Tuple[TelegramChannelsResponse, Callable]: + if page < 1: + raise ValueError("page must be greater than 0") + if limit < 1: + raise ValueError("limit must be greater than 0") + if sort not in ["asc", "desc"]: + raise ValueError("sort must be either asc or desc") + + res = await self.request_endpoint( + "telegram_channels", + page=page, + limit=limit, + category=category, + key=key, + sort=sort, + ) + if res["data"] is None: + raise ValueError("no data found") + + async def next_request(): + return await self.channels( + category=category, + page=page + 1, + limit=limit, + sort=sort, + ) + + return res, next_request + + async def messages( + self, + channel_name: Optional[str] = None, + page: int = 1, + limit: int = 1000, + key: Optional[str] = None, + sort: SortOrder = "desc", + category: Optional[str] = None, + search_query: Optional[str] = None, + ) -> Tuple[TelegramMessagesResponse, Callable]: + if page < 1: + raise ValueError("page must be greater than 0") + if limit < 1: + raise ValueError("limit must be greater than 0") + if sort not in ["asc", "desc"]: + raise ValueError("sort must be either asc or desc") + + res = await self.request_endpoint( + "telegram_messages", + channel=channel_name, + page=page, + limit=limit, + key=key, + sort=sort, + category=category, + search_query=search_query, + ) + if res["data"] is None: + raise ValueError("no data found") + + async def next_request(): + return await self.messages( + channel_name=channel_name, + page=page + 1, + limit=limit, + sort=sort, + ) + + return res, next_request diff --git a/tests/test_async_resources.py b/tests/test_async_resources.py new file mode 100644 index 0000000..4b1be02 --- /dev/null +++ b/tests/test_async_resources.py @@ -0,0 +1,187 @@ +"""Async coverage for the full resource tree (#142 expansion). + +Routes requests by path through one httpx.MockTransport and checks each +resource's ported method returns/awaits correctly, including the async +`next_request` pagination closures. Skipped when httpx is absent. +""" + +import asyncio + +import pytest + +httpx = pytest.importorskip("httpx") + +from datamaxi.aio import AsyncDatamaxi, AsyncTelegram, AsyncNaver # noqa: E402 + +BASE_URL = "https://api.datamaxiplus.com" + +_FEE = [{"exchange": "binance", "symbol": "BTC-USDT", "maker": "0.001"}] +_STATUS = [{"network": "BSC", "currency": "USDT"}] +_ANNOUNCE = {"data": [{"t": "New listing", "e": "binance"}]} +_UPDATES = {"data": [{"b": "BTC", "t": "listed"}]} +_META = {"BTC": {"status": "trading"}} +_HISTORY = {"data": [{"d": "1700000000", "f": "0.0001"}]} +_LATEST = {"d": "1700000000", "f": "0.0001", "s": "BTC-USDT"} +_FOREX = {"d": "1700000000", "r": "1380.5", "s": "USD-KRW"} +_PREMIUM = { + "data": [ + { + "detail": {"bid": "BTC"}, + "source_annualized_funding_rate": "0", + "target_annualized_funding_rate": "0", + } + ] +} +_HEATMAP = {"data": {"binance": {"BTC": "1"}}} +_OI = {"data": {"oi": "1"}} +_MARGIN = {"data": {"rate": "1"}} +_INDEX = {"data": [{"d": "1", "p": "1"}]} +_CHANNELS = {"data": [{"channelName": "alpha", "category": "news"}]} +_MESSAGES = {"data": [{"channelName": "alpha", "message": "gm"}]} +_TREND = [{"d": "2024-01-01", "v": 10}] + +ROUTES = { + "/api/v1/cex/fees": _FEE, + "/api/v1/wallet-status": _STATUS, + "/api/v1/cex/announcements": _ANNOUNCE, + "/api/v1/cex/token/updates": _UPDATES, + "/api/v1/cex/symbol/metadata": _META, + "/api/v1/funding-rate/history": _HISTORY, + "/api/v1/funding-rate/latest": _LATEST, + "/api/v1/forex": _FOREX, + "/api/v1/premium": _PREMIUM, + "/api/v1/liquidation/heatmap": _HEATMAP, + "/api/v1/open-interest": _OI, + "/api/v1/margin-borrow": _MARGIN, + "/api/v1/index-price": _INDEX, + "/api/v1/telegram/channels": _CHANNELS, + "/api/v1/telegram/messages": _MESSAGES, + "/api/v1/naver-trend": _TREND, +} + + +def _handler(request): + payload = ROUTES.get(request.url.path) + if payload is None: # unexpected path -> fail loudly + return httpx.Response(404, json={"error": "no route for " + request.url.path}) + return httpx.Response(200, json=payload) + + +def _transport(): + return httpx.MockTransport(_handler) + + +def _dm(): + return AsyncDatamaxi(api_key="k", base_url=BASE_URL, transport=_transport()) + + +def _run(coro): + return asyncio.run(coro) + + +def test_async_cex_fee(): + async def run(): + async with _dm() as c: + return await c.cex.fee(exchange="binance") + + assert _run(run()) == _FEE + + +def test_async_cex_wallet_status_pandas_false(): + async def run(): + async with _dm() as c: + return await c.cex.wallet_status( + exchange="binance", asset="USDT", pandas=False + ) + + assert _run(run()) == _STATUS + + +def test_async_cex_symbol_metadata(): + async def run(): + async with _dm() as c: + return await c.cex.symbol.metadata(base="BTC") + + assert _run(run()) == _META + + +def test_async_forex_single_object(): + async def run(): + async with _dm() as c: + return await c.forex(symbol="USD-KRW", pandas=False) + + assert _run(run()) == _FOREX + + +def test_async_premium_envelope(): + async def run(): + async with _dm() as c: + return await c.premium(pandas=False) + + assert _run(run()) == _PREMIUM + + +def test_async_futures_surfaces_raw(): + async def run(): + async with _dm() as c: + return ( + await c.liquidation.heatmap(), + await c.open_interest(exchange="binance", symbol="BTC-USDT"), + await c.margin_borrow(asset="BTC"), + await c.index_price(asset="BTC"), + ) + + hm, oi, mb, ip = _run(run()) + assert (hm, oi, mb, ip) == (_HEATMAP, _OI, _MARGIN, _INDEX) + + +def test_async_naver_trend_and_symbols(): + async def run(): + async with AsyncNaver( + api_key="k", base_url=BASE_URL, transport=_transport() + ) as n: + return await n.trend("BTC", pandas=False) + + assert _run(run()) == _TREND + + +def test_async_pagination_next_request_is_awaitable(): + # announcement / token / funding.history / telegram all return + # (envelope, async next_request); the closure must be awaitable. + async def run(): + async with _dm() as c: + res, nxt = await c.cex.announcement() + assert res == _ANNOUNCE + assert asyncio.iscoroutinefunction(nxt) + res2, nxt2 = await nxt() # awaiting the next page works + return res2 + + assert _run(run()) == _ANNOUNCE + + +def test_async_telegram_pagination(): + async def run(): + async with AsyncTelegram( + api_key="k", base_url=BASE_URL, transport=_transport() + ) as t: + res, nxt = await t.channels() + assert asyncio.iscoroutinefunction(nxt) + return res + + assert _run(run()) == _CHANNELS + + +def test_async_funding_history_and_latest(): + async def run(): + async with _dm() as c: + hist, nxt = await c.funding_rate.history( + exchange="binance", symbol="BTC-USDT", pandas=False + ) + latest = await c.funding_rate.latest( + exchange="binance", symbol="BTC-USDT", pandas=False + ) + return hist, latest + + hist, latest = _run(run()) + assert hist == _HISTORY + assert latest == _LATEST