import asyncio
import os
from typing import Any, AsyncGenerator, Awaitable, Callable, Optional
import aiohttp
class WxPusherClient:
def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
self.base_url = base_url
self.token = token or os.environ["WXPUSHER_TOKEN"]
async def send_message(
self,
content,
summary: Optional[str] = None,
content_type: int = 1,
topic_ids: Optional[list[int]] = None,
uids: Optional[list[int]] = None,
verify: bool = False,
url: Optional[str] = None,
):
payload = {
"appToken": self.token,
"content": content,
"summary": summary,
"contentType": content_type,
"topicIds": topic_ids or [],
"uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
"verifyPay": verify,
"url": url,
}
url = f"{self.base_url}/api/send/message"
return await self._request("POST", url, json=payload)
async def _request(self, method, url, **kwargs):
async with aiohttp.ClientSession() as session:
async with session.request(method, url, **kwargs) as response:
response.raise_for_status()
return await response.json()
async def wxpusher_callback():
client = WxPusherClient(token="AT_xxx")
res = await client.send_message(content="hello", uids=["UID_xxx"])
print(res)
asyncio.run(wxpusher_callback())