当前位置: 代码网 > it编程>编程语言>Java > kafak消费数据,webSocket实时推送数据到前端

kafak消费数据,webSocket实时推送数据到前端

2024年08月01日 Java 我要评论
【代码】kafak消费数据,webSocket实时推送数据到前端。

1.导入websocket依赖

 <!--websocket依赖包-->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-websocket</artifactid>
        </dependency>

2.编写websocket类

package com.skyable.device.config.websocket;


import lombok.extern.slf4j.slf4j;
import org.springframework.stereotype.component;

import javax.websocket.onclose;
import javax.websocket.onerror;
import javax.websocket.onopen;
import javax.websocket.session;
import javax.websocket.server.serverendpoint;
import java.io.ioexception;
import java.util.hashset;
import java.util.set;

/**
 * @author administrator
 */
@serverendpoint("/vehicle/{domainid}")
@component
@slf4j
public class websocketserver {
    /**
     * concurrent包的线程安全set,用来存放每个客户端对应的mywebsocket对象。
     */
    private static final set<session> sessions = new hashset<>();


    /**
     * 连接关闭调用的方法
     */
    @onclose
    public void onclose() {
        log.info("websocket link close");
    }

    /**
     * @param error
     */
    @onerror
    public void onerror(throwable error) {
        error.printstacktrace();
    }

    /**
     * 接收数据
     *
     * @param data
     */
    public static void senddatatoclients(string data) {
        for (session session : sessions) {
            try {
                session.getbasicremote().sendtext(data);
            } catch (ioexception e) {
                e.printstacktrace();
            }
        }
    }

    @onopen
    public void onopen(session session) {
        /**
         * 与某个客户端的连接会话,需要通过它来给客户端发送数据
         */
        /**
         * 接收domainid
         */
        sessions.add(session);
        senddatatoclients();
    }


    public void senddatatoclients() {
        for (session session : sessions) {
            try {
                session.getbasicremote().sendtext("websocket link succeed");
            } catch (ioexception e) {
                e.printstacktrace();
            }
        }
    }
}
package com.skyable.device.config.websocket;

import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.web.socket.config.annotation.enablewebsocket;
import org.springframework.web.socket.server.standard.serverendpointexporter;

/**
 * @author administrator
 */
@enablewebsocket
@configuration
public class websocketconfig {
    @bean
    public serverendpointexporter serverendpointexporter() {
        return new serverendpointexporter();
    }
}

3.kafak消费数据后调用websocket方法

  /**
     * 获取kafka数据
     *
     * @param
     */
    @override
    public void savebatch(string jsonvalue) {
        objectmapper objectmapper = new objectmapper();
        try {
            //位置
            jsonnode jsonnode = objectmapper.readtree(jsonvalue);
            if (jsonnode.has(vehicleconstant.location)) {
                realtimeposition realtimeposition = new realtimeposition();
                jsonnode locationnode = jsonnode.get("location");
                string vehicleid = locationnode.get("vehicleid").astext();
                double longitude = double.parsedouble(locationnode.get("longitude").astext());
                double latitude = double.parsedouble(locationnode.get("latitude").astext());
                long timestamp = locationnode.get("timestamp").aslong();
                realtimeposition.settimestamp(timestamp);
                realtimeposition.setlatitude(string.valueof(latitude));
                realtimeposition.setlongitude(string.valueof(longitude));
                realtimeposition.setvehicleid(vehicleid);
                vehiclelocationvo locationvo = devicemapmapper.selectlonlat(vehicleid);
                if (!objects.isnull(locationvo)) {
                    //计算距离
                    redisutil.addlocation(vehicleid, double.parsedouble(locationvo.getlongitude()), double.parsedouble(locationvo.getlatitude()), "l1");
                    redisutil.addlocation(vehicleid, longitude, latitude, "l2");
                    double result = redisutil.calculatedistance(vehicleid, "l1", "l2");
                    double meters = redisutil.convertmilestokilometers(result);
                    decimalformat decimalformat = new decimalformat("#.###");
                    string distance = decimalformat.format(meters);
                    realtimeposition.setdistance(double.parsedouble(distance));
                } else {
                    realtimeposition.setdistance(0);
                }
                //获取省份
                map<string, object> position = addressutil.getposition(longitude, latitude, null, null, null);
                map data = (map) position.get("data");
                string provincename = data.get("shortname").tostring().replaceall("\"", "");
                realtimeposition.setarea(provincename);
                devicemapmapper.insertrealtimeposition(realtimeposition);
                redisutil.addzsetvalue(vehicleconstant.vehicle_location, string.valueof(vehicleid), timestamp);
            }
        } catch (jsonprocessingexception e) {
            e.printstacktrace();
        }
        try {
            //报警
            jsonnode jsonnode = objectmapper.readtree(jsonvalue);
            if (jsonnode.has(vehicleconstant.alert)) {
                jsonnode alertnode = jsonnode.get("alert");
                string vehicleid = alertnode.get("vehicleid").astext();
                integer alerttype = alertnode.get("alerttype").asint();
                long timestamp = alertnode.get("timestamp").aslong();
                alerts alerts = new alerts();
                alerts.setalerttype(alerttype);
                alerts.settimestamp(timestamp);
                alerts.setvehicleid(vehicleid);
                devicemapmapper.insertalerts(alerts);
                redisutil.addzsetvalue(vehicleconstant.vehicle_alert, string.valueof(vehicleid), timestamp);
            }
        } catch (jsonprocessingexception e) {
            e.printstacktrace();
        }
        //websocket发送消息
        vehicleallvo vehicles = vehicles();
        websocketserver.senddatatoclients(vehicles.tostring());
    }

4.发送消息内容

vehicleallvo vehicles = vehicles();
该方法就是发送的具体内容

5.kafak消费者

package com.skyable.device.listener.vehicle;

import com.alibaba.fastjson.json;
import com.skyable.common.config.cloudapplicationcontext;
import com.skyable.common.constants.kafka.kafkatopicconstants;
import com.skyable.device.config.websocket.websocketserver;
import com.skyable.device.entity.vehicle.vehicle;
import com.skyable.device.service.devicemapservice;
import lombok.requiredargsconstructor;
import lombok.extern.slf4j.slf4j;
import org.apache.kafka.clients.consumer.consumerrecord;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.stereotype.component;
import org.springframework.stereotype.service;

import java.util.list;
import java.util.stream.collectors;

/**
 * description:
 *
 * @author yangjun
 * @date: 2023-08-18-14:12
 */
@service
@component
@slf4j
@requiredargsconstructor(onconstructor = @__(@autowired))
public class vehicledatakafkalistener {
    private final devicemapservice devicemapservice;

    @kafkalistener(topics = kafkatopicconstants.topic_vehicle_record, groupid = "rx_1_thing", containerfactory = "batchfactory")
    public void dealdevicedatatoscript(list<consumerrecord<string, string>> recordlist) {
        recordlist.parallelstream()
                .map(consumerrecord::value)
                .foreach(this::savevehicledata);
    }

    private void savevehicledata(string jsonvalue) {
        log.info("kafka data:" + jsonvalue);
        devicemapservice.savebatch(jsonvalue);
    }
}
package com.skyable.device.listener.vehicle;

import org.apache.kafka.clients.consumer.consumerconfig;
import org.apache.kafka.clients.producer.producerconfig;
import org.apache.kafka.common.serialization.stringdeserializer;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory;
import org.springframework.kafka.config.kafkalistenercontainerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;

import java.util.hashmap;
import java.util.map;

/**
 * @classname kafkaconsumerconfig
 * @description kafka消费者配置
 * @author gaoy
 * @date 2021/2/25 15:02
 */
@configuration
public class kafkaconfig {

    @value("${spring.kafka.bootstrap-servers}")
    private string servers;

    @value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableautocommit;

    @value("${spring.kafka.consumer.group-id}")
    private string groupid;

    @value("${spring.kafka.consumer.auto-offset-reset}")
    private string autooffsetreset;

    @value("${spring.kafka.consumer.concurrency}")
    private int concurrency;

    @value("${spring.kafka.consumer.max-poll-records}")
    private int maxpollrecords;


    /**
     * 批量消费工厂bean
     * @return
     */
    @bean
    kafkalistenercontainerfactory batchfactory() {
        concurrentkafkalistenercontainerfactory factory = new
                concurrentkafkalistenercontainerfactory<>();
        factory.setconsumerfactory(new defaultkafkaconsumerfactory<>(consumerconfigs()));
        // 开启批量监听
        factory.setbatchlistener(true);
        factory.setconcurrency(concurrency);
        // 设置手动提交ackmode
        // factory.getcontainerproperties().setackmode(containerproperties.ackmode.manual_immediate);
        return factory;
    }

    @bean
    public map consumerconfigs() {
        map<string,object> props = new hashmap<>();
        props.put(consumerconfig.group_id_config, groupid);
        props.put(consumerconfig.auto_offset_reset_config, autooffsetreset);
        props.put(consumerconfig.bootstrap_servers_config, servers);
        props.put(consumerconfig.enable_auto_commit_config, enableautocommit);
        //设置每次接收message的数量
        props.put(consumerconfig.max_poll_records_config, maxpollrecords);
        props.put(consumerconfig.auto_commit_interval_ms_config, "100");
        props.put(consumerconfig.session_timeout_ms_config, 120000);
        props.put(consumerconfig.request_timeout_ms_config, 180000);
        props.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);
        props.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class);
        //开启幂等性。
        props.put(producerconfig.enable_idempotence_config,true);
        return props;
    }

}

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com