python elasticsearch 8.x通过代理发起请求方法

news2024/12/21 22:16:50

由于python elasticsearch v8 engine的源码包中并未开放对于请求添加proxies的支持,导致在某些环境下无法连通外网的es服务。目前网上暂无相关的修改内容,我这边提供下自己修改的动态运行时替换elasticsearch包的源码方法demo

import gzip
import ssl
import time
import requests
from elastic_transport._node._http_requests import RequestsHttpNode
from typing import Any, Optional, Union
from elastic_transport._compat import warn_stacklevel
from elastic_transport._exceptions import ConnectionError, ConnectionTimeout, SecurityWarning, TlsError
from elastic_transport._models import ApiResponseMeta, HttpHeaders, NodeConfig
from elastic_transport.client_utils import DEFAULT, DefaultType, client_meta_version
from elastic_transport._node._base import (
    BUILTIN_EXCEPTIONS,
    RERAISE_EXCEPTIONS,
    BaseNode,
    NodeApiResponse,
    ssl_context_from_node_config,
)


def custom_perform_request(
        self,
        method: str,
        target: str,
        body: Optional[bytes] = None,
        headers: Optional[HttpHeaders] = None,
        request_timeout: Union[DefaultType, Optional[float]] = DEFAULT,
    ) -> NodeApiResponse:
        url = self.base_url + target
        headers = HttpHeaders(headers or ())

        request_headers = self._headers.copy()
        if headers:
            request_headers.update(headers)

        body_to_send: Optional[bytes]
        if body:
            if self._http_compress:
                body_to_send = gzip.compress(body)
                request_headers["content-encoding"] = "gzip"
            else:
                body_to_send = body
        else:
            body_to_send = None

        start = time.time()
        proxies_dict = {
            "http": "http://xx.xx.xx.xx:xx",
            "http": "http://xx.xx.xx.xx:xx",
        }
        request = requests.Request(
            method=method, headers=request_headers, url=url, data=body_to_send
        )
        prepared_request = self.session.prepare_request(request)
        send_kwargs = {
            "timeout": (
                request_timeout
                if request_timeout is not DEFAULT
                else self.config.request_timeout
            )
        }
        send_kwargs.update(
            self.session.merge_environment_settings(  # type: ignore[arg-type]
                prepared_request.url, {}, None, None, None
            )
        )
        send_kwargs.pop('proxies')
        try:
            response = self.session.send(prepared_request, proxies=proxies_dict,  **send_kwargs)  # type: ignore[arg-type]
            data = response.content
            duration = time.time() - start
            response_headers = HttpHeaders(response.headers)

        except RERAISE_EXCEPTIONS:
            raise
        except Exception as e:
            err: Exception
            if isinstance(e, requests.Timeout):
                err = ConnectionTimeout(
                    "Connection timed out during request", errors=(e,)
                )
            elif isinstance(e, (ssl.SSLError, requests.exceptions.SSLError)):
                err = TlsError(str(e), errors=(e,))
            elif isinstance(e, BUILTIN_EXCEPTIONS):
                raise
            else:
                err = ConnectionError(str(e), errors=(e,))
            self._log_request(
                method=method,
                target=target,
                headers=request_headers,
                body=body,
                exception=err,
            )
            raise err from None

        meta = ApiResponseMeta(
            node=self.config,
            duration=duration,
            http_version="1.1",
            status=response.status_code,
            headers=response_headers,
        )
        self._log_request(
            method=method,
            target=target,
            headers=request_headers,
            body=body,
            meta=meta,
            response=data,
        )
        return NodeApiResponse(
            meta,
            data,
        )


RequestsHttpNode.perform_request = custom_perform_request


from elasticsearch import Elasticsearch


es = Elasticsearch(hosts=["http://xx.xx.xxx.xxx:9200"], basic_auth=("elastic", "xxxxxxxxxxxx"), node_class=RequestsHttpNode)
query = {"query": {"match_all": {}}}
response = es.search(index="xxxxxx_prod", body= query)
print(response)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2263449.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入剖析MyBatis的架构原理

架构设计 简要画出 MyBatis 的架构图 >> ​​ Mybatis 的功能架构分为哪三层? API 接口层 提供给外部使用的接口 API,开发人员通过这些本地 API 来操纵数据库。接口层一接收到调用请求就会调用数据处理层来完成具体的数据处理。MyBatis 和数据库的…

android opencv导入进行编译

1、直接新建module进行导入,选择opencv的sdk 导入module模式,选择下载好的sdk,修改module name为OpenCV490。 有报错直接解决报错,没报错直接运行成功。 2、解决错误,同步成功 一般报错是gradle版本问题较多。我的报…

智能座舱进阶-应用框架层-Jetpack主要组件

Jetpack的分类 1. DataBinding:以声明方式将可观察数据绑定到界面元素,通常和ViewModel配合使用。 2. Lifecycle:用于管理Activity和Fragment的生命周期,可帮助开发者生成更易于维护的轻量级代码。 3. LiveData: 在底层数据库更…

设计模式-访问者设计模式

介绍 访问者模式(Visitor),表示一个作用于某对象结构中的各元素的操作,它使你可以在不改变个元素的类的前提下定义作用于这些元素的新操作。 问题:在一个机构里面有两种员工,1.Teacher 2.Engineer 员…

springmvc的拦截器,全局异常处理和文件上传

拦截器: 拦截不符合规则的,放行符合规则的。 等价于过滤器。 拦截器只拦截controller层API接口。 如何定义拦截器。 定义一个类并实现拦截器接口 public class MyInterceptor implements HandlerInterceptor {public boolean preHandle(HttpServletRequest reque…

宿舍管理系统(源码+数据库+报告)

356基于SpringBoot的宿舍管理系统,系统包含两种角色:管理员、用户,系统分为前台和后台两大模块 二、项目技术 编程语言:Java 数据库:MySQL 项目管理工具:Maven 前端技术:Vue 后端技术:SpringBo…

基于 HC_SR04的超声波测距数码管显示(智能小车超声波避障部分)

超声波测距模块HC-SR04 1、产品特色 ①典型工作用电压:5V ②超小静态工作电流:小于 5mA ③感应角度(R3 电阻越大,增益越高,探测角度越大): R3 电阻为 392,不大于 15 度 R3 电阻为 472, 不大于 30 度 ④探测距离(R3 电阻可调节增益,即调节探测…

(OCPP服务器)SteVe编译搭建全过程

注意:建议使用3.6.0,我升级到3.7.1,并没有多什么新功能,反而电表的实时数据只能看到累计电能了,我回退了就正常,数据库是兼容的,java版本换位java11,其他不变就好 背景:…

搭建Tomcat(四)---Servlet容器

目录 引入 Servlet容器 一、优化MyTomcat ①先将MyTomcat的main函数搬过来: ②将getClass()函数搬过来 ③创建容器 ④连接ServletConfigMapping和MyTomcat 连接: ⑤完整的ServletConfigMapping和MyTomcat方法: a.ServletConfigMappin…

Iris简单实现Go web服务器

package mainimport ("github.com/kataras/iris" )func main() {app : iris.New() // 实例一个iris对象//配置路由app.Get("/", func(ctx iris.Context) {ctx.WriteString("Hello Iris")})app.Get("/aa", func(ctx iris.Context) {ct…

MySql 中的解决某列中多个字段查询是否存在指定某个值, FIND_IN_SET 用法。

简言:今天公司数据库里面有个列是多个数据拼接而成的比如:**“,131113,749932833,749932825,749932826,749932827,749932828,749932829,”**想要通过sql 查找749932833值的列,很多同学第一想到的就是like 模糊匹配,模糊匹配不能保…

Git实用指南(精简版)

目录 读者须知 Git是什么 Git的原理 文件在Git中的几种状态 快速上手 结尾 读者须知 本文章适合从未接触过git,或者需要深度学习Git的用户进行阅读. 文末有详细的文档,读者可以前往Github下载阅读!!三克油 Git是什么 简单来说,Git是一个代码备份工具,你可以使用指令对…

jmeter 接口性能测试 学习笔记

目录 说明工具准备工具配置jmeter 界面汉化配置汉化步骤汉化结果图 案例1:测试接口接口准备线程组添加线程组配置线程组值线程数(Number of Threads)Ramp-Up 时间(Ramp-Up Period)循环次数(Loop Count&…

小红书关键词搜索采集 | AI改写 | 无水印下载 | 多维表格 | 采集同步飞书

小红书关键词搜索采集 | AI改写 | 无水印下载 | 多维表格 | 采集同步飞书 一、下载影刀: https://www.winrobot360.com/share/activity?inviteUserUuid595634970300317698 二、加入应用市场 https://www.yingdao.com/share/accede/?inviteKeyb2d3f22a-fd6c-4a…

Unbuntu下怎么生成SSL自签证书?

环境: WSL2 Unbuntu 22.04 问题描述: Unbuntu下怎么生成SSL自签证书? 解决方案: 生成自签名SSL证书可以使用OpenSSL工具,这是一个广泛使用的命令行工具,用于创建和管理SSL/TLS证书。以下是生成自签名…

通过阿里云 Milvus 与 PAI 搭建高效的检索增强对话系统

背景介绍 阿里云向量检索服务Milvus版(简称阿里云Milvus)是一款云上全托管服务,确保了了与开源Milvus的100%兼容性,并支持无缝迁移。在开源版本的基础上增强了可扩展性,能提供大规模 AI 向量数据的相似性检索服务。相…

打靶记录22——Tomato

靶机: https://download.vulnhub.com/tomato/Tomato.ova 难度: 低 目标: 获得 Root 权限 Flag 攻击方法: 主机发现端口扫描信息收集路径爬取源码分析文件包含写入日志 /var/log/auth.log内核漏洞枚举 les.sh本地提权 主机…

三维引擎cesium学习经验

三维引擎cesium学习经验: 1、初始化viewer对象 2、对entity的操作:添加,隐藏,修改,去除,居中显示 3、去除掉entity的双击事件 4、获取当前视角高度 5、获取经纬度在屏幕上的位置 6、获取三维场景屏幕中心点…

【蓝桥杯】43699-四平方和

四平方和 题目描述 四平方和定理,又称为拉格朗日定理: 每个正整数都可以表示为至多 4 个正整数的平方和。如果把 0 包括进去,就正好可以表示为 4 个数的平方和。 比如: 502021222 712121222; 对于一个给定的正整数,可…

十、从0开始卷出一个新项目之瑞萨RZN2L rzn-fsp v2.0.0 Release Notes

目录 一、概述 二、Github地址 三、 Features Added 3.1 Developer Assistance feature support added. 3.2 Multiplex interrupts support added. 四、Bug Fixes and Improvements 4.1 Added a noncache section for user applications. 4.2 Unified case of asm inst…