上一期,我们说明了基于API形式的热更新,但是API形式的热更新存在词库的管理不方便,要直接操作磁盘文件,检索页很麻烦;文件的读写没有专门的优化,性能不好;多一次接口调用和网络传输等缺点,因此这期我们来说明直连数据库的方式来实现热更新
1. 简介
官方github中并没有说明这种方式,所以本身是不支持直连数据库实现热更新的,要实现需要通过修改源码来做到。
相比较与API的形式,直连数据库的方式更加稳定,但是因为官方并没有明确支持这种方式所以也存在着不确定性,生产中采取哪种方式,还要根据实际业务的需求情况而定
2. 步骤
1、github下载源码:https://github.com/medcl/elasticsearch-analysis-ik/releases?page=2
2、在IDEA中导入刚刚下载的源码
3、源码导入需要一会时间,我们先把数据表创建出来,一共要创建两张表:分词表、停止词表
需要字段id,word
4、下载下来的源码中使用的elasticsearch版本是7.4的,我们需要将其改为对应版本,因为我用的es是7.13.0的,所以在pom中将其改为7.13.0
5、并且在pom中添加mysql驱动器依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
6、创建数据库配置文件jdbc-reload.properties,放在IK项目的config文件夹下
jdbc.url=jdbc:mysql://192.168.101.109:3306/user_test?serverTimezone=UTC
jdbc.user=root
jdbc.password=123456
jdbc.reload.extend.sql=select word from extend_word
jdbc.reload.stop.sql=select word from stop_word
# 间隔时间 毫秒
jdbc.reload.interval=1000
7、IK分词器加载分词的源码在witea.analyzer.dic.Dictionary类中,打开这个类,添加如下方法
两个加载分词的方法可以模仿自带的加载分词方法来写,具体如下:
添加加载拓展词方法
/**
* 加载自定义数据库拓展词典到主词库表
* 55555 20211216
*/
public void loadExtendDictFromMysql(){
Connection connection = null;
Statement statement = null;
ResultSet resultSet = null;
try{
Path file = PathUtils.get(getDictRoot(),"jdbc-reload.properties");
props.load(new FileInputStream(file.toFile()));
logger.info("loading jdbc-reload.properties");
for (Object key : props.keySet()) {
logger.info(key + "=" + props.getProperty(String.valueOf(key)));
}
logger.info(" hot dict " + props.getProperty("jdbc.reload.extend.sql"));
connection = DriverManager.getConnection(
props.getProperty("jdbc.url"),
props.getProperty("jdbc.user"),
props.getProperty("jdbc.password"));
statement = connection.createStatement();
resultSet = statement.executeQuery(props.getProperty("jdbc.reload.extend.sql"));
while (resultSet.next()){
// 加载扩展词典数据到主内存词典中
String theWord = resultSet.getString("word");
logger.info(theWord);
_MainDict.fillSegment(theWord.trim().toLowerCase().toCharArray());
}
// 加载时间
Thread.sleep(Integer.valueOf(String.valueOf(props.get("jdbc.reload.interval"))));
}catch (Exception e){
logger.error("[Extend Dict Loading] "+ e);
}finally {
if(resultSet != null){
try {
statement.close();
} catch (SQLException e) {
logger.error("[Extend Dict Loading] " + e);
}
}
if(connection != null){
try {
connection.close();
} catch (SQLException e) {
logger.error("[Extend Dict Loading] " + e);
}
}
}
}
添加加载停用词方法
/**
* 加载自定义数据库拓展停止词词典到主词库表
* 55555 20211216
*/
public void loadStopDictFromMysql(){
// 建立主词典实例
_StopWords = new DictSegment((char) 0);
Connection connection = null;
Statement statement = null;
ResultSet resultSet = null;
try{
Path file = PathUtils.get(getDictRoot(),"jdbc-reload.properties");
props.load(new FileInputStream(file.toFile()));
logger.info("loading jdbc-reload.properties");
for (Object key : props.keySet()) {
logger.info(key + "=" + props.getProperty(String.valueOf(key)));
}
logger.info(" stop dict " + props.getProperty("jdbc.reload.stop.sql"));
connection = DriverManager.getConnection(
props.getProperty("jdbc.url"),
props.getProperty("jdbc.user"),
props.getProperty("jdbc.password"));
statement = connection.createStatement();
resultSet = statement.executeQuery(props.getProperty("jdbc.reload.stop.sql"));
while (resultSet.next()){
// 加载扩展词典数据到主内存词典中
String theWord = resultSet.getString("word");
logger.info(theWord);
_StopWords.fillSegment(theWord.trim().toLowerCase().toCharArray());
}
// 加载时间
Thread.sleep(Integer.valueOf(String.valueOf(props.get("jdbc.reload.interval"))));
}catch (Exception e){
logger.error("[Stop Dict Loading] "+ e);
}finally {
if(resultSet != null){
try {
statement.close();
} catch (SQLException e) {
logger.error("[Stop Dict Loading] " + e);
}
}
if(connection != null){
try {
connection.close();
} catch (SQLException e) {
logger.error("[Stop Dict Loading] " + e);
}
}
}
}
8、在loadMainDict()中添加自定义的加载拓展词的方法
9、在loadStopWordDict方法中添加自定义的加载停止词的方法
这里如何判断将新写的方法添加到哪里呢?
第一种方式:
(1)新增的加载拓展词的方法,只要查看原有的加载拓展词的方法loadRemoteExtDict()在哪些地方被引用了,同步的添加一下我们自己写的拓展词方法
(2)新增的加载停止词的方法,同理查看停止词方法loadStopWordDict()在哪儿被引用了,同步添加一份即可
第二种方式:
直接将方法添加到原有的加载拓展词、停止词的方法中
10、因为需要加载数据库,因此需要加载数据库驱动器,在Dictionary中添加:
static {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
logger.error("error", e);
}
}
11、使用maven将项目打包:
注意:这里因为pom中使用的是elasticsearch4.0,因此打出来的包的版本也是7.4.0.将其改为7.13.0.因为我现在用的es是7.13版本的。
12、将打包好的ik分词器添加到es/plusgins目录下,将原来的ik分词器删除,将新的重新解压并重命名为ik
13、将mysql驱动器jar包添加到ik分词器目录下
scp mysql-connector-java-8.0.22.jar root@172.16.188.8:/var/local/elasticsearch/plugins/ik
14、重启es
./bin/elasticsearch
日志打印出加载的分词和停用词
15、测试
GET _analyze
{
"analyzer": "ik_smart",
"text": "伍55突然想养一只猫了"
}
结果显示分词正常,停用词也被过滤了
3.常见报错
3.1 java.lang.ExceptionInInitializerError: null …access denied (“java.lang.RuntimePermission” “setContextClassLoader”)
报错详情:
java.lang.ExceptionInInitializerError: null
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_271]
at java.lang.Class.forName(Class.java:264) ~[?:1.8.0_271]
at com.mysql.cj.jdbc.NonRegisteringDriver.<clinit>(NonRegisteringDriver.java:99) ~[?:?]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_271]
at java.lang.Class.forName(Class.java:264) ~[?:1.8.0_271]
at org.wltea.analyzer.dic.Dictionary.<clinit>(Dictionary.java:103) ~[?:?]
at org.wltea.analyzer.cfg.Configuration.<init>(Configuration.java:40) ~[?:?]
at org.elasticsearch.index.analysis.IkTokenizerFactory.<init>(IkTokenizerFactory.java:15) ~[?:?]
at org.elasticsearch.index.analysis.IkTokenizerFactory.getIkSmartTokenizerFactory(IkTokenizerFactory.java:23) ~[?:?]
at org.elasticsearch.index.analysis.AnalysisRegistry.buildMapping(AnalysisRegistry.java:433) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.index.analysis.AnalysisRegistry.buildTokenizerFactories(AnalysisRegistry.java:275) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.index.analysis.AnalysisRegistry.build(AnalysisRegistry.java:203) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.index.IndexModule.newIndexService(IndexModule.java:431) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.indices.IndicesService.createIndexService(IndicesService.java:663) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.indices.IndicesService.createIndex(IndicesService.java:566) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.indices.IndicesService.createIndex(IndicesService.java:170) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.indices.cluster.IndicesClusterStateService.createIndices(IndicesClusterStateService.java:468) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:228) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:499) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:489) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:460) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:407) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.cluster.service.ClusterApplierService.access$000(ClusterApplierService.java:57) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:151) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:673) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:241) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:204) ~[elasticsearch-7.13.0.jar:7.13.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_271]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_271]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_271]
Caused by: java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "setContextClassLoader")
at java.security.AccessControlContext.checkPermission(AccessControlContext.java:472) ~[?:1.8.0_271]
at java.security.AccessController.checkPermission(AccessController.java:886) ~[?:1.8.0_271]
at java.lang.SecurityManager.checkPermission(SecurityManager.java:549) ~[?:1.8.0_271]
at java.lang.Thread.setContextClassLoader(Thread.java:1474) ~[?:1.8.0_271]
at com.mysql.cj.jdbc.AbandonedConnectionCleanupThread.lambda$static$0(AbandonedConnectionCleanupThread.java:72) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(ThreadPoolExecutor.java:619) ~[?:1.8.0_271]
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932) ~[?:1.8.0_271]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367) ~[?:1.8.0_271]
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) ~[?:1.8.0_271]
at com.mysql.cj.jdbc.AbandonedConnectionCleanupThread.<clinit>(AbandonedConnectionCleanupThread.java:75) ~[?:?]
... 30 more
fatal error in thread [elasticsearch[node-4][clusterApplierService#updateTask][T#1]], exiting
java.lang.ExceptionInInitializerError
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at com.mysql.cj.jdbc.NonRegisteringDriver.<clinit>(NonRegisteringDriver.java:99)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.wltea.analyzer.dic.Dictionary.<clinit>(Dictionary.java:103)
at org.wltea.analyzer.cfg.Configuration.<init>(Configuration.java:40)
at org.elasticsearch.index.analysis.IkTokenizerFactory.<init>(IkTokenizerFactory.java:15)
at org.elasticsearch.index.analysis.IkTokenizerFactory.getIkSmartTokenizerFactory(IkTokenizerFactory.java:23)
at org.elasticsearch.index.analysis.AnalysisRegistry.buildMapping(AnalysisRegistry.java:433)
at org.elasticsearch.index.analysis.AnalysisRegistry.buildTokenizerFactories(AnalysisRegistry.java:275)
at org.elasticsearch.index.analysis.AnalysisRegistry.build(AnalysisRegistry.java:203)
at org.elasticsearch.index.IndexModule.newIndexService(IndexModule.java:431)
at org.elasticsearch.indices.IndicesService.createIndexService(IndicesService.java:663)
at org.elasticsearch.indices.IndicesService.createIndex(IndicesService.java:566)
at org.elasticsearch.indices.IndicesService.createIndex(IndicesService.java:170)
at org.elasticsearch.indices.cluster.IndicesClusterStateService.createIndices(IndicesClusterStateService.java:468)
at org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:228)
at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:499)
at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:489)
at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:460)
at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:407)
at org.elasticsearch.cluster.service.ClusterApplierService.access$000(ClusterApplierService.java:57)
at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:151)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:673)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:241)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:204)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "setContextClassLoader")
at java.security.AccessControlContext.checkPermission(AccessControlContext.java:472)
at java.security.AccessController.checkPermission(AccessController.java:886)
at java.lang.SecurityManager.checkPermission(SecurityManager.java:549)
at java.lang.Thread.setContextClassLoader(Thread.java:1474)
at com.mysql.cj.jdbc.AbandonedConnectionCleanupThread.lambda$static$0(AbandonedConnectionCleanupThread.java:72)
at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(ThreadPoolExecutor.java:619)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367)
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
at com.mysql.cj.jdbc.AbandonedConnectionCleanupThread.<clinit>(AbandonedConnectionCleanupThread.java:75)
... 30 more
解决:
这是因为jdk权限不够
jdk14在/var/local/jdk-14.0.2/lib/security/default.policy中的grant{}里添加
jdk8是在/var/local/jdk1.8.0_271/jre/lib/security/java.policy中的grant{}里添加
因为ES7.12后官方推荐使用jdk11+,而ES兼容的jdk版本又只有jdk8,11,14,所以我这里以jdk14为例
permission java.lang.RuntimePermission "setContextClassLoader";
再次重启ES,出现报错
[Extend Dict Loading] java.sql.SQLNonTransientConnectionException: Could not create connection to database server.
3.2 java.sql.SQLNonTransientConnectionException: Could not create connection to database server.
解决:
1、检查是否将mysql-connector-java-8.0.22.jar错误的放在plugins目录下了,应该放在plugins/ik路径下
2、检查ik源码pom中的驱动器版本和ik目录中的驱动器版本是否一致
3、在jdk/lib/security/default.policy中添加权限
jdk14在/var/local/jdk-14.0.2/lib/security/default.policy中的grant{}里添加
jdk8是在/var/local/jdk1.8.0_271/jre/lib/security/java.policy中的grant{}里添加
我这里使用的是es自带的jdk
vim jdk/lib/security/default.policy
添加内容,目的是给该ip和端口开通socket网络链接权限
// 192.168.244.1:3306是拓展词数据库ip和端口
permission java.net.SocketPermission "192.168.244.1:3306","connect,resolve";
参考博客
【1】https://artisan.blog.csdn.net/article/details/99350933(文中代码主要参考该文)