第4章 功能使用
4.1 Java API 操作
随着 Elasticsearch 8.x 新版本的到来,Type 的概念被废除,为了适应这种数据结构的改
变,Elasticsearch 官方从 7.15 版本开始建议使用新的 Elasticsearch Java Client。
4.1.1 增加依赖关系
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<elastic.version>8.1.0</elastic.version>
</properties>
<dependencies>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>x-pack-sql-jdbc</artifactId>
<version>8.1.0</version>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elastic.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>
4.1.2 获取客户端对象
就像连接 MySQL 数据库一样,Java 通过客户端操作 Elasticsearch 也要获取到连接后才
可以。咱们现在使用的基于 https 安全的 Elasticsearch 服务,所以首先我们需要将之前的证
书进行一个转换
openssl pkcs12 -in elastic-stack-ca.p12 -clcerts -nokeys -out java-ca.crt
配置证书后,我们就可以采用 https 方式获取连接对象了。
# 导入的类
import co.elastic.clients.elasticsearch.*;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.*;
import org.apache.http.client.*;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.*;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.*;
import org.elasticsearch.client.*;
import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.*;
import java.security.KeyStore;
import java.security.cert.*;
# 获取客户端对象
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "O3x0hfu7i=ZbQvlktCnd"));
Path caCertificatePath = Paths.get("ca.crt");
CertificateFactory factory =
CertificateFactory.getInstance("X.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(caCertificatePath)) {
trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore, null);
final SSLContext sslContext = sslContextBuilder.build();
RestClientBuilder builder = RestClient.builder(
new HttpHost("linux1", 9200, "https"))
.setHttpClientConfigCallback(new
RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext)
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setDefaultCredentialsProvider(credentialsProvider);
}
});
RestClient restClient = builder.build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient(transport);
...
transport.close();
4.1.3 操作数据(普通操作)
4.1.3.1 索引操作
// 创建索引
CreateIndexRequest request = new
CreateIndexRequest.Builder().index("myindex").build();
final CreateIndexResponse createIndexResponse =
client.indices().create(request);
System.out.println("创建索引成功:" + createIndexResponse.acknowledged());
// 查询索引
GetIndexRequest getIndexRequest = new
GetIndexRequest.Builder().index("myindex").build();
final GetIndexResponse getIndexResponse =
client.indices().get(getIndexRequest);
System.out.println("索引查询成功:" + getIndexResponse.result());
// 删除索引
DeleteIndexRequest deleteIndexRequest = new
DeleteIndexRequest.Builder().index("myindex").build();
final DeleteIndexResponse delete = client.indices().delete(deleteIndexRequest);
final boolean acknowledged = delete.acknowledged();
System.out.println("删除索引成功:" + acknowledged)
4.1.3.2 文档操作
// 创建文档
IndexRequest indexRequest = new IndexRequest.Builder()
.index("myindex")
.id(user.getId().toString())
.document(user)
.build();
final IndexResponse index = client.index(indexRequest);
System.out.println("文档操作结果:" + index.result());
// 批量创建文档
final List<BulkOperation> operations = new ArrayList<BulkOperation>();
for ( int i= 1;i <= 5; i++ ) {
final CreateOperation.Builder builder = new CreateOperation.Builder();
builder.index("myindex");
builder.id("200" + i);
builder.document(new User(2000 + i, 30 + i * 10, "zhangsan" + i, "beijing",
1000 + i*1000));
final CreateOperation<Object> objectCreateOperation = builder.build();
final BulkOperation bulk = new
BulkOperation.Builder().create(objectCreateOperation).build();
operations.add(bulk);
}
BulkRequest bulkRequest = new
BulkRequest.Builder().operations(operations).build();
final BulkResponse bulkResponse = client.bulk(bulkRequest);
System.out.println("数据操作成功:" + bulkResponse);
// 删除文档
DeleteRequest deleteRequest = new
DeleteRequest.Builder().index("myindex").id("1001").build();
client.delete(deleteRequest);
4.1.3.3 文档查询
final SearchRequest.Builder searchRequestBuilder = new
SearchRequest.Builder().index("myindex1");
MatchQuery matchQuery = new
MatchQuery.Builder().field("city").query(FieldValue.of("beijing")).build();
Query query = new Query.Builder().match(matchQuery).build();
searchRequestBuilder.query(query);
SearchRequest searchRequest = searchRequestBuilder.build();
final SearchResponse<Object> search = client.search(searchRequest,
Object.class);
System.out.println(search);
4.1.4 操作数据(函数操作)
4.1.4.1 索引操作
// 创建索引
final Boolean acknowledged = client.indices().create(p ->
p.index("")).acknowledged();
System.out.println("创建索引成功");
// 获取索引
System.out.println(
client.indices().get(
req -> req.index("myindex1")
).result());
// 删除索引
client.indices().delete(
reqbuilder -> reqbuilder.index("myindex")
).acknowledged();
4.1.4.2 文档操作
// 创建文档
System.out.println(
client.index(
req ->
req.index("myindex")
.id(user.getId().toString())
.document(user)
).result()
);
// 批量创建文档
client.bulk(
req -> {
users.forEach(
u -> {
req.operations(
b -> {
b.create(
d ->
d.id(u.getId().toString()).index("myindex").document(u)
);
return b;
}
);
}
);
return req;
}
);
// 删除文档
client.delete(
req -> req.index("myindex").id("1001")
);
4.1.4.3 文档查询
client.search(
req -> {
req.query(
q ->
q.match(
m -> m.field("city").query("beijing")
)
);
return req;
}
, Object.class
);
4.1.5 客户端异步操作
ES Java API 提供了同步和异步的两种客户端处理。之前演示的都是同步处理,异步客
户端的处理和同步客户端处理的 API 基本原理相同,不同的是需要异步对返回结果进行相
应的处理
// 创建索引
asyncClient.indices().create(
req -> {
req.index("newindex");
return req;
}
).whenComplete(
(resp, error) -> {
System.out.println("回调函数");
if ( resp != null ) {
System.out.println(resp.acknowledged());
} else {
error.printStackTrace();
}
}
);
System.out.println("主线程操作...");
asyncClient.indices().create(
req -> {
req.index("newindex");
return req;
}
)
.thenApply(
resp -> {
return resp.acknowledged();
}
)
.whenComplete(
(resp, error) -> {
System.out.println("回调函数");
if ( !resp ) {
System.out.println();
} else {
error.printStackTrace();
}
}
);
4.2 EQL 操作
EQL 的全名是 Event Query Language (EQL)。事件查询语言(EQL)是一种用于基于事
件的时间序列数据(例如日志,指标和跟踪)的查询语言。在 Elastic Security 平台上,当
输入有效的 EQL 时,查询会在数据节点上编译,执行查询并返回结果。这一切都快速、并
行地发生,让用户立即看到结果。
EQL 的优点:
➢ EQL 使你可以表达事件之间的关系
许多查询语言允许您匹配单个事件。EQL 使你可以匹配不同事件类别和时间跨度的一
系列事件。
➢ EQL 的学习曲线很低
EQL 语法看起来像其他常见查询语言,例如 SQL。 EQL 使你可以直观地编写和读取查
询,从而可以进行快速,迭代的搜索。
➢ EQL 设计用于安全用例
尽管你可以将其用于任何基于事件的数据,但我们创建了 EQL 来进行威胁搜寻。 EQL
不仅支持危害指标(IOC)搜索,而且可以描述超出 IOC 范围的活动。
4.2.1 基础语法
4.2.1.1 数据准备
要运行 EQL 搜索,搜索到的数据流或索引必须包含时间戳和事件类别字段。 默认情况下,
EQL 使用 Elastic 通用模式(ECS)中的 @timestamp 和 event.category 字段。
@timestamp 表示时间戳,event.category 表示事件分类。
咱们准备一些简单的数据,用于表示电商网站页面跳转
# 创建索引
PUT /gmall
# 批量增加数据
PUT _bulk
{"index":{"_index":"gmall"}}
{"@timestamp":"2022-06-01T12:00:00.00+08:00",
"event":{"category":"page"},"page" : {"session_id" :
"42FC7E13-CB3E-5C05-0000-0010A0125101","last_page_id" : "","page_id" :
"login","user_id" : ""}}
{"index":{"_index":"gmall"}}
{"@timestamp":"2022-06-01T12:01:00.00+08:00",
"event":{"category":"page"},"page" : {"session_id" :
"42FC7E13-CB3E-5C05-0000-0010A0125101","last_page_id" : "login","page_id" :
"good_list","user_id" : "1"}}
{"index":{"_index":"gmall"}}
{"@timestamp":"2022-06-01T12:05:00.00+08:00",
"event":{"category":"page"},"page" : {"session_id" :
"42FC7E13-CB3E-5C05-0000-0010A0125101","last_page_id" : "good_list","page_id" :
"good_detail","user_id" : "1"}}
{"index":{"_index":"gmall"}}
{"@timestamp":"2022-06-01T12:07:00.00+08:00",
"event":{"category":"page"},"page" : {"session_id" :
"42FC7E13-CB3E-5C05-0000-0010A0125101","last_page_id" :
"good_detail","page_id" : "order","user_id" : "1"}}
{"index":{"_index":"gmall"}}
{"@timestamp":"2022-06-01T12:08:00.00+08:00",
"event":{"category":"page"},"page" : {"session_id" :
"42FC7E13-CB3E-5C05-0000-0010A0125101","last_page_id" : "order","page_id" :
"payment","user_id" : "1"}}
{"index":{"_index":"gmall"}}
{"@timestamp":"2022-06-01T12:08:00.00+08:00",
"event":{"category":"page"},"page" : {"session_id" :
"42FC7E13-CB3E-5C05-0000-0010A0125102","last_page_id" : "","page_id" :
"login","user_id" : "2"}}
{"index":{"_index":"gmall"}}
{"@timestamp":"2022-06-01T12:08:00.00+08:00",
"event":{"category":"page"},"page" : {"session_id" :
"42FC7E13-CB3E-5C05-0000-0010A0125102","last_page_id" : "login","page_id" :
"payment","user_id" : "2"}}
4.2.1.2 数据窗口搜索
在事件响应过程中,有很多时候,了解特定时间发生的所有事件是很有用的。使用一种名为
any 的特殊事件类型,针对所有事件进行匹配,如果想要匹配特定事件,就需要指明事件分
类名称
#
GET /gmall/_eql/search
{
"query" : """
any where page.user_id == "1"
"""
}
4.2.1.3 统计符合条件的事件
#
GET /gmall/_eql/search
{
"query" : """
any where true
""",
"filter": {
"range": {
"@timestamp": {
"gte": "1654056000000",
"lt": "1654056005000"
}
}
}
}
4.2.1.4 事件序列
# 页面先访问 login,后面又访问了 good_detail 的页面
GET /gmall/_eql/search
{
"query" : """
sequence by page.session_id
[page where page.page_id=="login"]
[page where page.page_id=="good_detail"]
"""
}
4.2.2 安全检测
EQL 在 Elastic Securit 中被广泛使用。实际应用时,我们可以使用 EQL 语言来进行检
测安全威胁和其他可疑行为。
4.2.2.1 数据准备
regsvr32.exe 是一个内置的命令行实用程序,用于在 Windows 中注册.dll 库。作为本机
工具,regsvr32.exe 具有受信任的状态,从而使它可以绕过大多数允许列表软件和脚本阻止
程序。 有权访问用户命令行的攻击者可以使用 regsvr32.exe 通过.dll 库运行恶意脚本,即使
在其他情况下也不允许这些脚本运行。
regsvr32 滥用的一种常见变体是 Squfullydoo 攻击。在 Squfullydoo 攻击中,regsvr32.exe
命令使用 scrobj.dll 库注册并运行远程脚本。
测试数据来自 Atomic Red Team 的测试数据集,其中包括模仿 Squibledoo 攻击的事件。
数据已映射到 Elastic 通用架构(ECS)字段:normalized-T1117-AtomicRed-regsvr32.json
将文件内容导入到 ES 软件中:
# 创建索引
PUT my-eql-index
# 导入数据
POST my-eql-index/_bulk?pretty&refresh
查看数据导入情况
# 导入数据
GET /_cat/indices/my-eql-index?v=true&h=health,status,index,docs.count
4.2.2 获取 regsvr32 事件的计数
获取与 regsvr32.exe 进程关联的事件数
# 查询数据
# ?filter_path=-hits.events 从响应中排除 hits.events 属性。 此搜索仅用于获取事件计数,
而不是匹配事件的列表
# query : 匹配任何进程名称为 regsvr32.exe 的事件
# size : 最多返回 200 个匹配事件的匹配,实际查询结果为 143 个
GET my-eql-index/_eql/search?filter_path=-hits.events
{
"query": """
any where process.name == "regsvr32.exe"
""",
"size": 200
}
4.2.3 检查命令行参数
regsvr32.exe 进程与 143 个事件相关联。 但是如何首先调用 regsvr32.exe?谁调用的?
regsvr32.exe 是一个命令行实用程序。将结果缩小到使用命令行的进程
# 增加过滤条件查询数据
GET my-eql-index/_eql/search
{
"query": """
process where process.name == "regsvr32.exe" and
process.command_line.keyword != null
"""
}
该查询将一个事件与创建的 event.type 相匹配,指示 regsvr32.exe 进程的开始。根据事件的
process.command_line 值,regsvr32.exe 使用 scrobj.dll 注册了脚本 RegSvr32.sct.这符合
Squibledoo 攻击的行为
4.2.4 检查恶意脚本加载
检查 regsvr32.exe 以后是否加载 scrobj.dll 库
# 增加过滤条件查询数据
GET my-eql-index/_eql/search
{
"query": """
library where process.name == "regsvr32.exe" and dll.name == "scrobj.dll"
"""
}
4.2.5 检查攻击成功可能性
在许多情况下,攻击者使用恶意脚本连接到远程服务器或下载其他文件。使用 EQL 序
列查询来检查以下一系列事件:
➢ regsvr32.exe 进程
➢ 通过相同的进程加载 scrobj.dll 库
➢ 同一过程中的任何网络事件
根据上一个响应中看到的命令行值,你可以期望找到一个匹配项。但是,此查询并非针
对该特定命令而设计。取而代之的是,它寻找一种可疑行为的模式,这种模式足以检测出相
似的威胁
# 增加过滤条件查询数据
GET my-eql-index/_eql/search
{
"query": """
sequence by process.pid
[process where process.name == "regsvr32.exe"]
[library where dll.name == "scrobj.dll"]
[network where true]
"""
}
4.3 SQL 操作
一般使用 Elasticsearch 的时候,会使用 Query DSL 来查询数据,从 Elasticsearch6.3 版本
以后,Elasticsearch 已经支持 SQL 查询了。
Elasticsearch SQL 是一个 X-Pack 组件,它允许针对 Elasticsearch 实时执行类似 SQL 的
查询。无论使用 REST 接口,命令行还是 JDBC,任何客户端都可以使用 SQL 对 Elasticsearch
中的数据进行原生搜索和聚合数据。可以将 Elasticsearch SQL 看作是一种翻译器,它可以将
SQL 翻译成 Query DSL。
Elasticsearch SQL 具有如下特性:
➢ 原生支持:Elasticsearch SQL 是专门为 Elasticsearch 打造的。
➢ 没有额外的零件:无需其他硬件,处理器,运行环境或依赖库即可查询 Elasticsearch,
Elasticsearch SQL 直接在 Elasticsearch 内部运行。
➢ 轻巧高效:Elasticsearch SQL 并未抽象化其搜索功能,相反的它拥抱并接受了 SQL 来
实现全文搜索,以简洁的方式实时运行全文搜索。
4.3.1 SQL 和 Elasticsearch 的对应关系
虽然 SQL 和 Elasticsearch 对数据的组织方式(以及不同的语义)有不同的术语,但它们的
目的本质上是相同的
然概念之间的映射并不完全是一对一的,语义也有所不同,但共同点多于差异。事实上,
SQL 的许多概念可以在 Elasticsearch 中找到对应关系,并且这两者的术语也很类似
4.3.2 数据准备
# 创建索引并增加数据,等同于创建表和数据
PUT my-sql-index/_bulk?refresh
{"index":{"_id": "JAVA"}}
{"name": "JAVA", "author": "zhangsan", "release_date": "2022-05-01",
"page_count": 561}
{"index":{"_id": "BIGDATA"}}
{"name": "BIGDATA", "author": "lisi", "release_date": "2022-05-02", "page_count":
482}
{"index":{"_id": "SCALA"}}
{"name": "SCALA", "author": "wangwu", "release_date": "2022-05-03", "page_count":
604}
4.3.3 第一个 SQL 查询
现在可以使用 SQL 对数据进行查询了。
# SQL
# 这里的表就是索引
# 可以通过 format 参数控制返回结果的格式,默认为 json 格式
# txt:表示文本格式,看起来更直观点.
# csv:使用逗号隔开的数据
# json:JSON 格式数据
# tsv: 使用 tab 键隔开数据
# yaml:属性配置格式
POST _sql?format=txt
{
"query": """
SELECT * FROM "my-sql-index"
"""
}
# 条件查询
POST _sql?format=txt
{
"query": """
SELECT * FROM "my-sql-index" where page_count > 500
"""
}
实际上会发现,和 JDBC 操作时的 SQL 语法是基本是一样的。
4.3.3 SQL 转换为 DSL 使用
当我们需要使用 Query DSL 时,也可以先使用 SQL 来查询,然后通过 Translate API 转换即
可,查询的结果为 DSL 方式的结果
# 转换 SQL 为 DSL 进行操作
POST _sql/translate
{
"query": """
SELECT * FROM "my-sql-index" where page_count > 500
"""
}
4.3.4 SQL 和 DSL 混合使用
我们如果在优化 SQL 语句之后还不满足查询需求,可以拿 SQL 和 DSL 混用,ES 会先根据
SQL 进行查询,然后根据 DSL 语句对 SQL 的执行结果进行二次查询
# SQL 和 DSL 混合使用
# 由于索引中含有横线,所以作为表名时需要采用双引号,且外层需要三个引号包含
POST _sql?format=txt
{
"query": """SELECT * FROM "my-sql-index" """,
"filter" : {
"range": {
"page_count": {
"gte": 400,
"lte": 600
}
}
},
"fetch_size": 2
}