当前位置: 首页 > news >正文

昆山网站开发ikelv点击排名软件哪个好

昆山网站开发ikelv,点击排名软件哪个好,做网站公司赚钱,dw怎么做音乐网站springboot集成kafka消费数据 文章目录 springboot集成kafka消费数据1.引入pom依赖2.添加配置文件2.1.添加KafkaConsumerConfig.java2.2.添加KafkaIotCustomProperties.java2.3.添加application.yml配置 3.消费者代码 1.引入pom依赖 <dependency><groupId>org.spri…

springboot集成kafka消费数据

文章目录

  • springboot集成kafka消费数据
  • 1.引入pom依赖
  • 2.添加配置文件
    • 2.1.添加KafkaConsumerConfig.java
    • 2.2.添加KafkaIotCustomProperties.java
    • 2.3.添加application.yml配置
  • 3.消费者代码

1.引入pom依赖

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.1.2</version></dependency>

2.添加配置文件

2.1.添加KafkaConsumerConfig.java

@Configuration
@EnableConfigurationProperties(KafkaIotCustomProperties.class)
@Slf4j
public class KafkaConsumerConfig {@AutowiredKafkaIotCustomProperties kafkaIotCustomProperties;@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 并发数 多个微服务实例会均分factory.setConcurrency(3);factory.setBatchListener(true);ContainerProperties containerProperties = factory.getContainerProperties();// 是否设置手动提交containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory<String, String> consumerFactory() {Map<String, Object> consumerConfigs = consumerConfigs();log.info("消费者的配置信息:{}",JSONObject.toJSONString(consumerConfigs));return new DefaultKafkaConsumerFactory<>(consumerConfigs);}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();// 服务器地址propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaIotCustomProperties.getBootstrapServers());// 是否自动提交propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaIotCustomProperties.isEnableAutoCommit());// 自动提交间隔propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getAutoCommitInterval());//会话时间propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaIotCustomProperties.getSessionTimeOut());//key序列化propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getKeyDeserializer());//value序列化propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getValueDeserializer());// 心跳时间propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getHeartbeatInterval());// 分组idpropsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaIotCustomProperties.getGroupId());//消费策略propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaIotCustomProperties.getAutoOffsetReset());// poll记录数propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaIotCustomProperties.getMaxPollRecords());//poll时间propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getMaxPollInterval());return propsMap;}}

2.2.添加KafkaIotCustomProperties.java

@Component
@ConfigurationProperties(prefix = "fxyh.realdata.kafka")
@Data
public class KafkaIotCustomProperties {private List<String> topics;private String groupId;private String sessionTimeOut;private String bootstrapServers;private String autoOffsetReset;private boolean enableAutoCommit;private String autoCommitInterval;private String fetchMinSize;private String fetchMaxWait;private String maxPollRecords;private String maxPollInterval;private String heartbeatInterval;private String keyDeserializer;private String valueDeserializer;
}

2.3.添加application.yml配置

fxyh:realdata:kafka:bootstrapServers:  192.168.80.251:9092topics: ["test1","test2"]groupId: shengtingrealdatagroup#后台的心跳线程必须在30秒之内提交心跳,否则会reBalancesessionTimeOut: 30000#      autoOffsetReset: earliest#取消自动提交,即便如此 spring会帮助我们自动提交enableAutoCommit: false#自动提交间隔autoCommitInterval: 1000#拉取的最小字节fetchMinSize: 1#拉去最小字节的最大等待时间fetchMaxWait: 500maxPollRecords: 50#300秒的提交间隔,如果程序大于300秒提交,会报错maxPollInterval: 300000#心跳间隔heartbeatInterval: 10000keyDeserializer: org.apache.kafka.common.serialization.StringDeserializervalueDeserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: latest

3.消费者代码


@Slf4j
@Component
public class DeviceDataConsumer {@Autowiredprivate KafkaIotCustomProperties kafkaIotCustomProperties;@KafkaListener(topics = {"#{@kafkaIotCustomProperties.topics}"}, groupId = "#{@kafkaIotCustomProperties.groupId}", containerFactory = "kafkaListenerContainerFactory",properties = {"#{@kafkaIotCustomProperties.autoOffsetReset}"})public void topicTest(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {for (ConsumerRecord<String, String> record : records) {log.info("topic_test 消费了: Topic:" + record.topic() + ",groupId:" + kafkaIotCustomProperties.getGroupId() + ",Message:" + record.value());//手动提交偏移量ack.acknowledge();}}
}
http://www.wooajung.com/news/28082.html

相关文章:

  • 珠海网站建设 科速安卓手机优化
  • 备案网址查询网站优化服务
  • 天津网站开发建设电子商务seo
  • 高端网站建设 引擎技网站策划是做什么的
  • 手把手制作公司网站百度竞价推广代理商
  • 微网站后台录入百度引流免费推广怎么做
  • 机票网站手机版建设专业seo站长工具全面查询网站
  • 做网站一定要虚拟主机吗上海网站搜索排名优化哪家好
  • 广东网站建设微信商城开发扬州seo优化
  • 网站开发报价表格式模板云搜索
  • 设计说明怎么写200字网站首页seo关键词布局
  • 宁波建设工程报名网站网站关键词有哪些
  • 做高端企业网站中国免费网站服务器下载
  • 宝安做网站的网络营销策略的定义
  • 水果网站源码中国公关公司前十名
  • 有网站前台网络销售技巧和话术
  • 网站头部设计代码就业培训机构有哪些
  • 台州市建设规划局路桥分局网站南宁seo排名外包
  • 做软装找产品上哪个网站关键词快速排名不限行业
  • 做网站l价格拉新项目官方一手平台
  • 门户网站建设注意事项天津百度关键词推广公司
  • 动态网站开发案例教程互联网优化是什么意思
  • 怎么做企业网站推广的方法旺道seo推广系统怎么收费
  • 汕头网站制作十堰seo优化方法
  • 曰本免费一级a做爰视频网站武汉seo诊断
  • wordpress仿站教程2016seo信息网
  • 电子商城网站的设计与实现磁力宅在线搜种子
  • 华升建设集团公司网站河南郑州最新事件
  • 微网站建设 上海百度地图在线使用
  • 昆明网络推广昆明网站建设昆明昆明推广普通话手抄报