当前位置: 代码网 > it编程>编程语言>Java > 普通java项目集成kafka方式

普通java项目集成kafka方式

2024年11月29日 Java 我要评论
现在假设一种需求,我方业务系统要与某服务平台通过kafka交互,异步获取服务,而系统架构可能老旧,不是spring cloud桶,不是spring boot,只是java普通项目或者 java web

现在假设一种需求,我方业务系统要与某服务平台通过kafka交互,异步获取服务,而系统架构可能老旧,不是spring cloud桶,不是spring boot,只是java普通项目或者 java web项目

依赖

		<dependency>
            <groupid>org.apache.kafka</groupid>
            <artifactid>kafka-clients</artifactid>
            <version>3.1.0</version>
        </dependency>

        <dependency>
            <groupid>org.apache.kafka</groupid>
            <artifactid>kafka_2.11</artifactid>
            <version>2.4.1</version>
        </dependency>

kafka配置读取类

本文后边没用到,直接填配置了,简单点

但如果生产需要,还是有这个类比较好,可以从不同配置中读取,同时给个默认值

import org.apache.commons.lang3.stringutils;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import java.io.file;
import java.io.fileinputstream;
import java.io.ioexception;
import java.util.properties;

/**
 * kafka配置读取类
 *
 * @author zzy
 */
public class kafkaproperties {

    private static final logger log = loggerfactory.getlogger(kafkaproperties.class);

    private static properties serverprops = new properties();

    private static properties clientprops = new properties();

    private static properties producerprops = new properties();

    private static properties consumerprops = new properties();

    private static kafkaproperties instance = null;

    private kafkaproperties() {

        string filepath = system.getproperty("user.dir") + file.separator
                + "kafkaconf" + file.separator;

        file file;
        fileinputstream fis = null;
        try {
            file = new file(filepath + "producer.properties");
            if (file.exists()) {
                fis = new fileinputstream(filepath + "producer.properties");
                producerprops.load(fis);
            }

            file = new file(filepath + "consumer.properties");
            if (file.exists()) {
                fis = new fileinputstream(filepath + "consumer.properties");
                consumerprops.load(fis);
            }

            file = new file(filepath + "server.properties");
            if (file.exists()) {
                fis = new fileinputstream(filepath + "server.properties");
                serverprops.load(fis);
            }

            file = new file(filepath + "client.properties");
            if (file.exists()) {
                fis = new fileinputstream(filepath + "client.properties");
                clientprops.load(fis);
            }

        } catch (exception e) {

            log.error("init kafka props error." + e.getmessage());

        } finally {

            if (fis != null) {
                try {
                    fis.close();
                } catch (ioexception e) {
                    log.error("close kafka properties fis error." + e);
                }
            }

        }

    }

    /**
     * 获取懒汉式单例
     */
    public static synchronized kafkaproperties getinstance() {
        if (instance == null) {
            instance = new kafkaproperties();
        }

        return instance;
    }


    /**
     * 获取配置,获取不到时使用参数的默认配置
     */
    public string getvalue(string key, string defaultvalue) {
        string value;

        if (stringutils.isempty(key)) {
            log.error("key is null or empty");
        }
        value = getpropsvalue(key);

        if (value == null) {
            log.warn("kafka property getvalue return null, the key is " + key);
            value = defaultvalue;
        }
        log.info("kafka property getvalue, key:" + key + ", value:" + value);

        return value;
    }


    private string getpropsvalue(string key) {
        string value = serverprops.getproperty(key);

        if (value == null) {
            value = producerprops.getproperty(key);
        }

        if (value == null) {
            value = consumerprops.getproperty(key);
        }

        if (value == null) {
            value = clientprops.getproperty(key);
        }

        return value;
    }
}

producer

import org.apache.kafka.clients.producer.callback;
import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.clients.producer.recordmetadata;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import java.util.properties;
import java.util.concurrent.executionexception;

/**
 * kafka producer
 * @author zzy
 */
public class kafkaproformal {

    public static final logger log = loggerfactory.getlogger(kafkaproformal.class);

    private properties properties = new properties();

    private final string bootstrapservers = "bootstrap.servers";
    private final string clientid = "client.id";
    private final string keyserializer = "key.serializer";
    private final string valueserializer = "value.serializer";
    //private final string securityprotocol = "security.protocol";
    //private final string saslkerberosservicename = "sasl.kerberos.service.name";
    //private final string kerberosdomainname = "kerberos.domain.name";
    private final string maxrequestsize = "max.request.size";

    private kafkaproducer<string, string> producer;

    private volatile static kafkaproformal kafkaproformal;

    private kafkaproformal(string servers) {
        properties.put(bootstrapservers, servers);
        properties.put(keyserializer, "org.apache.kafka.common.serialization.stringserializer");
        properties.put(valueserializer, "org.apache.kafka.common.serialization.stringserializer");

        producer = new kafkaproducer<string, string>(properties);
    }

    public static kafkaproformal getinstance(string servers) {
        if(kafkaproformal == null) {
            synchronized(kafkaproformal.class) {
                if(kafkaproformal == null) {
                    kafkaproformal = new kafkaproformal(servers);
                }
            }
        }

        return kafkaproformal;
    }

    public void sendstringwithcallback(string topic, string message, boolean asyncflag) {
        producerrecord<string, string> record = new producerrecord<string, string>(topic, message);
        long starttime = system.currenttimemillis();
        if(asyncflag) {
            //异步发送
            producer.send(record, new kafkacallback(starttime, message));
        } else {
            //同步发送
            try {
                producer.send(record, new kafkacallback(starttime, message)).get();
            } catch (interruptedexception e) {
                log.error("interruptedexception occured : {0}", e);
            } catch (executionexception e) {
                log.error("executionexception occured : {0}", e);
            }
        }
    }
}

class kafkacallback implements callback {
    private static logger log = loggerfactory.getlogger(kafkacallback.class);

    private string key;

    private long starttime;

    private string message;

    kafkacallback(long starttime, string message) {
        this.starttime = starttime;
        this.message = message;
    }

    @override
    public void oncompletion(recordmetadata metadata, exception exception) {
        long elapsedtime = system.currenttimemillis() - starttime;

        if(metadata != null) {
            log.info("record(" + key + "," + message + ") sent to partition(" + metadata.partition()
                    + "), offset(" + metadata.offset() + ") in " + elapsedtime + " ms.");
        } else {
            log.error("metadata is null." + "record(" + key + "," + message + ")", exception);
        }
    }
}

consumer

import kafka.utils.shutdownablethread;
import org.apache.kafka.clients.consumer.consumerrecords;
import org.apache.kafka.clients.consumer.kafkaconsumer;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import java.time.duration;
import java.util.properties;
import java.util.set;

/**
 * kafka consumer
 * @author zzy
 */
public abstract class kafkaconformal extends shutdownablethread {

    private static final logger log = loggerfactory.getlogger(kafkaconformal.class);

    private set<string> topics;

    private final string bootstrapservers = "bootstrap.servers";
    private final string groupid = "group.id";
    private final string keydeserializer = "key.deserializer";
    private final string valuedeserializer = "value.deserializer";
    private final string enableautocommit = "enable.auto.commit";
    private final string autocommitintervalms = "auto.commit.interval.ms";
    private final string sessiontimeoutms = "session.timeout.ms";

    private kafkaconsumer<string, string> consumer;

    public kafkaconformal(string topic) {
        super("kafkaconsumerexample", false);

        topics.add(topic);

        properties props = new properties();
        props.put(bootstrapservers, "your servers");
        props.put(groupid, "testgroup");
        props.put(enableautocommit, "true");
        props.put(autocommitintervalms, "1000");
        props.put(sessiontimeoutms, "30000");
        props.put(keydeserializer, "org.apache.kafka.common.serialization.stringdeserializer");
        props.put(valuedeserializer, "org.apache.kafka.common.serialization.stringdeserializer");

        consumer = new kafkaconsumer<>(props);
    }

    /**
     * subscribe and handle the msg
     */
    @override
    public void dowork() {
        consumer.subscribe(topics);
        consumerrecords<string, string> records = consumer.poll(duration.ofseconds(1));

        dealrecords(records);
    }

    /**
     * 实例化consumer时,进行对消费信息的处理
     * @param records records
     */
    public abstract void dealrecords(consumerrecords<string, string> records);

    public void settopics(set<string> topics) {
        this.topics = topics;
    }
}

使用

kafkaproformal producer = kafkaproformal.getinstance("kafka server1.1.1.1:9092,2.2.2.2:9092");

kafkaconformal consumer = new kafkaconformal("consume_topic") {
	@override
	public void dealrecords(consumerrecords<string, string> records) {
		for (consumerrecord<string, string> record: records) {
			producer.sendstringwithcallback("target_topic", record.value(), true);
    	}
	}
};

consumer.start();

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com