"""
Define ClientLogAdapter & BaseClient
"""
import json
import logging
import os
import importlib
import requests
from arkid_client import config
from arkid_client.exceptions import ArkIDAPIError, ArkIDSDKUsageError, convert_request_exception
from arkid_client.response import ArkIDHTTPResponse
from arkid_client.version import __version__
class ClientLogAdapter(logging.LoggerAdapter):
"""
Stuff in the memory location of the client to make log records unambiguous.
"""
def process(self, msg, kwargs):
return "[instance:{0}] {1}".format(id(self.extra["client"]), msg), kwargs
[文档]class BaseClient(object):
"""
简单的基类客户端,可处理 ArkID REST APIs 返回的错误信息。
封装 ``requests.Session`` 对象为一个简化的接口,
该接口不公开来自请求的任何内容。
注意:强烈建议您不要尝试直接实例化 ``BaseClient``
**Parameters**
``base_url`` (*str*)
ArkID 服务端的根地址,用户无需进行任何有关 ArkID 服务端的地址配置操作,
只需在初始化 BaseClient 实例时传入 ``base_url`` 参数即可
``authorizer`` (:class:`ArkIDAuthorizer\
<arkid.authorizers.base.ArkIDAuthorizer>`)
``authorizer`` 认证授权器用于生成 HTTP 请求的头部认证信息
``app_name`` (*str*)
(*可选*)用于标识调用方,往往指代正在使用 ArkID SDK 进行开发的项目。
此参数与客户端的任何操作无关。仅仅作为请求头部 ``User-Agent``
的一部分发送给 ArkID 团队,以方便调试出现的问题。
``http_timeout`` (*float*)
HTTP 连接响应的等待时间(单位:s)。默认 60 。
如果传入的值为 -1 ,代表请求发送后将无限期挂起。
所有其它的初始化参数用于子类内部使用
"""
# 可被 < BaseClient > 子类重写,其必须为 < ArkIDError > 的子类
error_class = ArkIDAPIError
default_response_class = ArkIDHTTPResponse
# 一个授权器类型集, 若其值为 None ,代表可以为任意类型的授权器
allowed_authorizer_types = None
# 置于请求头部,用作 `User-Agent` 属性值
BASE_USER_AGENT = "ArkID-sdk-py-{0}".format(__version__)
def __init__(self,
base_url: str,
service: str,
environment: str = None,
base_path: str = None,
authorizer: object = None,
app_name: str = None,
http_timeout: float = None,
*args,
**kwargs):
self._init_logger_adapter()
self.logger.info("正在创建访问 ArkID 官方 {} 服务的{}类型的客户端".format(service, type(self)))
# 校验授权器
self.check_authorizer(authorizer)
# 若未提供 `environment` 参数值,将在配置文件中查找与 `default` 相关的章节内容
self.environment = config.get_arkid_environ(input_env=environment)
self.authorizer = authorizer
# self.base_url = config.get_service_url(self.environment, service) if base_url is None else base_url
# ArkID 根服务地址
self.base_url = base_url
# 目前所加载的 ArkID 服务地址
self.service_url = slash_join(slash_join(base_url, config.get_service(self.environment, service)), base_path)
# 目前所装载 ArkID 的服务
self.service = service
# 封装 < requests.Session > 对象
self._session = requests.Session()
# 初始化请求头部
self._headers = {"Accept": "application/json", "User-Agent": self.BASE_USER_AGENT}
# 是否验证 SSL,通常为 True
self._verify = config.get_ssl_verify(self.environment)
# 初始化 HTTP 连接超时设置
http_timeout = config.get_http_timeout(self.environment) if http_timeout is None else http_timeout
# 若传入的参数值为 -1 ,将其转换为 None
self._http_timeout = http_timeout if http_timeout != -1 else None
# 初始化调用 ArkID SDK 的项目的名称
self.app_name = None
if app_name is not None:
self.set_app_name(app_name)
def _init_logger_adapter(self):
"""
Create & assign the self.logger LoggerAdapter.
Used when initializing a new client.
"""
# 获取客户端类的完全限定名, 可标识为 ArkID SDK 所有
self.logger = ClientLogAdapter(logging.getLogger(self.__module__ + "." + self.__class__.__name__),
{"client": self})
# 初始化 console_handler ,用于在终端输出调试信息
# 如果 console_handler 存在,则不会继续添加
# 只在开发 SDK 期间调试使用,生产环境务必将其注释掉
if not self.logger.logger.handlers:
self.logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
self.logger.logger.addHandler(console_handler)
[文档] def set_app_name(self, app_name: str):
"""
设置一个应用名称(用户代理)并发送给 ArkID 服务端。
注意:建议应用开发者提供一个应用名称给 ArkID 团队,以便在可能的情况下促进与 ArkID 的交互问题的解决。
"""
self.app_name = app_name
self._headers["User-Agent"] = "{}/{}".format(self.BASE_USER_AGENT, app_name)
def check_authorizer(self, authorizer):
"""
若子类重写 `allowed_authorizer_types` 参数值,
需检查并确保未违背所提供的约束
"""
if self.allowed_authorizer_types is not None and (authorizer is not None
and type(authorizer) not in self.allowed_authorizer_types):
self.logger.error("'{}'客户端不支持授权器'{}'".format(type(self), type(authorizer)))
raise ArkIDSDKUsageError("{}客户端目前仅支持'{}'中的授权器类型, 而您提供的授权器类型为{}".format(type(self),
self.allowed_authorizer_types,
type(authorizer)))
if self.allowed_authorizer_types is not None and authorizer is None:
self.logger.error("'{}'客户端需要有效授权器的支持".format(type(self)))
raise ArkIDSDKUsageError("'{}'客户端需要有效授权器的支持".format(type(self)))
def reload_service_url(self, service: str):
"""
一些种类的客户端在有些时候调用不同的接口会使用不同的服务根
地址,虽然这种情况出现的很少,但是仍然需要提供这样一种方式,
来方便开发者更灵活的处理服务根地址不一致的情况。
"""
self.logger.info("{}客户端正在加载 {} 服务".format(type(self), service))
self.service_url = slash_join(self.base_url, config.get_service(self.environment, service))
def reload_authorizer(self, authorizer):
"""
一些种类的客户端在有些时候调用不同的接口会使用不同的授权器,
虽然这种情况出现的很少,但是仍然需要提供这样一种方式,来方便
开发者更灵活的处理授权器不一致的情况。
"""
self.logger.info("{}客户端正在重载加载器{} => {}".format(type(self), type(self.authorizer), type(authorizer)))
self.check_authorizer(authorizer)
self.authorizer = authorizer
[文档] def get(self,
path: str,
params: dict = None,
headers: dict = None,
response_class: object = None,
retry_401: bool = True):
"""
以 GET 方式向指定地址发送 HTTP 请求。
**Parameters**
``path`` (*string*)
请求的路径,有无正斜杠均可
``params`` (*dict*)
编码为 Query String 的参数
``headers`` (*dict*)
添加到请求中的 HTTP 标头
``response_class`` (*class*)
响应对象的类型,由客户端的 ``default_response_class``
重写
``retry_401`` (*bool*)
如果响应码为 401 并且 ``self.authorizer`` 支持重试,
那么会自动进行新的认证。
:return: :class:`ArkIDHTTPResponse \
<arkid_client.response.ArkIDHTTPResponse>` object
"""
self.logger.debug("GET to {} with params {}".format(self.base_url + path, params))
return self._request(
"GET",
path,
params=params,
headers=headers,
response_class=response_class,
retry_401=retry_401,
)
[文档] def post(
self,
path: str,
json_body: dict = None,
params: dict = None,
headers: dict = None,
text_body: dict or str = None,
response_class: object = None,
retry_401: bool = True,
):
"""
以 POST 方式向指定地址发送 HTTP 请求。
**Parameters**
``path`` (*string*)
请求的路径,有无正斜杠均可
``params`` (*dict*)
编码为 Query String 的参数
``headers`` (*dict*)
添加到请求中的 HTTP 标头
``json_body`` (*dict*)
请求体中通过 JSON 编码的数据
``text_body`` (*string or dict*)
用作请求主体的原始字符串,或是以 HTTP 形式编码的字典数据
``response_class`` (*class*)
响应对象的类型,由客户端的 ``default_response_class``
重写
``retry_401`` (*bool*)
如果响应码为 401 并且 ``self.authorizer`` 支持重试,
那么会自动进行新的认证。
:return: :class:`ArkIDHTTPResponse \
<arkid_client.response.ArkIDHTTPResponse>` object
"""
self.logger.debug("POST to {} with params {}".format(self.base_url + path, params))
return self._request(
"POST",
path,
json_body=json_body,
params=params,
headers=headers,
text_body=text_body,
response_class=response_class,
retry_401=retry_401,
)
[文档] def delete(
self,
path: str,
params: dict = None,
headers: dict = None,
response_class: object = None,
retry_401: bool = True,
):
"""
以 DELETE 方式向指定地址发送 HTTP 请求。
**Parameters**
``path`` (*string*)
请求的路径,有无正斜杠均可
``params`` (*dict*)
编码为 Query String 的参数
``headers`` (*dict*)
添加到请求中的 HTTP 标头
``response_class`` (*class*)
响应对象的类型,由客户端的 ``default_response_class``
重写
``retry_401`` (*bool*)
如果响应码为 401 并且 ``self.authorizer`` 支持重试,
那么会自动进行新的认证。
:return: :class:`ArkIDHTTPResponse \
<arkid_client.response.ArkIDHTTPResponse>` object
"""
self.logger.debug("DELETE to {} with params {}".format(self.base_url + path, params))
return self._request(
"DELETE",
path,
params=params,
headers=headers,
response_class=response_class,
retry_401=retry_401,
)
[文档] def put(
self,
path: str,
json_body: dict = None,
params: dict = None,
headers: dict = None,
text_body: dict or str = None,
response_class: object = None,
retry_401: bool = True,
):
"""
以 PUT 方式向指定地址发送 HTTP 请求。
**Parameters**
``path`` (*string*)
请求的路径,有无正斜杠均可
``params`` (*dict*)
编码为 Query String 的参数
``headers`` (*dict*)
添加到请求中的 HTTP 标头
``json_body`` (*dict*)
请求体中通过 JSON 编码的数据
``text_body`` (*string or dict*)
用作请求主体的原始字符串,或是以 HTTP 形式编码的字典数据
``response_class`` (*class*)
响应对象的类型,由客户端的 ``default_response_class``
重写
``retry_401`` (*bool*)
如果响应码为 401 并且 ``self.authorizer`` 支持重试,
那么会自动进行新的认证。
:return: :class:`ArkIDHTTPResponse \
<arkid_client.response.ArkIDHTTPResponse>` object
"""
self.logger.debug("PUT to {} with params {}".format(self.base_url + path, params))
return self._request(
"PUT",
path,
json_body=json_body,
params=params,
headers=headers,
text_body=text_body,
response_class=response_class,
retry_401=retry_401,
)
[文档] def patch(
self,
path: str,
json_body: dict = None,
params: dict = None,
headers: dict = None,
text_body: dict or str = None,
response_class: object = None,
retry_401: bool = True,
):
"""
以 PATCH 方式向指定地址发送 HTTP 请求。
**Parameters**
``path`` (*string*)
请求的路径,有无正斜杠均可
``params`` (*dict*)
编码为 Query String 的参数
``headers`` (*dict*)
添加到请求中的 HTTP 标头
``json_body`` (*dict*)
请求体中通过 JSON 编码的数据
``text_body`` (*string or dict*)
用作请求主体的原始字符串,或是以 HTTP 形式编码的字典数据
``response_class`` (*class*)
响应对象的类型,由客户端的 ``default_response_class``
重写
``retry_401`` (*bool*)
如果响应码为 401 并且 ``self.authorizer`` 支持重试,
那么会自动进行新的认证。
:return: :class:`ArkIDHTTPResponse \
<arkid_client.response.ArkIDHTTPResponse>` object
"""
self.logger.debug("PATCH to {} with params {}".format(self.base_url + path, params))
return self._request(
"PATCH",
path,
json_body=json_body,
params=params,
headers=headers,
text_body=text_body,
response_class=response_class,
retry_401=retry_401,
)
def _request(
self,
method: str,
path: str,
params: dict = None,
headers: dict = None,
json_body: dict = None,
text_body: dict or str = None,
response_class=None,
retry_401: bool = True,
):
"""
封装 requests
:param method: HTTP 的请求方法,一个全大写字符串
:param path: 请求的路径,有无正斜杠均可
:param params: 编码为 Query String 的参数
:param headers: 添加到请求中的 HTTP 标头
:param json_body: 请求体中通过 JSON 编码的数据
:param text_body: 用作请求主体的原始字符串,或是以 HTTP 形式编码的字典数据
:param response_class: 响应对象的类型,由客户端的 ``default_response_class``
重写
:param retry_401: 如果响应码为 401 并且 ``self.authorizer`` 支持重试,
那么会自动进行新的认证
:return: :class:`ArkIDHTTPResponse \
<arkid_client.response.ArkIDHTTPResponse>` object
"""
_headers = dict(self._headers)
if headers is not None:
_headers.update(headers)
if json_body is not None:
assert text_body is None
text_body = json.dumps(json_body)
_headers.update({"Content-Type": "application/json"})
if self.authorizer is not None:
self.logger.debug("正在装载'{}'类型的授权器".format(type(self.authorizer)))
self.authorizer.set_authorization_header(_headers)
url = slash_join(self.service_url, path)
self.logger.debug("开始访问 URL: {}".format(url))
# because a 401 can trigger retry, we need to wrap the retry-able thing in a method
def send_request():
try:
return self._session.request(
method=method,
url=url,
headers=_headers,
params=params,
data=text_body,
verify=self._verify,
timeout=self._http_timeout,
)
except requests.RequestException as exc:
self.logger.error("NetworkError on request")
raise convert_request_exception(exc)
# initial request
response = send_request()
self.logger.debug("收到响应 URL: {}".format(response.url))
# potential 401 retry handling
if response.status_code == 401 and retry_401 and self.authorizer is not None:
self.logger.debug("request got 401, checking retry-capability")
# note that although handle_missing_authorization returns a T/F
# value, it may actually mutate the state of the authorizer and
# therefore change the value set by the `set_authorization_header`
# method
if self.authorizer.handle_missing_authorization():
self.logger.debug("可重新尝试访问")
self.authorizer.set_authorization_header(_headers)
response = send_request()
if 200 <= response.status_code < 400:
self.logger.debug("HTTP 请求完成 响应码: {}".format(response.status_code))
return self.default_response_class(response, client=self) \
if response_class is None \
else response_class(response, client=self)
self.logger.debug("HTTP 请求完成(错误) 响应码: {}".format(response.status_code))
raise self.error_class(response)
def slash_join(base: str, path: str):
"""
Join a and b with a single slash, regardless of whether they already
contain a trailing/leading slash or neither.
:param base: base_url
:param path: path
"""
if not path: # "" or None, don't append a slash
return base
path = path if path.endswith("/") else '{}/'.format(path)
if base.endswith("/"):
if path.startswith("/"):
return base[:-1] + path
return base + path
if path.startswith("/"):
return base + path
return base + "/" + path
def reload_service(service: str):
"""
简化客户端重载服务流程的装饰器函数
:param service: 目标服务名称
"""
def _wrapper(func):
def __wrapper(*args, **kwargs):
instance = args[0]
_service = instance.service
if _service == service:
return func(*args, **kwargs)
instance.reload_service_url(service)
response = func(*args, **kwargs)
instance.reload_service_url(_service)
return response
# 同步底层客户端的 __doc__
if service in ['user', 'org', 'node']:
_class = getattr(
__import__('arkid_client'),
'{}Client'.format(service.capitalize())
)
__wrapper.__doc__ = getattr(_class, func.__name__).__doc__
return __wrapper
return _wrapper
def reload_authorizer(func):
"""
简化客户端重载授权器的装饰器函数
:param func: 客户端功能接口
"""
def _wrapper(*args, **kwargs):
instance = args[0]
_authorizer = instance.authorizer
instance.reload_authorizer(args[1])
response = func(*args, **kwargs)
instance.reload_authorizer(_authorizer)
# _wrapper.__doc__ = func.__doc__
return response
return _wrapper