ElasticSearch教程入门到精通——第五部分(基于ELK技术栈elasticsearch 7.x+8.x新特性)

news2024/10/7 20:33:15

ElasticSearch教程入门到精通——第五部分(基于ELK技术栈elasticsearch 7.x+8.x新特性)

  • 1. Elasticsearch集成
    • 1.1 框架集成-SpringData-整体介绍
    • 1.2 Spring Data Elasticsearch 介绍
    • 1.3 框架集成-SpringData-代码功能集成
      • 1.3.1 创建Maven项目
      • 1.3.2 修改pom文件,增加依赖关系
      • 1.3.3 增加配置文件
      • 1.3.4 Spring Boot 主程序
      • 1.3.5 数据实体类
      • 1.3.6 配置类
      • 1.3.7 DAO 数据访问对象
    • 1.4 框架集成-SpringData-集成测试-索引操作
    • 1.5 框架集成-SpringData-集成测试-文档操作
    • 1.6 框架集成-SpringData-集成测试-文档搜索
    • 1.7 框架集成-SparkStreaming-集成
      • 1.7.1 创建Maven项目
      • 1.7.2 修改 pom 文件,增加依赖关系
      • 1.7.3 功能实现
    • 1.8 框架集成-Flink-集成
      • 1.8.1 创建Maven项目
      • 1.8.2 修改 pom 文件,增加相关依赖类库
      • 1.8.3 功能实现

在这里插入图片描述

在这里插入图片描述

1. Elasticsearch集成

1.1 框架集成-SpringData-整体介绍

Spring Data是一个用于简化数据库、非关系型数据库、索引库访问,并支持云服务的开源框架。其主要目标是使得对数据的访问变得方便快捷,并支持 map-reduce框架和云计算数据服务。Spring Data可以极大的简化JPA(Elasticsearch…)的写法,可以在几乎不用写实现的情况下,实现对数据的访问和操作。除了CRUD 外,还包括如分页、排序等一些常用的功能。

Spring Data 的官网

Spring Data 常用的功能模块如下:

  • Spring Data JDBC
  • Spring Data JPA
  • Spring Data LDAP
  • Spring Data MongoDB
  • Spring Data Redis
  • Spring Data R2DBC
  • Spring Data REST
  • Spring Data for Apache Cassandra
  • Spring Data for Apache Geode
  • Spring Data for Apache Solr
  • Spring Data for Pivotal GemFire
  • Spring Data Couchbase
  • Spring Data Elasticsearch
  • Spring Data Envers
  • Spring Data Neo4j
  • Spring Data JDBC Extensions
  • Spring for Apache Hadoop

在这里插入图片描述

1.2 Spring Data Elasticsearch 介绍

Spring Data Elasticsearch基于Spring Data API简化 Elasticsearch 操作,将原始操作Elasticsearch 的客户端API进行封装。Spring Data为Elasticsearch 项目提供集成搜索引擎。Spring Data Elasticsearch POJO的关键功能区域为中心的模型与Elastichsearch交互文档和轻松地编写一个存储索引库数据访问层。

Spring Data Elasticsearch 官网

在这里插入图片描述

1.3 框架集成-SpringData-代码功能集成

1.3.1 创建Maven项目

1.3.2 修改pom文件,增加依赖关系

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.6.RELEASE</version>
        <relativePath/>
    </parent>

    <groupId>com.atguigu</groupId>
    <artifactId>es-spring</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
        </dependency>
    </dependencies>
</project>

1.3.3 增加配置文件

在 resources 目录中增加application.properties文件

# es 服务地址
elasticsearch.host=127.0.0.1
# es 服务端口
elasticsearch.port=9200
# 配置日志级别,开启 debug 日志
logging.level.com.atguigu.es=debug

1.3.4 Spring Boot 主程序

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MainApplication {
    public static void main(String[] args) {
        SpringApplication.run(MainApplication.class, args);
    }
}

1.3.5 数据实体类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Document(indexName = "shopping", shards = 3, replicas = 1)
public class Product {
    //必须有 id,这里的 id 是全局唯一的标识,等同于 es 中的"_id"
    @Id
    private Long id;//商品唯一标识

    /**
     * type : 字段数据类型
     * analyzer : 分词器类型
     * index : 是否索引(默认:true)
     * Keyword : 短语,不进行分词
     */
    @Field(type = FieldType.Text, analyzer = "ik_max_word")
    private String title;//商品名称

    @Field(type = FieldType.Keyword)
    private String category;//分类名称

    @Field(type = FieldType.Double)
    private Double price;//商品价格

    @Field(type = FieldType.Keyword, index = false)
    private String images;//图片地址
}

1.3.6 配置类

  • ElasticsearchRestTemplate是spring-data-elasticsearch项目中的一个类,和其他spring项目中的 template类似。
  • 在新版的spring-data-elasticsearch 中,ElasticsearchRestTemplate 代替了原来的ElasticsearchTemplate。
  • 原因是ElasticsearchTemplate基于TransportClient,TransportClient即将在8.x 以后的版本中移除。所以,我们推荐使用ElasticsearchRestTemplate。
  • ElasticsearchRestTemplate基于RestHighLevelClient客户端的。需要自定义配置类,继承AbstractElasticsearchConfiguration,并实现elasticsearchClient()抽象方法,创建RestHighLevelClient对象。

AbstractElasticsearchConfiguration源码:

package org.springframework.data.elasticsearch.config;

import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;


public abstract class AbstractElasticsearchConfiguration extends ElasticsearchConfigurationSupport {

	//需重写本方法
	public abstract RestHighLevelClient elasticsearchClient();

	@Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" })
	public ElasticsearchOperations elasticsearchOperations(ElasticsearchConverter elasticsearchConverter) {
		return new ElasticsearchRestTemplate(elasticsearchClient(), elasticsearchConverter);
	}
}

需要自定义配置类,继承AbstractElasticsearchConfiguration,并实现elasticsearchClient()抽象方法,创建RestHighLevelClient对象。

import lombok.Data;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;

@ConfigurationProperties(prefix = "elasticsearch")
@Configuration
@Data
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration{

    private String host ;
    private Integer port ;
    //重写父类方法
    @Override
    public RestHighLevelClient elasticsearchClient() {
        RestClientBuilder builder = RestClient.builder(new HttpHost(host, port));
        RestHighLevelClient restHighLevelClient = new
                RestHighLevelClient(builder);
        return restHighLevelClient;
    }
}

1.3.7 DAO 数据访问对象

import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface ProductDao extends ElasticsearchRepository<Product, Long>{

}

在这里插入图片描述

1.4 框架集成-SpringData-集成测试-索引操作

import com.lun.model.Product;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringDataESIndexTest {
    //注入 ElasticsearchRestTemplate
    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;
    //创建索引并增加映射配置
    @Test
    public void createIndex(){
        //创建索引,系统初始化会自动创建索引
        System.out.println("创建索引");
    }

    @Test
    public void deleteIndex(){
        //创建索引,系统初始化会自动创建索引
        boolean flg = elasticsearchRestTemplate.deleteIndex(Product.class);
        System.out.println("删除索引 = " + flg);
    }
}

用Postman 检测有没有创建和删除。

#GET http://localhost:9200/_cat/indices?v 

在这里插入图片描述

1.5 框架集成-SpringData-集成测试-文档操作

import com.lun.dao.ProductDao;
import com.lun.model.Product;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.ArrayList;
import java.util.List;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringDataESProductDaoTest {

    @Autowired
    private ProductDao productDao;
    /**
     * 新增
     */
    @Test
    public void save(){
        Product product = new Product();
        product.setId(2L);
        product.setTitle("华为手机");
        product.setCategory("手机");
        product.setPrice(2999.0);
        product.setImages("http://www.atguigu/hw.jpg");
        productDao.save(product);
    }
    //POSTMAN, GET http://localhost:9200/product/_doc/2

    //修改
    @Test
    public void update(){
        Product product = new Product();
        product.setId(2L);
        product.setTitle("小米 2 手机");
        product.setCategory("手机");
        product.setPrice(9999.0);
        product.setImages("http://www.atguigu/xm.jpg");
        productDao.save(product);
    }
    //POSTMAN, GET http://localhost:9200/product/_doc/2


    //根据 id 查询
    @Test
    public void findById(){
        Product product = productDao.findById(2L).get();
        System.out.println(product);
    }

    @Test
    public void findAll(){
        Iterable<Product> products = productDao.findAll();
        for (Product product : products) {
            System.out.println(product);
        }
    }

    //删除
    @Test
    public void delete(){
        Product product = new Product();
        product.setId(2L);
        productDao.delete(product);
    }
    //POSTMAN, GET http://localhost:9200/product/_doc/2

    //批量新增
    @Test
    public void saveAll(){
        List<Product> productList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Product product = new Product();
            product.setId(Long.valueOf(i));
            product.setTitle("["+i+"]小米手机");
            product.setCategory("手机");
            product.setPrice(1999.0 + i);
            product.setImages("http://www.atguigu/xm.jpg");
            productList.add(product);
        }
        productDao.saveAll(productList);
    }

    //分页查询
    @Test
    public void findByPageable(){
        //设置排序(排序方式,正序还是倒序,排序的 id)
        Sort sort = Sort.by(Sort.Direction.DESC,"id");
        int currentPage=0;//当前页,第一页从 0 开始, 1 表示第二页
        int pageSize = 5;//每页显示多少条
        //设置查询分页
        PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
        //分页查询
        Page<Product> productPage = productDao.findAll(pageRequest);
        for (Product Product : productPage.getContent()) {
            System.out.println(Product);
        }
    }
}

在这里插入图片描述

1.6 框架集成-SpringData-集成测试-文档搜索

import com.lun.dao.ProductDao;
import com.lun.model.Product;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.PageRequest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringDataESSearchTest {

    @Autowired
    private ProductDao productDao;
    /**
     * term 查询
     * search(termQueryBuilder) 调用搜索方法,参数查询构建器对象
     */
    @Test
    public void termQuery(){
        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
                Iterable<Product> products = productDao.search(termQueryBuilder);
        for (Product product : products) {
            System.out.println(product);
        }
    }
    /**
     * term 查询加分页
     */
    @Test
    public void termQueryByPage(){
        int currentPage= 0 ;
        int pageSize = 5;
        //设置查询分页
        PageRequest pageRequest = PageRequest.of(currentPage, pageSize);
        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
                Iterable<Product> products =
                        productDao.search(termQueryBuilder,pageRequest);
        for (Product product : products) {
            System.out.println(product);
        }
    }

}

在这里插入图片描述

1.7 框架集成-SparkStreaming-集成

Spark Streaming 是Spark core API的扩展,支持实时数据流的处理,并且具有可扩展,高吞吐量,容错的特点。数据可以从许多来源获取,如Kafka, Flume,Kinesis或TCP sockets,并且可以使用复杂的算法进行处理,这些算法使用诸如 map,reduce,join和 window等高级函数表示。最后,处理后的数据可以推送到文件系统,数据库等。实际上,您可以将Spark的机器学习和图形处理算法应用于数据流。

1.7.1 创建Maven项目

1.7.2 修改 pom 文件,增加依赖关系

<?xml version="1.0" encoding="utf-8"?>
<project
    xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.lun.es</groupId>
    <artifactId>sparkstreaming-elasticsearch</artifactId>
    <version>1.0</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.8.0</version>
        </dependency>
        <!-- elasticsearch 的客户端 -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.8.0</version>
        </dependency>
        <!-- elasticsearch 依赖 2.x 的 log4j -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <!-- <dependency>-->
        <!-- <groupId>com.fasterxml.jackson.core</groupId>-->
        <!-- <artifactId>jackson-databind</artifactId>-->
        <!-- <version>2.11.1</version>-->
        <!-- </dependency>-->
        <!-- &lt;!&ndash; junit 单元测试 &ndash;&gt;-->
        <!-- <dependency>-->
        <!-- <groupId>junit</groupId>-->
        <!-- <artifactId>junit</artifactId>-->
        <!-- <version>4.12</version>-->
        <!-- </dependency>-->
    </dependencies>
</project>

1.7.3 功能实现

import org.apache.http.HttpHost
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.indices.CreateIndexRequest
import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentType
import java.util.Date

object SparkStreamingESTest {

    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")
        val ssc = new StreamingContext(sparkConf, Seconds(3))
        val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
        ds.foreachRDD(
            rdd => {
                println("*************** " + new Date())
                rdd.foreach(
                    data => {
                        val client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));
                        // 新增文档 - 请求对象
                        val request = new IndexRequest();
                        
                        // 设置索引及唯一性标识
                        val ss = data.split(" ")
                        println("ss = " + ss.mkString(","))
                        request.index("sparkstreaming").id(ss(0));
                        
                        val productJson =
                            s"""
                            | { "data":"${ss(1)}" }
                            |""".stripMargin;
                        
                        // 添加文档数据,数据格式为 JSON 格式
                        request.source(productJson,XContentType.JSON);
                        
                        // 客户端发送请求,获取响应对象
                        val response = client.index(request,
                        RequestOptions.DEFAULT);
                        System.out.println("_index:" + response.getIndex());
                        System.out.println("_id:" + response.getId());
                        System.out.println("_result:" + response.getResult());
                        client.close()
                    }
                )
            }
        )
        ssc.start()
        ssc.awaitTermination()
    }
}

在这里插入图片描述

1.8 框架集成-Flink-集成

Apache Spark是一-种基于内存的快速、通用、可扩展的大数据分析计算引擎。Apache Spark掀开了内存计算的先河,以内存作为赌注,贏得了内存计算的飞速发展。但是在其火热的同时,开发人员发现,在Spark中,计算框架普遍存在的缺点和不足依然没有完全解决,而这些问题随着5G时代的来临以及决策者对实时数据分析结果的迫切需要而凸显的更加明显:

  • 乱序数据,迟到数据
  • 低延迟,高吞吐,准确性
  • 容错性
  • 数据精准一次性处理(Exactly-Once)

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。在Spark火热的同时,也默默地发展自己,并尝试着解决其他计算框架的问题。慢慢地,随着这些问题的解决,Flink 慢慢被绝大数程序员所熟知并进行大力推广,阿里公司在2015年改进Flink,并创建了内部分支Blink,目前服务于阿里集团内部搜索、推荐、广告和蚂蚁等大量核心实时业务。

1.8.1 创建Maven项目

1.8.2 修改 pom 文件,增加相关依赖类库

<?xml version="1.0" encoding="UTF-8"?>
<project
    xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.lun.es</groupId>
    <artifactId>flink-elasticsearch</artifactId>
    <version>1.0</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>
        <!-- jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.11.1</version>
        </dependency>
    </dependencies>
</project>

1.8.3 功能实现

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FlinkElasticsearchSinkTest {

	public static void main(String[] args) throws Exception {

		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
		List<HttpHost> httpHosts = new ArrayList<>();
		httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
		//httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));

		// use a ElasticsearchSink.Builder to create an ElasticsearchSink
		ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, 
			new ElasticsearchSinkFunction<String>() {
				public IndexRequest createIndexRequest(String element) {
					Map<String, String> json = new HashMap<>();
					json.put("data", element);
					return Requests.indexRequest()
						.index("my-index")
						//.type("my-type")
						.source(json);
				}

				@Override
				public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
					indexer.add(createIndexRequest(element));
				}
			}
		);
		
		// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
		esSinkBuilder.setBulkFlushMaxActions(1);

		// provide a RestClientFactory for custom configuration on the internally createdREST client
		// esSinkBuilder.setRestClientFactory(
		// restClientBuilder -> {
			// restClientBuilder.setDefaultHeaders(...)
			// restClientBuilder.setMaxRetryTimeoutMillis(...)
			// restClientBuilder.setPathPrefix(...)
			// restClientBuilder.setHttpClientConfigCallback(...)
		// }
		// );
		source.addSink(esSinkBuilder.build());
		env.execute("flink-es");
	}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1632182.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

pytorch中的过拟合和欠拟合

基本概念 我们知道&#xff0c;所谓的神经网络其实就是一个复杂的非线性函数&#xff0c;网络越深&#xff0c;这个函数就越复杂&#xff0c;相应的表达能力也就越强&#xff0c;神经网络的训练则是一个拟合的过程。   当模型的复杂度小于真实数据的复杂度&#xff0c;模型表…

linux中通过logrotate进行日志切割

&#x1f341;博主简介&#xff1a; &#x1f3c5;云计算领域优质创作者 &#x1f3c5;2022年CSDN新星计划python赛道第一名 &#x1f3c5;2022年CSDN原力计划优质作者 &#x1f3c5;阿里云ACE认证高级工程师 &#x1f3c5;阿里云开发者社区专…

Android 设置头像 - 裁剪及圆形头像

书接上文 Android 设置头像 - 相册拍照&#xff0c;通过相册和照片的设置就可以获取到需要的头像信息&#xff0c;但是在通常情况下&#xff0c;我们还想要实现针对头像的裁剪功能和圆形头像功能。 先上截图&#xff1a; 图像裁剪 通常裁剪可以分为程序自动裁剪和用户选择裁剪…

自适应信号处理基础及应用——DSP学习笔记五

本专栏的图片内容都来自于老师讲课的PPT&#xff0c;本篇博客只是我个人对于上课内容的知识结构分析和梳理。 导论 自适应系统的定义、特征、形式、举例 特征 非自适应系统 • 固定参数的设计方法 • 假定事先知道了一切可能的输入条件&#xff1b;在这些条件下怎样动作&#…

限流--4种经典限流算法讲解--单机限流和分布式限流的实现

为什么需要限流 系统的维护使用是需要成本的&#xff0c;用户可能使用科技疯狂刷量&#xff0c;消耗系统资源&#xff0c;出现额外的经济开销问题&#xff1a; 控制成本>限制用户的调用次数用户在短时间内疯狂使用&#xff0c;导致服务器资源被占满&#xff0c;其他用户无…

大象机器人开源六轴协作机械臂myCobot 320 手机摄影技术!

引言 有没有遇到过这样的情况&#xff1a;当你手持手机或相机准备拍摄视频时&#xff0c;心中已经构想了完美的画面&#xff0c;但却因为实际的限制无法捕捉到理想中的角度&#xff1f;这种情况可能会让人感到挫折。例如&#xff0c;如果想要从地面一只蚂蚁的视角拍摄&#xff…

dremio数据湖sql行列转换及转置

1、行转列 (扁平化) 数据准备 表 aa 1.1 cross join unnest 在Dremio中&#xff0c;UNNEST 函数用于将数组或复杂类型的列&#xff08;如JSON、Map或Array类型&#xff09;中的值“炸裂”&#xff08;分解&#xff09;成多行. with aa as ( select 上海 as city, ARRAY[浦东…

asp.net结课作业中遇到的问题解决1

作业要求 实现增删改查导出基本功能。 1、如何设置使得某个背景就是一整个而不是无限填充或者是这个图片的某一部分。 这就要求在设置这一块的时候&#xff0c;长和宽按照背景图片的大小进行设置&#xff0c;比如&#xff1a; 如果&#xff0c;图片的大小不符合你的要求&am…

技术团队的管理方法和日常总结建议

管理学家德鲁克有言“管理是一种实践&#xff0c;其本质不在于知&#xff0c;而在于行&#xff0c;其验证不在于逻辑&#xff0c;而在于成果&#xff0c;其唯一的权威就是成就” &#xff0c;因此管理重实践看效果&#xff0c;但如果管理实践有理论依凭&#xff0c;那么实践起来…

云手机对出海企业有什么帮助?

近些年&#xff0c;越来越多的企业开始向海外拓展&#xff0c;意图发掘更广阔的市场。在这过程中&#xff0c;云手机作为一个新型工具为很多企业提供了助力&#xff0c;尤其在解决海外市场拓展过程中的诸多挑战方面发挥着作用。 首先&#xff0c;云手机的出现解决了企业在海外拓…

VS2022 嘿嘿

还是大二的时候就开始用这个&#xff0c;但居然是为了用PB&#xff0c;-_-|| 用了段时间换成了C#&#xff0c;依稀还记得大佬们纠正我的读法&#xff0c;别读C井&#xff0c;应该读C夏普。。。 安装过程其实也没啥&#xff0c;就是关键Key得花时间找&#xff0c;我好不容易搞…

Android如何使用XML自定义属性

1、定义 在res/values文件下定义一个attrs.xml文件&#xff0c;代码如下: 2、使用 在布局中使用&#xff0c; 示例代码如下&#xff1a; 3、获取 最终来到这里&#xff1a;

设计模式——保护性暂停

同步模式之保护性暂停 文章目录 同步模式之保护性暂停定义实现应用带超时版 GuardedObject扩展——原理之join扩展——多任务版 GuardedObject 定义 即 Guarded Suspension&#xff0c;用在一个线程等待另一个线程的执行结果 要点 有一个结果需要从一个线程传递到另一个线程&…

秋招后端开发面试题 - Java语言基础(下)

目录 Java基础下前言面试题toString() 、String.valueof()、(String)&#xff1f;hashCode() 方法&#xff1f;hashCode 和 equals 方法判断两个对象是否相等&#xff1f;为什么重写 equals 时必须重写 hashCode 方法&#xff1f;String、StringBuffer、StringBuilder?String …

VoxAtnNet:三维点云卷积神经网络

VoxAtnNet:三维点云卷积神经网络 摘要IntroductionProposed VoxAtnNet 3D Face PAD3D face point cloud presentation attack Dataset (3D-PCPA) VoxAtnNet: A 3D Point Clouds Convolutional Neural Network for 摘要 面部生物识别是智能手机确保可靠和可信任认证的重要组件。…

react 学习笔记二:ref、状态、继承

基础知识 1、ref 创建变量时&#xff0c;需要运用到username React.createRef()&#xff0c;并将其绑定到对应的节点。在使用时需要获取当前的节点&#xff1b; 注意&#xff1a;vue直接使用里面的值&#xff0c;不需要再用this。 2、状态 组件描述某种显示情况的数据&#…

[ACTF2020 新生赛]BackupFile 1 [极客大挑战 2019]BuyFlag 1 [护网杯 2018]easy_tornado 1

目录 [ACTF2020 新生赛]BackupFile 1 1.打开页面&#xff0c;叫我们去找源文件 2.想到用disearch扫描&#xff0c;发现源文件index.php.bak 3.访问这个文件&#xff0c;下载一个文件&#xff0c;用记事本打开 4.翻译php代码 5.构造payload url/?key123&#xff0c;得到fl…

【哈希】Leetcode 面试题 01.02. 判定是否互为字符重排

题目讲解 面试题 01.02. 判定是否互为字符重排 算法讲解 直观的想法&#xff1a;我们找到一个字符串的全排列&#xff0c;然后对比当前的排列是否等于另一个字符串。如果两个字符串如果互为排列&#xff0c;所以我们知道两个字符串对应的字符出现的个数相同&#xff0c;那么…

Windows 容器镜像踩坑记录

为什么研究windows容器&#xff1f;emm&#xff0c;公司需要&#xff0c;不想多说。 dotnet后端 问题描述&#xff1a; 基于mcr.microsoft.com/dotnet/aspnet:6.0镜像撰写dockerfile编译.net core后端项目后运行容器出现类库不存在问题&#xff1a; 程序中使用了fastreport&a…

编写你的第一个 golang 的应用程序

进行你的第一个golang的程序 当你把程序都安装好以后 环境变量配置 好 vscode 插件下载好以后 1. 创建一个test.go 的文件 //主包&#xff0c;可执行文件所在包 package main//导入包 import "fmt"//主函数&#xff0c;入口函数 func main() { }2.解释 需要导入包 …