es配置ik分词器及热更新

环境

  • JDK 8
  • CentOS 7
  • Elasticsearch 7.9.3单节点

配置ik分词器

1.下载

[下载地址]https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v7.9.3

注意下载的版本必须与es版本一致。

2.添加

将上面的zip文件添加至es/plugins/ik/文件夹下。
unzip xxx来解压给文件到当前文件夹即可。

3.测试

完成上面后,重启es服务即可。
运行下面的语句,查看能否使用分词器,分解中文。

POST _analyze
{
"analyzer": "ik_smart",
"text": "你好世界"
}

ik分词器配置热更新中文词

1.下载

[下载地址]https://github.com/medcl/elasticsearch-analysis-ik/releases
注意下载的版本必须与es版本一致。

2.修改源码

2.1 修改版本

文件pom.xml中:

<elasticsearch.version>7.9.3</elasticsearch.version>

虽然我们是根据版本来下载的源码,但是这里的版本可能与我们想要的版本不同,因此打包的时候名字不对。

2.2 添加mysql包

也是pom.xml文件中添加,版本自定即可:

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>

2.3 增加jdbc文件

增加文件jdbc-reload.properties添加至config文件夹中。

jdbc.url=jdbc:mysql://localhost:3306/es_work?serverTimezone=GMT
jdbc.user=root
jdbc.password=123456
jdbc.reload.sql=select word from hot_words
jdbc.reload.stopword.sql=select stopword as word from hot_stopwords
jdbc.reload.interval=1000

这里有我们需要的两个表:hot_wordshot_stopwords,分别是热词和停用词的数据表。

2.4 代码部分

代码主要在org.wltea.analyzer.dic.Dictionary类中处理。

2.4.1 线程

添加下面代码:

public class HotDictReloadThread implements Runnable {
private static final Logger LOGGER = ESPluginLoggerFactory.getLogger(HotDictReloadThread.class.getName());

@Override
public void run() {
while (true){
LOGGER.info("reload hot dict from mysql");
Dictionary.getSingleton().reLoadMainDict();
}
}
}


线程主要完成了死循环中的加载主数据的过程。
该线程添加Dictionary至下面中:
public static synchronized void initial(Configuration cfg) {
if (singleton == null) {
synchronized (Dictionary.class) {
if (singleton == null) {

singleton = new Dictionary(cfg);
singleton.loadMainDict();
singleton.loadSurnameDict();
singleton.loadQuantifierDict();
singleton.loadSuffixDict();
singleton.loadPrepDict();
singleton.loadStopWordDict();

//!!!! mysql监控线程
new Thread(new HotDictReloadThread()).start();

if(cfg.isEnableRemoteDict()){
// 建立监控线程
for (String location : singleton.getRemoteExtDictionarys()) {
// 10 秒是初始延迟可以修改的 60是间隔时间 单位秒
pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
}
for (String location : singleton.getRemoteExtStopWordDictionarys()) {
pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
}
}

}
}
}
}

完成后进入Dictionary下面的代码:
void reLoadMainDict() {
logger.info("start to reload ik dict.");
// 新开一个实例加载词典,减少加载过程对当前词典使用的影响
Dictionary tmpDict = new Dictionary(configuration);
tmpDict.configuration = getSingleton().configuration;
tmpDict.loadMainDict();
tmpDict.loadStopWordDict();
_MainDict = tmpDict._MainDict;
_StopWords = tmpDict._StopWords;
logger.info("reload ik dict finished.");
}

需要注意这两行,分别完成了热词加载和停用词加载的工作:
tmpDict.loadMainDict();
tmpDict.loadStopWordDict();

2.4.2

添加下面代码:

private static Properties prop = new Properties();

static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e){
logger.error("error",e);
}
}

2.4.3 热词部分

添加下面代码:

private void loadMysqlExtDict() {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;

try {
Path file = PathUtils.get(getDictRoot(), "/jdbc-reload.properties");
prop.load(new FileInputStream(file.toFile()));

logger.info("/jdbc-reload.properties");
for (Object key : prop.keySet()) {
logger.info(key + "=" + prop.getProperty(String.valueOf(key)));
}

logger.info("query hot dict from mysql," + prop.getProperty("jdbc.reload.sql"));

conn = DriverManager.getConnection(
prop.getProperty("jdbc.url"),
prop.getProperty("jdbc.user"),
prop.getProperty("jdbc.password")
);
stmt = conn.createStatement();
rs = stmt.executeQuery(prop.getProperty("jdbc.reload.sql"));

while (rs.next()){
String word = rs.getString("word");
logger.info("hot word from mysql:" + word);
_MainDict.fillSegment(word.trim().toCharArray());
}
Thread.sleep(Integer.valueOf(String.valueOf(prop.get("jdbc.reload.interval"))));
} catch (Exception e){
logger.error("error",e);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException e){
logger.error("error",e);
}
}

if (stmt != null) {
try {
stmt.close();
} catch (SQLException e){
logger.error("error",e);
}
}

if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
logger.error("error",e);
}
}
}
}

上面的代码主要完成了对数据中的热词加载。

将上面的代码添加至下面函数中:

private void loadMainDict() {
// 建立一个主词典实例
_MainDict = new DictSegment((char) 0);

// 读取主词典文件
Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_MAIN);
loadDictFile(_MainDict, file, false, "Main Dict");
// 加载扩展词典
this.loadExtDict();
// 加载远程自定义词库
this.loadRemoteExtDict();
// !!!! 加载mysql的主词典
this.loadMysqlExtDict();
}

2.4.4 停用词部分

添加下面代码:

private void loadMySQLStopwordDict() {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;

try {
Path file = PathUtils.get(getDictRoot(), "/jdbc-reload.properties");
prop.load(new FileInputStream(file.toFile()));

logger.info("[==========]jdbc-reload.properties");
for(Object key : prop.keySet()) {
logger.info("[==========]" + key + "=" + prop.getProperty(String.valueOf(key)));
}

logger.info("[==========]query hot stopword dict from mysql, " + prop.getProperty("jdbc.reload.stopword.sql") + "......");

conn = DriverManager.getConnection(
prop.getProperty("jdbc.url"),
prop.getProperty("jdbc.user"),
prop.getProperty("jdbc.password"));
stmt = conn.createStatement();
rs = stmt.executeQuery(prop.getProperty("jdbc.reload.stopword.sql"));

while(rs.next()) {
String theWord = rs.getString("word");
logger.info("[==========]hot stopword from mysql: " + theWord);
_StopWords.fillSegment(theWord.trim().toCharArray());
}

Thread.sleep(Integer.valueOf(String.valueOf(prop.get("jdbc.reload.interval"))));
} catch (Exception e) {
logger.error("erorr", e);
} finally {
if(rs != null) {
try {
rs.close();
} catch (SQLException e) {
logger.error("error", e);
}
}
if(stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
logger.error("error", e);
}
}
if(conn != null) {
try {
conn.close();
} catch (SQLException e) {
logger.error("error", e);
}
}
}
}

上面的代码主要完成了对数据中的热词加载。

将上面的代码添加至下面函数中:

private void loadStopWordDict() {
// 建立主词典实例
_StopWords = new DictSegment((char) 0);

...

// !!!!从mysql中加载停用词
this.loadMySQLStopwordDict();
}

3.部署

3.1 打包

package后,将target下的release中的zip文件复制到我们es中的plugins/ik中。

3.2 准备包

因为引入了mysql包,因此需要到maven中把他拿出来。

3.3 包替换

  1. 进入es服务中,将打包的zip复制到plugins/ik中,解压。注意,这里在解压前一定要做好备份,因为config中,可能会有重要的词语。当然也可以直接将jar包覆盖,来减少侵入性。
  2. jdbc-reload.properties文件复制到ik/config文件夹中。
  3. 将mysql包,移动到es/lib中。

3.4 运行

重启es、kibana并测试即可。

4.问题

在热更新分词器的时候,出现了下面的问题

4.1 SSL报错

需要配置证书,记录如下:

4.1.1 在ES跟目录生成CA证书,需要输入名称和密码,可以直接回车不输入

bin/elasticsearch-certutil ca

4.1.2 使用第一步生成的证书,产生p12密钥,需要输入密码的时候可以直接回车不输入

bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12

此时在es中新建了两个文件。

4.1.3 在config目录下,新建文件夹certs

mkdir config/certs

4.1.4 将elastic-certificates.p12文件,复制到config/certs文件夹下

cp ./elastic-certificates.p12 ./config/certs

4.1.5 修改配置文件config/elasticsearch.yml

添加如下代码

xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.license.self_generated.type: basic
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: certs/elastic-certificates.p12

4.2 RuntimePermission

mysql远程实现异常处理。

4.2.1 方案一

修改jdk 安全策略,即修改 java.policy文件。但是部分版本不支持,会修改无效。
进入jdk安装目录(mac 环境)

/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/security

如果不知道jdk安装在哪的,使用下面命令
/usr/libexec/java_home -V

找到java.policy文件,在最下面增加

permission java.net.SocketPermission "*:*","connect,resolve";
permission java.lang.RuntimePermission "getClassLoader";
permission java.lang.RuntimePermission "setContextClassLoader";

4.2.2 方案二

es/config文件夹下新建policy.policy文件,添加如下内容:

grant {
permission java.net.SocketPermission "*:*","connect,resolve";
permission java.lang.RuntimePermission "getClassLoader";
permission java.lang.RuntimePermission "setContextClassLoader";
};

然后修改jvm.option文件增加:
-Djava.security.policy=你的es安装目录/config/policy.policy