多topic注入kafka消费者

@KafkaListener() 两种方式加载多个topic

# application.properties
spring.kafkaListenerList = topic1,topic2

第一种

通过EL表达式进行注入

@KafkaListener(topics = "#{'${spring.kafkaListenerList}'.split(',')}")

第二种

通过解析数据生成对象进行注入

KafkaListenerReceiver是自定义的消费对象

  • KafkaListenerConfig.java
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.PriorityOrdered;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;


@Configuration
@Slf4j
public class KafkaListenerConfig implements BeanDefinitionRegistryPostProcessor, PriorityOrdered {

    private List<String> listenerList;

    KafkaListenerConfig() throws IOException {
        Properties pro = new Properties();
        File f;
        f =new File( java.net.URLDecoder.decode(this.getClass().getResource("/application.properties").getPath(),"utf-8"));
        FileInputStream in = new FileInputStream(f);
        pro.load(in);
        in.close();
        String listString = pro.getProperty("spring.verify.kafkaListenerList");
        if (StringUtils.isEmpty(listString)){
            log.error("spring.verify.kafkaListenerList is empty or null");
            System.exit(0);
        }
        log.info("spring.kafkaListenerList: "+listString);
        listenerList = Arrays.asList(listString.split(","));
    }

    public List<String> getListenerList() {
        return listenerList;
    }

    public void setListenerList(List<String> listenerList) {
        this.listenerList = listenerList;
    }

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
        for (String listener :  listenerList) {
            BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(KafkaListenerReceiver.class);
            builder.addConstructorArgValue(listener);
            beanDefinitionRegistry.registerBeanDefinition(listener, builder.getBeanDefinition());
        }
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {

    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
}
  • KafkaListenerReceiver.java
@KafkaListener(topics = "#{__listener.topic}")
Copyright © cnkj.site 2021 all right reserved,powered by Gitbook该文件修订时间: 2021-08-11 17:10:41

results matching ""

    No results matching ""