Python Tips6 基于数据库和钉钉机器人的通知

news2024/11/16 5:56:36

说明

起因是我第一版quant程序的短信通知失效了。最初认为短信是比较即时且比较醒目的通知方式,现在看来完全不行。

列举三个主要问题:

  • 1 延时。在早先还能收到消息的时候,迟滞就很严重,几分钟都算短的。
  • 2 完全丢失。我手机没有做任何设置,但是没有消息了。反正华为和腾讯,总有一个是锅。
    在这里插入图片描述
  • 3 短信模板。由于短信的模板需要审批,所以内容缩减的不像样子。所以信息阅读起来很累。

结论:不再采用短信方式,而是使用机器、邮件的方式通知。

内容

以下主要的内容有两部分:

  • 1 通过ORM方式操作数据库。
  • 2 使用钉钉机器人

1 数据

数据存在Mongo中,形如

在这里插入图片描述
在每次生成交易订单时,都会发出一条短信。所以这几天miss掉的订单,每个都是5个点,甚至十几个点的利润,太可惜了。100个短信包也比不上这些损失啊。

需要在订单的买和卖时间发生变动时发送通知信息。

对于发送程序来说,可以每30秒执行一次查询,每次执行时基于「当前时间」来判断需要发送的消息:

  • 1 基础筛选。只查询3天的候选数据。
  • 2 筛选买单。当 is_buy 字段不为1时,说明该笔订单的买信号没有发送,进行发送。
  • 3 筛选卖单。当 is_sell字段为不为1时,处理卖单消息。

所以对于业务来说,Mongo真的是更方便的,当需要增加字段时完全不用管表结构。

最近因为要做一些数据库方面的简单培训,在梳理的过程中,我发现其实很多数据库的出现是因为其特定的业务场景塑造的,而且通常能适应A场景的数据库,在B场景就不太行。所以A数据库通常无法取代B数据库

以Mongo和Mysql为例,是否A能取代B的问题应该是研究最多的(更大的考虑是否能用NoSQL取代SQL),现在来看,结论是不能。

目前可以做一些简单结论:

  • 1 当数据维度经常需要变动时,用Mongo。所以与操作和控制相关的场景下,主要用Mongo。业务形态不固定的时候,也用Mongo。例如本次,最初我并没有设计is_buy字段,但是要更改时几乎是瞬时完成的,似乎一开始就这样。
  • 2 当数据需要连表时,用Mysql。特别是结构化数据,用Mysql来存是比较方便的,每个字段也预先做了限定,不会犯错。

以下也简单总结了一些数据库的特点/场景:

  • 1 Redis : 高速kv存取,可用于实时计数等。
  • 2 Mongo: 文档性数据库,可用于集成部分的数据,适合作为主库。
  • 3 Mysql: 表格型数据库,可用于结构化部分的存储。
  • 4 Postgres: 算是Mysql的平替吧,在功能上更完善一些,许可更open。
  • 5 ES: 文档性数据库,主要用于解决模糊搜索效率的问题,用于搜索、推荐。
  • 6 Milvus: 向量数据库,主要解决大量中间向量的存储,以及相似性检索,用于搜图等功能。
  • 7 ClickHouse: 列式数据库。用于高效存储和统计,某种程度上也适合备份。速度太快,压缩比太高了。
  • 8 InfluxDB: 时序数据库。快速存储和分析时间数据。可用于物联网、金融等。
  • 9 Neo4j: 图库。用于存储真正的关联信息,适合用于知识图谱。
  • 10 SQLite:容易被忽略,但是这个还是非常棒的数据库。适合微型应用,嵌入式应用等。

还有很多数据库,对应着一些其他维度的细分。比如FAISS也是向量数据库,但是是在内存里;而Milvus是在硬盘上。然后还有很多是功能近似的,就不展开了。

2 ORM

之前我用pymongo自己搭了微服务,然后基于这个微服务又做了WMongo对象封装,本质上还是函数式的。结论是,函数式也好,agent(微服务)也好,的确是有其存在的必要和应用场景的。不过这种场景更偏向后端纯粹的数据处理与分析,在搭建应用对象(强调集成性和灵活性)时的确不方便。WMongo如果之后继续和pydantic结合,我应该也会做出一个类似工具。

言归正传, 操作Mongo时pymongo和mongoengine的关系就类似与pymysql和sqlalchemy;而Motor就像 aiomysql + sqlalchemy。

先从同步入手:

2.1 连接

from mongoengine import connect, disconnect,Document, StringField, IntField,DictField

# 可以直接使用函数式连接
connect(
    db='your_database_name',       # 数据库名称
    host='localhost',              # 主机地址,默认为localhost
    port=27017,                    # 端口号,默认为27017
    username='your_username',      # 可选,用户名
    password='your_password',      # 可选,密码
    authentication_source='admin'  # 可选,认证数据库,默认为admin
)
# 也可以使用url方式连接
# 集群
connect(
    host='mongodb://USER:PASSWD@IP1:PORT1,IP2:PORT2,IP3:PORT3,IP4:PORT4/%s?authSource=admin&replicaSet=mymeta' % db_name
)

本次仅按单机方式连接

# 单机模式
host_url = 'mongodb://USER:PASSWD@IP1:PORT1/%s?authSource=admin' % db_name
client = connect(host=host_url)
# 获取所有数据库
databases = client.list_database_names()
print("Databases:")
for db in databases:
    print(db)

# 遍历每个数据库,获取集合
for db_name in databases:
    db = client[db_name]
    collections = db.list_collection_names()
    print(f"\nCollections in {db_name}:")
    for collection in collections:
        print(collection)

# 断开连接
disconnect()

连接之后,做了一些函数式的操作,这个虽然不是ORM的主要方向,但有时候还是有点用的。接下来则是对数据的操作:

设置对象时,如果meta没有设置,对应的表名将会按类名的小写拆开建立并严格校验字段的一致性。这里我并不会映射现有表的全部字段,需要设置非严格模式。

    meta = {
        'collection': 'trade_orders',  # MongoDB 中的集合名
        'strict': False,  # 不强制字段验证,允许动态字段
    }

2.2 查询

我发现关于mongoengine的内容,大模型说的很多是错误的。
这是一个还不错的介绍。一文带你深入浅出 MongoEngine 经典查询【内附详细案例】

因为之前用pymongo做了很多开发,所以知道字符串是可以比较的。程序里的这个报错TypeError: '>=' not supported between instances of 'StringField' and 'str'是指>=只能用于数值型的筛选。而mongo里的gte方法则是通过变量名称的特殊构造来传递的(x__gte), 虽然有点妖,但的确是个好点子。

另外如果返回后会进行批量处理(而不是使用单个实例),那么可以在定义映射模型时定义返回字典。


# 定义与 MongoDB 集合关联的类
class TradeOrders(Document):
    # 指定 MongoDB 中的集合名称
    # 启用动态模式,这样可以处理未定义的字段
    meta = {
        'collection': 'trade_orders',  # MongoDB 中的集合名
        'strict': False,  # 不强制字段验证,允许动态字段
        'indexes': [
            {'fields': ['is_buy_sms_tag']},
            {'fields': ['is_sell_sms_tag']}
        ]

    }
    
    # 定义字段
    model_signal = StringField(required=True, max_length=50)
    reason = StringField()
    np = FloatField()
    is_win = IntField()
    buy_price = FloatField()
    sell_price = FloatField()
    buy_dt = StringField()
    sell_dt = StringField()
    buy_amt = FloatField()

    strategy_name = StringField()
    is_buy_sms_tag = IntField()
    is_sell_sms_tag = IntField()

    def dict(self):
        res_dict = {}
        res_dict['model_signal'] = self.model_signal
        res_dict['reason'] = self.reason
        res_dict['np'] = self.np
        res_dict['is_win'] = self.is_win
        res_dict['buy_price'] = self.buy_price
        res_dict['sell_price'] = self.sell_price
        res_dict['buy_dt'] = self.buy_dt
        res_dict['buy_amt'] = self.buy_amt

        res_dict['strategy_name'] = self.strategy_name
        res_dict['is_buy_sms_tag'] = self.is_buy_sms_tag
        res_dict['is_sell_sms_tag'] = self.is_sell_sms_tag
        return res_dict



# 实例化时会为现有集合创建索引(是ensure index的方式)
from datetime import datetime, timedelta
# 获取当前时间
now = datetime.now()

# 计算7天前的时间
seven_days_ago = now - timedelta(days=7)

# 将 seven_days_ago 转换为字符串格式,以便与 buy_dt 和 sell_dt 进行比较
seven_days_ago_str = seven_days_ago.strftime('%Y-%m-%d %H:%M:%S')

# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(buy_dt__gt=seven_days_ago_str).all()
recent_orders1 = [x.dict() for x in recent_orders]

 {'model_signal': '510500_normal_strong_2_near_120_far_1200_top_85_standard_gbdt_running_B',
  'reason': 'Control Sell',
  'np': 392.1,
  'is_win': 1,
  'buy_price': 5.533,
  'sell_price': 5.999,
  'buy_dt': '2024-09-30 09:54:00',
  'buy_amt': 4980.0,
  'strategy_name': '510500_normal_strong_2_near_120_far_1200_top_85_standard_gbdt_running_B_15_15',
  'is_buy_sms_tag': None,
  'is_sell_sms_tag': None},
 {'model_signal': None,
  'reason': None,
  'np': None,
  'is_win': None,
  'buy_price': 4.162,
  'sell_price': None,
  'buy_dt': '2024-09-30 13:43:00',
  'buy_amt': 4994.0,
  'strategy_name': '510300_normal_strong_2_near_60_far_1800_top_80_all_notnull_gbdt_running_A_10_15',
  'is_buy_sms_tag': None,
  'is_sell_sms_tag': None}]

3 钉钉

以下是发送函数。webhook url需要在钉钉群里创建机器人得到。

import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse

# 钉钉 Webhook 和加签密钥

# mine
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxx'
secret ='xxx'


# 生成签名
def generate_sign(secret):
    timestamp = str(round(time.time() * 1000))
    secret_enc = secret.encode('utf-8')
    string_to_sign = '{}\n{}'.format(timestamp, secret)
    string_to_sign_enc = string_to_sign.encode('utf-8')
    hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
    return timestamp, sign

# 发送信息到群里面
def send_dingtalk_message_markdown(webhook, secret, title = None, message = None):
    try:
        timestamp, sign = generate_sign(secret)
        url = f"{webhook}&timestamp={timestamp}&sign={sign}"
        headers = {'Content-Type': 'application/json'}

        data = {
            "msgtype": 'markdown',
            "markdown": {
                'title':title,
                "text": message
            }
        } 
        print(f"Sending message to URL: {url}")
        print(f"Message content: {data}")
        response = requests.post(url, headers=headers, data=json.dumps(data))
        print(f"Response: {response.status_code}, {response.text}")
        return response.status_code, response.text
    except Exception as e:
        print(f"Error sending message: {e}")
        return None, str(e)

send_dingtalk_message_markdown('https://oapi.dingtalk.com/robot/send?access_token=xxx',
'xxx',title ='It is Title', message= '# It is Content\n## AA' )

在这里插入图片描述
效果比短信好多了。

4 整合逻辑

按照买消息和卖消息两个维度进行数据的整理和通知。对于买消息而言,消息的内容更少,而对于卖消息,需要展示的内容更多。在发布完消息后要把对应的标记打上,避免再次被筛选到。

作为Buy,提取以下字段发布

  • 1 Code
  • 2 Buy Datetime
  • 3 Buy Price
  • 4 Buy Amout
  • 5 Strategy Name

发布消息之后进行ACK(也就是把对应的字段置为1),所以,某种程度上说使用消息队列可能更自然一些。

# 先获取未消息的买单数据
from mongoengine.queryset.visitor import Q
# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(
    Q(buy_dt__gt=seven_days_ago_str) & Q(is_buy_sms_tag__ne = 1)
    
    ).all()
recent_orders1 = [x.dict() for x in recent_orders]

# 对任何一个买单
some_buy_order = recent_orders[0]
code = some_buy_order.strategy_name.split('_')[0]
msg_title = '### Buy %s' % code 
msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_buy_order.buy_dt, some_buy_order.buy_price,some_buy_order.buy_vol,some_buy_order.buy_amt ,some_buy_order.strategy_name)

send_dingtalk_message_markdown('https://oapi.dingtalk.com/robot/send?access_token=xxx',
'xxx',title =msg_title, message= msg_text )

在这里插入图片描述
发送消息后,对相应的买单执行ACK

# ack
some_buy_order.is_buy_sms_tag = 1
some_buy_order.save()

循环执行买消息


from mongoengine.queryset.visitor import Q
# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(
    Q(buy_dt__gt=seven_days_ago_str) & Q(is_buy_sms_tag__ne = 1)
    ).all()
# buy / one
import time 
if len(recent_orders):
    print('共有%s个买消息' % len(recent_orders))
    for some_buy_order in recent_orders:
        # some_buy_order = recent_orders[0]
        code = some_buy_order.strategy_name.split('_')[0]
        msg_title = '### Buy %s' % code 
        msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_buy_order.buy_dt, some_buy_order.buy_price,some_buy_order.buy_vol,some_buy_order.buy_amt ,some_buy_order.strategy_name)
        # msg 
        send_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text )
        # ack
        some_buy_order.is_buy_sms_tag = 1
        some_buy_order.save()
        # 防止消息发的太快
        time.sleep(1)

然后每次执行时都执行一次查询,如果有买单,按顺序循环输出即可。
在这里插入图片描述
同样的,对于卖单

  • 1 查询未发送的卖单
  • 2 与买单相比,卖单会输出更多的字段
if len(recent_sell_orders):
    print('共有%s个卖消息' % len(recent_sell_orders))
    for some_sell_order in recent_sell_orders:

        # some_sell_order = recent_sell_orders[0]

        code = some_sell_order.strategy_name.split('_')[0]
        msg_title = '### Sell %s' % code 
        msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_sell_order.buy_dt, some_sell_order.buy_price,some_sell_order.buy_vol,some_sell_order.buy_amt ,some_sell_order.strategy_name)
        msg_sell_text = '''\n### Sell DT:%s\n### Sell Price:%s\n### Sell Vol:%s\n ### Sell Amt:%s\n### NetProfit: %s\n### Reason:%s ''' % (some_sell_order.sell_dt, some_sell_order.sell_price, some_sell_order.sell_vol, some_sell_order.sell_amt, some_sell_order.np,some_sell_order.reason)
        send_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text+ msg_sell_text)

        # ack
        some_sell_order.is_sell_sms_tag = 1
        some_sell_order.save()
        # 防止消息发的太快
        time.sleep(1)

效果类似
在这里插入图片描述

5 APS运行

ubuntu系统自带的cron并不好用,所以我用python的apscheduler。只是在系统重启的时候需要自己管理,有两个办法可以解决这个问题:

  • 1 注册systemd服务,这样开机时服务可以跟随启动
  • 2 使用docker运行。

不过我给机器配了ups,一般情况下也不会关机,这个问题等之后再考虑吧,暂时就用nohup后台运行。

  • 1 在/home/workers/ 下放py文件
  • 2 在/home/local_aps/ 下放脚本文件
  • 3 在/home/local_aps/ 下放主启动文件

aps.py 主程序启动,aps默认是采用多线程方式控制多个worker(所以应该也也不是完全阻塞的)

from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler

def exe_sh(fpathname = None):
    os.system('sh %s ' % fpathname)

# 后台启动命令 nohup python3 /root/prj27_timetask/cron_task/test_001.py >/dev/null 2>&1 &

if __name__ == '__main__':
    sche1 = BlockingScheduler()
    sche1.add_job(exe_sh,'interval', seconds=30, kwargs ={'fpathname':'/home/local_aps/aps01.sh'})
    # sche1.add_job(exe_sh,'interval', seconds=86400, kwargs ={'fpathname':'/home/shs/clean_log.sh'})
    print('[S] starting inteverl')
    sche1.start()

aps01.sh用于灵活存放多个需要执行的python脚本

#!/bin/bash
python3 /home/workers/qtv102_dingtalk.py

qtv102_dingtalk.py 加上了logger部分

import logging
from logging.handlers import RotatingFileHandler

def get_logger(name, lpath='/var/log/'):
    logger = logging.getLogger(name)
    fpath = lpath + name + '.log'
    handler = RotatingFileHandler(fpath, maxBytes=100 * 1024 * 1024, backupCount=10)

    # 设置日志格式为 [时间] - [日志级别] - 消息
    formatter = logging.Formatter('[%(asctime)s] - [%(levelname)s] - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
    handler.setFormatter(formatter)

    logger.addHandler(handler)
    logger.setLevel(logging.INFO)

    return logger

# 按日志格式写入数据
log_info_tmp = '[%s] - %s'
logger = get_logger('qtv102_dingtalk')


from mongoengine import connect, disconnect,Document, StringField, IntField,DictField,FloatField

# connect(
#     db='your_database_name',       # 数据库名称
#     host='localhost',              # 主机地址,默认为localhost
#     port=27017,                    # 端口号,默认为27017
#     username='your_username',      # 可选,用户名
#     password='your_password',      # 可选,密码
#     authentication_source='admin'  # 可选,认证数据库,默认为admin
# )

db_name = 'QTV102_Strategy'

# 单机模式
host_url = 'mongodb://xxx:xxx@192.168.0.159:24086/'
connect(db_name, host=host_url)


# 定义与 MongoDB 集合关联的类
class TradeOrders(Document):
    # 指定 MongoDB 中的集合名称
    # 启用动态模式,这样可以处理未定义的字段
    meta = {
        'collection': 'trade_orders',  # MongoDB 中的集合名
        'strict': False,  # 不强制字段验证,允许动态字段
        'indexes': [
            {'fields': ['is_buy_sms_tag']},
            {'fields': ['is_sell_sms_tag']}
        ]

    }

    # 定义字段
    # model_signal = StringField(required=True, max_length=50)
    reason = StringField()
    np = FloatField()
    is_win = IntField()
    buy_price = FloatField()
    sell_price = FloatField()
    buy_dt = StringField()
    sell_dt = StringField()
    buy_vol = IntField()
    sell_vol = IntField()
    buy_amt = FloatField()
    sell_amt = FloatField()

    strategy_name = StringField()
    is_buy_sms_tag = IntField()
    is_sell_sms_tag = IntField()

    def dict(self):
        res_dict = {}
        res_dict['reason'] = self.reason
        res_dict['np'] = self.np
        res_dict['is_win'] = self.is_win
        res_dict['buy_price'] = self.buy_price
        res_dict['sell_price'] = self.sell_price

        res_dict['buy_dt'] = self.buy_dt
        res_dict['buy_amt'] = self.buy_amt

        res_dict['sell_dt'] = self.sell_dt
        res_dict['sell_amt'] = self.sell_amt

        res_dict['buy_vol'] = self.buy_vol
        res_dict['sell_vol'] = self.sell_vol

        res_dict['strategy_name'] = self.strategy_name
        res_dict['is_buy_sms_tag'] = self.is_buy_sms_tag
        res_dict['is_sell_sms_tag'] = self.is_sell_sms_tag
        return res_dict



# 实例化时会为现有集合创建索引(是ensure index的方式)
from datetime import datetime, timedelta
# 获取当前时间
now = datetime.now()

# 计算7天前的时间
seven_days_ago = now - timedelta(days=7)

# 将 seven_days_ago 转换为字符串格式,以便与 buy_dt 和 sell_dt 进行比较
seven_days_ago_str = seven_days_ago.strftime('%Y-%m-%d %H:%M:%S')


# recent_orders1 = [x.dict() for x in recent_orders]

# 发送消息
# test_md = '''# 关于订单 \n## 1 订单 allls'''
# test_html = '''<h1>关于订单</h1> <p>allls</p>'''




# --- 发送消息
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse

# 钉钉 Webhook 和加签密钥

# mine
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxx'
secret ='xxx'


# 生成签名
def generate_sign(secret):
    timestamp = str(round(time.time() * 1000))
    secret_enc = secret.encode('utf-8')
    string_to_sign = '{}\n{}'.format(timestamp, secret)
    string_to_sign_enc = string_to_sign.encode('utf-8')
    hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
    return timestamp, sign

# 发送信息到群里面
def send_dingtalk_message_markdown(webhook, secret, title = None, message = None):
    try:
        timestamp, sign = generate_sign(secret)
        url = f"{webhook}&timestamp={timestamp}&sign={sign}"
        headers = {'Content-Type': 'application/json'}

        data = {
            "msgtype": 'markdown',
            "markdown": {
                'title':title,
                "text": message
            }
        }
        print(f"Sending message to URL: {url}")
        print(f"Message content: {data}")
        response = requests.post(url, headers=headers, data=json.dumps(data))
        print(f"Response: {response.status_code}, {response.text}")
        return response.status_code, response.text
    except Exception as e:
        print(f"Error sending message: {e}")
        return None, str(e)



from mongoengine.queryset.visitor import Q

# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(
    Q(buy_dt__gt=seven_days_ago_str) & Q(is_buy_sms_tag__ne = 1)
    ).all()
logger.info(log_info_tmp % ('get_open_orders', 'recs %s' % len(recent_orders)))
# buy / one
import time
if len(recent_orders):
    print('共有%s个买消息' % len(recent_orders))
    for some_buy_order in recent_orders:
        # some_buy_order = recent_orders[0]
        code = some_buy_order.strategy_name.split('_')[0]
        msg_title = '### Buy %s' % code
        msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_buy_order.buy_dt, some_buy_order.buy_price,some_buy_order.buy_vol,some_buy_order.buy_amt ,some_buy_order.strategy_name)
        # msg
        send_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text )
        # ack
        some_buy_order.is_buy_sms_tag = 1
        some_buy_order.save()
        # 防止消息发的太快
        time.sleep(1)

    logger.info(log_info_tmp % ('sent_open_orders', 'recs %s' % len(recent_orders)))

# 卖单
recent_sell_orders = TradeOrders.objects(
    Q(sell_dt__gt=seven_days_ago_str) & Q(is_sell_sms_tag__ne = 1)
    ).all()

logger.info(log_info_tmp % ('get_close_orders', 'recs %s' % len(recent_sell_orders)))
if len(recent_sell_orders):
    print('共有%s个卖消息' % len(recent_sell_orders))
    for some_sell_order in recent_sell_orders:

        # some_sell_order = recent_sell_orders[0]

        code = some_sell_order.strategy_name.split('_')[0]
        msg_title = '### Sell %s' % code
        msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_sell_order.buy_dt, some_sell_order.buy_price,some_sell_order.buy_vol,some_sell_order.buy_amt ,some_sell_order.strategy_name)
        msg_sell_text = '''\n### Sell DT:%s\n### Sell Price:%s\n### Sell Vol:%s\n ### Sell Amt:%s\n### NetProfit: %s\n### Reason:%s ''' % (some_sell_order.sell_dt, some_sell_order.sell_price, some_sell_order.sell_vol, some_sell_order.sell_amt, some_sell_order.np,some_sell_order.reason)
        send_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text+ msg_sell_text)

        # ack
        some_sell_order.is_sell_sms_tag = 1
        some_sell_order.save()
        # 防止消息发的太快
        time.sleep(1)

    logger.info(log_info_tmp % ('sent_close_orders', 'recs %s' % len(recent_sell_orders)))

观察日志

....
[2024-10-02 17:28:17] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:28:47] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:28:47] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:29:17] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:29:17] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:29:47] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:29:47] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:30:17] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:30:17] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:30:47] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:30:47] - [INFO] - [get_close_orders] - recs 0
...

修改数据的消息发送字段,然后就看到重发消息了
在这里插入图片描述

到这里,基本ok了。之前的sms可以扔掉了。

总结一下:

  • 1 消息存放在数据库里。
  • 2 程序通过ORM定期查询需要发送的消息。
  • 3 发送钉钉消息后将属性更新,回存数据库

以后的策略消息仍然会先存放在数据库中,不过是通过ORM访问数据库还是走消息队列,以及发送到钉钉还是邮件还是其他可以再探讨。

ps: 属性的更新应该只是部分更新(update),而不是替换(replace),这样开销比较小。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2185077.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ACP科普:SoSM和CPO

在Scrum of Scrums&#xff08;SoS&#xff09;框架中&#xff0c;SoSM&#xff08;Scrum of Scrums Master&#xff09;和CPO&#xff08;Chief Product Owner&#xff09;是两个关键角色&#xff0c;负责协调多个Scrum团队的工作&#xff0c;确保项目的顺利进行。以下是对这两…

Android AMS介绍

注&#xff1a;本文为作者学习笔记&#xff0c;如有误&#xff0c;请各位大佬指点 系统进程运行环境的初始化 Context是一个抽象类&#xff0c;它可以访问application环境的全局信息和各种资源信息和类 context功能&#xff1a; 对Activity、Service生命周期的管理通过Intent发…

c++进阶之多态讲解

这篇文章和大家一起学习一下c中的多态 多态的概念 多态的概念&#xff1a;通俗来讲&#xff0c;就是多种形态。多态分为编译时多态(静态多态)和运⾏时多态(动态多态)。 什么是静态多态 前⾯讲的函数重载和函数模板&#xff0c;它们传不同类型的参数就可以调用不同的函数&…

深入理解 C 语言中的内存操作函数:memcpy、memmove、memset 和 memcmp

目录&#xff1a; 前言一、 memcpy 函数二、 memmove 函数三、 memset 函数四、 memcmp 函数总结 前言 在 C 语言中&#xff0c;内存操作函数是非常重要的工具&#xff0c;它们允许我们对内存进行直接操作&#xff0c;从而实现高效的数据处理。本文将深入探讨四个常用的内存操…

zabbix7.0web页面删除主机操作实现过程

前言 服务端配置 链接: rocky9.2部署zabbix服务端的详细过程 被监控端配置 链接: zabbix7.0监控linux主机案例详解 环境 主机ip应用zabbix-server192.168.10.11zabbix本体zabbix-client192.168.10.12zabbix-agent zabbix-server(服务端已配置) zabbix-client(被监控端已配置…

Bruno:拥有 11.2k star 的免费开源 API 测试工具

Github 开源地址&#xff1a; https://github.com/usebruno/bruno 官网地址&#xff1a; https://www.usebruno.com/ 下载地址&#xff1a; https://www.usebruno.com/downloads 使用文档&#xff1a; https://docs.usebruno.com/ Bruno 是一款全新且创新的 API 客户端&…

微调学习记录

目前看到的市面上的微调文章&#xff0c;想大体上给他们分个类&#xff0c;方便后续进行重点学习 参数高效微调 1. LoRA 不用多说含金量 2. Rein https://github.com/w1oves/rein 把它也算进来了&#xff0c;类似。 Adapter adapter类的我感觉都大差不差&#xff0c;具体…

VisionTS:基于时间序列的图形构建高性能时间序列预测模型,利用图像信息进行时间序列预测

构建预训练时间序列模型时面临的主要挑战是什么&#xff1f;获取高质量、多样化的时间序列数据。目前构建基础预测模型主要有两种方法&#xff1a; 迁移学习LLM&#xff1a;通过针对时间序列任务定制的微调或分词策略&#xff0c;重新利用预训练的大型语言模型&#xff08;LLM…

餐饮重点企业在AI领域的布局,看方大的AI实践

大家好&#xff0c;我是Shelly&#xff0c;一个专注于输出AI工具和科技前沿内容的AI应用教练&#xff0c;体验过300款以上的AI应用工具。关注科技及大模型领域对社会的影响10年。关注我一起驾驭AI工具&#xff0c;拥抱AI时代的到来。 AI已经被应用在餐饮餐厨行业的哪些方面&am…

Spring注解系列 - @Autowired注解

文章目录 使用总结注入原理Autowired 注入过程InjectionMetadataInjectedElement依赖注入查找过程findAutowireCandidates 缓存注入信息 Resource 注解 使用总结 Autowired注解可以自动将所需的依赖对象注入到类的属性、构造方法或方法中&#xff0c;从而减少手动注入依赖的代…

Android Compose的基本使用

前言: Compose这个东西呢,好处我没发现,坏处就是学习成本和低版本兼容. 不过,看在官方力推的份儿上,有空就学一下吧. 当初的kotlin,很多人说鸡肋(包括我)!现在不也咔咔用纯kotlin做项目吗?哈哈哈哈. 未来的事情,谁说得清呢? 首先创建一个专用的Compose项目 对没错!看到E…

体系结构论文(五十三):Featherweight Soft Error Resilience for GPUs 【22‘ MIRCO】

Featherweight Soft Error Resilience for GPUs 一、文章介绍 背景&#xff1a;软错误通常由高能粒子&#xff08;如宇宙射线和α粒子&#xff09;打击电路造成的位翻转&#xff0c;可能导致程序崩溃或产生错误输出。随着电子技术的进步&#xff0c;电路对这种辐射引发的软错…

Arduino UNO R3自学笔记14 之 Arduino使用HC-SR04模块如何测量距离?

注意&#xff1a;学习和写作过程中&#xff0c;部分资料搜集于互联网&#xff0c;如有侵权请联系删除。 前言&#xff1a;学习使用HC-SR04模块测距。 1.HC-SR04模块介绍 基本参数&#xff1a; ●使用电压&#xff1a;DC---5V ●静态电流&#xff1a;小于2mA ●电平输出&#…

【计算机网络】传输层UDP和TCP协议

目录 再谈端口号端口号范围划分认识知名端口号查看知名端口号两个问题 UDP协议UDP特点UDP的缓冲区基于UDP的应用层协议 TCP协议TCP协议格式确认应答机制超时重传机制连接管理机制&#xff08;三次握手与四次挥手&#xff09;理解TIME_WAIT状态理解CLOSE_WAIT状态滑动窗口快重传…

wsl(1) -- win11环境配置

1.前言 本专栏主要记录了我配置wsl的过程&#xff0c;以便日后回忆。 2. 开启WSL可选功能 打开设置&#xff0c;点击应用&#xff0c;点击可选功能&#xff0c;点击更多Windows功能&#xff0c;查看是否开启了【适用于Linux的Windows子系统】和【虚拟机平台】 3. 更新wsl …

【JavaEE初阶】深入理解多线程阻塞队列的原理,如何实现生产者-消费者模型,以及服务器崩掉原因!!!

前言&#xff1a; &#x1f308;上期博客&#xff1a;【JavaEE初阶】深入解析单例模式中的饿汉模式&#xff0c;懒汉模式的实现以及线程安全问题-CSDN博客 &#x1f525;感兴趣的小伙伴看一看小编主页&#xff1a;GGBondlctrl-CSDN博客 ⭐️小编会在后端开发的学习中不断更新~~…

【在Linux世界中追寻伟大的One Piece】System V共享内存

目录 1 -> System V共享内存 1.1 -> 共享内存数据结构 1.2 -> 共享内存函数 1.2.1 -> shmget函数 1.2.2 -> shmot函数 1.2.3 -> shmdt函数 1.2.4 -> shmctl函数 1.3 -> 实例代码 2 -> System V消息队列 3 -> System V信号量 1 -> Sy…

K8S部署流程

一、war打包镜像(survey,analytics,trac系统) 代码打包成war准备tomcat的server.xml文件&#xff0c;修改connector中8080端口为项目的端口 修改前&#xff1a; <Connector port"8080" protocol"HTTP/1.1"connectionTimeout"20000"redirect…

idea环境下vue2升级vue3

在IDEA环境下&#xff0c;Vue2升级Vue3是一个非常重要的主题。在本文中&#xff0c;我们将介绍Vue2和Vue3之间的主要区别&#xff0c;以及如何在IDEA中升级Vue2项目到Vue3。我们还将讨论Vue3的新特性&#xff0c;如Composition API和Teleport等&#xff0c;并提供一些实用的代码…

快速掌握-vue3

是什么 vue2 的升级版&#xff0c; 使用 ts 重构了代码&#xff0c; 带来了 Composition API RFC。 类似于 react hook 的写法。 ts 重构&#xff0c;代码可读性更强vue3.x 使用 Proxy 取代 Vue2.x 版本的 Object.defineProperty实现了 TreeShaking (当 Javascript 项目达到一定…