技术控

    今日:162| 主题:52609
收藏本版 (1)
最新软件应用技术尽在掌握

[其他] Flume架构与源码分析-核心组件分析-2

[复制链接]
小悸动Octobse 发表于 2016-11-29 09:35:04
222 7

立即注册CoLaBug.com会员,免费获得投稿人的专业资料,享用更多功能,玩转个人品牌!

您需要 登录 才可以下载或查看,没有帐号?立即注册

x

Flume架构与源码分析-核心组件分析-2-1 (Channel,option,Java)

  4、整体流程

  从以上部分我们可以看出,不管是Source还是Sink都依赖Channel,那么启动时应该先启动Channel然后再启动Source或Sink即可。
  Flume有两种启动方式:使用EmbeddedAgent内嵌在Java应用中或使用Application单独启动一个进程,此处我们已Application分析为主。
  首先进入org.apache.flume.node.Application的main方法启动:
  Java代码

  [code]//1、设置默认值启动参数、参数是否必须的   
Options options = new Options();   
Option option = new Option("n", "name", true, "the name of this agent");   
option.setRequired(true);   
options.addOption(option);   
   
option = new Option("f", "conf-file", true,   
"specify a config file (required if -z missing)");   
option.setRequired(false);   
options.addOption(option);   
   
//2、接着解析命令行参数   
CommandLineParser parser = new GnuParser();   
CommandLine commandLine = parser.parse(options, args);   
   
String agentName = commandLine.getOptionValue('n');   
boolean reload = !commandLine.hasOption("no-reload-conf");   
   
if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {   
  isZkConfigured = true;   
}   
   
if (isZkConfigured) {   
    //3、如果是通过ZooKeeper配置,则使用ZooKeeper参数启动,此处忽略,我们以配置文件讲解   
} else {   
  //4、打开配置文件,如果不存在则快速失败   
  File configurationFile = new File(commandLine.getOptionValue('f'));   
  if (!configurationFile.exists()) {   
         throw new ParseException(   
        "The specified configuration file does not exist: " + path);   
  }   
  List components = Lists.newArrayList();   
   
  if (reload) { //5、如果需要定期reload配置文件,则走如下方式   
    //5.1、此处使用Guava提供的事件总线   
    EventBus eventBus = new EventBus(agentName + "-event-bus");   
    //5.2、读取配置文件,使用定期轮训拉起策略,默认30s拉取一次   
    PollingPropertiesFileConfigurationProvider configurationProvider =   
        new PollingPropertiesFileConfigurationProvider(   
          agentName, configurationFile, eventBus, 30);   
    components.add(configurationProvider);   
    application = new Application(components); //5.3、向Application注册组件   
    //5.4、向事件总线注册本应用,EventBus会自动注册Application中使用@Subscribe声明的方法   
    eventBus.register(application);   
   
  } else { //5、配置文件不支持定期reload   
    PropertiesFileConfigurationProvider configurationProvider =   
        new PropertiesFileConfigurationProvider(   
          agentName, configurationFile);   
    application = new Application();   
    //6.2、直接使用配置文件初始化Flume组件   
    application.handleConfigurationEvent(configurationProvider   
      .getConfiguration());   
  }   
}   
//7、启动Flume应用   
application.start();   
   
//8、注册虚拟机关闭钩子,当虚拟机关闭时调用Application的stop方法进行终止   
final Application appReference = application;   
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {   
  @Override   
  public void run() {   
    appReference.stop();   
  }   
});   
[/code]  以上流程只提取了核心代码中的一部分,比如ZK的实现直接忽略了,而Flume启动大体流程如下:
  1、读取命令行参数;
  2、读取配置文件;
  3、根据是否需要reload使用不同的策略初始化Flume;如果需要reload,则使用Guava的事件总线实现,Application的handleConfigurationEvent是事件订阅者,PollingPropertiesFileConfigurationProvider是事件发布者,其会定期轮训检查文件是否变更,如果变更则重新读取配置文件,发布配置文件事件变更,而handleConfigurationEvent会收到该配置变更重新进行初始化;
  4、启动Application,并注册虚拟机关闭钩子。
  handleConfigurationEvent方法比较简单,首先调用了stopAllComponents停止所有组件,接着调用startAllComponents使用配置文件初始化所有组件:
  Java代码

  [code]@Subscribe   
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {   
  stopAllComponents();   
  startAllComponents(conf);   
}     
[/code]  MaterializedConfiguration存储Flume运行时需要的组件:Source、Channel、Sink、SourceRunner、SinkRunner等,其是通过ConfigurationProvider进行初始化获取,比如PollingPropertiesFileConfigurationProvider会读取配置文件然后进行组件的初始化。
  对于startAllComponents实现大体如下:
  Java代码

  [code]//1、首先启动Channel   
supervisor.supervise(Channels,   
      new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);   
//2、确保所有Channel是否都已启动   
for(Channel ch: materializedConfiguration.getChannels().values()){   
  while(ch.getLifecycleState() != LifecycleState.START   
      && !supervisor.isComponentInErrorState(ch)){   
    try {   
      Thread.sleep(500);   
    } catch (InterruptedException e) {   
        Throwables.propagate(e);   
    }   
  }   
}   
//3、启动SinkRunner   
supervisor.supervise(SinkRunners,      
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);   
//4、启动SourceRunner   
supervisor.supervise(SourceRunner,   
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);   
//5、初始化监控服务   
this.loadMonitoring();   
[/code]  从如下代码中可以看到,首先要准备好Channel,因为Source和Sink会操作它,对于Channel如果初始化失败则整个流程是失败的;然后启动SinkRunner,先准备好消费者;接着启动SourceRunner开始进行采集日志。此处我们发现有两个单独的组件LifecycleSupervisor和MonitorService,一个是组件守护哨兵,一个是监控服务。守护哨兵对这些组件进行守护,假设出问题了默认策略是自动重启这些组件。
  对于stopAllComponents实现大体如下:
  Java代码

  [code]//1、首先停止SourceRunner   
supervisor.unsupervise(SourceRunners);   
//2、接着停止SinkRunner   
supervisor.unsupervise(SinkRunners);   
//3、然后停止Channel   
supervisor.unsupervise(Channels);   
//4、最后停止MonitorService   
monitorServer.stop();     
[/code]  此处可以看出,停止的顺序是Source、Sink、Channel,即先停止生产,再停止消费,最后停止管道。
  Application中的start方法代码实现如下:
  Java代码

  [code]public synchronized void start() {   
  for(LifecycleAware component : components) {   
    supervisor.supervise(component,   
        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);   
  }   
}     
[/code]  其循环Application注册的组件,然后守护哨兵对它进行守护,默认策略是出现问题会自动重启组件,假设我们支持reload配置文件,则之前启动Application时注册过PollingPropertiesFileConfigurationProvider组件,即该组件会被守护哨兵守护着,出现问题默认策略自动重启。
  而Application关闭执行了如下动作:
  Java代码

  [code]public synchronized void stop() {   
  supervisor.stop();   
  if(monitorServer != null) {   
    monitorServer.stop();   
  }   
}     
  
[/code]  即关闭守护哨兵和监控服务。
  到此基本的Application分析结束了,我们还有很多疑问,守护哨兵怎么实现的。
  整体流程可以总结为:
  1、首先初始化命令行配置;
  2、接着读取配置文件;
  3、根据是否需要reload初始化配置文件中的组件;如果需要reload会使用Guava事件总线进行发布订阅变化;
  4、接着创建Application,创建守护哨兵,并先停止所有组件,接着启动所有组件;启动顺序:Channel、SinkRunner、SourceRunner,并把这些组件注册给守护哨兵、初始化监控服务;停止顺序:SourceRunner、SinkRunner、Channel;
  5、如果配置文件需要定期reload,则需要注册Polling***ConfigurationProvider到守护哨兵;
  6、最后注册虚拟机关闭钩子,停止守护哨兵和监控服务。
  轮训实现的SourceRunner 和SinkRunner会创建一个线程进行工作,之前已经介绍了其工作方式。接下来我们看下守护哨兵的实现。
  首先创建LifecycleSupervisor:
  Java代码

  [code]//1、用于存放被守护的组件   
supervisedProcesses = new HashMap();   
//2、用于存放正在被监控的组件   
monitorFutures = new HashMap>();   
//3、创建监控服务线程池   
monitorService = new ScheduledThreadPoolExecutor(10,   
    new ThreadFactoryBuilder().setNameFormat(   
        "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")   
        .build());   
monitorService.setMaximumPoolSize(20);   
monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);   
//4、定期清理被取消的组件   
purger = new Purger();   
//4.1、默认不进行清理   
needToPurge = false;     
[/code]  LifecycleSupervisor启动时会进行如下操作:
  Java代码

  [code]public synchronized void start() {   
  monitorService.scheduleWithFixedDelay(purger, 2, 2, TimeUnit.HOURS);   
  lifecycleState = LifecycleState.START;   
}     
[/code]  首先每隔两个小时执行清理组件,然后改变状态为启动。而LifecycleSupervisor停止时直接停止了监控服务,然后更新守护组件状态为STOP:
  Java代码

  [code]//1、首先停止守护监控服务   
if (monitorService != null) {   
  monitorService.shutdown();   
  try {   
    monitorService.awaitTermination(10, TimeUnit.SECONDS);   
  } catch (InterruptedException e) {   
    logger.error("Interrupted while waiting for monitor service to stop");   
  }   
}   
//2、更新所有守护组件状态为STOP,并调用组件的stop方法进行停止   
for (final Entry entry : supervisedProcesses.entrySet()) {   
  if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {   
    entry.getValue().status.desiredState = LifecycleState.STOP;   
    entry.getKey().stop();   
  }   
}   

//3、更新本组件状态   
if (lifecycleState.equals(LifecycleState.START)) {   
  lifecycleState = LifecycleState.STOP;   
}     
//4、最后的清理   

supervisedProcesses.clear();     
monitorFutures.clear();     
[/code]  接下来就是调用supervise进行组件守护了:
  Java代码

  [code] if(this.monitorService.isShutdown() || this.monitorService.isTerminated()   
  || this.monitorService.isTerminating()){   
    //1、如果哨兵已停止则抛出异常,不再接收任何组件进行守护   
  }   
  //2、初始化守护组件   
  Supervisoree process = new Supervisoree();   
  process.status = new Status();   
  //2.1、默认策略是失败重启   
  process.policy = policy;   
  //2.2、初始化组件默认状态,大多数组件默认为START   
  process.status.desiredState = desiredState;   
  process.status.error = false;   
  //3、组件监控器,用于定时获取组件的最新状态,或者重新启动组件   
  MonitorRunnable monitorRunnable = new MonitorRunnable();   
  monitorRunnable.lifecycleAware = lifecycleAware;   
  monitorRunnable.supervisoree = process;   
  monitorRunnable.monitorService = monitorService;   
   
  supervisedProcesses.put(lifecycleAware, process);   
  //4、定期的去执行组件监控器,获取组件最新状态,或者重新启动组件   
  ScheduledFuture future = monitorService.scheduleWithFixedDelay(   
      monitorRunnable, 0, 3, TimeUnit.SECONDS);   
  monitorFutures.put(lifecycleAware, future);   
}   
[/code]  如果不需要守护了,则需要调用unsupervise:
  Java代码

  [code]@Subscribe   
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {   
  stopAllComponents();   
  startAllComponents(conf);   
}     
0[/code]  接下来我们再看下MonitorRunnable的实现,其负责进行组件状态迁移或组件故障恢复:
  Java代码

  [code]@Subscribe   
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {   
  stopAllComponents();   
  startAllComponents(conf);   
}     
1[/code]  如上代码进行了一些简化,整体逻辑即定时去采集组件的状态,如果发现守护组件和组件的状态不一致,则可能需要进行启动或停止。即守护监视器可以用来保证组件如能失败后自动启动。默认策略是总是失败后重启,还有一种策略是只启动一次。
  【本文是51CTO专栏作者张开涛的原创文章,作者微信公众号:开涛的博客,id:kaitao-1234567】
友荐云推荐




上一篇:iOS 中的各种锁
下一篇:[工具资源] koth/kcws: Deep Learning Chinese Word Segment
酷辣虫提示酷辣虫禁止发表任何与中华人民共和国法律有抵触的内容!所有内容由用户发布,并不代表酷辣虫的观点,酷辣虫无法对用户发布内容真实性提供任何的保证,请自行验证并承担风险与后果。如您有版权、违规等问题,请通过"联系我们"或"违规举报"告知我们处理。

hei298 发表于 2016-11-29 12:23:56
生活可以将就,生活也可以讲究!
回复 支持 反对

使用道具 举报

莽荒纪 发表于 2016-11-29 15:17:32
小悸动Octobse的帖子提神醒脑啊!
回复 支持 反对

使用道具 举报

三年痛七年痒 发表于 2016-11-30 13:24:57
小悸动Octobse敢整点更有创意的不?兄弟们等着围观捏~
回复 支持 反对

使用道具 举报

ptyks 发表于 2016-12-3 02:02:19
现在问题来了,“挖掘机技术哪家强?找ptyks”
回复 支持 反对

使用道具 举报

董建新 发表于 2016-12-17 15:00:45
楼主,星期六一路顺风!
回复 支持 反对

使用道具 举报

54654 发表于 2016-12-18 16:29:37
看一漂亮MM,苦无搭讪办法,路旁一砖头,拣起,上前,“同学,这是你掉的吧?”
回复 支持 反对

使用道具 举报

swfly 发表于 2016-12-26 22:11:08
一个人快活,两个人生活,三个人就是你死我活。
回复 支持 反对

使用道具 举报

*滑动验证:
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

我要投稿

推荐阅读

扫码访问 @iTTTTT瑞翔 的微博
回页顶回复上一篇下一篇回列表手机版
手机版/CoLaBug.com ( 粤ICP备05003221号 | 文网文[2010]257号 )|网站地图 酷辣虫

© 2001-2017 Comsenz Inc. Design: Dean. DiscuzFans.

返回顶部 返回列表