DataX学习之HelloWorld

微信扫一扫,分享到朋友圈

DataX学习之HelloWorld

最近去公司的大数据部门轮岗,接的第一个任务就是异构数据的导出项目,趁着机会学习了一下DataX框架,开一个系列来记录一下。

项目背景

既然是异构数据的导出,那肯定少不了数据源和目标源。数据源有hive和kafka,目标源涵盖很多,像hbase,hive,kafka,es等等,之前公司的做法是通过mr任务去跑数据,但是配置很繁琐,而且基本都是祖传配置,需要人工拷贝,所以趁着这次重构,希望在前端展示上做一个产品化的平台来cover这些工作量,并且在后端核心的导出逻辑上使用一些其他方式,抛弃mr任务。

经过研讨之后项目组决定以hive为目标源的项目还是使用mr任务去跑,而以kafka为数据源的项目则使用DataX这个开源框架。

何为DataX

根据Github上的描述,DataX就是一个离线数据的同步工具,来解决异构数据源之间的高效同步。在围观了一下代码之后发现这个项目确实不错,对于我这样一个大数据的新手玩家来说,只需要编写简单的reader和writer插件就可以完成任务,颇有serverless的感觉。

不过DataX还有一个问题,就是貌似只支持单机而不支持集群,所以真正投入生产可能还得配合成熟的调度框架才行。

Hello Datax

作为程序员,当然还是要以代码说话,在研究了半天时间之后,我决定动手写一个DataX的kafkareader插件的小demo用来上手。

对于DataX插件的资料大家可以去看 DataX插件开发宝典
这篇文章,写的十分详细了。

既然要开发kafkareader,那肯定是要将DataX和kafka整合到一起了,话不多说,先看代码吧:

public class KafkaReader extends Reader {

   public static class Job extends Reader.Job {

      private Configuration originalConfig = null;

      @Override
      public void init() {
         this.originalConfig = super.getPluginJobConf();
      }

      @Override
      public void destroy() {

      }

      @Override
      public List<Configuration> split(int adviceNumber) {
         List<Configuration> configurations = new ArrayList<Configuration>();

         for (int i = 0; i < adviceNumber; i++) {
            configurations.add(this.originalConfig.clone());
         }
         return configurations;
      }
   }

   public static class Task extends Reader.Task {

      private List<String> topics = new ArrayList<String>();

      @Override
      public void init() {
         topics.add("kafka_topic");
      }

      @Override
      public void destroy() {

      }

      @Override
      public void startRead(RecordSender recordSender) {
         Properties props = new Properties();
         props.put("bootstrap.servers", "clusters_ip");
         props.put("group.id", "kzTestGroup");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("session.timeout.ms", "30000");
         props.put("max.poll.records", 1000);
         props.put("auto.offset.reset", "latest");
         props.put("key.deserializer", StringDeserializer.class.getName());
         props.put("value.deserializer", StringDeserializer.class.getName());
         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
         consumer.subscribe(topics);

         Record oneRecord = null;

         while(true) {
            System.out.println("start polling kafka msg...");

            ConsumerRecords<String,String> records = consumer.poll(100);
            Iterator<ConsumerRecord<String,String>> iterator = records.iterator();
            if(!iterator.hasNext()) {
               System.out.println("no data...");
               //break;
            }
            System.out.println("msg arriving...");
            if(iterator.hasNext()) {
               ConsumerRecord<String,String> record = iterator.next();
               oneRecord = buildOneRecord(recordSender, record.value());
               if (oneRecord != null) {
                  recordSender.sendToWriter(oneRecord);
               }
            }

            try {
               Thread.sleep(3000);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
      }

      private Record buildOneRecord(RecordSender recordSender, String value) {
         System.out.println("msg value => " + value);
         Record record = recordSender.createRecord();
         record.addColumn(new StringColumn(value));
         return record;
      }
   }

}

这个demo的代码十分简单,根据DataX插件开发宝典的指示,创建一个Reader,并有2个静态内部类Job和Task。

在Task的startRead方法中去创建一个kafka的consumer并且监听topic,将数据封装成DataX所需要的recode并通过recordSender传递出去就可以了。

当然,这个demo是非常简单的一个例子,对于要投入生产的代码,我们需要做的是参数化,比如kafka的集群地址,topic需要用参数的方式配置,还有就是和位点相关的逻辑,是否需要从头开始消费等等。

最后就是Job类的Task方法了,这里我直接使用了adviceNumber去创建多个Task,根据kafka的特别,我个人推荐是直接使用kafka的partition数量去创建Task,比较合理。

后记

这篇文章算是DataX框架学习使用的一个开篇,随着公司项目的进行,肯定会有更多的知识需要学习,文章也会继续更新~

终于跑通分布式事务框架tcc-transaction的示例项目

上一篇

小米9 5G版有望在下月亮相:骁龙855 Plus+UFS 3.0闪存

下一篇

你也可能喜欢

DataX学习之HelloWorld

长按储存图像,分享给朋友