• spring 集成 kafka producer(KafkaTemplate)


    spring-kafka-provider.xml配置:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
             http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
             http://www.springframework.org/schema/context
             http://www.springframework.org/schema/context/spring-context.xsd">
        <context:property-placeholder location="classpath*:kafka.properties" />
        <!-- 定义producer的参数 -->
        <bean id="producerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="${bootstrap.servers}" />
                    <entry key="group.id" value="${group.id}" />
                    <entry key="retries" value="${retries}" />
                    <entry key="batch.size" value="${batch.size}" />
                    <entry key="linger.ms" value="${linger.ms}" />
                    <entry key="buffer.memory" value="${buffer.memory}" />
                    <entry key="acks" value="${acks}" />
                    <entry key="key.serializer"
                           value="org.apache.kafka.common.serialization.StringSerializer" />
                    <entry key="value.serializer"
                           value="org.apache.kafka.common.serialization.StringSerializer" />
                </map>
            </constructor-arg>
        </bean>
    
        <!-- 创建kafkatemplate需要使用的producerfactory bean -->
        <bean id="producerFactory"
              class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <ref bean="producerProperties" />
            </constructor-arg>
        </bean>
    
        <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
        <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
            <constructor-arg ref="producerFactory" />
            <constructor-arg name="autoFlush" value="true" />
            <property name="defaultTopic" value="jq-test" />
        </bean>
    </beans>

    其中${xxxx}是从配置文件kafka.properties引入的。对集群链接的一些属性进行配置。

    import java.util.HashMap;
    import java.util.Map;
    
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.util.concurrent.FailureCallback;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.SuccessCallback;
    
    import club.codeapes.common.date.DateUtil;
    import net.sf.json.JSONObject;
    
    public class KafkaSendMsgUtils {
        public static final  ClassPathXmlApplicationContext CONTEXT = new ClassPathXmlApplicationContext("/spring-kafka-provider.xml");
    
        @SuppressWarnings("unchecked")
        public static <K,T>void sendMessage(String topic, Integer partition, Long timestamp,  K key, T data) {
            KafkaTemplate<K, T> kafkaTemplate = (KafkaTemplate<K, T>) CONTEXT.getBean("kafkaTemplate");
            ListenableFuture<SendResult<K, T>> listenableFuture = null;
            if (kafkaTemplate.getDefaultTopic().equals(topic)) {
                listenableFuture =  kafkaTemplate.sendDefault(partition,timestamp,key,data);
            }else {
                listenableFuture = kafkaTemplate.send(topic,partition,timestamp,key,data);
            }
            //发送成功回调
            SuccessCallback<SendResult<K, T>> successCallback = new SuccessCallback<SendResult<K, T>>() {
                @Override
                public void onSuccess(SendResult<K, T> result) {
                                   System.out.println("成功");
                }
            };
            //发送失败回调
            FailureCallback failureCallback = new FailureCallback() {
                @Override
                public void onFailure(Throwable ex) {
                    
                    throw new RuntimeException(ex);
                }
            };
            listenableFuture.addCallback(successCallback, failureCallback);
        }
    }

    其中kafkaTemplate send 方法使用多态重载的,可以有许多不同的参数可以根据自己需要进行调用传参。

    返回值是 :ListenableFuture<SendResult<K, T>> listenableFuture

    可以通过以下代码处理失败或成功后的情况:

            //发送成功回调
            SuccessCallback<SendResult<K, T>> successCallback = new SuccessCallback<SendResult<K, T>>() {
                @Override
                public void onSuccess(SendResult<K, T> result) {
                    //成功业务逻辑
                    System.out.println("成功");
                }
            };
            //发送失败回调
            FailureCallback failureCallback = new FailureCallback() {
                @Override
                public void onFailure(Throwable ex) {
                    System.out.println("失败");
                    //失败业务逻辑
                    throw new RuntimeException(ex);
                }
            };
            listenableFuture.addCallback(successCallback, failureCallback);
  • 相关阅读:
    Send or receive files via Xshell
    git archive命令详解
    test命令详解
    shell中的数学运算
    深入理解文件权限
    rebuild online时意外中断 再次重建时报错解决方法
    关于临时表空间,在日常生产中会遇到的问题
    Oracle对于敏感数据的处理,可以采用策略(dbms_rls.add_policy)
    我对于B-树索引的内部结构与索引类型所做的笔记
    记一次ADG备库归档目录满导致的延时处理
  • 原文地址:https://www.cnblogs.com/chenmz1995/p/12350421.html
一二三 - 开发者的网上家园