列式存储格式之parquet读写

概述

Apache Parquet是Hadoop生态系统中任何项目均可使用的列式存储格式,更高压缩比以及更小IO操作。网上许多写入parquet需要在本地安装haddop环境,下面介绍一种不需要安装haddop即可写入parquet文件的方式,以及通过两种方式来读取parquet文件。下面开始入坑了…

parquet写入

1.pom依赖

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.8.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.8.1</version>
</dependency>

2.定义schema(实体类)

package com.kestrel;
public class User {
private String id;
private String name;
private String password;
public User() {
}
public User(String id, String name, String password) {
this.id = id;
this.name = name;
this.password = password;
}
public String getId() {
return id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return "User{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", password='" + password + '\'' +
'}';
}
}
  1. AvroParquetWriter 写入

    List<User> users = new ArrayList<>();
    User user1 = new User("1","huangchixin","123123");
    User user2 = new User("2","huangchixin2","123445");
    users.add(user1);
    users.add(user2);
    Path dataFile = new Path("./tmp/demo.snappy.parquet");
    // Write as Parquet file.
    try (ParquetWriter<User> writer = AvroParquetWriter.<User>builder(dataFile)
    .withSchema(ReflectData.AllowNull.get().getSchema(User.class))
    .withDataModel(ReflectData.get())
    .withConf(new Configuration())
    .withCompressionCodec(SNAPPY)
    .withWriteMode(OVERWRITE)
    .build()) {
    for (User user : users) {
    writer.write(user);
    }
    }

    parquet读取

    1. AvroParquetReader读取,需要指定对象实例,或者也可自定义json 字符串
    // Read from Parquet file.
    try (ParquetReader<User> reader = AvroParquetReader.<User>builder(dataFile)
    .withDataModel(new ReflectData(User.class.getClassLoader()))
    .disableCompatibility()
    .withConf(new Configuration())
    .build()) {
    User user;
    while ((user = reader.read()) != null) {
    System.out.println(user);
    }
    }
    1. ParquetFileReader读取,不需要

      • 列实体
      package com.kestrel;
      /**
      * @Auther: 12640
      * @Date: 2021/1/1 15:13
      * @Description:
      */
      public class TableHead {
      /**
      * 列名
      */
      private String name;
      /**
      * 存储 列的 数据类型
      */
      private String type;
      /**
      * 所在列
      */
      private Integer index;
      public String getType() {
      return type;
      }
      public void setType(String type) {
      this.type = type;
      }
      public String getName() {
      return name;
      }
      public void setName(String name) {
      this.name = name;
      }
      public Integer getIndex() {
      return index;
      }
      public void setIndex(Integer index) {
      this.index = index;
      }
      }
      • parquet 实体类
      package com.kestrel;
      import java.util.List;
      /**
      * @Auther: 12640
      * @Date: 2021/1/1 15:14
      * @Description:
      */
      public class TableResult {
      /**
      * 解析文件的表头信息 暂时只对 arrow,csv 文件有效
      */
      private List< TableHead> columns;
      /**
      * 数据内容
      */
      private List<?> data;
      public List< TableHead> getColumns() {
      return columns;
      }
      public void setColumns(List< TableHead> columns) {
      this.columns = columns;
      }
      public List<?> getData() {
      return data;
      }
      public void setData(List<?> data) {
      this.data = data;
      }
      }
      • 读取parquet文件
      import com.fasterxml.jackson.databind.ObjectMapper;
      import com.google.common.collect.Lists;
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.parquet.column.page.PageReadStore;
      import org.apache.parquet.example.data.Group;
      import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
      import org.apache.parquet.format.converter.ParquetMetadataConverter;
      import org.apache.parquet.hadoop.ParquetFileReader;
      import org.apache.parquet.hadoop.ParquetReader;
      import org.apache.parquet.hadoop.example.GroupReadSupport;
      import org.apache.parquet.hadoop.metadata.ParquetMetadata;
      import org.apache.parquet.io.ColumnIOFactory;
      import org.apache.parquet.io.MessageColumnIO;
      import org.apache.parquet.io.RecordReader;
      import org.apache.parquet.schema.GroupType;
      import org.apache.parquet.schema.MessageType;
      import org.apache.parquet.schema.OriginalType;
      import org.apache.parquet.schema.Type;
      import java.io.File;
      import java.io.IOException;
      import java.util.ArrayList;
      import java.util.List;
      public class ReadParquet {
      public static void main(String[] args) throws Exception {
      TableResult tableResult = parquetReaderV2(new File("./tmp/demo.snappy.parquet"));
      ObjectMapper mapper = new ObjectMapper();
      String jsonString = mapper.writerWithDefaultPrettyPrinter()
      .writeValueAsString(tableResult);
      System.out.println(jsonString);
      }
      public static TableResult parquetReaderV2(File file) throws Exception {
      long start = System.currentTimeMillis();
      haddopEnv();
      Path path = new Path(file.getAbsolutePath());
      Configuration conf = new Configuration();
      TableResult table = new TableResult();
      //二位数据列表
      List<List<Object>> dataList = Lists.newArrayList();
      ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER);
      MessageType schema = readFooter.getFileMetaData().getSchema();
      ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, readFooter.getBlocks(), schema.getColumns());
      //        1.9.0使用以下创建对象
      //        ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
      PageReadStore pages = null;
      try {
      while (null != (pages = r.readNextRowGroup())) {
      final long rows = pages.getRowCount();
      //                logger.info(file.getName()+" 行数: " + rows);
      final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
      final RecordReader<Group> recordReader = columnIO.getRecordReader(pages,
      new GroupRecordConverter(schema));
      for (int i = 0; i <= rows; i++) {
      //                    System.out.println(recordReader.shouldSkipCurrentRecord());
      final Group g = recordReader.read();
      if (i == 0) {
      // 设置表头列名
      table.setColumns(parquetColumn(g));
      i++;
      }
      // 获取行数据
      List<Object> row = getparquetData(table.getColumns(), g);
      dataList.add(row);
      // printGroup(g);
      }
      }
      } finally {
      r.close();
      }
      //        logger.info(file.getName()+" 加载时间:"+(System.currentTimeMillis() - start));
      table.setData(dataList);
      return table;
      }
      //新版本中new ParquetReader()所有构造方法好像都弃用了,用上面的builder去构造对象
      static void parquetReader(String inPath) throws Exception{
      GroupReadSupport readSupport = new GroupReadSupport();
      ParquetReader<Group> reader = new ParquetReader<Group>(new Path(inPath),readSupport);
      Group line=null;
      while((line=reader.read())!=null){
      System.out.println(line.toString());
      }
      System.out.println("读取结束");
      }
      private static List<Object> getparquetData(List<TableHead> columns, Group line) {
      List<Object> row = new ArrayList<>();
      Object cellStr = null;
      for (int i = 0; i < columns.size(); i++) {
      try {
      switch (columns.get(i).getType()) {
      case "DOUBLE":
      cellStr = line.getDouble(i, 0);
      break;
      case "FLOAT":
      cellStr = line.getFloat(i, 0);
      break;
      case "BOOLEAN":
      cellStr = line.getBoolean(i, 0);
      break;
      case "INT96":
      cellStr = line.getInt96(i, 0);
      break;
      case "LONG":
      cellStr = line.getLong(i, 0);
      break;
      default:
      cellStr = line.getValueToString(i, 0);
      }
      } catch (RuntimeException e) {
      } finally {
      row.add(cellStr);
      }
      }
      return row;
      }
      /**
      * 获取arrow 文件 表头信息
      *
      * @param
      * @return
      */
      private static List<TableHead> parquetColumn(Group line) {
      List<TableHead> columns = Lists.newArrayList();
      TableHead dto = null;
      GroupType groupType = line.getType();
      int fieldCount = groupType.getFieldCount();
      for (int i = 0; i < fieldCount; i++) {
      dto = new TableHead();
      Type type = groupType.getType(i);
      String fieldName = type.getName();
      OriginalType originalType = type.getOriginalType();
      String typeName = null;
      if (originalType != null) {
      typeName = originalType.name();
      } else {
      typeName = type.asPrimitiveType().getPrimitiveTypeName().name();
      }
      dto.setIndex(i);
      dto.setName(fieldName);
      dto.setType(typeName);
      columns.add(dto);
      }
      return columns;
      }
      public static void haddopEnv() throws IOException {
      File workaround = new File(".");
      System.getProperties().put("hadoop.home.dir", workaround.getAbsolutePath());
      new File("./bin").mkdirs();
      new File("./bin/winutils.exe").createNewFile();
      }
      }

      微信公众号【Java搬砖小伙子】关注一波,更多资源等着你哦

      您的支持是我前进路上最大的动力,谢谢!

SegmentFault博客
我还没有学会写个人说明!

你也可能喜欢

评论已经被关闭。

插入图片