SpringBoot整合Kafka和Storm彩民之家论坛9066777

2019-10-11 05:44 来源:未知
@Override protected AbstractFetcher<T, ?> createFetcher( SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {...... return new KafkaFetcher<>( sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), runtimeContext.getTaskNameWithSubtasks(), deserializer, properties, pollTimeout, runtimeContext.getMetricGroup(), consumerMetricGroup, useMetrics); }

前言

本篇文章主要介绍的是SpringBoot整合kafka和storm以及在这过程遇到的一些问题和解决方案。

createFetcher方法

缓存主动更新

kafka 生产数据:

 {"serviceId":"productInfoService","productId":10}

从数据库中拉取数据为(注意modifiedTime时间:2017-10-3 12:30:00):

String productInfoJSON = "{"id": 10, "name": "iphone7手机", "price": 5599, "pictureList":"a.jpg,b.jpg", "specification": "iphone7的规格", "service": "iphone7的售后服务", "color": "红色,白色,黑色", "size": "5.5", "shopId": 1, "modifiedTime": "2017-10-3 12:30:00"}";

开发准备

在进行代码开发前,我们要明确开发什么。
在上述的业务场景中,需要大量的数据,但是我们这里只是简单的进行开发,也就是写个简单的demo出来,能够简单的实现这些功能,所以我们只需满足如下条件就可以了:

  1. 提供一个将用户数据写入kafka的接口;
  2. 使用storm的spout获取kafka的数据并发送给bolt;
  3. 在bolt移除年龄小于10岁的用户的数据,并写入mysql;

那么根据上述要求我们进行SpringBoot、kafka和storm的整合。
首先需要相应jar包,所以maven的依赖如下:

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <springboot.version>1.5.9.RELEASE</springboot.version>
        <mybatis-spring-boot>1.2.0</mybatis-spring-boot>
        <mysql-connector>5.1.44</mysql-connector>
        <slf4j.version>1.7.25</slf4j.version>
        <logback.version>1.2.3</logback.version>
        <kafka.version>1.0.0</kafka.version>
        <storm.version>1.2.1</storm.version>
        <fastjson.version>1.2.41</fastjson.version>
        <druid>1.1.8</druid>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>${springboot.version}</version>
        </dependency>

        <!-- Spring Boot Mybatis 依赖 -->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>${mybatis-spring-boot}</version>
        </dependency>

        <!-- MySQL 连接驱动依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql-connector}</version>
        </dependency>


        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>


        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>${logback.version}</version>
        </dependency>


        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>${kafka.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>${kafka.version}</version>
        </dependency>


        <!--storm相关jar -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <!--排除相关依赖 -->
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-slf4j-impl</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-1.2-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-web</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <artifactId>ring-cors</artifactId>
                    <groupId>ring-cors</groupId>
                </exclusion>
            </exclusions>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>${storm.version}</version>
        </dependency>


        <!--fastjson 相关jar -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!-- Druid 数据连接池依赖 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>${druid}</version>
        </dependency>
    </dependencies>

成功添加了相关依赖之后,这里我们再来添加相应的配置。
application.properties中添加如下配置:

    # log
    logging.config=classpath:logback.xml

    ## mysql
    spring.datasource.url=jdbc:mysql://localhost:3306/springBoot2?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true
    spring.datasource.username=root
    spring.datasource.password=123456
    spring.datasource.driverClassName=com.mysql.jdbc.Driver


    ## kafka 
    kafka.servers = 192.169.0.23:9092,192.169.0.24:9092,192.169.0.25:9092  
    kafka.topicName = USER_TOPIC
    kafka.autoCommit = false
    kafka.maxPollRecords = 100
    kafka.groupId = groupA
    kafka.commitRule = earliest

注:上述的配置只是一部分,完整的配置可以在我的github中找到。

数据库脚本:

-- springBoot2库的脚本

CREATE TABLE `t_user` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `name` varchar(10) DEFAULT NULL COMMENT '姓名',
  `age` int(2) DEFAULT NULL COMMENT '年龄',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8

注:因为这里我们只是简单的模拟一下业务场景,所以只是建立一张简单的表。

既然consumerThread.start()开始了实际的kafka consumer,我们一起来看一下consumerThread中的方法

利用kafka-console-producer.sh 生产一条商品变更消息,并回车(由于代码中更新时为了演示效果,休眠了10s,把握时间)

cd /usr/local/kafka && bin/kafka-console-producer.sh --broker-list my-cache1:9092,my-cache2:9092,my-cache3:9092 --topic cache-message

商品消息如下:

{"serviceId":"productInfoService","productId":10}

代码编写

说明:这里我只对几个关键的类进行说明,完整的项目工程链接可以在博客底部找到。

在使用SpringBoot整合kafka和storm之前,我们可以先对kfaka和storm的相关代码编写,然后在进行整合。

首先是数据源的获取,也就是使用storm中的spout从kafka中拉取数据。

在之前的storm入门中,讲过storm的运行流程,其中spout是storm获取数据的一个组件,其中我们主要实现nextTuple方法,编写从kafka中获取数据的代码就可以在storm启动后进行数据的获取。

spout类的主要代码如下:

@Override
public void nextTuple() {
    for (;;) {
        try {
            msgList = consumer.poll(100);
            if (null != msgList && !msgList.isEmpty()) {
                String msg = "";
                List<User> list=new ArrayList<User>();
                for (ConsumerRecord<String, String> record : msgList) {
                    // 原始数据
                    msg = record.value();
                    if (null == msg || "".equals(msg.trim())) {
                        continue;
                    }
                    try{
                        list.add(JSON.parseObject(msg, User.class));
                    }catch(Exception e){
                        logger.error("数据格式不符!数据:{}",msg);
                        continue;
                    }
                 } 
                logger.info("Spout发射的数据:" list);
                //发送到bolt中
                this.collector.emit(new Values(JSON.toJSONString(list)));
                 consumer.commitAsync();
            }else{
                TimeUnit.SECONDS.sleep(3);
                logger.info("未拉取到数据...");
            }
        } catch (Exception e) {
            logger.error("消息队列处理异常!", e);
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e1) {
                logger.error("暂停失败!",e1);
            }
        }
    }
}

注:如果spout在发送数据的时候发送失败,是会重发的!

上述spout类中主要是将从kafka获取的数据传输传输到bolt中,然后再由bolt类处理该数据,处理成功之后,写入数据库,然后给与sqout响应,避免重发。

bolt类主要处理业务逻辑的方法是execute,我们主要实现的方法也是写在这里。需要注意的是这里只用了一个bolt,因此也不用定义Field进行再次的转发。
代码的实现类如下:

@Override
    public void execute(Tuple tuple) {
        String msg=tuple.getStringByField(Constants.FIELD);
        try{
            List<User> listUser =JSON.parseArray(msg,User.class);
            //移除age小于10的数据
            if(listUser!=null&&listUser.size()>0){
                Iterator<User> iterator = listUser.iterator();
                 while (iterator.hasNext()) {
                     User user = iterator.next();
                     if (user.getAge()<10) {
                         logger.warn("Bolt移除的数据:{}",user);
                         iterator.remove();
                     }
                 }
                if(listUser!=null&&listUser.size()>0){
                    userService.insertBatch(listUser);
                }
            }
        }catch(Exception e){
            logger.error("Bolt的数据处理失败!数据:{}",msg,e);
        }
    }

编写完了spout和bolt之后,我们再来编写storm的主类。

storm的主类主要是对Topology(拓步)进行提交,提交Topology的时候,需要对spout和bolt进行相应的设置。Topology的运行的模式有两种:

  1. 一种是本地模式,利用本地storm的jar模拟环境进行运行。
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TopologyApp", conf,builder.createTopology());
  1. 另一种是远程模式,也就是在storm集群进行运行。
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

这里为了方便,两种方法都编写了,通过主方法的args参数来进行控制。
Topology相关的配置说明在代码中的注释写的很详细了,这里我就不再多说了。
代码如下:

  public  void runStorm(String[] args) {
    // 定义一个拓扑
    TopologyBuilder builder = new TopologyBuilder();
    // 设置1个Executeor(线程),默认一个
    builder.setSpout(Constants.KAFKA_SPOUT, new KafkaInsertDataSpout(), 1);
    // shuffleGrouping:表示是随机分组
    // 设置1个Executeor(线程),和两个task
    builder.setBolt(Constants.INSERT_BOLT, new InsertBolt(), 1).setNumTasks(1).shuffleGrouping(Constants.KAFKA_SPOUT);
    Config conf = new Config();
    //设置一个应答者
    conf.setNumAckers(1);
    //设置一个work
    conf.setNumWorkers(1);
    try {
        // 有参数时,表示向集群提交作业,并把第一个参数当做topology名称
        // 没有参数时,本地提交
        if (args != null && args.length > 0) { 
            logger.info("运行远程模式");
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
            // 启动本地模式
            logger.info("运行本地模式");
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("TopologyApp", conf, builder.createTopology());
        }
    } catch (Exception e) {
        logger.error("storm启动失败!程序退出!",e);
        System.exit(1);
    }
    logger.info("storm启动成功...");
    }

好了,编写完了kafka和storm相关的代码之后,我们再来进行和SpringBoot的整合!

在进行和SpringBoot整合前,我们先要解决下一下几个问题。

1 在SpringBoot程序中如何提交storm的Topolgy?

storm是通过提交Topolgy来确定如何启动的,一般使用过运行main方法来启动,但是SpringBoot启动方式一般也是通过main方法启动的。所以应该怎么样解决呢?

  • 解决思路:将storm的Topology写在SpringBoot启动的主类中,随着SpringBoot启动而启动。
  • 实验结果:可以一起启动(按理来说也是可以的)。但是随之而来的是下一个问题,bolt和spout类无法使用spring注解。

2 如何让bolt和spout类使用spring注解?

  • 解决思路:在了解到spout和bolt类是由nimbus端实例化,然后通过序列化传输到supervisor,再反向序列化,因此无法使用注解,所以这里可以换个思路,既然不能使用注解,那么就动态获取Spring的bean就好了。
  • 实验结果:使用动态获取bean的方法之后,可以成功启动storm了。

3.有时启动正常,有时无法启动,动态的bean也无法获取?

  • 解决思路:在解决了1、2的问题之后,有时出现问题3,找了很久才找到,是因为之前加入了SpringBoot的热部署,去掉之后就没出现了...。

上面的三个问题是我在整合的时候遇到的,其中解决办法在目前看来是可行的,或许其中的问题可能是因为其他的原因导致的,不过目前就这样整合之后,就没出现过其他的问题了。若上述问题和解决办法有不妥之后,欢迎批评指正!

解决了上面的问题之后,我们回到代码这块。
其中,程序的入口,也就是主类的代码在进行整合后如下:

@SpringBootApplication
public class Application{

    public static void main(String[] args) {
        // 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件
        ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
        GetSpringBean springBean=new GetSpringBean();
        springBean.setApplicationContext(context);
        TopologyApp app = context.getBean(TopologyApp.class);
        app.runStorm(args);
    }

}

动态获取bean的代码如下:

public class GetSpringBean implements ApplicationContextAware{

    private static ApplicationContext context;

    public static Object getBean(String name) {
        return context.getBean(name);
    }

    public static <T> T getBean(Class<T> c) {

        return context.getBean(c);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
            throws BeansException {
        if(applicationContext!=null){
            context = applicationContext;
        }
    }

}

主要的代码的介绍就到这里了,至于其它的,基本就和以前的一样了。

返回了一个 KafkaFetcher对象,我们点进去看一下KafkaFetcher的构造器里面创建了一个 KafkaConsumerThread对象

结语

关于SpringBoot整合kafka和storm暂时就告一段落了。本篇文章只是简单的介绍这些 相关的使用,在实际的应用可能会更复杂。如果有有更好的想法和建议,欢迎留言进行讨论!
SpringBoot整合kafka和storm的工程我放在github上了,如果感觉不错的话请给个star吧。
Gihub地址:
对了,也有kafka整合storm的工程,也在我的github上。
地址:

到此,本文结束,谢谢阅读。

版权声明:
作者:虚无境
博客园出处:
CSDN出处:    
个人博客出处:
原创不易,转载请标明出处,谢谢!

首先来看一下 FlinkKafkaConsumerBase.run方法,相当于是Flink 从kafka中拉取数据的入口方法:

首先,我们来写个zookeeper 工具类,提供 获取锁 acquireDistributedLock、释放锁releaseDistributedLock 方法

使用SpringBoot整合kafka和storm做什么

一般来说,kafka和storm的整合,使用kafka进行数据的传输,然后使用storm实时的处理kafka中的数据。

在这里我们加入SpringBoot之后,也是做这些,只不过是由SpringBoot对kafka和storm进行统一的管理。

如果还是不好理解的话,可以通过下面这个简单的业务场景了解下:

在数据库中有一批大量的用户数据,其中这些用户数据中有很多是不需要的,也就是脏数据,我们需要对这些用户数据进行清洗,然后重新存入数据库中,但是要求实时、延时低,并且便于管理。

所以这里我们就可以使用SpringBoot kafka storm来进行相应的开发。

至此为止createFetch就介绍完了,也可以看作是拉取数据的准备工作,接下来看一下kafkaFetcher.runFetchLoop();KafkaFetch中的runFetchLoop方法,正式开始从kafka中拉取message

缓存被动重建

http 请求:

http://localhost:81/getProductInfo?productId=10

从数据库中拉取数据为(注意modifiedTime时间:2017-10-3 12:30:01):

String productInfoJSON = "{"id": 10, "name": "iphone7手机", "price": 5599, "pictureList":"a.jpg,b.jpg", "specification": "iphone7的规格", "service": "iphone7的售后服务", "color": "红色,白色,黑色", "size": "5.5", "shopId": 1, "modifiedTime": "2017-10-3 12:30:01"}";

准备环境:
启动 redis cluster
启动 zookeeper 集群
启动 kafka 集群
启动 缓存服务(缓存项目服务)

注:由于笔者使用Windows 系统,可能避免windows 中 centos ip与主机名映射不了,所以作以下配置,推荐使用 SwitchHosts 的工具,很方便

C:WindowsSystem32driversetchosts
    *   ################################### 配置本地hosts #################### 很重要 ######################
    *   # 缓存架构方案
    *       192.168.0.16 my-cache1
    *       192.168.0.17 my-cache2
    *       192.168.0.18 my-cache3
    *   ################################### 配置本地hosts #################### 很重要 ######################

为什么使用SpringBoot整合kafka和storm

一般而言,使用kafka整合storm可以应付大多数需求。但是在扩展性上来说,可能就不太好。目前主流的微服务框架SpringCloud是基于SpringBoot的,所以使用SpringBoot对kafka和storm进行整合,可以进行统一配置,扩展性会更好。

//fetcher message from kafka public void runFetchLoop() throws Exception { try {//KafkaConsumerThread构造的参数之一 final Handover handover = this.handover; // kick off the actual Kafka consumer //实际的从kafka中拉取数据的地方 consumerThread.start(); while  { // this blocks until we get the next records // it automatically re-throws exceptions encountered in the consumer thread//从handover中获取数据,然后对records进行处理 final ConsumerRecords<byte[], byte[]> records = handover.pollNext(); // get the records for each topic partition for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates { List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle; for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { final T value = deserializer.deserialize; if (deserializer.isEndOfStream { // end of stream signaled running = false; break; } // emit the actual record. this also updates offset state atomically // and deals with timestamps and watermark generation//发送消息,接下来就是timestamps和watermark的处理了 emitRecord(value, partition, record.offset(), record); } } } } finally { // this signals the consumer thread that no more work is to be done consumerThread.shutdown(); } // on a clean exit, wait for the runner thread try { consumerThread.join(); } catch (InterruptedException e) { // may be the result of a wake-up interruption after an exception. // we ignore this here and only restore the interruption state Thread.currentThread().interrupt(); } }

代码测试

测试数据

SpringBoot整合kafka和storm

至此如何从kafka中拉取数据,已经介绍完了

  • 添加zookeeper client 依赖(注:如果你添加了kafka 依赖,这里就不需要单独依赖了

kafka和storm的相关知识

如果你对kafkastorm熟悉的话,这一段可以直接跳过!如果不熟,也可以看看我之前写的博客。一些相关博客如下。

kafka 和 storm的环境安装

地址:

kafka的相关使用

地址:

storm的相关使用

地址:

//入口方法 start a source public void run(SourceContext<T> sourceContext) throws Exception { ...... // from this point forward: // - 'snapshotState' will draw offsets from the fetcher, // instead of being built from `subscribedPartitionsToStartOffsets` // - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to // Kafka through the fetcher, if configured to do so) //创建Fetcher 从kafka中拉取数据 this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, periodicWatermarkAssigner, punctuatedWatermarkAssigner, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics); if  { return; } // depending on whether we were restored with the current state version , // remaining logic branches off into 2 paths: // 1) New state - partition discovery loop executed as separate thread, with this // thread running the main fetcher loop // 2) Old state - partition discovery is disabled and only the main fetcher loop is executed if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { //未配置KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS kafkaFetcher.runFetchLoop(); } else {//仍然调用了kafkaFetcher.runFetchLoop(); runWithPartitionDiscovery(); } }

那么前面一章中,笔者分析了缓存重建情况

测试结果

成功启动程序之后,我们先调用接口新增几条数据到kafka

新增请求:

POST http://localhost:8087/api/user

{"name":"张三","age":20}
{"name":"李四","age":10}
{"name":"王五","age":5}

新增成功之后,我们可以使用xshell工具在kafka集群中查看数据。
输入:**kafka-console-consumer.sh --zookeeper master:2181 --topic USER_TOPIC --from-beginning**

然后可以看到以下输出结果。

彩民之家论坛9066777 1

上述也表示了数据成功的写入了kafka。
因为是实时的从kafka那数据,我们也可以从控制台查看打印的语句。

控制台输出:

 INFO  com.pancm.storm.spout.KafkaInsertDataSpout - Spout发射的数据:[{"age":5,"name":"王五"}, {"age":10,"name":"李四"}, {"age":20,"name":"张三"}]
 WARN  com.pancm.storm.bolt.InsertBolt - Bolt移除的数据:{"age":5,"name":"王五"}
 INFO  com.alibaba.druid.pool.DruidDataSource - {dataSource-1} inited
 DEBUG com.pancm.dao.UserDao.insertBatch - ==>  Preparing: insert into t_user (name,age) values (?,?) , (?,?) 
 DEBUG com.pancm.dao.UserDao.insertBatch - ==> Parameters: 李四(String), 10(Integer), 张三(String), 20(Integer)
 DEBUG com.pancm.dao.UserDao.insertBatch - <==    Updates: 2
 INFO  com.pancm.service.impl.UserServiceImpl - 批量新增2条数据成功!

可以在控制台成功的看到处理的过程和结果。
然后我们也可以通过接口进行数据库所有的数据查询。

查询请求:

GET http://localhost:8087/api/user

返回结果:

[{"id":1,"name":"李四","age":10},{"id":2,"name":"张三","age":20}]

上述代码中测试返回的结果显然是符合我们预期的。

@Override public void run() { // early exit check if  { return; } // this is the means to talk to FlinkKafkaConsumer's main thread//构造器中参数,用于FlinkKafkaConsumer主线程进行消费,上面提到的handover.pollNext() final Handover handover = this.handover; // This method initializes the KafkaConsumer and guarantees it is torn down properly. // This is important, because the consumer has multi-threading issues, // including concurrent 'close()' calls. try {//获取consumer this.consumer = getConsumer(kafkaProperties); } catch (Throwable t) { handover.reportError; return; } // from here on, the consumer is guaranteed to be closed properly ...... // early exit check if  { return; } // the latest bulk of records. May carry across the loop if the thread is woken up // from blocking on the handover ConsumerRecords<byte[], byte[]> records = null; // reused variable to hold found unassigned new partitions. // found partitions are not carried across loops using this variable; // they are carried across via re-adding them to the unassigned partitions queue List<KafkaTopicPartitionState<TopicPartition>> newPartitions; // main fetch loop while  { // check if there is something to commit //default false if (!commitInProgress) { // get and reset the work-to-be committed, so we don't repeatedly commit the same//这里具体可以参考[Flink是如何保存Offset的](https://www.jianshu.com/p/ee4fe63f0182) final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet; if (commitOffsetsAndCallback != null) { log.debug("Sending async offset commit request to Kafka broker"); // also record that a commit is already in progress // the order here matters! first set the flag, then send the commit command. commitInProgress = true; consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1)); } } try { //hasAssignedPartitions default false //当发现新的partition的时候,会add到unassignedPartitionsQueue和sub//具体可以参考 flink startupMode是如何起作用的 if (hasAssignedPartitions) { newPartitions = unassignedPartitionsQueue.pollBatch(); } else { // if no assigned partitions block until we get at least one // instead of hot spinning this loop. We rely on a fact that // unassignedPartitionsQueue will be closed on a shutdown, so // we don't block indefinitely newPartitions = unassignedPartitionsQueue.getBatchBlocking(); } if (newPartitions != null) { reassignPartitions(newPartitions); } } catch (AbortedReassignmentException e) { continue; } if (!hasAssignedPartitions) { // Without assigned partitions KafkaConsumer.poll will throw an exception continue; } // get the next batch of records, unless we did not manage to hand the old batch over if (records == null) { try {//通过kafkaAPI 拉取数据 records = consumer.poll(pollTimeout); } catch (WakeupException we) { continue; } } try {//handover对records进行"包装",供FlinkKafkaConsumer主线程消费 handover.produce; records = null; } catch (Handover.WakeupException e) { // fall through the loop } } // end main fetch loop } catch (Throwable t) { // let the main thread know and exit // it may be that this exception comes because the main thread closed the handover, in // which case the below reporting is irrelevant, but does not hurt either handover.reportError; } finally { // make sure the handover is closed if it is not already closed or has an error handover.close(); // make sure the KafkaConsumer is closed try { consumer.close(); } catch (Throwable t) { log.warn("Error while closing Kafka consumer", t); } } }

彩民之家论坛9066777 2

public KafkaFetcher( SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, KafkaDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, MetricGroup subtaskMetricGroup, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {...... this.consumerThread = new KafkaConsumerThread( LOG,//KafkaConsumerThread 构造器中的参数 handover, kafkaProperties,//unassignedPartitionsQueue具体是什么呢?咱们会在flink startupMode是如何起作用的 详细去讲 unassignedPartitionsQueue, getFetcherName()   " for "   taskNameWithSubtasks, pollTimeout, useMetrics, consumerMetricGroup, subtaskMetricGroup); }
浏览器 http 请求

http://localhost:81/getProductInfo?productId=10

/**
 * 
 * zookeeper 分布式锁工具类
 * @author bill
 * @date 2017年10月3日 上午11:39:08
 */
public class ZookeeperSession {

    private ZooKeeper zookeeper;
    //计数器(同步锁),连接信号量,用于控制并发请求时,确保 zookeeper client 与 server 已连接
    private static CountDownLatch connectSemaphore = new CountDownLatch(1);

    private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperSession.class);

    public ZookeeperSession(){
        try {
            // 连接 zookeeper server
            this.zookeeper = new ZooKeeper("192.168.0.16:2181,192.168.0.17:2181,192.168.0.18:2181", 50000, new ZookeeperWatcher());
            // 等待,保证 client、server连接
            connectSemaphore.await();
            LOGGER.debug(" zookeeper session established ...");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     *  获取分布式锁
     * @param productId 商品id
     */
    public void acquireDistributedLock(Long productId){
        String path = "/product-lock-"   productId;
        try {
            // 尝试获取分布式锁
            zookeeper.create(path, "".getBytes(), Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL);
            LOGGER.debug("success to acquire lock for productId [{}]", productId);
        } catch (Exception e) {
            // 如果报 nodeExitsException,说明已经有请求获取了锁,所有当前重复尝试获取锁,知道获取到锁为止
            int count = 0;
            while(true){
                try {
                    // 睡眠一下下,为了测试效果,生产环境,可以20ms
                    Thread.sleep(1000);
                    // 再次尝试获取分布式锁
                    zookeeper.create(path, "".getBytes(), Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL);
                } catch (Exception e2) {
                    // 如果报 nodeExitsException,说明已经有请求获取了锁,所有当前重复尝试获取锁,知道获取到锁为止
                    LOGGER.debug("repeat to acquire lock for productId:[{}] - count:[{}] ...", productId, count);
                    count   ;
                    continue;
                }
                LOGGER.debug("success to acquire lock for productId:[{}] after count:[{}] repeat 。。。", productId, count);
                break;
            }
        }
    }

    /**
     * 释放分布式锁
     * @param productId 商品id
     */
    public void releaseDistributedLock(Long productId){
        String path = "/product-lock-"   productId;
        try {
            // 删除node,释放锁
            zookeeper.delete(path, -1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建 zookeeper session watcher
     * @author bill
     * @date 2017年10月3日 下午12:05:21
     */
    private class ZookeeperWatcher implements Watcher{

        @Override
        public void process(WatchedEvent evt) {
            LOGGER.debug("receive zookeeper watched event: {}", evt.getState());
            if(KeeperState.SyncConnected == evt.getState()){
                // client、server 已连接 是否等待信号量锁
                connectSemaphore.countDown();
            }
        }

    }


    /**
     * 单例有很多种方式去实现,这里采取绝对线程安全的一种方式
     * 静态内部类的方式,去初始化单例
     */
    private static class Singleton {

        private static ZookeeperSession instance;

        static{
            instance = new ZookeeperSession();
        }

        public static ZookeeperSession getInstance(){
            return instance;
        }
    }

    /**
     * jvm 的机制去保证多线程并发安全
     * 内部类的初始化,一定只会发生一次,不管多少个线程并发去初始化
     */
    public static ZookeeperSession getIntance(){
        return Singleton.getInstance();
    }

    /**
     * 初始化单例 zookeeperSession
     */
    public static void init(){
        getIntance();
    }
}

好,工具类写完了呢,我们不要忘了,对它进行初始化,笔者这里就直接在 InitListener 监听器中初始化了,如下图:

上篇讲了可以通过分布式锁方案解决

  • 创建 zookeeperSession 工具类,代码如下,对应代码都有注释说明,故不做过多解释了

下面笔者先讲 缓存主动更新 中怎么做,既然我们是通过接收kafka 商品变更消息去更新缓存,那么对应的就是在消费kafka 消息的时候先获取分布式锁,得到锁后,对比时间版本,决定是否更新缓存

缓存被动重建

注:这里的核心实现和缓存主动更新差不多,但是处理流程稍微有点不一样

  • 创建一个缓存重建队列,提供加入队列、获取队列数据方法
  • 创建一个缓存重建队列消费线程,设置商品数据缓存,同时做缓存重建冲突处理
  • 请求进来,如果缓存中都没有商品数据,到源数据服务拉取商品数据,然后将商品数据加入缓存重建队列,同时响应http 商品数据

下面直接 coding 吧!

  • 创建一个缓存重建队列 RebuildCacheQueue,这是一个单例类,代码如下:
/**
 * 
 * 重建缓存的内存队列
 * @author bill
 * @date 2017年10月3日 上午11:39:48
 */
public class RebuildCacheQueue {

    /**
     * 内存队列
     */
    private ArrayBlockingQueue<ProductInfo> queue = new ArrayBlockingQueue<ProductInfo>(1000);


    /**
     * 将商品信息对象加入队列
     * @param productInfo 商品信息对象
     */
    public void putProductInfo(ProductInfo productInfo){
        try {
            queue.put(productInfo);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     *  从队列中获取商品信息对象
     * @return 商品信息对象
     */
    public ProductInfo takeProductInfo(){
        try {
            return queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 单例有很多种方式去实现,这里采取绝对线程安全的一种方式
     * 静态内部类的方式,去初始化单例
     */
    private static class Singleton {

        private static RebuildCacheQueue instance;

        static {
            instance = new RebuildCacheQueue();
        }

        private static RebuildCacheQueue getInstance(){
            return instance;
        }
    }

    /**
     * jvm 的机制去保证多线程并发安全
     * 内部类的初始化,一定只会发生一次,不管多少个线程并发去初始化
     */
    public static RebuildCacheQueue getInstance(){
        return Singleton.getInstance();
    }
}
  • 创建一个缓存重建队列消费线程 RebuilCacheThread,进行重建缓存队列消费,没什么好说的了,上面流程已经讲了很清楚了,直接看代码吧:
/**
 * 
 * 重建缓存队列消费线程
 * @author bill
 * @date 2017年10月3日 上午11:52:40
 */
public class RebuilCacheThread implements Runnable{

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageProcessor.class);

    private static final SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");

    @Override
    public void run() {
        // 获取重建缓存队列实例
        RebuildCacheQueue rebuildCacheQueue = RebuildCacheQueue.getInstance(); 
        // 获取zookeeperSession 实例
        ZookeeperSession zkSession = ZookeeperSession.getIntance();
        CacheService cacheService = SpringContext.applicationContext.getBean(CacheService.class);
        while(true){
            ProductInfo productInfo = rebuildCacheQueue.takeProductInfo();
            // 获取zookeeper分布式锁
            zkSession.acquireDistributedLock(productInfo.getId());
            // 获取到了锁
            // 先从redis 中获取当前最新数据
            ProductInfo redisLastProductInfo = cacheService.getProductInfoFromRedisCache(productInfo.getId());
            if(null != redisLastProductInfo){
                //比较更新时间,redis中的时间 与现有数据比较.redis product info 比现有数据小则更新,否则不更新redis
                try {
                    Date date = sdf.parse(productInfo.getModifiedTime());
                    Date redisLastProductInfoDate = sdf.parse(redisLastProductInfo.getModifiedTime());
                    if(date.before(redisLastProductInfoDate)){
                        LOGGER.debug("无需更新  > 现有数据 date:[{}] - before redis 最新版本  date:[{}]", date, redisLastProductInfoDate);
                        // 无需更新,直接返回
                        continue;
                    }
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                LOGGER.debug("current product info date is after redis product info date , to update redis");
            }else{
                LOGGER.debug("product Info is null, to update redis");
            }
            // 更新本地 ehcache 缓存
            cacheService.saveProductInfo2LocalCache(productInfo);
            // redis 缓存
            cacheService.saveProductInfo2RedisCache(productInfo);
            // 释放 zookeeper 分布式锁
            zkSession.releaseDistributedLock(productInfo.getId());
        }
    }
}
  • http 请求时,无缓存数据,重建缓存,加入重建缓存队列
    /**
     * 获取商品信息
     * @param productId 商品id
     * @return 商品信息
     */
    @GetMapping("/getProductInfo")
    public ProductInfo getProductInfo(Long productId){
        ProductInfo productInfo = null;
        try {
            productInfo = cacheService.getProductInfoFromRedisCache(productId);
            LOGGER.debug("从redis中获取缓存,商品信息: {}", productInfo); 
            if(productInfo == null){
                productInfo = cacheService.getProductInfoFromLocalCache(productId);
                LOGGER.debug("从ehcache中获取缓存,商品信息: {}", productInfo);
            }
            if(productInfo == null){
                //走 数据源重新拉数据并重建缓存,注意这里笔者就直接写死数据了
                String productInfoJSON = "{"id": 10, "name": "iphone7手机", "price": 5599, "pictureList":"a.jpg,b.jpg", "specification": "iphone7的规格", "service": "iphone7的售后服务", "color": "红色,白色,黑色", "size": "5.5", "shopId": 1, "modifiedTime": "2017-10-3 12:30:01"}";
                productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
                // 将数据推送到一个内存队列中消费(重建缓存的内存队列)
                RebuildCacheQueue.getInstance().putProductInfo(productInfo);
            }
        } catch (Exception e) {}
        return productInfo;
    }

好了,缓存主动更新 和 缓存被动重建三部曲就完了,可以很清楚的看到,代码量不多,主要集中在更新缓存前获取分布式锁即可,结合缓存重建分析图

接下来要干啥呢,有个很重要的环节没有做,那就是测试,不到黄河不死心,看不到效果,我也不信,那我们就拿出来遛遛吧

上一篇 分布式缓存重建并发冲突问题以及zookeeper分布式锁解决方案, 主要讲解了分布式缓存重建冲突原因及利用zookeeper分布式锁解决缓存重建冲突问题,本篇接着上篇,实现上篇思路,带你利用zookeeper代码实现分布式锁解决重建缓存冲突问题。

缓存主动更新

我们找呀找,终于找到了消费kafka 商品变更消息线程 KafkaMessageProcessor

在更新之前先获取锁,得到锁后,先获取redis 中 数据跟 当前商品数据时间版本对比,当前数据比缓存数据更靠后(更新),则更新,相关代码如下:

        // 在数据写入redis 缓存之前,先获取 zookeeper 分布式锁,确保缓存重建冲突
        ZookeeperSession zkSession = ZookeeperSession.getIntance();
        zkSession.acquireDistributedLock(productId);

        // 获取到了锁
        // 先从redis 中获取当前最新数据
        ProductInfo redisLastProductInfo = cacheService.getProductInfoFromRedisCache(productId);
        if(null != redisLastProductInfo){
            //比较更新时间,redis中的时间 与现有数据比较.redis product info 比现有数据小则更新,否则不更新redis
            try {
                Date date = sdf.parse(productInfo.getModifiedTime());
                Date redisLastProductInfoDate = sdf.parse(redisLastProductInfo.getModifiedTime());
                if(date.before(redisLastProductInfoDate)){
                    LOGGER.debug("无需更新  > 现有数据 date:[{}] - before redis 最新版本  date:[{}]", date, redisLastProductInfoDate);
                    // 无需更新,直接返回
                    return;
                }
            } catch (ParseException e) {
                e.printStackTrace();
            }
            LOGGER.debug("current product info date is after redis product info date , to update redis");
        }else{
            LOGGER.debug("product Info is null, to update redis");
        }

        /** 此休眠为了延迟,更好的查看打印效果 -----生产环境去掉即可----- start*/
        try {
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        /** 此休眠为了延迟,更好的查看打印效果 -----生产环境去掉即可----- end*/

        // 更新本地 ehcache 缓存
        cacheService.saveProductInfo2LocalCache(productInfo);
        LOGGER.debug("获取刚保存到本地缓存的商品信息:[{}]", cacheService.getProductInfoFromLocalCache(productId));

        // 更新redis 缓存
        cacheService.saveProductInfo2RedisCache(productInfo);
        // 释放 zookeeper 分布式锁
        zkSession.releaseDistributedLock(productId);

好了,缓存主动更新就完了,其实就这么简单,你懂了没有?

接下来继续,缓存被动重建那就是从http 入手了,请求进来后,先到redis cluster中获取商品数据,发现没有,然后又到本地ehcache 中获取,发现也没有,这时候就到源数据服务中拉取mysql 商品数据,这时候是不是要更新到redis cluster 以及 ehcache 中呢,那是必须的,所以这里就有发生缓存重建冲突的可能。

<dependency>
      <groupId>org.apache.zookeeper</groupId>
     <artifactId>zookeeper</artifactId>
     <version>3.4.5</version>
     <exclusions>
      <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
     </exclusion>
    </exclusions>
</dependency>
测试效果

期望效果,先看到控制台打印kafka 的消费日志,等kafka 消费线程释放分布式锁后,才能看到缓存被动重建 获取分布式锁,并更新redis,并且返回modifiedTime:2017-10-3 12:30:01 的商品信息,同时redis cluster 中的数据也必须是 modifiedTime:2017-10-3 12:30:01 的商品信息,表示测试通过,这里都说得我有点迫不及待了,来吧,试试

彩民之家论坛9066777 3

主动更新缓存

彩民之家论坛9066777 4

浏览器请求

彩民之家论坛9066777 5

打印日志

彩民之家论坛9066777 6

redis cluster 最新商品数据

好了,是不是很激动呀,今天的讲解就到这里哈,赶紧去试试吧!

注:这里只是以商品信息为例来讲解利用分布式锁解决缓存重建冲突,其他如商铺信息等也是同理,这里希望告诉你的是方法,就不一一演示了。

以上就是本章内容,如有不对的地方,请多多指教,谢谢!

为了方便有需要的人,本系列全部软件都在 https://pan.baidu.com/s/1qYsJZfY

下章预告:主要讲解 简单看 storm

代码地址附上:https://github.com/bill5/cache-project/tree/master/cache-cache

作者:逐暗者 (转载请注明出处)

zookeeper分布式锁的解决逻辑思路

那,接下来要干什么呢,自然就是缓存主动更新 或者 缓存被动重建代码中,加入分布式锁啦,让多个请求串行执行,即可

  • 缓存主动更新
    我们监听kafka中的缓存操作消息队列,当接收到一个商品变更消息后,我们会立即根据源数据服务获取商品最新信息,然后更新到ehcache 和 redis cluster 中,这种情况笔者将之称为缓存主动更新
  • 缓存被动重建
    当nginx 请求获取商品信息时,发现redis cluster 和 ehcache 中都没有获取到相关商品信息,这时候就需要到源数据服务中拉取商品信息,这时候我们需要同步更新到redis cluster 和 ehcache 中,然后返回nginx 并进行nginx 本地缓存,这种情况笔者将之称为缓存被动重建

那下面我们就进入coding 环节,来吧!(注:以下代码层面只针对分布式锁,其他不做介绍,具体其他设计实现会单独剥离讲解

  • 变更缓存重建或者空缓存请求重建,更新redis之前,先获取对应商品id的分布式锁
  • 拿到分布式锁后,做时间版本比较,如果自己的版本新于redis中的版本,那么就更新,否则就不更新
  • 如果拿不到分布式锁,那么就等待,不断轮询等待,直到自己获取到分布式的锁

从上图我们可以看出:

初始化zookeeper

缓存重建分析图

彩民之家论坛9066777 7

这里笔者在着重讲下,当缓存数据由于个方面因素(如LRU等算法)清理了,这时候缓存主动更新 和 缓存在高并发或者特殊情况下,同时进行时,缓存重建冲突就悲剧的发生了(注:上篇说多个缓存服务实例时,出现分布式缓存重建冲突没错,但是就算不是多缓存实例服务,单个也会发生,只要两者同时发生即可,这里着重补充一下

版权声明:本文由彩民之家高手论坛发布于编程技术,转载请注明出处:SpringBoot整合Kafka和Storm彩民之家论坛9066777