1.导入websocket依赖
<!--websocket依赖包-->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-websocket</artifactid>
</dependency>
2.编写websocket类
package com.skyable.device.config.websocket;
import lombok.extern.slf4j.slf4j;
import org.springframework.stereotype.component;
import javax.websocket.onclose;
import javax.websocket.onerror;
import javax.websocket.onopen;
import javax.websocket.session;
import javax.websocket.server.serverendpoint;
import java.io.ioexception;
import java.util.hashset;
import java.util.set;
/**
* @author administrator
*/
@serverendpoint("/vehicle/{domainid}")
@component
@slf4j
public class websocketserver {
/**
* concurrent包的线程安全set,用来存放每个客户端对应的mywebsocket对象。
*/
private static final set<session> sessions = new hashset<>();
/**
* 连接关闭调用的方法
*/
@onclose
public void onclose() {
log.info("websocket link close");
}
/**
* @param error
*/
@onerror
public void onerror(throwable error) {
error.printstacktrace();
}
/**
* 接收数据
*
* @param data
*/
public static void senddatatoclients(string data) {
for (session session : sessions) {
try {
session.getbasicremote().sendtext(data);
} catch (ioexception e) {
e.printstacktrace();
}
}
}
@onopen
public void onopen(session session) {
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
/**
* 接收domainid
*/
sessions.add(session);
senddatatoclients();
}
public void senddatatoclients() {
for (session session : sessions) {
try {
session.getbasicremote().sendtext("websocket link succeed");
} catch (ioexception e) {
e.printstacktrace();
}
}
}
}
package com.skyable.device.config.websocket;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.web.socket.config.annotation.enablewebsocket;
import org.springframework.web.socket.server.standard.serverendpointexporter;
/**
* @author administrator
*/
@enablewebsocket
@configuration
public class websocketconfig {
@bean
public serverendpointexporter serverendpointexporter() {
return new serverendpointexporter();
}
}
3.kafak消费数据后调用websocket方法
/**
* 获取kafka数据
*
* @param
*/
@override
public void savebatch(string jsonvalue) {
objectmapper objectmapper = new objectmapper();
try {
//位置
jsonnode jsonnode = objectmapper.readtree(jsonvalue);
if (jsonnode.has(vehicleconstant.location)) {
realtimeposition realtimeposition = new realtimeposition();
jsonnode locationnode = jsonnode.get("location");
string vehicleid = locationnode.get("vehicleid").astext();
double longitude = double.parsedouble(locationnode.get("longitude").astext());
double latitude = double.parsedouble(locationnode.get("latitude").astext());
long timestamp = locationnode.get("timestamp").aslong();
realtimeposition.settimestamp(timestamp);
realtimeposition.setlatitude(string.valueof(latitude));
realtimeposition.setlongitude(string.valueof(longitude));
realtimeposition.setvehicleid(vehicleid);
vehiclelocationvo locationvo = devicemapmapper.selectlonlat(vehicleid);
if (!objects.isnull(locationvo)) {
//计算距离
redisutil.addlocation(vehicleid, double.parsedouble(locationvo.getlongitude()), double.parsedouble(locationvo.getlatitude()), "l1");
redisutil.addlocation(vehicleid, longitude, latitude, "l2");
double result = redisutil.calculatedistance(vehicleid, "l1", "l2");
double meters = redisutil.convertmilestokilometers(result);
decimalformat decimalformat = new decimalformat("#.###");
string distance = decimalformat.format(meters);
realtimeposition.setdistance(double.parsedouble(distance));
} else {
realtimeposition.setdistance(0);
}
//获取省份
map<string, object> position = addressutil.getposition(longitude, latitude, null, null, null);
map data = (map) position.get("data");
string provincename = data.get("shortname").tostring().replaceall("\"", "");
realtimeposition.setarea(provincename);
devicemapmapper.insertrealtimeposition(realtimeposition);
redisutil.addzsetvalue(vehicleconstant.vehicle_location, string.valueof(vehicleid), timestamp);
}
} catch (jsonprocessingexception e) {
e.printstacktrace();
}
try {
//报警
jsonnode jsonnode = objectmapper.readtree(jsonvalue);
if (jsonnode.has(vehicleconstant.alert)) {
jsonnode alertnode = jsonnode.get("alert");
string vehicleid = alertnode.get("vehicleid").astext();
integer alerttype = alertnode.get("alerttype").asint();
long timestamp = alertnode.get("timestamp").aslong();
alerts alerts = new alerts();
alerts.setalerttype(alerttype);
alerts.settimestamp(timestamp);
alerts.setvehicleid(vehicleid);
devicemapmapper.insertalerts(alerts);
redisutil.addzsetvalue(vehicleconstant.vehicle_alert, string.valueof(vehicleid), timestamp);
}
} catch (jsonprocessingexception e) {
e.printstacktrace();
}
//websocket发送消息
vehicleallvo vehicles = vehicles();
websocketserver.senddatatoclients(vehicles.tostring());
}
4.发送消息内容
vehicleallvo vehicles = vehicles();
该方法就是发送的具体内容
5.kafak消费者
package com.skyable.device.listener.vehicle;
import com.alibaba.fastjson.json;
import com.skyable.common.config.cloudapplicationcontext;
import com.skyable.common.constants.kafka.kafkatopicconstants;
import com.skyable.device.config.websocket.websocketserver;
import com.skyable.device.entity.vehicle.vehicle;
import com.skyable.device.service.devicemapservice;
import lombok.requiredargsconstructor;
import lombok.extern.slf4j.slf4j;
import org.apache.kafka.clients.consumer.consumerrecord;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.stereotype.component;
import org.springframework.stereotype.service;
import java.util.list;
import java.util.stream.collectors;
/**
* description:
*
* @author yangjun
* @date: 2023-08-18-14:12
*/
@service
@component
@slf4j
@requiredargsconstructor(onconstructor = @__(@autowired))
public class vehicledatakafkalistener {
private final devicemapservice devicemapservice;
@kafkalistener(topics = kafkatopicconstants.topic_vehicle_record, groupid = "rx_1_thing", containerfactory = "batchfactory")
public void dealdevicedatatoscript(list<consumerrecord<string, string>> recordlist) {
recordlist.parallelstream()
.map(consumerrecord::value)
.foreach(this::savevehicledata);
}
private void savevehicledata(string jsonvalue) {
log.info("kafka data:" + jsonvalue);
devicemapservice.savebatch(jsonvalue);
}
}
package com.skyable.device.listener.vehicle;
import org.apache.kafka.clients.consumer.consumerconfig;
import org.apache.kafka.clients.producer.producerconfig;
import org.apache.kafka.common.serialization.stringdeserializer;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory;
import org.springframework.kafka.config.kafkalistenercontainerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;
import java.util.hashmap;
import java.util.map;
/**
* @classname kafkaconsumerconfig
* @description kafka消费者配置
* @author gaoy
* @date 2021/2/25 15:02
*/
@configuration
public class kafkaconfig {
@value("${spring.kafka.bootstrap-servers}")
private string servers;
@value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableautocommit;
@value("${spring.kafka.consumer.group-id}")
private string groupid;
@value("${spring.kafka.consumer.auto-offset-reset}")
private string autooffsetreset;
@value("${spring.kafka.consumer.concurrency}")
private int concurrency;
@value("${spring.kafka.consumer.max-poll-records}")
private int maxpollrecords;
/**
* 批量消费工厂bean
* @return
*/
@bean
kafkalistenercontainerfactory batchfactory() {
concurrentkafkalistenercontainerfactory factory = new
concurrentkafkalistenercontainerfactory<>();
factory.setconsumerfactory(new defaultkafkaconsumerfactory<>(consumerconfigs()));
// 开启批量监听
factory.setbatchlistener(true);
factory.setconcurrency(concurrency);
// 设置手动提交ackmode
// factory.getcontainerproperties().setackmode(containerproperties.ackmode.manual_immediate);
return factory;
}
@bean
public map consumerconfigs() {
map<string,object> props = new hashmap<>();
props.put(consumerconfig.group_id_config, groupid);
props.put(consumerconfig.auto_offset_reset_config, autooffsetreset);
props.put(consumerconfig.bootstrap_servers_config, servers);
props.put(consumerconfig.enable_auto_commit_config, enableautocommit);
//设置每次接收message的数量
props.put(consumerconfig.max_poll_records_config, maxpollrecords);
props.put(consumerconfig.auto_commit_interval_ms_config, "100");
props.put(consumerconfig.session_timeout_ms_config, 120000);
props.put(consumerconfig.request_timeout_ms_config, 180000);
props.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);
props.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class);
//开启幂等性。
props.put(producerconfig.enable_idempotence_config,true);
return props;
}
}
发表评论