APScheduler定时器使用【重写SQLAlchemyJobStore版】:django中使用apscheduler,使用mysql做存储后端

news2024/11/28 5:29:44

一、环境配置

python==3.8.10

包:

APScheduler==3.10.4
Django==3.2.7
djangorestframework==3.15.1
SQLAlchemy==2.0.29
PyMySQL==1.1.0

项目目录情况

gs_scheduler 应用

        commands : 主要用来自定义命令,python manage.py crontab  

        schedulers:所有apscheduler定时器的东西都在里面

        logs:存放定时器任务的日志信息

        views.py和urls.py:对外开放的接口,获取定时任务的基本信息和运行情况

二、django基本配置

根settings.py

#pymysql使用数据库
import pymysql
# pymysql.version_info = (1, 4, 0, "final", 0)  # 确保版本信息被正确设置
pymysql.install_as_MySQLdb()

INSTALLED_APPS = [
    'rest_framework',#restful
    'gs_scheduler', #注册创建的应用
]


#设置数据库
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWORD = 'ldc-root'
MYSQL_NAME = 'study_scheduler'

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'HOST': MYSQL_HOST,
        'PORT': MYSQL_PORT,
        'USER': MYSQL_USER,
        'PASSWORD': MYSQL_PASSWORD,
        'NAME': MYSQL_NAME,
    }
}

根urls.py

注册路由

from django.contrib import admin
from django.urls import path,include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('api/scheduler/',include('gs_scheduler.urls')),
]

三、配置apscheduler

3.1、schedulers/base.py

主要重写了SQLAlchemyJobStore类,添加上一些其他的字段和数据库表。

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore as _SQLAlchemyJobStore
from apscheduler.jobstores.base import JobLookupError, ConflictingIdError
from apscheduler.util import maybe_ref, datetime_to_utc_timestamp
from datetime import datetime
try:
    import cPickle as pickle
except ImportError:  # pragma: nocover
    import pickle

try:
    from sqlalchemy import (
        create_engine, Table, Column, MetaData,delete, Unicode, Float, LargeBinary,String, BigInteger,DATETIME,select,Boolean,text, and_)
    from sqlalchemy.exc import IntegrityError
    from sqlalchemy.sql.expression import null
except ImportError:  # pragma: nocover
    raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed')

Datetime_Format = '%Y-%m-%d %H:%M:%S'

#重写SQLAlchemyJobStore,用于自定义数据库表
class SQLAlchemyJobStore(_SQLAlchemyJobStore):

    Jobs_Tablename = 'apscheduler_jobs'#记录定时任务基本信息
    Jobs_History_Tablename = 'apscheduler_history'#记录定时任务运行历史

    def __init__(self, url=None, engine=None, tablename='apscheduler_jobs', metadata=None,
                 pickle_protocol=pickle.HIGHEST_PROTOCOL, tableschema=None, engine_options=None):
        #执行当前父类的父类的初始化方法
        super(_SQLAlchemyJobStore, self).__init__()
        self.pickle_protocol = pickle_protocol
        metadata = maybe_ref(metadata) or MetaData()
        if engine:
            self.engine = maybe_ref(engine)
        elif url:
            self.engine = create_engine(url, **(engine_options or {}))
        else:
            raise ValueError('Need either "engine" or "url" defined')

        # 191 = max key length in MySQL for InnoDB/utf8mb4 tables,
        # 25 = precision that translates to an 8-byte float
        self.jobs_t = Table(
                self.Jobs_Tablename, metadata,
                Column('id', Unicode(191), primary_key=True),
                Column('next_run_time', Float(25), index=True),
                Column('job_state', LargeBinary, nullable=False),
                Column('trigger',String(256),nullable=True),#新增,记录定时器的定时规则
                Column('desc',String(256),nullable=True),#新增,记录定时任务的描述信息
                schema=tableschema
            )
        #新增的一张表,记录定时任务运行历史
        self.jobs_t_history = Table(
                self.Jobs_History_Tablename,metadata,
                Column('id',BigInteger(),primary_key=True),
                Column('job_id',Unicode(191)),
                Column('run_time',DATETIME(),index=True,nullable=False),
                Column('is_error',Boolean(),default=0),
                Column('error_msg',String(256),nullable=True),
                schema=tableschema
            )
    #重写:对新表的创建
    def start(self, scheduler, alias):
        super(SQLAlchemyJobStore, self).start(scheduler, alias)
        #创建表
        self.jobs_t.create(self.engine, True)
        self.jobs_t_history.create(self.engine,True)
    #重写:对新字段的操作
    def add_job(self, job):
        #获取当前任务的定时器规则,描述信息
        trigger,desc = self.get_job_rule_and_desc(job)
        insert = self.jobs_t.insert().values(**{
            'id': job.id,
            'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
            'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol),
            'trigger':trigger,
            'desc':desc

        })
        with self.engine.begin() as connection:
            try:
                connection.execute(insert)
            except IntegrityError:
                raise ConflictingIdError(job.id)
    #重写:对新字段的操作
    def update_job(self, job):
        trigger,desc = self.get_job_rule_and_desc(job)
        update = self.jobs_t.update().values(**{
            'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
            'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol),
            'trigger':trigger,
            'desc':desc
        }).where(self.jobs_t.c.id == job.id)
        with self.engine.begin() as connection:
            result = connection.execute(update)
            if result.rowcount == 0:
                raise JobLookupError(job.id)

    # 新增方法:提取job的定时器信息和job的描述信息
    def get_job_rule_and_desc(self, job):
        the_type, rules = str(job.trigger).split('[')
        rule = rules.split(']')[0]
        trigger = '每隔'  # 定时任务的定时规则
        if the_type == 'date':
            # rule = '2024-10-10 20:20:12 csl'
            trigger = '在{} 时间点执行一次'.format(rule.rsplit(' ', 1)[0])
        elif the_type == 'interval':
            dic = {0: '小时', 1: '分钟', 2: '秒'}
            if 'day' in rule:
                # rule = '1 ady,00:00:00'
                day, hms = rule.split(',')
                day = int(day.split('day')[0])
                trigger += '{}天'.format(day)
                hms = hms.split(':')
            else:
                # rule = '01:01:01'
                hms = rule.split(':')
            for i, value in enumerate(hms):
                value = int(value)
                if value > 0:
                    trigger += str(value)
                    trigger += dic.get(i)
            else:
                trigger += '执行一次'
        else:
            # cron,比较复杂,不好判断
            # rule ="hour='0', minute='0', second='1'"
            trigger = '{},通过linux系统cron表达式'.format(job.trigger)
        desc = job.name or ''  # 定时任务描述
        return (trigger, desc)

    #新增方法:记录定时任务的运行历史,给scheduler监听器使用
    def insert_job_history(self,data:dict):
        '''
        :param data: {'job_id':'x','run_time','is_error':1,'error_msg':'xxxx'}
        :return:
        '''
        insert = self.jobs_t_history.insert().values(**{
            'job_id': data.get('job_id'),
            'run_time': data.get('run_time'),
            'is_error': data.get('is_error'),
            'error_msg': data.get('error_msg')
        })
        with self.engine.begin() as connection:
            connection.execute(insert)

    #新增方法:api获取任务下次运行时间
    def api_get_run_next(self):
        '''
        获取每个任务的下次运行时间
        row:
            job_id = row[0],next_run=row[1],trigger=row[3],desc=row[4]
        :return:
        '''
        search = self.jobs_t.select().filter_by()
        with self.engine.begin() as connection:
            results = connection.execute(search)

        ret_data = []

        for row in results:
            dic = {
                'job_id':row[0],
                'next_run':datetime.fromtimestamp(row[1]).strftime(Datetime_Format),
                'trigger':row[3],
                'desc':row[4]
            }
            ret_data.append(dic)
        return ret_data

    #新增方法:api获取任务历史运行记录
    def api_get_run_history(self):
        '''
        获取每个任务运行成功的最近10个记录
        row :
            id=row[0],job_id=row[1],run_time=row[2],is_error=row[3],error_msg=row[4]
        :return:
        '''
        job_data_list = self.api_get_run_next()
        ret_data = []
        for dic in job_data_list:
            job_id = dic.get('job_id')
            history_data = {
                'job_id': job_id,
                'run_time': [],
            }
            search = self.jobs_t_history.select().filter_by(is_error=0,job_id=job_id).order_by(text('-id')).limit(10)
            with self.engine.begin() as connection:
                results = connection.execute(search)
            for row in results:
                run_time = datetime.strftime(row[2],Datetime_Format)
                history_data['run_time'].append(run_time)
            ret_data.append(history_data)
        return ret_data

    #新增方法:api获取任务错误记录
    def api_get_run_error(self):
        '''
        获取每个任务运行失败的最近10个记录
        row :
            id=row[0],job_id=row[1],run_time=row[2],is_error=row[3],error_msg=row[4]
        :return:
        '''
        job_data_list = self.api_get_run_next()
        ret_data = []
        for dic in job_data_list:
            job_id = dic.get('job_id')
            history_data = {
                'job_id': job_id,
                'error_run': [],
            }
            search = self.jobs_t_history.select().filter_by(is_error=1, job_id=job_id).order_by(text('-id')).limit(5)
            with self.engine.begin() as connection:
                results = connection.execute(search)
            for row in results:
                id=row[0]
                job_id=row[1]
                run_time=datetime.strftime(row[2], Datetime_Format)
                is_error=row[3]
                error_msg=row[4]
                history_data['error_run'].append({
                    'run_time':run_time,
                    'error_msg':error_msg
                })
            ret_data.append(history_data)
        return ret_data

    #新增方法:清除历史运行记录
    def delete_before_run_history(self):
        '''
        将历史运行记录中,
            每个任务只保留最近20个运行成功的记录
            每个任务只保留最近20个运行失败的记录
        :return:
        '''
        # 获取底层数据库连接
        session = self.engine.connect()
        # 获取任务表的名称
        table_name = self.Jobs_History_Tablename
        # 获取所有任务的job_id
        job_data_list = self.api_get_run_next()
        for dic in job_data_list:
            job_id = dic.get('job_id')
            search_success = self.jobs_t_history.select().filter_by(is_error=0, job_id=job_id).order_by(text('-id'))
            search_error = self.jobs_t_history.select().filter_by(is_error=1, job_id=job_id).order_by(text('-id'))
            #正常运行的待删除的id
            delete_ids_su = []
            #运行失败的待删除的id
            delete_ids_err = []
            #提交,拿到查询结果
            with self.engine.begin() as connection:
                results_su = connection.execute(search_success)
                results_err = connection.execute(search_error)

            #把成功的所有的id取出
            for row in results_su:
                delete_ids_su.append(row[0])
            #把运行失败的所有的id取出
            for row in results_err:
                delete_ids_err.append(row[0])
            #删除成功运行的记录
            if delete_ids_su:
                delete_ids_su = delete_ids_su[20:]
                # 构建 SQL 语句
                delete_query = text(f"DELETE FROM {table_name} WHERE id IN :job_ids")
                # 执行删除操作
                session.execute(delete_query, {"job_ids": delete_ids_su})
                # 提交事务
                session.commit()
            #删除失败运行的记录
            if delete_ids_err:
                delete_ids_err = delete_ids_err[20:]
                # 构建 SQL 语句
                delete_query = text(f"DELETE FROM {table_name} WHERE id IN :job_ids")
                # 执行删除操作
                session.execute(delete_query, {"job_ids": delete_ids_err})
                # 提交事务
                session.commit()
        # 关闭数据库连接
        session.close()

3.2、schedulers/logger.py

用于记录apscheduler定时器的一些日志信息。

import os
import logging
from datetime import datetime, date, timedelta
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler

# management目录路径
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
# 日志文件存放的目录
LOGS_DIR = os.path.join(BASE_DIR, 'logs')

# 创建logs目录
if not os.path.exists(LOGS_DIR):
    os.makedirs(LOGS_DIR)


def getLogHandlerConsole():
    '1、日志格式'
    formatter = logging.Formatter('[%(asctime)s][%(levelname)s][ %(funcName)s function: %(lineno)s line]:%(message)s')
    '2、输出到控制台处理器'
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.DEBUG)
    console_handler.setFormatter(formatter)
    return console_handler

def getLogHandlerFile():
    # 文件名,以日期作为文件名
    log_file_name = date.today().strftime('%Y-%m-%d.log')
    # 构建日志文件的路径
    log_file_str = os.path.join(LOGS_DIR, log_file_name)

    '1、日志记录格式'
    # 默认日志等级的设置
    # logging.basicConfig(level=logging.INFO)
    # 设置日志的格式:发生时间,日志等级,日志信息文件名,      函数名,行数,日志信息
    formatter = logging.Formatter(
        '[%(asctime)s][%(levelname)s][%(pathname)s: %(funcName)s function: %(lineno)s line]: %(message)s')

    '2、基于文件的日志处理器配置'
    # 创建日志记录器,指明日志保存路径,每个日志的大小,保存日志的上限
    file_log_handler = RotatingFileHandler(
        filename=log_file_str,  # 日志文件名
        maxBytes=1024 * 1024 * 10,  # 文件大小超过10MB后,就会生成一个新的日志文件,日志就写到新的文件中
        backupCount=10,  # 最大支持总的日志文件数
        encoding='UTF-8')
    file_log_handler.setFormatter(formatter)  # 设置日志的格式
    file_log_handler.setLevel(logging.INFO)  # 设置日志等级
    return file_log_handler  # 基于文件大小分割日志的方案



# 日志记录器1
scheduler_logger = logging.getLogger('apscheduler.scheduler')
scheduler_logger.setLevel(logging.INFO)
scheduler_logger.addHandler(getLogHandlerFile())  # 添加文件日志处理器
scheduler_logger.addHandler(getLogHandlerConsole())  # 添加控制台日志处理器


if __name__ == '__main__':
    scheduler_logger.info('hhhhh')
    print(os.path.dirname(os.path.dirname(__file__)))

3.3、schedulers/config.py

存放的是apscheduler调度器需要的配置信息。

import os
#from apscheduler.jobstores.memory import MemoryJobStore #内存做后端存储
#from apscheduler.jobstores.redis import RedisJobStore #redis做后端存储
from .base import SQLAlchemyJobStore #mysql等做后端存储
from study_apscheduler import settings
from .logger import scheduler_logger
#mysql://root:ldc-root@127.0.0.1:3306/jobs?charset=utf8
MYSQL_CONFIG = settings.DATABASES.get('default')
MYSQL_USER = MYSQL_CONFIG.get('USER')
MYSQL_PASSWORD = MYSQL_CONFIG.get('PASSWORD')
MYSQL_HOST = MYSQL_CONFIG.get('HOST')
MYSQL_PORT = MYSQL_CONFIG.get('PORT')
MYSQL_NAME = MYSQL_CONFIG.get('NAME')
MYSQL_CHARSET = 'utf8mb4'
URL = 'mysql://{}:{}@{}:{}/{}?charset={}'.format(MYSQL_USER,MYSQL_PASSWORD,MYSQL_HOST,MYSQL_PORT,MYSQL_NAME,MYSQL_CHARSET)
#时区
TIME_ZONE = 'Asia/Shanghai'
#job的默认配置
JOB_DEFAULTS =  {
        'coalesce': True, #系统挂掉,任务积攒多次为执行,True是合并成一次执行,False是执行所有的次数。 持久化存储才有效
        'max_instances': 3 # 同一个任务同一时间最多只能有3个实例在运行。
    }
#job的存储后端
JOB_STORE = {
    'default': SQLAlchemyJobStore(url=URL)
}

#日志处理器
Scheduler_Logger = {
    'logger':scheduler_logger
}
#监听事件对应的情况
LISTENER={
    1:'调度程序启动',
    2:'调度程序关闭',
    4:'调度程序中任务处理暂停',
    8:'调度程序中任务处理恢复',
    16:'将执行器添加到调度程序中',
    32:'执行器从调度程序中删除',
    64:'将任务存储添加到调度程序中',
    128:'任务存储从调度程序中删除',
    256:'所有任务从所有任务存储中删除或从一个特定的任务存储中删除	',
    512:'添加新的定时任务',
    1024:'从任务存储中删除了任务',
    2048:'从调度程序外部修改了任务',
    4096:'任务执行成功',
    8192:'任务在执行期间引发异常',
    16384:'错误了任务执行',
    32768:'任务已经提交到执行器中执行',
    65536:'任务因为达到最大并发执行时,触发的事件'
}

3.4、schedulers/main.py

实例化好调度器,配置日志,监听器、添加定时任务、存储后端等。

# 导入所需的调度器类和触发器类
from apscheduler.schedulers.background import BackgroundScheduler #后台运行
from apscheduler.schedulers.blocking import BlockingScheduler  #主进程运行,需要单独运行
from apscheduler.triggers.interval import IntervalTrigger #时间间隔
from apscheduler.triggers.cron import CronTrigger #复杂的定时任务
from apscheduler.triggers.date import DateTrigger #一次性定时任务
from apscheduler import events
from datetime import datetime
#定时任务
from .task import delete_apscheduler_history
from .task import send_to_big_data
from .task import crontab_task,date_task
#日志
from .config import LISTENER
from .config import TIME_ZONE,JOB_DEFAULTS,JOB_STORE,Scheduler_Logger #调度器配置

class TheBlockScheduler(object):
    TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
    def __init__(self):
        self.scheduler = self._scheduler_obj()
        self.logger = Scheduler_Logger.get('logger')
    # 1、初始化调度器
    def _scheduler_obj(self):
        obj = BlockingScheduler()
        obj.configure(
            timezone=TIME_ZONE,  # 时区
            job_defaults=JOB_DEFAULTS,  # job的默认配置
            jobstores=JOB_STORE,  # job的存储后端
            gconfig=Scheduler_Logger, #日志记录相关的的配置
        )
        return obj

    # 2、添加任务
    def _add_job(self):
        # 每5分钟执行一次推送告警到大数据仓
        self.scheduler.add_job(
            send_to_big_data,
            trigger=IntervalTrigger(minutes=1),
            id='send_to_big_data',
            replace_existing=True,
            coalesce=True,
            name='该定时,用于将数据推送到远端大数据系统'
        )

        # 每天凌晨,清除历史的记录
        self.scheduler.add_job(
            delete_apscheduler_history,
            trigger=CronTrigger(hour=0,minute=0,second=1),
            id='delete_apscheduler_history',
            replace_existing=True,
            coalesce=True,
            name='该定时器,是将历史定时任务的执行记录进行清除'
        )

        # 每隔20分钟清除垃圾记录
        self.scheduler.add_job(
            crontab_task,
            trigger=IntervalTrigger(minutes=20),  # 每天晚上零点1秒执行
            id='crontab_task',
            replace_existing=True,
            coalesce=True,
            name='该定时任务,用于清除xxx表中的垃圾记录'
        )

        #在指定某个时刻执行一次
        self.scheduler.add_job(
            date_task,
            trigger=DateTrigger(run_date='2024-12-03 10:11:30'),
            id='date_task',
            replace_existing=True,
            coalesce=True,
            name='该定时任务,在指定时间,进行系统初始化任务',
        )

    # 3、添加监听器
    def _listener(self, event: events):
        code = event.code
        run_time = datetime.now().strftime(self.TIME_FORMAT)
        msg = LISTENER.get(code)
        #存储器
        jobstore = self.scheduler._jobstores['default']
        job_history_data = {
            'job_id':None,
            'run_time':None,
            'is_error':0,
            'error_msg':None
        }

        if code == 4096:
            # 成功运行
            job_id = event.job_id
            job_history_data['job_id'] = job_id
            job_history_data['run_time'] = run_time
            # 记录到数据库中
            jobstore.insert_job_history(job_history_data)
        elif code == 8192 or code == 16384:
            # 运行异常了
            job_id = event.job_id
            job_history_data['job_id'] = job_id
            job_history_data['run_time'] = run_time
            job_history_data['is_error'] = 1
            job_history_data['error_msg'] = msg
            # 记录到数据库中
            jobstore.insert_job_history(job_history_data)
        elif code in (1,2,4,8,32,128,1024,2048):
            ###调度器启动时
            self.logger.info(msg)
            try:
                job_id = event.job_id
                if msg:
                    msg = '任务id={},{}'.format(job_id, msg)
                    self.logger.info(msg)
            except Exception:
                if msg:
                    self.logger.info(msg)
    # 4、启动定时器
    def start(self):
        # 1、设置定时任务(监听器会先监听到任务添加,再监听到调度器启动)
        self._add_job()
        # 2、设置监听器
        self.scheduler.add_listener(self._listener)
        # 3、启动调度器
        try:
            # print('{},定时器启动成功,等待定时任务执行...'.format(datetime.now().strftime(self.TIME_FORMAT)))
            self.scheduler.start()
        except KeyboardInterrupt:
            self.scheduler.shutdown()

#后台线程运行:随django项目一起运行
class TheBackRunScheduler(TheBlockScheduler):
    def _scheduler_obj(self):
        obj = BackgroundScheduler()
        obj.configure(
            timezone=TIME_ZONE,  # 时区
            job_defaults=JOB_DEFAULTS,  # job的默认配置
            jobstores=JOB_STORE,  # job的存储后端
            gconfig=Scheduler_Logger,  # 日志记录相关的的配置
        )
        return obj



if __name__ == '__main__':
    #后台启动
    backScheduler = TheBackRunScheduler()
    backScheduler.start()

3.5、schedulers/models.py

数据库查询方法,获取定时任务的状态和运行情况。

from .config import JOB_STORE
jobstore = JOB_STORE['default']
#获取定时任务下次运行时间
get_job_run_next = jobstore.api_get_run_next
#获取定时任务最近运行情况
get_job_run_history = jobstore.api_get_run_history
#获取定时任务最近运行错误
get_job_run_error = jobstore.api_get_run_error
#清除历史运行记录
delete_run_history = jobstore.delete_before_run_history

if __name__ == '__main__':
    pass


3.6、schedulers/task.py

存放定时任务。

import os
from .logger import scheduler_logger as log
from .models import delete_run_history

#推送到数据仓的告警信息:5分钟执行一次
def send_to_big_data():
    log.info('推送到大数据仓')

#清除定时任务历史运行记录
def delete_apscheduler_history():
    #清除历史运行记录
    delete_run_history()

def crontab_task():
    log.info('crontab定时器')

def date_task():
    log.info('date定时器')


if __name__ == '__main__':
    print(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))

四、自定义命令,启动定时器

4.1、commands/crontab.py

from django.core.management.base import BaseCommand
#导入block运行的代码
from gs_scheduler.management.schedulers.main import TheBlockScheduler

#自定义脚本命令:使用,python manage.py crontab 启动
class Command(BaseCommand):
    # python manage.py crontab运行 就是调用该方法
    def handle(self, *args, **options):
        scheduler = TheBlockScheduler()
        scheduler.start()


五、启动定时器

5.1、以主进程方式启动

python manage.py crontab

5.2、后台进程方式启动

1、修改settings.py

from gs_scheduler.management.schedulers.main import TheBackRunScheduler
scheduler = TheBackRunScheduler()
scheduler.start()

2、启动django项目时,后台运行定时器

六、启动django项目

python manage.py runserver 8800

七、测试

1、获取任务下次运行情况

获取每个任务,下次执行的时间和基本信息。

http://127.0.0.1:8800/api/scheduler/run_next/

2、获取任务运行历史记录

获取到每个任务的最近10次运行成功的记录。

http://127.0.0.1:8800/api/scheduler/run_history/

3、获取任务运行失败的记录

获取每个任务最近5次运行失败的记录。

http://127.0.0.1:8800/api/scheduler/run_error/

八、码云下载源码

码云地址:

django应用定时器: django下使用定时器的方法icon-default.png?t=N7T8https://gitee.com/liuhaizhang/django-application-timer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1659326.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

利用BACnet分布式IO控制器优化Niagara楼宇自动化系统

在智能建筑领域,随着物联网技术的飞速发展,如何实现高效、灵活且安全的楼宇自动化控制成为了行业关注的焦点。BACnet IP分布式远程I/O模块,作为这一领域的创新成果,正逐渐成为连接智能建筑各子系统的关键桥梁,尤其在与…

黏土滤镜制作方法:探索黏土特效制作的魅力

在数字时代,图像处理已经成为我们生活的一部分,无论是社交媒体上的照片分享,还是专业设计领域的创作,都离不开对图像的精心处理。而黏土滤镜,作为一种独特而富有艺术感的图像处理效果,受到了越来越多人的喜…

ArcGIS如何计算地级市间的距离

一、数据准备 加载配套实验数据包中的地级市和行政区划矢量数据(订阅专栏后,从私信查收数据),如下图所示: 二、计算距离 1. 计算邻近表 ArcGIS提供了计算点和另外点之间距离的工具:分析工具→邻域分析→生成临近表。 计算一个或多个要素类或图层中的要素间距离和其他邻…

AI写的论文AI疑似度太高怎么办?教你一招降低aigc痕迹

随着 AI 技术迅猛发展,各种AI辅助论文写作的工具层出不穷! 为了防止有人利用AI工具进行论文代写,在最新的学位法中已经明确规定“已经获得学位者,在获得该学位过程中如有人工智能代写等学术不端行为,经学位评定委员会…

Docker 怎么将映射出的路径设置为非root用户权限

在Docker中,容器的根文件系统默认是由root用户拥有的。如果想要在映射到宿主机的路径时设置为非root用户权限,可以通过以下几种方式来实现: 1. 使用具有特定UID和GID的非root用户运行容器: 在运行容器时,你可以使用-u…

huggingface-cli + hf_transfer 加速大模型下载

huggingface-cli hf_transfer 加速大模型下载 如果用的是autodl的话,可以直接开学术加速用 lfs 下也挺快 pip install -U huggingface_hub设置环境变量 vim .bashrcexport HF_ENDPOINThttps://hf-mirror.comsource ~/.bashrc下载模型: huggingface-cl…

H5 css动画效果

你可以使用 CSS 动画来实现这个效果。下面是一个简单的示例代码&#xff0c;展示了如何使用 CSS 中的关键帧动画来放大然后缩小一张图片&#xff0c;并使动画循环播放&#xff1a; html <!DOCTYPE html> <html lang"en"><head><meta charset&qu…

Numpy的数组操作

文章目录 数组的创建创建全0的二维数组a(3,3)全1的二维数组b&#xff08;3,4&#xff09;随机数二维数数组c&#xff08;2,3&#xff09;效果截图 数组的属性查看b数组的维度查看b数组元素的个数效果截图 数组的维度操作将数组c的行变列&#xff0c;返回最后一个元素返回数组c第…

Linux中的日志系统简介

在的Linux系统上使用的日志系统一般为rsyslogd。rsyslogd守护进程既能接收用户进程输出的日志&#xff0c;又能接收内核日志。用户进程是通过调用syslog函数生成系统日志的。该函数将日志输出到一个UNIX本地域socket类型&#xff08;AF_UNIX&#xff09;的文件/dev/log中&#…

c++11 标准模板(STL)本地化库 - 平面类别(std::money_get) - 从输入字符序列中解析并构造货币值

本地化库 本地环境设施包含字符分类和字符串校对、数值、货币及日期/时间格式化和分析&#xff0c;以及消息取得的国际化支持。本地环境设置控制流 I/O 、正则表达式库和 C 标准库的其他组件的行为。 平面类别 从输入字符序列中解析并构造货币值 std::money_get template<…

威乐集团亚太PMO负责人和继明受邀为第十三届中国PMO大会演讲嘉宾

全国PMO专业人士年度盛会 威乐集团亚太PMO负责人和继明先生受邀为PMO评论主办的2024第十三届中国PMO大会演讲嘉宾&#xff0c;演讲议题为“PMO助力企业项目管理走向卓越”。大会将于6月29-30日在北京举办&#xff0c;敬请关注&#xff01; 议题简要&#xff1a; AI人工智能的…

移动UI瓷片区能有多漂亮?要多漂亮就多漂亮。

移动UI的瓷片区&#xff08;Tile area&#xff09;是指移动应用或移动网页的界面布局中的一个区域&#xff0c;通常用于展示独立的信息块或功能块&#xff0c;每个块都是一个可点击的图标或瓷片&#xff0c;用于快速访问相关功能或查看相关信息。 瓷片区的设计灵感来源于Window…

Vue.js-----vue组件

能够说出vue生命周期能够掌握axios的使用能够了解$refs, $nextTick作用能够完成购物车案例 Vue 生命周期讲解 1.钩子函数 目标&#xff1a;Vue 框架内置函数&#xff0c;随着组件的生命周期阶段&#xff0c;自动执行 作用: 特定的时间点&#xff0c;执行特定的操作场景: 组…

基于LangChain-Chatchat实现的本地知识库的问答应用-快速上手(检索增强生成(RAG)大模型)

基于LangChain-Chatchat实现的本地知识库的问答应用-kuaisu&#xff08;检索增强生成(RAG)大模型&#xff09; 基于 ChatGLM 等大语言模型与 Langchain 等应用框架实现&#xff0c;开源、可离线部署的检索增强生成(RAG)大模型知识库项目。 1.介绍 一种利用 langchain思想实现…

jupyter notebook单元格运行耗时计算魔术方法timeit

%%timeit 默认执行多次计算平均耗时 -n 或 --number&#xff1a;指定总的运行次数。 -r 或 --repeat&#xff1a;指定每个运行要重复的次数。 例如&#xff0c;如果你想要运行一个测试&#xff0c;总共运行3次&#xff0c;每次运行7次迭代&#xff0c;你可以这样写&#xff1a…

MySQL——利用变量进行查询操作

新建链接&#xff0c;自带world数据库&#xff0c;里面自带city表格。 DQL # MySQL利用变量进行查询操作 set cityNameHaarlemmermeer; select * from city where NamecityName;# 多个结果查询 set cityName1Haarlemmermeer; set cityName2Breda; set cityName3Willemstad; s…

气膜种植仓:解决空间需求,助力农业发展

在现代农业发展的背景下&#xff0c;气膜种植仓成为了解决空间需求和提升种植效率的理想选择。气膜种植仓结合了先进的建筑技术和现代农业理念&#xff0c;为种植业带来了全新的解决方案&#xff0c;具有以下几大优势&#xff1a; 1. 最大化空间利用 气膜种植仓采用无梁无柱的设…

Kinetic Technologies授权世强硬创代理,DC-DC转换器尺寸小、发热量低

为能够更好的服务国内消费电子、通信、工业、汽车和企业系统客户&#xff0c;持续满足他们对高效、智能集成电路产品的需求&#xff0c;世强先进&#xff08;深圳&#xff09;科技股份有限公司&#xff08;下称&#xff1a;“世强先进”&#xff09;与Kinetic Technologies Sem…

【环境配置】vsCode 中使用 conda 配置虚拟环境

文章目录 准备前言在 vsCode 中直接创建创建步骤测试更新环境 使用 Anaconda 建立虚拟环境创建步骤在 vsCode 中选择环境 总结 准备 在看本博客之前&#xff0c;希望大家有以下知识储备&#xff1a; 能够正确安装 vsCode&#xff0c;并配置好 python 环境&#xff1b;了解并安…

Visual Studio 中.net8.0(以前叫NET Core)框架和.net framewok 框架有什么区别?

更新vs到2022版本后&#xff0c;新建项目时就多出不少选项&#xff0c;这里来给大家分享下.net8.0&#xff08;以前叫NET Core&#xff09;框架和.net framewok的区别 如下图&#xff0c;不带后缀的就是使用.net8.0。 .net framewok框架选项&#xff1a; 正文开始&#xff1a;…