1 resthighlevelclient介绍
默认情况下,elasticsearch使用两个端口来监听外部tcp流量。
- 9200端口:用于所有通过http协议进行的api调用。包括搜索、聚合、监控、以及其他任何使用http协议的请求。所有的客户端库都会使用该端口与elasticsearch进行交互。
- 9300端口:是一个自定义的二进制协议,用于集群中各节点之间的通信。用于诸如集群变更、主节点选举、节点加入/离开、分片分配等事项。
resthighlevelclient是es的java客户端,它是通过http与es集群进行通信。
2 引入es依赖
<!--引入es-high-level-client相关依赖 start-->
<dependency>
<groupid>org.elasticsearch</groupid>
<artifactid>elasticsearch</artifactid>
<version>7.10.0</version>
</dependency>
<dependency>
<groupid>org.elasticsearch.client</groupid>
<artifactid>elasticsearch-rest-client</artifactid>
<version>7.10.0</version>
</dependency>
<dependency>
<groupid>org.elasticsearch.client</groupid>
<artifactid>elasticsearch-rest-high-level-client</artifactid>
<version>7.10.0</version>
</dependency>
<!--引入es-high-level-client相关依赖 end-->
3 使用
3.1 es的配置
# es配置
# es用户名
elasticsearch.username=elastic
# es密码
elasticsearch.password=elastic
# es host ip 地址(集群),多个以","间隔
elasticsearch.hosts=127.0.0.1:9200
# es 请求方式
elasticsearch.scheme=http
# es 连接超时时间(ms)
elasticsearch.connecttimeout=1000
# es socket 连接超时时间(ms)
elasticsearch.sockettimeout=30000
# es 请求超时时间(ms)
elasticsearch.connectionrequesttimeout=500
# es 最大连接数
elasticsearch.maxconnectnum=100
# es 每个路由的最大连接数
elasticsearch.maxconnectnumperroute=100
3.2 es客户端配置类
/**
* resthighlevelclient 客户端配置类
*
*/
@slf4j
@data
@configuration
@configurationproperties(prefix = "elasticsearch")
public class elasticsearchconfig {
/**
* es host ip 地址(集群)
*/
private string hosts;
/**
* es用户名
*/
private string username;
/**
* es密码
*/
private string password;
/**
* es 请求方式
*/
private string scheme;
/**
* es 连接超时时间
*/
private int connecttimeout;
/**
* es socket 连接超时时间
*/
private int sockettimeout;
/**
* es 请求超时时间
*/
private int connectionrequesttimeout;
/**
* es 最大连接数
*/
private int maxconnectnum;
/**
* es 每个路由的最大连接数
*/
private int maxconnectnumperroute;
/**
* 如果@bean没有指定bean的名称,那么方法名就是bean的名称
*/
@bean(name = "resthighlevelclient")
public resthighlevelclient resthighlevelclient() {
// 构建连接对象
restclientbuilder builder = restclient.builder(geteshost());
// 连接延时配置
builder.setrequestconfigcallback(requestconfigbuilder -> {
requestconfigbuilder.setconnecttimeout(connecttimeout);
requestconfigbuilder.setsockettimeout(sockettimeout);
requestconfigbuilder.setconnectionrequesttimeout(connectionrequesttimeout);
return requestconfigbuilder;
});
// 连接数配置
builder.sethttpclientconfigcallback(httpclientbuilder -> {
httpclientbuilder.setmaxconntotal(maxconnectnum);
httpclientbuilder.setmaxconnperroute(maxconnectnumperroute);
httpclientbuilder.setdefaultcredentialsprovider(getcredentialsprovider());
return httpclientbuilder;
});
return new resthighlevelclient(builder);
}
private httphost[] geteshost() {
// 拆分地址(es为多节点时,不同host以逗号间隔)
list<httphost> hostlists = new arraylist<>();
string[] hostlist = hosts.split(",");
for (string addr : hostlist) {
string host = addr.split(":")[0];
string port = addr.split(":")[1];
hostlists.add(new httphost(host, integer.parseint(port), scheme));
}
// 转换成 httphost 数组
return hostlists.toarray(new httphost[]{});
}
private credentialsprovider getcredentialsprovider() {
// 设置用户名、密码
credentialsprovider credentialsprovider = new basiccredentialsprovider();
credentialsprovider.setcredentials(authscope.any, new usernamepasswordcredentials(username, password));
return credentialsprovider;
}
}
3.3 es的使用
3.3.1 创建es索引
3.3.1.1 创建es索引的工具类
创建es索引的工具类如下所示。
/**
* 操作es索引
*
*/
@slf4j
@service
public class esindexoperation {
@resource
private resthighlevelclient resthighlevelclient;
private final requestoptions options = requestoptions.default;
/**
* 判断索引是否存在
*/
public boolean checkindex(string index) {
try {
return resthighlevelclient.indices().exists(new getindexrequest(index), options);
} catch (exception e) {
log.error("esindexoperation checkindex error.", e);
}
return boolean.false;
}
/**
* 创建索引
*
* @param indexname es索引名
* @param essettingfilepath es索引的alias、settings和mapping的配置文件
*/
public boolean createindex(string indexname, string essettingfilepath) {
string aliases = null;
string mappings = null;
string settings = null;
if (stringutils.isnotblank(essettingfilepath)) {
try {
string filecontent = fileutils.readfilecontent(essettingfilepath);
if (stringutils.isnotblank(filecontent)) {
jsonobject jsonobject = json.parseobject(filecontent);
aliases = jsonobject.getstring("aliases");
mappings = jsonobject.getstring("mappings");
settings = jsonobject.getstring("settings");
}
} catch (exception e) {
log.error("createindex error.", e);
return false;
}
}
if (checkindex(indexname)) {
log.error("createindex indexname:[{}]已存在", indexname);
return false;
}
createindexrequest request = new createindexrequest(indexname);
if ((stringutils.isnotblank(aliases))) {
request.aliases(aliases, xcontenttype.json);
}
if (stringutils.isnotblank(mappings)) {
request.mapping(mappings, xcontenttype.json);
}
if (stringutils.isnotblank(settings)) {
request.settings(settings, xcontenttype.json);
}
try {
this.resthighlevelclient.indices().create(request, options);
return true;
} catch (ioexception e) {
log.error("esindexoperation createindex error.", e);
return false;
}
}
/**
* 删除索引
*/
public boolean deleteindex(string indexname) {
try {
if (checkindex(indexname)) {
deleteindexrequest request = new deleteindexrequest(indexname);
acknowledgedresponse response = resthighlevelclient.indices().delete(request, options);
return response.isacknowledged();
}
} catch (exception e) {
log.error("esindexoperation deleteindex error.", e);
}
return boolean.false;
}
}
3.3.1.2 读取文件的工具类
/**
* 文件操作类
*/
@slf4j
public class fileutils {
/**
* 读取项目resources文件夹下的文件
*
* @param filepath 文件路径
* @return 文件内容
*/
public static string readfilecontent(string filepath) {
try {
bufferedreader reader = new bufferedreader(new filereader(filepath));
string line;
stringbuilder stringbuilder = new stringbuilder();
while ((line = reader.readline()) != null) {
stringbuilder.append(line);
}
reader.close();
return stringbuilder.tostring();
} catch (ioexception e) {
log.error("readfilecontent error.", e);
}
return null;
}
public static void main(string[] args) {
string filepath = "src/main/resources/es/mappings_test20231216.txt";
string filecontent = readfilecontent(filepath);
}
}
3.3.1.3 测试创建es索引
(1)在“resources”文件夹下创建es索引的配置文件
配置文件内容如下所示。
{
"aliases": {
"test": {}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"address": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"settings": {
"index": {
"number_of_shards": "1",
"number_of_replicas": "1"
}
}
}
(2)读取es索引的配置文件,创建es索引
@test
public void createindex() {
string indexname = "test_1216";
string filepath = "src/main/resources/es/mappings_test20231216.txt";
boolean b = esindexoperation.createindex(indexname, filepath);
assert.asserttrue(b);
}
(3)查看创建结果
通过命令 get /test 查看es索引创建结果,结果如下所示。
{
"test_1216" : {
"aliases" : {
"test" : { }
},
"mappings" : {
"properties" : {
"address" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
},
"settings" : {
"index" : {
"routing" : {
"allocation" : {
"include" : {
"_tier_preference" : "data_content"
}
}
},
"number_of_shards" : "1",
"provided_name" : "test_1216",
"creation_date" : "1702723364945",
"number_of_replicas" : "1",
"uuid" : "rcahqjpzsg-n4fse3cot4a",
"version" : {
"created" : "7100099"
}
}
}
}
}
3.3.2 查询操作
3.3.2.1 常用查询
/**
* 查询操作
*
*/
@slf4j
@service
public class esqueryoperation {
@resource
private resthighlevelclient client;
private final requestoptions options = requestoptions.default;
/**
* 查询总数
*/
public long count(string indexname) {
countrequest countrequest = new countrequest(indexname);
try {
countresponse countresponse = client.count(countrequest, options);
return countresponse.getcount();
} catch (exception e) {
log.error("esqueryoperation count error.", e);
}
return 0l;
}
/**
* 查询数据集
*/
public list<map<string, object>> list(string indexname, searchsourcebuilder sourcebuilder) {
searchrequest searchrequest = new searchrequest(indexname);
searchrequest.source(sourcebuilder);
try {
searchresponse searchresp = client.search(searchrequest, options);
list<map<string, object>> data = new arraylist<>();
searchhit[] searchhitarr = searchresp.gethits().gethits();
for (searchhit searchhit : searchhitarr) {
map<string, object> temp = searchhit.getsourceasmap();
temp.put("id", searchhit.getid());
data.add(temp);
}
return data;
} catch (exception e) {
log.error("esqueryoperation list error.", e);
}
return null;
}
}
3.3.2.2 测试
@test
public void list() {
string indexname = "test";
// 查询条件
searchsourcebuilder sourcebuilder = new searchsourcebuilder();
boolquerybuilder querybuilder = querybuilders.boolquery();
querybuilder.must(querybuilders.termquery("address", "hunan"));
querybuilder.mustnot(querybuilders.matchquery("name", "jack"));
sourcebuilder.query(querybuilder);
// 分页查询
sourcebuilder.from(0);
sourcebuilder.size(1);
list<map<string, object>> list = esqueryoperation.list(indexname, sourcebuilder);
assert.asserttrue(true);
}
3.3.3 增删改操作
3.3.3.1 常用增删改操作
/**
* 增删改数据
*
*/
@slf4j
@service
public class esdataoperation {
@resource
private resthighlevelclient client;
private final requestoptions options = requestoptions.default;
/**
* 写入数据
*/
public boolean insert(string indexname, map<string, object> datamap) {
try {
bulkrequest request = new bulkrequest();
request.add(new indexrequest(indexname).optype("create")
.id(datamap.get("id").tostring())
.source(datamap, xcontenttype.json));
this.client.bulk(request, options);
return boolean.true;
} catch (exception e) {
log.error("esdataoperation insert error.", e);
}
return boolean.false;
}
/**
* 批量写入数据
*/
public boolean batchinsert(string indexname, list<map<string, object>> userindexlist) {
try {
bulkrequest request = new bulkrequest();
for (map<string, object> datamap : userindexlist) {
request.add(new indexrequest(indexname).optype("create")
.id(datamap.get("id").tostring())
.source(datamap, xcontenttype.json));
}
this.client.bulk(request, options);
return boolean.true;
} catch (exception e) {
log.error("esdataoperation batchinsert error.", e);
}
return boolean.false;
}
/**
* 根据id更新数据,可以直接修改索引结构
*
* @param refreshpolicy 数据刷新策略
*/
public boolean update(string indexname, map<string, object> datamap, writerequest.refreshpolicy refreshpolicy) {
try {
updaterequest updaterequest = new updaterequest(indexname, datamap.get("id").tostring());
updaterequest.setrefreshpolicy(refreshpolicy);
updaterequest.doc(datamap);
this.client.update(updaterequest, options);
return boolean.true;
} catch (exception e) {
log.error("esdataoperation update error.", e);
}
return boolean.false;
}
/**
* 删除数据
*/
public boolean delete(string indexname, string id) {
try {
deleterequest deleterequest = new deleterequest(indexname, id);
this.client.delete(deleterequest, options);
return boolean.true;
} catch (exception e) {
log.error("esdataoperation delete error.", e);
}
return boolean.false;
}
}
3.3.3.2 测试
@test
public void insert(){
string indexname = "test";
hashmap<string, object> hashmap = new hashmap<>();
hashmap.put("id",4);
hashmap.put("name","tom");
hashmap.put("address","jiangsu");
boolean flag = esdataoperation.insert(indexname, hashmap);
assert.asserttrue(true);
}
@test
public void update(){
string indexname = "test";
hashmap<string, object> hashmap = new hashmap<>();
hashmap.put("id", 5);
hashmap.put("name", "jack7");
boolean update = esdataoperation.update(indexname, hashmap, writerequest.refreshpolicy.wait_until);
assert.asserttrue(true);
}
4 参考文献
(1)elasticsearch学习(七):es客户端resthighlevelclient_炎升的博客
(2)中间件:elasticsearch组件resthighlevelclient用法详解
(3)
发表评论