1. 准备工作
使用docker 快速搭建的环境,官网docker-compose 方式搭建的集群
设置了密码登录 elastic elastic
需要给jdk 导入证书
找到 证书对应目录,复制到桌面。主要导入下面2个证书,执行如下命令
keytool -importcert -alias "修改成你的证书名" -keystore "D:\Program Files\Java\jdk-18\lib\security\cacerts" -file "你的es 证书地址"
导入证书(需要导入2个如下)
keytool -importcert -alias es01 -keystore "D:\Program Files\Java\jdk-18\lib\security\cacerts" -file "E:\桌面\certs\certs\es01\es01.crt"
keytool -importcert -alias mycert -keystore "D:\Program Files\Java\jdk-18\lib\security\cacerts" -file "E:\桌面\certs\ca\ca.crt"
下面就可以进行连接了
2、使用httpClient
插入pom
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
创建一个重试handler
public class MyRequestRetryHandler implements HttpRequestRetryHandler {
public boolean retryRequest(
IOException exception,
int executionCount,
HttpContext context) {
if (executionCount >= 3) {
// Do not retry if over max retry count
return false;
}
if (exception instanceof InterruptedIOException) {
// Timeout
return false;
}
if (exception instanceof UnknownHostException) {
// Unknown host
return false;
}
if (exception instanceof ConnectTimeoutException) {
// Connection refused
return false;
}
if (exception instanceof SSLException) {
// SSL handshake exception
return false;
}
HttpClientContext clientContext = HttpClientContext.adapt(context);
HttpRequest request = clientContext.getRequest();
return !(request instanceof HttpEntityEnclosingRequest);
}
}
kibana 准备数据
# 插入数据
PUT /test_index/_doc/1
{
"name":"zhangsan",
"age":"12"
}
# 查询数据
GET /test_index/_doc/1
安全登录认证测试
public class AppWithSecurity {
private static String wsUrl = "https://localhost:9200";
public static void main(String[] args) throws Exception {
CloseableHttpClient client = HttpClients.custom().setRetryHandler(new MyRequestRetryHandler()).build();
HttpGet method = new HttpGet(wsUrl +"/test_index/_doc/1");
// Execute the method.
HttpHost targetHost = new HttpHost("localhost", 9200, "https");
CredentialsProvider credsProvider = new BasicCredentialsProvider();
credsProvider.setCredentials(new AuthScope(targetHost.getHostName(), targetHost.getPort()),
new UsernamePasswordCredentials("elastic", "elastic"));
// 创建认证实例
AuthCache authCache = new BasicAuthCache();
// Generate BASIC scheme object and add it to local auth cache
BasicScheme basicAuth = new BasicScheme();
authCache.put(targetHost, basicAuth);
// Add AuthCache to the execution context
HttpClientContext context = HttpClientContext.create();
context.setCredentialsProvider(credsProvider);
method.addHeader("Accept-Encoding", "gzip");
try {
CloseableHttpResponse response = client.execute(method, context);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
System.err.println("Method failed: " + response.getStatusLine());
} else {
HttpEntity entity = response.getEntity();
String responseBody = EntityUtils.toString(entity);
System.out.println(responseBody);
}
} catch (IOException e) {
System.err.println("Fatal transport error: " + e.getMessage());
e.printStackTrace();
} finally {
// Release the connection.
method.releaseConnection();
}
}
}
不使用安全认证
可以使用如下代码
public class App {
//不使用安全认证不用https 请求,使用http
private static String wsUrl = "http://127.0.0.1:9200";
public static void main(String[] args) throws Exception {
CloseableHttpClient client = HttpClients.custom()
.setRetryHandler(new MyRequestRetryHandler()).build();
HttpGet method = new HttpGet(wsUrl + "/test_index/_doc/1");
// Execute the method.
try {
CloseableHttpResponse response = client.execute(method);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
System.err.println("Method failed: " + response.getStatusLine());
} else {
HttpEntity entity = response.getEntity();
String responseBody = EntityUtils.toString(entity);
System.out.println(responseBody);
}
} catch (IOException e) {
System.err.println("Fatal transport error: " + e.getMessage());
e.printStackTrace();
} finally {
// Release the connection.
method.releaseConnection();
}
}
}
3. 官方 Java Api
证书,操作如上1,不能省
[Elasticsearch Java API Client 8.8] |弹性的
启动kibana
引入依赖
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.3.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.5</version>
</dependency>
创建客户端
public ElasticsearchClient esClient(){
// URL and API key
String serverUrl = "https://localhost:9200";
//官网提示如何获取apikey https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/getting-started-java.html
String apiKey = "cHdCUlJJa0JSTzdJbVI1dHhKQ2o6cWp3VWVOTmxUamV3bW9pZE9MRXRFQQ==";
// Create the low-level client
RestClient restClient = RestClient
.builder(HttpHost.create(serverUrl))
.setDefaultHeaders(new Header[]{
new BasicHeader("Authorization", "ApiKey " + apiKey)
})
.build();
// Create the transport with a Jackson mapper
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
// And create the API client
return new ElasticsearchClient(transport);
}
Crud 测试完整代码
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Product {
private String id;
private String name;
private Double sku;
}
根据官网写的,个别方法有坑,不知道是不是版本问题,我用的8.3.3,下面是自己改好的
@Slf4j(topic = "logger")
public class Es8ApI {
private static final Logger logger = LoggerFactory.getLogger(Es8ApI.class);
public static ElasticsearchClient getEsClient(){
// URL and API key
String serverUrl = "https://localhost:9200";
//官网提示如何获取apikey https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/getting-started-java.html
String apiKey = "cndCc1JJa0JSTzdJbVI1dFhwQng6WlNCbVZtUXNTdG1WZExVQlExVFV2dw==";
// Create the low-level client
RestClient restClient = RestClient
.builder(HttpHost.create(serverUrl))
.setDefaultHeaders(new Header[]{
new BasicHeader("Authorization", "ApiKey " + apiKey)
})
.build();
// Create the transport with a Jackson mapper
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
// And create the API client
return new ElasticsearchClient(transport);
}
//获取客户端
private static final ElasticsearchClient esClient = getEsClient();
@Test
public void create() throws IOException {
//创建索引
CreateIndexResponse response = esClient.indices().create(c -> c
.index("products")
);
log.info(response.toString());
}
@Test
public void indexDoc() throws IOException {
//插入文档
Product product = new Product("bk-1", "City bike", 123.0);
CreateResponse response = esClient.create(
builder -> builder
.index("products")
.id(product.getId())
.document(product)
);
log.info("Indexed with version " + response.version());
}
@Test
public void getDoc() throws IOException {
GetResponse<Product> response = esClient.get(
builder -> builder
.index("products")
.id("bk-1"),
Product.class
);
log.info(response.toString());
if (response.found()) {
Product product = response.source();
logger.info("Product name " + product.getName());
} else {
logger.info ("Product not found");
}
}
@Test
public void searchDoc() throws IOException {
String searchText = "bike";
SearchResponse<Product> response = esClient.search(
builder -> builder
.index("products")
.query(q -> q
.match(t -> t
.field("name")
.query(searchText)
)
),
Product.class
);
log.info(response.toString());
}
@Test
public void updateDoc() throws IOException {
Product product = new Product("bk-1", "Big bike", 127.0);
esClient.update(builder ->builder.index("products")
.id("bk-1")
//官网坑人,坑小白
.doc(product)
,Product.class);
}
@Test
public void deleteDoc() throws IOException {
esClient.delete(d -> d.index("products").id("bk-1"));
}
@Test
public void deleteIndex() throws IOException {
esClient.indices().delete(c -> c
.index("products")
);
}
}
更多操作,请查看官网
4.springboot 整合 ES8
经过进一步的学习。重写client 配置类。有2种获取client 的方法,一种是账号密码连接,一直是使用apikey
配置文件application.yml
spring:
elasticsearch:
rest:
# 是否启用es
enable: true
# 多个IP逗号隔开
hosts: 127.0.0.1:9200
# 账号密码登录
username: elastic
password: elastic
# http 授权证书
crtName: ca.crt
# 自定义apikey
apikey: cndCc1JJa0JSTzdJbVI1dFhwQng6WlNCbVZtUXNTdG1WZExVQlExVFV2dw==
ElasticSearchConfig配置类
/**
* es8的Java客户端配置
* author:Geng
*/
@Configuration
@Slf4j
public class ElasticSearchConfig {
@Value("${spring.elasticsearch.rest.enable}")
private boolean enable;
@Value("${spring.elasticsearch.rest.hosts}")
private String hosts;
@Value("${spring.elasticsearch.rest.username}")
private String userName;
@Value("${spring.elasticsearch.rest.password}")
private String passWord;
@Value("${spring.elasticsearch.rest.crtName}")
private String tempCrtName;
@Value("${spring.elasticsearch.rest.apikey}")
private String apikey;
private static String crtName;
@PostConstruct
private void init() {
crtName = tempCrtName;
}
/**
* 解析配置的字符串,转为HttpHost对象数组
* @return
*/
private HttpHost[] toHttpHost() {
if (!StringUtils.hasLength(hosts)) {
throw new RuntimeException("invalid elasticsearch configuration");
}
String[] hostArray = hosts.split(",");
HttpHost[] httpHosts = new HttpHost[hostArray.length];
HttpHost httpHost;
for (int i = 0; i < hostArray.length; i++) {
String[] strings = hostArray[i].split(":");
httpHost = new HttpHost(strings[0], Integer.parseInt(strings[1]), "https");
httpHosts[i] = httpHost;
}
return httpHosts;
}
@Bean
@Qualifier("clientByPasswd")
public ElasticsearchClient clientByPasswd() throws Exception {
ElasticsearchTransport transport = getElasticsearchTransport(userName, passWord,toHttpHost() );
return new ElasticsearchClient(transport);
}
/**
* apikey 方式访问
* @return
*/
@Bean
@Qualifier("apiKeyBean")
public ElasticsearchClient esClientByAPIKEy(){
// 2.自签证书的设置,并且还包含了账号密码
RestClientBuilder.HttpClientConfigCallback callback = httpAsyncClientBuilder -> httpAsyncClientBuilder
.setSSLContext(buildSSLContext())
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
// Create the low-level client
RestClient restClient = RestClient
.builder(toHttpHost())
.setDefaultHeaders(new Header[]{
new BasicHeader("Authorization", "ApiKey " + this.apikey)
})
.setHttpClientConfigCallback(callback)
.build();
// Create the transport with a Jackson mapper
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
// And create the API client
return new ElasticsearchClient(transport);
}
/**
* http 证书认证
* @return
*/
private static SSLContext buildSSLContext() {
ClassPathResource resource = new ClassPathResource(crtName);
SSLContext sslContext = null;
try {
CertificateFactory factory = CertificateFactory.getInstance("X.509");
Certificate trustedCa;
try (InputStream is = resource.getInputStream()) {
trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore, null);
sslContext = sslContextBuilder.build();
} catch (CertificateException | IOException | KeyStoreException | NoSuchAlgorithmException |
KeyManagementException e) {
log.error("ES连接认证失败", e);
}
return sslContext;
}
private static ElasticsearchTransport getElasticsearchTransport(String username, String passwd, HttpHost... hosts ) {
// 1.账号密码的配置
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, passwd));
// 2.自签证书的设置,并且还包含了账号密码
RestClientBuilder.HttpClientConfigCallback callback = httpAsyncClientBuilder -> httpAsyncClientBuilder
.setSSLContext(buildSSLContext())
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setDefaultCredentialsProvider(credentialsProvider);
//3. 用builder创建RestClient对象
RestClient client = RestClient
.builder( hosts)
.setHttpClientConfigCallback(callback)
.build();
return new RestClientTransport(client, new JacksonJsonpMapper());
}
}
由于2种连接方式都使用了https 认证,需要把es 自定义证书,复制到resources 文件下,命名为ca.crt
这种方式,就不需要在本机jdk 中添加http证书了。代码中验证了
@Service
public class EsService {
private static final Logger logger = LoggerFactory.getLogger(EsService.class);
@Autowired
@Qualifier("apiKeyBean")
private ElasticsearchClient esClient;
public String addIndex(String name) throws IOException {
//创建索引
CreateIndexResponse response = esClient.indices().create(c -> c
.index(name)
);
logger.info(response.toString());
return response.toString();
}
}
修改@Qualifier("apiKeyBean")
切换2种方式检测是否可行
@RestController
public class EsController {
@Autowired
private EsService esService;
@GetMapping("/test/{index}")
public String test(@PathVariable("index")String index) throws IOException {
return esService.addIndex(index);
}
}
访问 http://localhost/test/ada 后面字段随便输,不能输入相同的,输入相同的会报错
结果