请选择 进入手机版 | 继续访问电脑版

技术控

    今日:223| 主题:53974
收藏本版 (1)
最新软件应用技术尽在掌握

[其他] Flume架构与源码分析-核心组件分析-2

[复制链接]
小悸动Octobse 发表于 2016-11-29 09:35:04
278 7

立即注册CoLaBug.com会员,免费获得投稿人的专业资料,享用更多功能,玩转个人品牌!

您需要 登录 才可以下载或查看,没有帐号?立即注册

x

Flume架构与源码分析-核心组件分析-2

Flume架构与源码分析-核心组件分析-2

  4、整体流程

  从以上部分我们可以看出,不管是Source还是Sink都依赖Channel,那么启动时应该先启动Channel然后再启动Source或Sink即可。
  Flume有两种启动方式:使用EmbeddedAgent内嵌在Java应用中或使用Application单独启动一个进程,此处我们已Application分析为主。
  首先进入org.apache.flume.node.Application的main方法启动:
  Java代码

  1. //1、设置默认值启动参数、参数是否必须的   
  2. Options options = new Options();   
  3. Option option = new Option("n", "name", true, "the name of this agent");   
  4. option.setRequired(true);   
  5. options.addOption(option);   
  6.    
  7. option = new Option("f", "conf-file", true,   
  8. "specify a config file (required if -z missing)");   
  9. option.setRequired(false);   
  10. options.addOption(option);   
  11.    
  12. //2、接着解析命令行参数   
  13. CommandLineParser parser = new GnuParser();   
  14. CommandLine commandLine = parser.parse(options, args);   
  15.    
  16. String agentName = commandLine.getOptionValue('n');   
  17. boolean reload = !commandLine.hasOption("no-reload-conf");   
  18.    
  19. if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {   
  20.   isZkConfigured = true;   
  21. }   
  22.    
  23. if (isZkConfigured) {   
  24.     //3、如果是通过ZooKeeper配置,则使用ZooKeeper参数启动,此处忽略,我们以配置文件讲解   
  25. } else {   
  26.   //4、打开配置文件,如果不存在则快速失败   
  27.   File configurationFile = new File(commandLine.getOptionValue('f'));   
  28.   if (!configurationFile.exists()) {   
  29.          throw new ParseException(   
  30.         "The specified configuration file does not exist: " + path);   
  31.   }   
  32.   List<LifecycleAware> components = Lists.newArrayList();   
  33.    
  34.   if (reload) { //5、如果需要定期reload配置文件,则走如下方式   
  35.     //5.1、此处使用Guava提供的事件总线   
  36.     EventBus eventBus = new EventBus(agentName + "-event-bus");   
  37.     //5.2、读取配置文件,使用定期轮训拉起策略,默认30s拉取一次   
  38.     PollingPropertiesFileConfigurationProvider configurationProvider =   
  39.         new PollingPropertiesFileConfigurationProvider(   
  40.           agentName, configurationFile, eventBus, 30);   
  41.     components.add(configurationProvider);   
  42.     application = new Application(components); //5.3、向Application注册组件   
  43.     //5.4、向事件总线注册本应用,[email protected]法   
  44.     eventBus.register(application);   
  45.    
  46.   } else { //5、配置文件不支持定期reload   
  47.     PropertiesFileConfigurationProvider configurationProvider =   
  48.         new PropertiesFileConfigurationProvider(   
  49.           agentName, configurationFile);   
  50.     application = new Application();   
  51.     //6.2、直接使用配置文件初始化Flume组件   
  52.     application.handleConfigurationEvent(configurationProvider   
  53.       .getConfiguration());   
  54.   }   
  55. }   
  56. //7、启动Flume应用   
  57. application.start();   
  58.    
  59. //8、注册虚拟机关闭钩子,当虚拟机关闭时调用Application的stop方法进行终止   
  60. final Application appReference = application;   
  61. Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {   
  62.   @Override   
  63.   public void run() {   
  64.     appReference.stop();   
  65.   }   
  66. });   
复制代码
以上流程只提取了核心代码中的一部分,比如ZK的实现直接忽略了,而Flume启动大体流程如下:
  1、读取命令行参数;
  2、读取配置文件;
  3、根据是否需要reload使用不同的策略初始化Flume;如果需要reload,则使用Guava的事件总线实现,Application的handleConfigurationEvent是事件订阅者,PollingPropertiesFileConfigurationProvider是事件发布者,其会定期轮训检查文件是否变更,如果变更则重新读取配置文件,发布配置文件事件变更,而handleConfigurationEvent会收到该配置变更重新进行初始化;
  4、启动Application,并注册虚拟机关闭钩子。
  handleConfigurationEvent方法比较简单,首先调用了stopAllComponents停止所有组件,接着调用startAllComponents使用配置文件初始化所有组件:
  Java代码

  1. @Subscribe   
  2. public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {   
  3.   stopAllComponents();   
  4.   startAllComponents(conf);   
  5. }     
复制代码
MaterializedConfiguration存储Flume运行时需要的组件:Source、Channel、Sink、SourceRunner、SinkRunner等,其是通过ConfigurationProvider进行初始化获取,比如PollingPropertiesFileConfigurationProvider会读取配置文件然后进行组件的初始化。
  对于startAllComponents实现大体如下:
  Java代码

  1. //1、首先启动Channel   
  2. supervisor.supervise(Channels,   
  3.       new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);   
  4. //2、确保所有Channel是否都已启动   
  5. for(Channel ch: materializedConfiguration.getChannels().values()){   
  6.   while(ch.getLifecycleState() != LifecycleState.START   
  7.       && !supervisor.isComponentInErrorState(ch)){   
  8.     try {   
  9.       Thread.sleep(500);   
  10.     } catch (InterruptedException e) {   
  11.         Throwables.propagate(e);   
  12.     }   
  13.   }   
  14. }   
  15. //3、启动SinkRunner   
  16. supervisor.supervise(SinkRunners,      
  17. new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);   
  18. //4、启动SourceRunner   
  19. supervisor.supervise(SourceRunner,   
  20. new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);   
  21. //5、初始化监控服务   
  22. this.loadMonitoring();   
复制代码
从如下代码中可以看到,首先要准备好Channel,因为Source和Sink会操作它,对于Channel如果初始化失败则整个流程是失败的;然后启动SinkRunner,先准备好消费者;接着启动SourceRunner开始进行采集日志。此处我们发现有两个单独的组件LifecycleSupervisor和MonitorService,一个是组件守护哨兵,一个是监控服务。守护哨兵对这些组件进行守护,假设出问题了默认策略是自动重启这些组件。
  对于stopAllComponents实现大体如下:
  Java代码

  1. //1、首先停止SourceRunner   
  2. supervisor.unsupervise(SourceRunners);   
  3. //2、接着停止SinkRunner   
  4. supervisor.unsupervise(SinkRunners);   
  5. //3、然后停止Channel   
  6. supervisor.unsupervise(Channels);   
  7. //4、最后停止MonitorService   
  8. monitorServer.stop();     
复制代码
此处可以看出,停止的顺序是Source、Sink、Channel,即先停止生产,再停止消费,最后停止管道。
  Application中的start方法代码实现如下:
  Java代码

  1. public synchronized void start() {   
  2.   for(LifecycleAware component : components) {   
  3.     supervisor.supervise(component,   
  4.         new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);   
  5.   }   
  6. }     
复制代码
其循环Application注册的组件,然后守护哨兵对它进行守护,默认策略是出现问题会自动重启组件,假设我们支持reload配置文件,则之前启动Application时注册过PollingPropertiesFileConfigurationProvider组件,即该组件会被守护哨兵守护着,出现问题默认策略自动重启。
  而Application关闭执行了如下动作:
  Java代码

  1. public synchronized void stop() {   
  2.   supervisor.stop();   
  3.   if(monitorServer != null) {   
  4.     monitorServer.stop();   
  5.   }   
  6. }     
  7.   
复制代码
即关闭守护哨兵和监控服务。
  到此基本的Application分析结束了,我们还有很多疑问,守护哨兵怎么实现的。
  整体流程可以总结为:
  1、首先初始化命令行配置;
  2、接着读取配置文件;
  3、根据是否需要reload初始化配置文件中的组件;如果需要reload会使用Guava事件总线进行发布订阅变化;
  4、接着创建Application,创建守护哨兵,并先停止所有组件,接着启动所有组件;启动顺序:Channel、SinkRunner、SourceRunner,并把这些组件注册给守护哨兵、初始化监控服务;停止顺序:SourceRunner、SinkRunner、Channel;
  5、如果配置文件需要定期reload,则需要注册Polling***ConfigurationProvider到守护哨兵;
  6、最后注册虚拟机关闭钩子,停止守护哨兵和监控服务。
  轮训实现的SourceRunner 和SinkRunner会创建一个线程进行工作,之前已经介绍了其工作方式。接下来我们看下守护哨兵的实现。
  首先创建LifecycleSupervisor:
  Java代码

  1. //1、用于存放被守护的组件   
  2. supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();   
  3. //2、用于存放正在被监控的组件   
  4. monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();   
  5. //3、创建监控服务线程池   
  6. monitorService = new ScheduledThreadPoolExecutor(10,   
  7.     new ThreadFactoryBuilder().setNameFormat(   
  8.         "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")   
  9.         .build());   
  10. monitorService.setMaximumPoolSize(20);   
  11. monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);   
  12. //4、定期清理被取消的组件   
  13. purger = new Purger();   
  14. //4.1、默认不进行清理   
  15. needToPurge = false;     
复制代码
LifecycleSupervisor启动时会进行如下操作:
  Java代码

  1. public synchronized void start() {   
  2.   monitorService.scheduleWithFixedDelay(purger, 2, 2, TimeUnit.HOURS);   
  3.   lifecycleState = LifecycleState.START;   
  4. }     
复制代码
首先每隔两个小时执行清理组件,然后改变状态为启动。而LifecycleSupervisor停止时直接停止了监控服务,然后更新守护组件状态为STOP:
  Java代码

  1. //1、首先停止守护监控服务   
  2. if (monitorService != null) {   
  3.   monitorService.shutdown();   
  4.   try {   
  5.     monitorService.awaitTermination(10, TimeUnit.SECONDS);   
  6.   } catch (InterruptedException e) {   
  7.     logger.error("Interrupted while waiting for monitor service to stop");   
  8.   }   
  9. }   
  10. //2、更新所有守护组件状态为STOP,并调用组件的stop方法进行停止   
  11. for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses.entrySet()) {   
  12.   if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {   
  13.     entry.getValue().status.desiredState = LifecycleState.STOP;   
  14.     entry.getKey().stop();   
  15.   }   
  16. }   
  17. //3、更新本组件状态   
  18. if (lifecycleState.equals(LifecycleState.START)) {   
  19.   lifecycleState = LifecycleState.STOP;   
  20. }     
  21. //4、最后的清理   
  22. supervisedProcesses.clear();     
  23. monitorFutures.clear();     
复制代码
接下来就是调用supervise进行组件守护了:
  Java代码

  1. if(this.monitorService.isShutdown() || this.monitorService.isTerminated()   
  2.   || this.monitorService.isTerminating()){   
  3.     //1、如果哨兵已停止则抛出异常,不再接收任何组件进行守护   
  4.   }   
  5.   //2、初始化守护组件   
  6.   Supervisoree process = new Supervisoree();   
  7.   process.status = new Status();   
  8.   //2.1、默认策略是失败重启   
  9.   process.policy = policy;   
  10.   //2.2、初始化组件默认状态,大多数组件默认为START   
  11.   process.status.desiredState = desiredState;   
  12.   process.status.error = false;   
  13.   //3、组件监控器,用于定时获取组件的最新状态,或者重新启动组件   
  14.   MonitorRunnable monitorRunnable = new MonitorRunnable();   
  15.   monitorRunnable.lifecycleAware = lifecycleAware;   
  16.   monitorRunnable.supervisoree = process;   
  17.   monitorRunnable.monitorService = monitorService;   
  18.    
  19.   supervisedProcesses.put(lifecycleAware, process);   
  20.   //4、定期的去执行组件监控器,获取组件最新状态,或者重新启动组件   
  21.   ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(   
  22.       monitorRunnable, 0, 3, TimeUnit.SECONDS);   
  23.   monitorFutures.put(lifecycleAware, future);   
  24. }   
复制代码
如果不需要守护了,则需要调用unsupervise:
  Java代码

  1. @Subscribe   
  2. public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {   
  3.   stopAllComponents();   
  4.   startAllComponents(conf);   
  5. }     
  6. 0
复制代码
接下来我们再看下MonitorRunnable的实现,其负责进行组件状态迁移或组件故障恢复:
  Java代码

  1. @Subscribe   
  2. public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {   
  3.   stopAllComponents();   
  4.   startAllComponents(conf);   
  5. }     
  6. 1
复制代码
如上代码进行了一些简化,整体逻辑即定时去采集组件的状态,如果发现守护组件和组件的状态不一致,则可能需要进行启动或停止。即守护监视器可以用来保证组件如能失败后自动启动。默认策略是总是失败后重启,还有一种策略是只启动一次。
  【本文是51CTO专栏作者张开涛的原创文章,作者微信公众号:开涛的博客,id:kaitao-1234567】



上一篇:iOS 中的各种锁
下一篇:[工具资源] koth/kcws: Deep Learning Chinese Word Segment
hei298 发表于 2016-11-29 12:23:56
生活可以将就,生活也可以讲究!
回复 支持 反对

使用道具 举报

莽荒纪 发表于 2016-11-29 15:17:32
小悸动Octobse的帖子提神醒脑啊!
回复 支持 反对

使用道具 举报

三年痛七年痒 发表于 2016-11-30 13:24:57
小悸动Octobse敢整点更有创意的不?兄弟们等着围观捏~
回复 支持 反对

使用道具 举报

ptyks 发表于 2016-12-3 02:02:19
现在问题来了,“挖掘机技术哪家强?找ptyks”
回复 支持 反对

使用道具 举报

董建新 发表于 2016-12-17 15:00:45
楼主,星期六一路顺风!
回复 支持 反对

使用道具 举报

54654 发表于 2016-12-18 16:29:37
看一漂亮MM,苦无搭讪办法,路旁一砖头,拣起,上前,“同学,这是你掉的吧?”
回复 支持 反对

使用道具 举报

swfly 发表于 2016-12-26 22:11:08
一个人快活,两个人生活,三个人就是你死我活。
回复 支持 反对

使用道具 举报

*滑动验证:
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

我要投稿

推荐阅读

扫码访问 @iTTTTT瑞翔 的微博
回页顶回复上一篇下一篇回列表
手机版/CoLaBug.com ( 粤ICP备05003221号 | 文网文[2010]257号 )

© 2001-2017 Comsenz Inc. Design: Dean. DiscuzFans.

返回顶部 返回列表