【flask + sqlalchemy】连接clickhouse数据库的踩的坑,在这里记录一下

news2024/12/28 18:09:11

文章目录

  • 前言
  • 1. 发现问题
  • 2. 复盘
    • 2.1 上面试一次错误的问题记录
    • 2.2 flask使用clickhouse
      • 2.2.1 配置
      • 2.2.2 orm
      • 2.3 如何插入数据


前言

使用clickhouse有一段时间了,现在要重构一个项目,重度依赖clickhouse,现在终于理顺了,记录一下。


1. 发现问题

from clickhouse_driver import Client
host = '172.18.43.134' # 22.2.2.1
host = '172.18.43.121' # 22.8.1.2097
client = Client(host=host, port=9900)

# 输入 ClickHouse SQL 查询语句并执行
result = client.execute('SELECT * FROM default.cl_grid_fwi_forecast_hourly_data limit 1')

# 打印查询结果
print(result)

两个clickhouse的数据库,一个能连接一个不能,
报错如下:

在这里插入图片描述
诡异的是,telnet 172.18.43.121:9000 是通的, 于是查了一下版本

SELECT version();

134 的clickhouse 是22.2.2.1
121 的clickhouse 是22.8.1.2097
报错指引ref:https://github.com/mymarilyn/clickhouse-driver/issues/242
到这里,我就联系运维要重新装clichouse,换成134的那个版本就行了。

2. 复盘

2.1 上面试一次错误的问题记录

真实原因是,9000端口被其它程序占了。 121 的9000 端口跟不上clickhouse的服务,所以报错上说,expect hello but exception.
正常的途径是, 我要到121 服务器上, lsof -i:9000
看一下是否启动的是clickhouse的客户端才行!

2.2 flask使用clickhouse

2.2.1 配置

flask 使用clickhouse,自然想借助sqlalchemy以orm的方式使用,而不是原生的方式,否则没有意义。
那么就涉及到了多库,mysql 库和 clickhouse库的配置
在这里插入图片描述

SQLALCHEMY_DATABASE_URI = \
    'mysql+pymysql://root:xxx@172.18.43.xxx/forest_fxxxx'
SQLALCHEMY_ECHO = True
SQLALCHEMY_BINDS ={
    "clickhouse1": 'clickhouse://default:@172.1xxx:8123/default',
    "clickhouse2": 'clickhouse://default:@172.18xxx:8123/default'
}

mysql 走SQLALCHEMY_DATABASE_URI (默认)
clickhouse走的时候 clickhouse1,clickhouse2, 注意不设账密的写法是如上,而且走的是8123的port。
这里要插一句,8123和9000的端口
8123走的是jdbc的方式连接的数据库, java爱用这个。
9000是tcp的方式连接,python的拓展库用tcp的方式。

2.2.2 orm

from sqlalchemy import Column, PrimaryKeyConstraint, String, Boolean, Integer, ForeignKey,DateTime,Float
from sqlalchemy.orm import relationship

from app.models.base import Base

class Forecast_grid_hourly_weather_168h(Base):
    __bind_key__ = 'clickhouse1'
    grid_id = Column(String)
    record_time = Column(DateTime)
    data_time = Column(DateTime)
    data_date = Column(DateTime)
    temperature = Column(Float)
    humidity = Column(Float)
    weather = Column(String)
    iconcode = Column(String)
    wind_speed = Column(Float)
    wind_degree = Column(Float)
    wind_trend = Column(String)
    wind_level = Column(String)
    rain_fail = Column(Float)
    rain_probability = Column(Float)
    pressure = Column(Float)
    cloud = Column(Float)
    dew_weather = Column(Float)
    __table_args__ = (
        PrimaryKeyConstraint('grid_id', 'data_date','data_time'),
    )

在这里插入图片描述
要注意__bind_key__ = ‘clickhouse1’ 和 table_args = (
PrimaryKeyConstraint(‘grid_id’, ‘data_date’,‘data_time’),
)
这两个设置。

2.3 如何插入数据

from sqlalchemy import and_
from app import create_app
from app.models.base import db
from app.models.click_forecast_grid_hourly_weather_168h import Forecast_grid_hourly_weather_168h
from app.models.click_cl_grid_fwi_forecast_hourly_data import Cl_grid_fwi_forecast_hourly_data
from app.models.grid import Grid
from app.models.plant import Plant
from app.models.grid_land_info import Gridlandinfo
from app.models.sys_risk_point import Sysriskpoint
from app.models.grid_plant_bind import Gridplantbind
from sqlalchemy.orm import aliased
import datetime
from app.spider.CEFDRS import CEFDRS
from clickhouse_driver.client import Client as clientclickhouse

if __name__ == '__main__':
  app = create_app()
  with app.app_context():
      # grid_code = "130733100201"
      # results = Gridplantbind.query.filter_by(grid_code=grid_code).all()
      # plant_list = [dict(plantType = row.plant.plant_type, plant_name = row.plant.plant_name, proportion=row.proportion) for row in results]
      # # for row in results:
      # #     print("-------------")
      # #     print(row.proportion)
      # #     print(row.plant.plant_name)
      # #     print(row.plant.plant_type)
      # #     exit("--===")
      # print(plant_list)
      cefdrs = CEFDRS()
      import json
      post_json_data = json.load(open("aaa.json"))
      # print(post_json_data)
      res = cefdrs.get_cefdrs(post_json_data)
      # print(res['data']['cefgrds'][0])

      # print(res['data']['fwi_reason'])
      session = db.session
      list1 = [res['data']['cefgrds'][0]]
      # # print(list1)
      # cl_1 = Cl_grid_fwi_forecast_hourly_data()
      # models = [cl_1.set_attrs(attrs_dict = d).__dict__ for d in list1]
      # for row in models:
      #     row.pop('_sa_instance_state')
      # print(models)
      # session.add_all(models)
      # session.commit()
      session.bulk_save_objects([Cl_grid_fwi_forecast_hourly_data(**data) for data in list1])
      session.commit()
      # insert_db_click_house(models) 

这里用的是批量插入的方式。 session.bulk_save_objects([Cl_grid_fwi_forecast_hourly_data(**data) for data in list1])

bulk_save_objects(objects, *, return_defaults=False, update_changed_only=True)

batchsize 可以划分块,避免一次性插入太多:
chunks = [objects[i:i+100] for i in range(0, len(objects), 100)]
for chunk in chunks:
    session.bulk_save_objects(chunk, chunksize=100)
session.commit()

···
SQLAlchemy 提供了一个 slice() 函数,用于将插入数据分成多个块进行批量插入,以避免一次性插入数据过多。slice() 函数的定义如下:

slice(start, stop, step=None)
slice() 函数接受三个参数:start、stop、step,表示生成一个类似切片的对象,用于将数据分成多个块进行插入。

我们可以通过将 slice() 函数返回的对象作为 mappings 参数的切片来实现分块插入数据。例如,如果我们有一个包含 1000 条记录的列表 data,想要将数据分成每页 100 条的块来逐个插入,可以这样使用 bulk_insert_mappings() 方法:

chunk_size = 100
total = len(data)

for i in slice(0, total, chunk_size):
    mappings = data[i:i+chunk_size]
    session.bulk_insert_mappings(MyModel, mappings)
    session.commit()
session.add_all(models) 

也会完成这件事,但都不会切块。

我怀疑是sqlalchemy自身内部完成了从 jdbc的连接转到了tcp的连接方式,而转换过程中,是能够查到clickhouse的客户端是以那个端口开放的,因为并没有手动设置9000或者9900端口的位置。
而且要下载sqlalchemy的 clickhouse的插件才能使用,指引在这里:
clickhouse-sqlalchemy的api网址:
https://github.com/xzkostyan/clickhouse-sqlalchemy
clickhouse-driver的api网址:
https://github.com/mymarilyn/clickhouse-driver

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/559197.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据库管理-第七十七期 再探分布式(20230523)

数据库管理 2023-05-23 第七十七期 再探分布式1 单机分布式2 分布式改造3 尝试改造一个订单系统3.1 表类型和分片键选择3.2 扩展分片3.3 业务扩展 总结 第七十七期 再探分布式 上一次系统探讨分布式数据库还是在第三十六期,经过大半年的“进步”加上中间参加了不少…

Linux启动过程的问题解决

文章目录 Linux启动过程的问题解决忘记root密码的解决因文件系统错误而无法启动 Linux启动过程的问题解决 当我们在使用Linux时,可能会因为某些设置或突然断电等原因导致文件系统出现错误,从而导致Linux无法正常启动。但这并不意味着我们需要重新安装系…

ASEMI代理长电可控硅MAC97A8图片,MAC97A8大小

编辑-Z 长电可控硅MAC97A8参数: 型号:MAC97A8 VDRM/VRRM:600V IT(RMS):1A ITSM:8A 栅极电流(峰值):1A 栅极电压(峰值):5V 栅极功率&#…

翼辉+飞腾全国产实时操作系统与全国产主板工控方案在电力产品的应用

SylixOS(MS-RTOS)电力产品应用简介 一、电网传统装置: 继电保护、测控、稳控、PMU、时间同步、故障录波等装置(输变电) 一般使用AMP方案,少数客户使用SMP方案。2019年11月使用SylixOS的国内首台100%全国…

centos8 安装mysql8

1、下载mysql8软件库 wget https://repo.mysql.com//mysql80-community-release-el8-3.noarch.rpm 2、安装软件库 rpm -ivh mysql80-community-release-el8-3.noarch.rpm 3、安装mysql yum install mysql-server 4、启动mysql systemctl start mysqld systemctl enable…

【非集中申请期】国家自然科学基金最新申请指南情况汇总

2023年国自然集中申请期函评季临近尾声。当下,申请人除了在日常的科研工作中也要合理、及时的关注一些非集中申请期的项目动态,恰如2023年各类非集中期申请项目指南正在陆续发布中,这些仍然是非常好的申请机会。近期的项目指南名称与申请截止…

快速上手MATLAB:科研、工程、数据分析,MATLAB入门(下)教你基础知识!分享《MATLAB初学者教程 MATLAB编程-菜鸟入门(清晰版)》

快速上手MATLAB:科研、工程、数据分析,MATLAB入门(上)教你基础知识! 福利:文末有资料分享!!前言一、文件读取1. 工作空间数据读取2. 文本文件读取3. 常用的数据导入和导出函数4. 图像…

不吹不黑,安利5个网工必备工具包

大家好,我是老杨。 要说网工的好用工具,你心里肯定有不少选项。工具常用常新,与时俱进,但你的思维却不一定。 研究一个新工具,除了能提升你的工作效率,也能对你的认知有所开拓和提升。 所以,…

《程序员面试金典(第6版)》面试题 02.01. 移除重复节点(哈希映射,多指针暴力破解,链表)

题目描述 编写代码,移除未排序链表中的重复节点。保留最开始出现的节点。 题目传输门:面试题 02.01. 移除重复节点 示例1: 输入:[1, 2, 3, 3, 2, 1]输出:[1, 2, 3]示例2: 输入:[1, 1, 1, 1, 2]输出:[1, 2]…

Hive---拉链表设计与实现

1 数据同步问题 Hive在实际工作中主要用于构建离线数据仓库,定期的从各种数据源中同步采集数据到Hive中,经过分层转换提供数据应用。比如每天需要从MySQL中同步最新的订单信息、用户信息、店铺信息等到数据仓库中,进行订单分析、用户分析。 …

【C++修炼之路】定位new(项目记录)

————————————每一个不曾起舞的日子都是对生命的辜负。 C之定位new 1. 什么是定位new2. 定位new的语法3. 具体实例 1. 什么是定位new 一般的new运算符负责在heap堆中找到一个足以能够满足要求的内存块。 而定位new(Placement new)是C中的一…

运行100万个并发任务,不同语言各需要多少内存

作者:DataStax 公司(美国的一家数据库系统开发商)Piotr Kołaczkowski 原文见: https://pkolaczk.github.io/memory-consumption-of-async/ 在这篇博客文章中,探讨了处理大量网络连接时候的Rust、Go、Java、C#、Pyth…

企企通“码上顺”清洗工具 | 让数据更有价值,让业务更出色

数据清理工作是企业数据管理、数据治理中的最基础的工作之一,不仅是一项苦活、累活,也是一个既考验业务又检验技术的活。 物料主数据作为企业核心的数据资产,在智慧供应链、业财一体化等数字化建设中发挥着重要作用。在当今高速发展的商业环…

《汇编语言》- 读书笔记 - 实验2 用机器指令和汇编指令编程

《汇编语言》- 读书笔记 - 实验2 用机器指令和汇编指令编程 1. 预备知识: Debug 的使用2 .实验任务 1. 预备知识: Debug 的使用 统一完善到:实验 1 查看 CPU 和内存,用机器指令和汇编指令编程。不在这拆开写了。 2 .实验任务 使用 Debug,将…

功率放大器在压电驱动器中的作用及应用

功率放大器在压电驱动器中的作用是将低功率信号放大为足够大的电力信号,以驱动压电陶瓷材料产生相应的机械振动。 压电陶瓷材料是一种特殊的陶瓷材料,能够将机械能转换为电能,因此被广泛应用于各种类型的振动器件和传感器中。这些器件通常需要…

Combiner

概述 Conbiner在MapReduce的Shuffle阶段起作用,它负责局部数据的聚合,我们可以看到,对于大数据量,如果没有Combiner,将会在磁盘上写入多个文件等待ReduceTask来拉取,但是如果有Combiner组件,我们…

5 个章节、25 条规范,全方位 Get 数据集选择与创建的「百科全书」

By 超神经 内容一览:如果你正在学习如何创建或选择一个合适的数据集,那么这篇文章会给你一些实用的建议,帮助你在选择和创建数据集时做出明智的决策。 关键词:机器学习 数据集 本文首发自 HyperAI 超神经微信公众平台~ 作者 |…

星标3.5k,一款国产的轻量级开源在线项目任务管理工具

今天给大家推荐一个轻量级的开源在线项目任务管理工具:DooTask 图片 DooTask 提供各类文档协作工具、在线思维导图、在线流程图、项目管理、任务分发、即时IM,文件管理等工具。 高效便捷的团队沟通工具 针对项目和任务建立群组,工作问题可…

SRP Batcher在真机上失效

1)SRP Batcher在真机上失效 ​2)Shader里面对同一张纹理多次采样会影响效率吗 3)为什么纹理开启了mipmap后,纹理内存反而下降了 4)TMP为什么有多次Delegate.Combine()的GC 这是第336篇UWA技术知识分享的推送&#xff0…

如何减少电脑内存占用?

内存(Memory)是计算机一个重要的组成部件,也称为内存储器或主存储器。它可以暂时存放CPU中运算的数据,以及与硬盘等外部存储器交换的数据,是CPU和硬盘之间的桥梁。若电脑内存占用过高,这会影响到电脑运行的速度,那该如…