支持事件订阅、机器人收消息、卡片回调等功能
优点:
配置简单,不依赖也不需要暴露公网IP,无需向公网开放端口
github官方链接:GitHub - open-dingtalk/dingtalk-stream-sdk-python: Python SDK for DingTalk Stream Mode API, Compared with the webhook mode, it is easier to access the DingTalk chatbot
安装
pip install dingtalk-stream
pip install alibabacloud_dingtalk
也可以下载SDK安装包
下载地址:https://open-dev.dingtalk.com/sdk/download/python
选择
进入钉钉开发者后台,创建企业内部应用,获取ClientID(即 AppKey)和ClientSecret( 即AppSecret)。
下载完github代码后,在编译器终端进入
cd examples/calcbot
python calcbot.py --client_id "put-your-client-id-here" --client_secret "put-your-client-secret-here"
# !/usr/bin/env python
import argparse
import logging
from dingtalk_stream import AckMessage
import dingtalk_stream
def setup_logger():
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter('%(asctime)s %(name)-8s %(levelname)-8s %(message)s [%(filename)s:%(lineno)d]'))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def define_options():
parser = argparse.ArgumentParser()
parser.add_argument(
'--client_id', dest='client_id', required=True,
help='app_key or suite_key from https://open-dev.digntalk.com'
)
parser.add_argument(
'--client_secret', dest='client_secret', required=True,
help='app_secret or suite_secret from https://open-dev.digntalk.com'
)
options = parser.parse_args()
return options
class CalcBotHandler(dingtalk_stream.ChatbotHandler):
def __init__(self, logger: logging.Logger = None):
super(dingtalk_stream.ChatbotHandler, self).__init__()
if logger:
self.logger = logger
async def process(self, callback: dingtalk_stream.CallbackMessage):
incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data)
expression = incoming_message.text.content.strip()
try:
result = eval(expression)
except Exception as e:
result = 'Error: %s' % e
self.logger.info('%s = %s' % (expression, result))
response = 'Q: %s\nA: %s' % (expression, result)
self.reply_text(response, incoming_message)
return AckMessage.STATUS_OK, 'OK'
def main():
logger = setup_logger()
options = define_options()
credential = dingtalk_stream.Credential(options.client_id, options.client_secret)
client = dingtalk_stream.DingTalkStreamClient(credential)
client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, CalcBotHandler(logger))
client.start_forever()
if __name__ == '__main__':
main()
测试效果: