企业微信应用消息收发实施记录

news2024/11/8 23:01:00

一、前置配置

1.1 进入我的企业页面,记录下企业ID

1.2 创建企微应用,记录下应用的 AgentIdSecret

1.3 设置应用的企业可信IP,将服务器公网 IP 填入即可。

1.4 设置应用接收消息API

填入服务器 API 地址,并记录下随机获取的 TokenEncodingAESKey。完成后,先不要点击保存,后续等服务端应用启动后再保存,即可完成校验

二、服务端部署

2.1 企业应用消息收发流程拓扑

2.2 企微相关开发者文档说明

①、消息接收概述(主要说明了 消息加解密方法、消息收发协议、消息收发格式等)

概述 - 文档 - 企业微信开发者中心 (qq.com)

②、消息加解密官方库(包含多种代码语言,本文使用的是python库,解压使用的文件如下:)

加解密库下载与返回码 - 文档 - 企业微信开发者中心 (qq.com)

注意:需要使用 WXBizMsgCrypt3.py 这个文件

③、企微应用主动发送消息(被动方式回复消息的格式不支持markdown和文件类型,为使回复内容更美观,可以采用主动发送消息的方式进行指定回复。)

发送应用消息 - 文档 - 企业微信开发者中心 (qq.com)

2.3 安装python相关依赖库。

pip3 install -r requirements.txt

 requirements 内容如下:

bcrypt==4.1.1
blinker==1.8.2
certifi==2024.8.30
cffi==1.17.0
charset-normalizer==3.3.2
click==8.1.7
colorama==0.4.6
crypto==1.4.1
cryptography==36.0.2
flask==3.0.3
idna==3.8
importlib-metadata==8.4.0
itsdangerous==2.2.0
jinja2==3.1.4
MarkupSafe==2.1.5
Naked==0.1.32
paramiko==3.0.0
pycparser==2.22
pycryptodome==3.20.0
PyNaCl==1.5.0
PyYAML==6.0.2
requests==2.32.3
shellescape==3.8.1
urllib3==2.2.2
werkzeug==3.0.4
zipp==3.20.1

2.4 主程序 app.py 内容:

# -*- coding: utf-8 -*-
from flask import Flask, request, make_response
from WXBizMsgCrypt3 import WXBizMsgCrypt
import xml.etree.ElementTree as ET
from xml.etree.ElementTree import fromstring

# help_list
from help_list import help_list

# funny
from funny.help_funny_list import help_funny_list
from funny.get_weather import get_weather
from funny.get_myrb import get_myrb
from funny.get_music import get_music
from funny.get_fortune import get_fortune
from funny.get_tellocal import get_tel
from funny.get_express import get_express

# ops_tools
from ops_tools.get_ops import get_ops

app = Flask(__name__)


def printXML(xml_content):
    # 创建XML元素
    element = ET.XML(xml_content)

    # 使用indent()函数进行格式化打印
    ET.indent(element)
    print(ET.tostring(element, encoding='unicode'))


# 对应接受消息回调模式中的token,EncodingAESKey 和 企业信息中的企业id
qy_api = [
    WXBizMsgCrypt("***************", "**************************", "*********************"), ]


# 开启消息接受模式时验证接口连通性
def signature(request, i):
    msg_signature = request.args.get('msg_signature', '')
    timestamp = request.args.get('timestamp', '')
    nonce = request.args.get('nonce', '')
    echo_str = request.args.get('echostr', '')
    ret, sEchoStr = qy_api[i].VerifyURL(msg_signature, timestamp, nonce, echo_str)
    if (ret != 0):
        print("ERR: VerifyURL ret: " + str(ret))
        return ("failed")
    else:
        return (sEchoStr)


# 接收用户消息,可进行被动响应
def handle_user_message(request, i):
    user_message = request.data
    printXML(user_message)
    msg_signature = request.args.get('msg_signature', '')
    timestamp = request.args.get('timestamp', '')
    nonce = request.args.get('nonce', '')
    ret, sMsg = qy_api[i].DecryptMsg(user_message.decode('utf-8'), msg_signature, timestamp, nonce)
    decrypt_data = {}
    for node in list(fromstring(sMsg.decode('utf-8'))):
        decrypt_data[node.tag] = node.text
    # 解析后得到的decrypt_data: {"ToUserName":"企业号", "FromUserName":"发送者用户名", "CreateTime":"发送时间", "Content":"用户发送的内容", "MsgId":"唯一id,需要针对此id做出响应", "AagentID": "应用id"}
    # 用户应根据Content的内容自定义要做出的行为,包括响应返回数据,如下例子,如果发送的是123,就返回hello world

    content_text = decrypt_data.get('Content', '')
    to_username_text = decrypt_data.get('ToUserName', '')
    from_username_text = decrypt_data.get('FromUserName', '')
    create_time_text = decrypt_data.get('CreateTime', '')

    # 主菜单
    if content_text == '#help':
        sRespData = help_list(to_username_text, from_username_text, create_time_text)

    # 生活菜单
    if content_text == '#help02':
        sRespData = help_funny_list(to_username_text, from_username_text, create_time_text)

    # 天气查询
    if content_text == '#天气查询':
        sRespData = get_weather(to_username_text, from_username_text, create_time_text)

    # 摸鱼日报
    if content_text == '#摸鱼日报':
        sRespData = get_myrb(to_username_text, from_username_text, create_time_text)

    # 随机点歌
    if content_text == '#随机点歌':
        sRespData = get_music(to_username_text, from_username_text, create_time_text)

    # 星座运势
    if "#星座运势#" in content_text:
        sRespData = get_fortune(content_text, to_username_text, from_username_text, create_time_text)

    # 电话查询
    if "#电话查询#" in content_text:
        sRespData = get_tel(content_text, to_username_text, from_username_text, create_time_text)

    # 快递查询
    if "#快递查询#" in content_text:
        sRespData = get_express(content_text, to_username_text, from_username_text, create_time_text)

    # OPS工具
    if "#ops#" in content_text:
        sRespData = get_ops(content_text, to_username_text, from_username_text, create_time_text)

    ret, send_msg = qy_api[i].EncryptMsg(sReplyMsg=sRespData, sNonce=nonce)
    if ret == 0:
        return send_msg
    else:
        print(send_msg)


@app.route('/company_wechat', methods=['GET', 'POST'])
def company_wechat():
    if request.method == 'GET':
        return signature(request, 0)
    else:
        print("收到请求......")
        return handle_user_message(request, 0)

# Flask服务端口,可自定义
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=6969, debug=True)

 将刚刚记录下的 TokenEncodingAESKey企业ID 分别替换至该段:

2.5 解密库 WXBizMsgCrypt3.py 内容:

# -*- encoding:utf-8 -*-

""" 对企业微信发送给企业后台的消息加解密示例代码.
@copyright: Copyright (c) 1998-2014 Tencent Inc.

"""
# ------------------------------------------------------------------------
import logging
import base64
import random
import hashlib
import time
import struct
from Crypto.Cipher import AES
import xml.etree.cElementTree as ET
import socket
import ierror

"""
关于Crypto.Cipher模块,ImportError: No module named 'Crypto'解决方案
请到官方网站 https://www.dlitz.net/software/pycrypto/ 下载pycrypto。
下载后,按照README中的“Installation”小节的提示进行pycrypto安装。
"""

class FormatException(Exception):
    pass

def throw_exception(message, exception_class=FormatException):
    """my define raise exception function"""
    raise exception_class(message)

class SHA1:
    """计算企业微信的消息签名接口"""

    def getSHA1(self, token, timestamp, nonce, encrypt):
        """用SHA1算法生成安全签名
        @param token:  票据
        @param timestamp: 时间戳
        @param encrypt: 密文
        @param nonce: 随机字符串
        @return: 安全签名
        """
        try:
            sortlist = [token, timestamp, nonce, encrypt]
            sortlist.sort()
            sha = hashlib.sha1()
            sha.update("".join(sortlist).encode())
            return ierror.WXBizMsgCrypt_OK, sha.hexdigest()
        except Exception as e:
            logger = logging.getLogger()
            logger.error(e)
            return ierror.WXBizMsgCrypt_ComputeSignature_Error, None

class XMLParse:
    """提供提取消息格式中的密文及生成回复消息格式的接口"""

    # xml消息模板
    AES_TEXT_RESPONSE_TEMPLATE = """<xml>
<Encrypt><![CDATA[%(msg_encrypt)s]]></Encrypt>
<MsgSignature><![CDATA[%(msg_signaturet)s]]></MsgSignature>
<TimeStamp>%(timestamp)s</TimeStamp>
<Nonce><![CDATA[%(nonce)s]]></Nonce>
</xml>"""

    def extract(self, xmltext):
        """提取出xml数据包中的加密消息
        @param xmltext: 待提取的xml字符串
        @return: 提取出的加密消息字符串
        """
        try:
            xml_tree = ET.fromstring(xmltext)
            encrypt = xml_tree.find("Encrypt")
            return ierror.WXBizMsgCrypt_OK, encrypt.text
        except Exception as e:
            logger = logging.getLogger()
            logger.error(e)
            return ierror.WXBizMsgCrypt_ParseXml_Error, None

    def generate(self, encrypt, signature, timestamp, nonce):
        """生成xml消息
        @param encrypt: 加密后的消息密文
        @param signature: 安全签名
        @param timestamp: 时间戳
        @param nonce: 随机字符串
        @return: 生成的xml字符串
        """
        resp_dict = {
            'msg_encrypt': encrypt,
            'msg_signaturet': signature,
            'timestamp': timestamp,
            'nonce': nonce,
        }
        resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict
        return resp_xml

class PKCS7Encoder():
    """提供基于PKCS7算法的加解密接口"""

    block_size = 32

    def encode(self, text):
        """ 对需要加密的明文进行填充补位
        @param text: 需要进行填充补位操作的明文
        @return: 补齐明文字符串
        """
        text_length = len(text)
        # 计算需要填充的位数
        amount_to_pad = self.block_size - (text_length % self.block_size)
        if amount_to_pad == 0:
            amount_to_pad = self.block_size
        # 获得补位所用的字符
        pad = chr(amount_to_pad)
        return text + (pad * amount_to_pad).encode()

    def decode(self, decrypted):
        """删除解密后明文的补位字符
        @param decrypted: 解密后的明文
        @return: 删除补位字符后的明文
        """
        pad = ord(decrypted[-1])
        if pad < 1 or pad > 32:
            pad = 0
        return decrypted[:-pad]

class Prpcrypt(object):
    """提供接收和推送给企业微信消息的加解密接口"""

    def __init__(self, key):

        # self.key = base64.b64decode(key+"=")
        self.key = key
        # 设置加解密模式为AES的CBC模式
        self.mode = AES.MODE_CBC

    def encrypt(self, text, receiveid):
        """对明文进行加密
        @param text: 需要加密的明文
        @return: 加密得到的字符串
        """
        # 16位随机字符串添加到明文开头
        text = text.encode()
        text = self.get_random_str() + struct.pack("I", socket.htonl(len(text))) + text + receiveid.encode()

        # 使用自定义的填充方式对明文进行补位填充
        pkcs7 = PKCS7Encoder()
        text = pkcs7.encode(text)
        # 加密
        cryptor = AES.new(self.key, self.mode, self.key[:16])
        try:
            ciphertext = cryptor.encrypt(text)
            # 使用BASE64对加密后的字符串进行编码
            return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
        except Exception as e:
            logger = logging.getLogger()
            logger.error(e)
            return ierror.WXBizMsgCrypt_EncryptAES_Error, None

    def decrypt(self, text, receiveid):
        """对解密后的明文进行补位删除
        @param text: 密文
        @return: 删除填充补位后的明文
        """
        try:
            cryptor = AES.new(self.key, self.mode, self.key[:16])
            # 使用BASE64对密文进行解码,然后AES-CBC解密
            plain_text = cryptor.decrypt(base64.b64decode(text))
        except Exception as e:
            logger = logging.getLogger()
            logger.error(e)
            return ierror.WXBizMsgCrypt_DecryptAES_Error, None
        try:
            pad = plain_text[-1]
            # 去掉补位字符串
            # pkcs7 = PKCS7Encoder()
            # plain_text = pkcs7.encode(plain_text)
            # 去除16位随机字符串
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
            xml_content = content[4: xml_len + 4]
            from_receiveid = content[xml_len + 4:]
        except Exception as e:
            logger = logging.getLogger()
            logger.error(e)
            return ierror.WXBizMsgCrypt_IllegalBuffer, None

        if from_receiveid.decode('utf8') != receiveid:
            return ierror.WXBizMsgCrypt_ValidateCorpid_Error, None
        return 0, xml_content

    def get_random_str(self):
        """ 随机生成16位字符串
        @return: 16位字符串
        """
        return str(random.randint(1000000000000000, 9999999999999999)).encode()

class WXBizMsgCrypt(object):
    # 构造函数
    def __init__(self, sToken, sEncodingAESKey, sReceiveId):
        try:
            self.key = base64.b64decode(sEncodingAESKey + "=")
            assert len(self.key) == 32
        except:
            throw_exception("[error]: EncodingAESKey unvalid !", FormatException)
            # return ierror.WXBizMsgCrypt_IllegalAesKey,None
        self.m_sToken = sToken
        self.m_sReceiveId = sReceiveId

        # 验证URL
        # @param sMsgSignature: 签名串,对应URL参数的msg_signature
        # @param sTimeStamp: 时间戳,对应URL参数的timestamp
        # @param sNonce: 随机串,对应URL参数的nonce
        # @param sEchoStr: 随机串,对应URL参数的echostr
        # @param sReplyEchoStr: 解密之后的echostr,当return返回0时有效
        # @return:成功0,失败返回对应的错误码

    def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr):
        sha1 = SHA1()
        ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, sEchoStr)
        if ret != 0:
            return ret, None
        if not signature == sMsgSignature:
            return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
        pc = Prpcrypt(self.key)
        ret, sReplyEchoStr = pc.decrypt(sEchoStr, self.m_sReceiveId)
        return ret, sReplyEchoStr

    def EncryptMsg(self, sReplyMsg, sNonce, timestamp=None):
        # 将企业回复用户的消息加密打包
        # @param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串
        # @param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间
        # @param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce
        # sEncryptMsg: 加密后的可以直接回复用户的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串,
        # return:成功0,sEncryptMsg,失败返回对应的错误码None
        pc = Prpcrypt(self.key)
        ret, encrypt = pc.encrypt(sReplyMsg, self.m_sReceiveId)
        encrypt = encrypt.decode('utf8')
        if ret != 0:
            return ret, None
        if timestamp is None:
            timestamp = str(int(time.time()))
        # 生成安全签名
        sha1 = SHA1()
        ret, signature = sha1.getSHA1(self.m_sToken, timestamp, sNonce, encrypt)
        if ret != 0:
            return ret, None
        xmlParse = XMLParse()
        return ret, xmlParse.generate(encrypt, signature, timestamp, sNonce)

    def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):
        # 检验消息的真实性,并且获取解密后的明文
        # @param sMsgSignature: 签名串,对应URL参数的msg_signature
        # @param sTimeStamp: 时间戳,对应URL参数的timestamp
        # @param sNonce: 随机串,对应URL参数的nonce
        # @param sPostData: 密文,对应POST请求的数据
        #  xml_content: 解密后的原文,当return返回0时有效
        # @return: 成功0,失败返回对应的错误码
        # 验证安全签名
        xmlParse = XMLParse()
        ret, encrypt = xmlParse.extract(sPostData)
        if ret != 0:
            return ret, None
        sha1 = SHA1()
        ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, encrypt)
        if ret != 0:
            return ret, None
        if not signature == sMsgSignature:
            return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
        pc = Prpcrypt(self.key)
        ret, xml_content = pc.decrypt(encrypt, self.m_sReceiveId)
        return ret, xml_content

2.6 启动应用,测试收发

nohup python3 app.py > /dev/null 2>&1 &

 测试收发(旧图):

三、菜单功能示例

3.1 help_funny_list.py 菜单内容:

注:改用了主动发送消息的方式,将回复内容设为markdown,并发送至指定成员ID)

import requests
import json


# 帮助菜单
def help_funny_list(to_username_text, from_username_text, create_time_text):

	# 获取access_token
    token_api = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'

    params = {
        'corpid': "******************",
        'corpsecret': "******************"
    }
    access_token = requests.get(token_api, params=params).json()['access_token']
    print(access_token)
	
	# 主动发送消息
    send_api = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}'

    payload = json.dumps({
        "touser": from_username_text,
        "msgtype": "markdown",
        "agentid": 1000003,
        "markdown": {
            "content": "# 【其他功能菜单】\n "
                       ">**【`#天气查询`】:查询实时天气信息**\n\n\n "
                       ">**【`#电话查询`】:查询手机号归属地信息**\n "
                       ">[<font color=\"warning\">指令格式</font>]:<font color=\"comment\">#电话查询#手机号码</font>\n"
                       ">[<font color=\"info\">指令示例</font>]:<font color=\"comment\">##电话查询#15200000000</font>\n\n\n"
                       ">**【`#快递查询`】:查询实时快递物流信息**\n "
                       ">[<font color=\"warning\">指令格式</font>]:<font color=\"comment\">#快递查询#快递公司#手机尾号#快递单号</font>\n"
                       ">[<font color=\"info\">指令示例</font>]:<font color=\"comment\">#快递查询#京东快递#95**#JD01425**************</font>\n"
                       "→[点击查看可用快递列表](http://work.weixin.qq.com/api/doc)\n\n\n"
                       ">**【`#星座运势`】:查询当日十二星座运势**\n "
                       ">[<font color=\"warning\">指令格式</font>]:<font color=\"comment\">#星座运势#星座名称</font>\n"
                       ">[<font color=\"info\">指令示例</font>]:<font color=\"comment\">#星座运势#金牛座</font>\n\n\n"
                       ">**【`#随机点歌`】:随机获取网易云在线歌曲**\n\n\n "
                       ">**【`#摸鱼日报`】:获取当日宜忌事项、历史事件、热点新闻**\n\n\n "
                       ">**【`#help`】:获取主菜单**"
        }
    })
    headers = {
        'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
        'Content-Type': 'application/json',
        'Accept': '*/*',
        'Host': 'qyapi.weixin.qq.com',
        'Connection': 'keep-alive'
    }
    response = requests.post(send_api, headers=headers, data=payload, timeout=15)
    if response.status_code == 200:
        print('ok')

将刚刚记录下来的 企业IDSecret 分别替换至该段:

 效果如下:

3.2 通过 paramiko 交互远程服务器,回复服务器信息

get_ops.py 内容如下:

import paramiko
import os


def get_ops(content_text, to_username_text, from_username_text, create_time_text):
    # 使用 split 以#分割字符串
    parts = content_text.split('#')
    # 检查分割后的列表是否有足够的分段
    if len(parts) >= 4:
        ip_address = parts[2]  # 获取ip地址
        command = parts[3]  # 获取命令
        client = paramiko.SSHClient()
        # 添加服务器密钥,如果使用的是密钥形式
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        # 连接SSH服务端
        client.connect(ip_address, port=22, username='root', password='********')
        # 执行命令
        stdin, stdout, stderr = client.exec_command(command)
        # 获取命令执行结果
        result = stdout.read().decode('utf-8', errors='ignore')
        sRespData = """<xml>
                <ToUserName>{to_username}</ToUserName>
                <FromUserName>{from_username}</FromUserName>
                <CreateTime>{create_time}</CreateTime>
                <MsgType>text</MsgType>
                <Content>{content}</Content>
                </xml>
                """.format(to_username=to_username_text,
                           from_username=from_username_text,
                           create_time=create_time_text,
                           content=result, )
        return sRespData

效果如下:

3.3 查询物流信息,被动回复纯文本格式

get_express.py 内容如下:

import requests
import json
import re


def get_express(content_text, to_username_text, from_username_text, create_time_text):
    
    # 将接收到的消息内容以#进行分割
    parts = content_text.split('#')
    if len(parts) >= 4:
        com_str = parts[2]
        phone_int = parts[3]
        no_str = parts[4]

    # 查询本地json文件中com_str对应的NO值
    with open('funny/exp.json', 'r', encoding='utf-8') as f:
        data = json.load(f)["result"]
    for item in data:
        if item["com"] == com_str:
            com_no = (item["no"])
            break
    else:
        error_msg = '输入有误,未找到物流信息'

	# 聚合平台物流查询接口,接口文档:https://www.juhe.cn/docs/api/id/43
    api_url = "http://v.juhe.cn/exp/index"
    params = {
        "key": "*********************************",
        "com": com_no,
        "no": no_str,
        "receiverPhone": phone_int
    }
    response = requests.get(api_url, params=params)
    json_data = response.json()
    
    # 提取result中的值
    exp_info = {
        "company": json_data["result"].get("company"),
        "no": json_data["result"].get("no"),
        "status_detail": json_data["result"].get("status_detail")
    }
   
   # 从 list 中提取每一项的 datetime 和 remark ,然后格式化为字符串
    list_items = "\n\n".join(
        "【物流时间】:{}\n【物流详情】:{}".format(item.get("datetime"), item.get("remark"))
        for item in json_data["result"].get("list", [])
    )
    # 构造最终的回复字符串,包括所有物流详情
    reply = ('【物流公司】:{company}\n【物流单号】:{no}\n【物流状态】:{status_detail}\n{list_items}'.format(**exp_info,
                                                                                                      list_items=list_items))

    sRespData = """<xml>
            <ToUserName>{to_username}</ToUserName>
            <FromUserName>{from_username}</FromUserName>
            <CreateTime>{create_time}</CreateTime>
            <MsgType>text</MsgType>
            <Content>{content}</Content>
            </xml>
            """.format(to_username=to_username_text,
                       from_username=from_username_text,
                       create_time=create_time_text,
                       content=reply, )
    return sRespData

 效果如下:

3.4 获取随机音乐链接,被动回复图文格式

get_music.py 内容如下:

import requests
import json
import re


def get_music(to_username_text, from_username_text, create_time_text):
    api_url = "https://api.52vmy.cn/api/music/wy/rand"
    response = requests.get(api_url)
    data = response.json().get('data', {})
    music_info = {key: data.get(key) for key in ['song', 'singer', 'cover', 'Music']}
    sRespData = """<xml>
                        <ToUserName>{to_username}</ToUserName>
                        <FromUserName>{from_username}</FromUserName>
                        <CreateTime>{create_time}</CreateTime>
                        <MsgType>news</MsgType>
                        <ArticleCount>1</ArticleCount>
                            <Articles>
                                <item>
                                    <Title>歌曲名:{title}</Title>
                                    <Description>演唱者:{description}</Description>
                                    <PicUrl>{picurl}</PicUrl>
                                    <Url>{url}</Url>
                                </item>
                            </Articles>
                        </xml>
                        """.format(to_username=to_username_text,
                                   from_username=from_username_text,
                                   create_time=create_time_text,
                                   title=music_info['song'],
                                   description=music_info['singer'],
                                   picurl=music_info['cover'],
                                   url=music_info['Music'])
    return sRespData

效果如下:

3.5 其他问题

由于应用被动回复消息的格式不支持文件类型,如需将文件回复至企微可以采用2种方式:

①、在服务端配置nginx静态目录,通过静态页面路径 + 文件名 的形式拼接出完整的文件 url 地址,再通过图文类型的消息格式带入回复。

②、改用主动发送消息的方式,通过企微素材上传接口上传文件,并获取对应的 media_id ,再通过文件类型消息带入 media_id 指定对应成员完成发送,示例:

# ===================== 【获取access_token】 ==========================

# API 地址
url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
# 企业ID
corpid = '*****************'
# SECRET
corpsecret = '******************************'

params = {
    'corpid': corpid,
    'corpsecret': corpsecret
}

access_token = requests.get(url, params=params).json()['access_token']
print(access_token)

# ================== 【上传素材获取media_id】 =======================

# API 地址
url = 'https://qyapi.weixin.qq.com/cgi-bin/media/upload'

params = {
    'access_token':  access_token,
    'type': 'file'
}

# 要上传的文件
files = {
    'media': ('111.xlsx', open(r'C:\Users\Looper\Desktop\111.xlsx', 'rb'), 'application/octet-stream')
}

media_id = requests.post(url, params=params, files=files).json()['media_id']
print(media_id)

到此,企微应用的消息收发实施,全部测试完毕。  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2129793.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PAT甲级-1012 The Best Rank

题目 题目大意 学生有C、M、E三个成绩&#xff0c;A是这三个成绩的平均值。要求对每个学生的C、M、E、A分别排名&#xff0c;取这4项的最高排名为最优排名。如果一个学生有多项排名一样&#xff0c;按照A > C > M > E的优先级输出最优排名。 输入给出学生人数和查询…

派遣函数-编写一个更通用的派遣函数

前面介绍的派遣函数处理过于简单&#xff0c;下面带领读者对派遣函数一步步进行扩充。首先介绍一个重要数据结构--IO_STACK LOCATION,即I/O堆栈,这个数据结构和IRP紧密相连。 在前面&#xff0c;曾经介绍过驱动程序的层次结构。驱动对象会创建一个个的设备对象&#xff0c; 并将…

前端单独实现 vue 动态路由

前端单独实现 vue 动态路由 Vue 动态路由权限是指在 Vue 应用程序中&#xff0c;根据用户的权限动态生成和控制路由的行为。这意味着不是所有的路由都在应用启动时就被硬编码到路由配置中&#xff0c;而是根据用户的权限信息&#xff0c;在运行时动态地决定哪些路由应该被加载…

3. 轴指令(omron 机器自动化控制器)——>MC_PowerMC_MoveJog

机器自动化控制器——第三章 轴指令 1 MC_Power变量▶输入变量▶输出变量▶输入输出变量 功能说明▶时序图▶重启运动指令▶多重启动运动指令▶错误代码 MC_MoveJog变量▶输入变量▶输出变量▶输入输出变量 功能说明▶时序图▶重启运动指令▶多重启动运动指令▶异常 MC_Power …

从0书写一个softmax分类 李沐pytorch实战

输出维度 在softmax 分类中 我们输出与类别一样多。 数据集有10个类别&#xff0c;所以网络输出维度为10。 初始化权重和偏置 torch.norma 生成一个均值为 0&#xff0c;标准差为0.01,一个形状为size(num_inputs, num_outputs)的张量偏置生成一个num_outputs 10 的一维张量&a…

1265:【例9.9】最长公共子序列 动态规划

题目链接 题目&#xff1a; 思路 最长-最值问题、重叠子问题、最优结构-前面序列的公共序列最优值是后续序列的子问题、无后效性也满足 确定状态、变量&#xff1a;序列是没有要求要连续&#xff0c;因此只能用长度为i的串a分别和长度为&#xff08;1-j&#xff09;串b去找最值…

【Linux】:信号与信号产生

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家带来信号和信号的产生相关代码和知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到…

HarmonyOS开发实战( Beta5.0)日历切换案例实践详解

鸿蒙HarmonyOS开发往期必看&#xff1a; HarmonyOS NEXT应用开发性能实践总结 最新版&#xff01;“非常详细的” 鸿蒙HarmonyOS Next应用开发学习路线&#xff01;&#xff08;从零基础入门到精通&#xff09; 介绍 本示例介绍使用Swiper实现自定义日历月视图和周视图左右滑…

反编译app

反编译代码步骤&#xff1a; 1.用dex2jar 将apk打成jar&#xff0c;d2j-dex2jar your-app.apk GitHub - pxb1988/dex2jar: Tools to work with android .dex and java .class filesTools to work with android .dex and java .class files - pxb1988/dex2jarhttps://github.co…

注解实现json序列化的时候自动进行数据脱敏

最近在进行开发的时候遇到一个问题&#xff0c;需要对用户信息进行脱敏处理&#xff0c;原有的方式是写一个util类&#xff0c;在需要脱敏的字段查出数据后&#xff0c;显示掉用方法处理后再set回去&#xff0c;觉得这种方式能实现功能&#xff0c;但是不是特别优雅&#xff0c…

机器学习特征分析

机器学习的常规流程 在真正进入机器学习算法之前&#xff0c;数据准备和处理过程会尤为重要&#xff0c;这直接关系到后续模型的效果和最终的业务判决。 数据分析 什么是数据分析 数据分析指对原始数据进行检查、清理、转换及筛选等一系列动作&#xff0c;找到数据对结果的影…

Qwen1.5模型文本分类微调实战教程

大家好啊!今天咱们来聊聊怎么给大语言模型"调教"一下&#xff0c;让它在文本分类这个任务上玩得更溜。具体来说&#xff0c;我们要用Qwen1.5这个模型来做文章。别看这活儿听着高大上&#xff0c;其实做起来也没那么难。跟着我来&#xff0c;保证让你轻松上手! 咱们这…

How to fool AI content detectors?

Add prompt below: Make it sound like a tweed jacket wearing professor taking to a group of 20 years old students. Vary the sentences length. Make it persoanl, add a touch of humor. Make the blog post sound unique when compared to Other blog posts.

MySQL--库的操作

文章目录 1.创建数据库2.创建数据库案例3.字符集和校验规则3.1默认字符集3.2默认校验规则3.3查看系统默认字符集以及校验规则3.4查看数据库支持的字符3.5查看数据库支持的字符集校验规则3.6校验规则对数据库的影响不区分大小写查询&#xff1a;排序结果&#xff1a;区分大小写查…

BFS广度优先搜索和DFS深度优先搜索解决迷宫问题

前言 BFS广度优先搜索和DFS深度优先搜索解决迷宫问题 迷宫问题 原题目&#xff1a;迷宫由n行m列的单元格组成(n,m都小于等于50)&#xff0c;每个单元格要吗是空地要吗是障碍物。现在请你找到一条从起点到终点的最短路径长度。 分析 BFS广度优先搜索 首先我们将起点入队&a…

iOS 18 RC 版本更新,为相机应用引入了“暂停录制视频”功能

苹果公司9月10日正式向全球iPhone用户推送了iOS 18 Release Candidate&#xff08;RC&#xff09;版本。这一版本的发布不仅标志着iOS系统的又一次重大更新&#xff0c;更预示着苹果在提升用户体验、增强隐私保护以及推动AI应用方面的持续努力。 并且此次苹果公司最新推出的 i…

Unity基本操作

API手册 Unity 脚本 APIhttps://docs.unity.cn/cn/2022.3/ScriptReference/index.html 在遇到不懂的方法、想更深入的学习或者是想查看是否有相应的方法实现某项功能&#xff0c;可以在Unity官方这里查看脚本。以Transform为例&#xff0c;可以直接搜索&#xff0c;或者在Unit…

9月12日 QT

//设置图片缩放适应label ui->label->setScaledContents(true); // 在spinbox后方设置$特殊符号 ui->spinBox->setSuffix(" 斤"); //给肉类combobox加入项目 QStringList Meat_List{"请选择&quo…

数据放到GPU上,运行程序卡住检查方法

这个问题一定是要结合具体的代码&#xff0c;下面就自己遇到问题&#xff0c;询问chatGPT后发现问题所在的过程进行记录&#xff0c;当然绝大部分情况下都是batch_size设置太大了&#xff0c;显卡内存不足导致 部分重点代码&#xff1a; 导入模型部分略 #自定义数据集有关类 c…