大数据之AVRO转ORC
背景
在大数据流行至今,出现了多种文件格式,其中ORC(S3 存储)和AVRO(confluent kafka)有着广泛的应用,今天有意整理了这两种文件格式的转换的基于Java语言的具体代码实现。
Maven依赖
<dependencies><dependency><groupId>org.apache.orcgroupId><artifactId>orc-coreartifactId><version>1.7.6version>dependency><dependency><groupId>org.apache.hadoopgroupId><artifactId>hadoop-commonartifactId><version>2.6.0version>dependency>
<dependency><groupId>com.google.guavagroupId><artifactId>guavaartifactId><version>27.0.1-jreversion>dependency><dependency><groupId>org.apache.avrogroupId><artifactId>avroartifactId><version>1.9.2version>dependency><dependency><groupId>org.springframeworkgroupId><artifactId>spring-coreartifactId><version>5.2.1.RELEASEversion><scope>compilescope>dependency><dependency><groupId>com.google.protobufgroupId><artifactId>protobuf-java-utilartifactId><version>3.20.0version>dependency><dependency><groupId>net.minidevgroupId><artifactId>json-smartartifactId><version>2.4.7version>dependency>dependencies>
code
public class AvroAndOrcTest {public static void main(String[] args) throws IOException {System.setProperty("hadoop.home.dir", "C:\\Users\\lv_ry601\\Desktop\\winutils_jb51\\winutils-master\\hadoop-2.6.3");String avroFile = "test.avro";String avroSchema = "avro_schema.txt";String targetOrcFile = "target.orc";String orcSchemaString = "struct" ;TypeDescription orcSchema = TypeDescription.fromString(orcSchemaString);avroGenerate(avroSchema,avroFile,"10");readAvroWriteOrc(new Schema.Parser().parse(new File(avroSchema)),orcSchema,avroFile,targetOrcFile);readOrc(targetOrcFile,orcSchema);}private static void readAvroWriteOrc(Schema avroSchema, TypeDescription orcSchema, String avroFile, String targetOrcFile) {VectorColumnFiller.JsonConverter[] jsonConverters = new VectorColumnFiller.JsonConverter[orcSchema.getChildren().size()];int i = 0;for (TypeDescription child : orcSchema.getChildren()) {jsonConverters[i++] = VectorColumnFiller.createConverter(child);}VectorizedRowBatch rowBatch = orcSchema.createRowBatch();OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(new Configuration());writerOptions.setSchema(orcSchema);try(Writer writer = OrcFile.createWriter(new Path(targetOrcFile),writerOptions)){DataFileReader<GenericData.Record> reader = new DataFileReader(new File(avroFile),new GenericDatumReader(avroSchema));while (reader.hasNext()){GenericData.Record rowData = reader.next();JsonElement jsonElement = new JsonParser().parse(rowData.toString());if(jsonElement instanceof JsonObject) {System.out.println(jsonElement);VectorColumnFiller.fillRow(rowBatch.size, jsonConverters, orcSchema, rowBatch, (JsonObject) jsonElement);if(rowBatch.size == rowBatch.getMaxSize()){writer.addRowBatch(rowBatch);rowBatch.reset();}}}writer.addRowBatch(rowBatch);}catch (Exception e){System.out.println(e);}}public static void avroGenerate(String schemaFileName,String avroFile,String count){try {String[] args = {schemaFileName,avroFile,count};RandomData.main(args);} catch (Exception e) {System.out.println("error occurred");}}public static void readOrc(String path, TypeDescription schema){Configuration configuration = new Configuration();OrcFile.ReaderOptions readerOptions = new OrcFile.ReaderOptions(configuration);VectorizedRowBatch rowBatch = schema.createRowBatch();System.out.println();System.out.println();System.out.println("read written files----------------------------------");try (Reader reader = OrcFile.createReader(new Path(path), readerOptions)) {RecordReader rows = reader.rows();while (rows.nextBatch(rowBatch)){System.out.println(rowBatch);}}catch (Exception e){System.out.println("error occurred");}}}import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.TypeDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** * @author Ashish (ashu.impetus@gmail.com)**/
public class VectorColumnFiller {private static final Logger LOG = LoggerFactory.getLogger(VectorColumnFiller.class);public interface JsonConverter {void convert(JsonElement value, ColumnVector vect, int row);}static class BooleanColumnConverter implements JsonConverter {public void convert(JsonElement value, ColumnVector vect, int row) {if (value == null || value.isJsonNull()) {vect.noNulls = false;vect.isNull[row] = true;} else {LongColumnVector vector = (LongColumnVector) vect;vector.vector[row] = value.getAsBoolean() ? 1 : 0;}}}static class LongColumnConverter implements JsonConverter {public void convert(JsonElement value, ColumnVector vect, int row) {if (value == null || value.isJsonNull()) {vect.noNulls = false;vect.isNull[row] = true;} else {LongColumnVector vector = (LongColumnVector) vect;vector.vector[row] = value.getAsLong();}}}static class DoubleColumnConverter implements JsonConverter {public void convert(JsonElement value, ColumnVector vect, int row) {if (value == null || value.isJsonNull()) {vect.noNulls = false;vect.isNull[row] = true;} else {DoubleColumnVector vector = (DoubleColumnVector) vect;vector.vector[row] = value.getAsDouble();}}}static class StringColumnConverter implements JsonConverter {public void convert(JsonElement value, ColumnVector vect, int row) {if (value == null || value.isJsonNull()) {vect.noNulls = false;vect.isNull[row] = true;} else {BytesColumnVector vector = (BytesColumnVector) vect;byte[] bytes = value.getAsString().getBytes(StandardCharsets.UTF_8);vector.setRef(row, bytes, 0, bytes.length);}}}static class BinaryColumnConverter implements JsonConverter {public void convert(JsonElement value, ColumnVector vect, int row) {if (value == null || value.isJsonNull()) {vect.noNulls = false;vect.isNull[row] = true;} else {BytesColumnVector vector = (BytesColumnVector) vect;String binStr = value.getAsString();byte[] bytes = new byte[binStr.length() / 2];for (int i = 0; i < bytes.length; ++i) {bytes[i] = (byte) Integer.parseInt(binStr.substring(i * 2, i * 2 + 2), 16);}vector.setRef(row, bytes, 0, bytes.length);}}}static class TimestampColumnConverter implements JsonConverter {BackOffUtil back = new BackOffUtil(true);public void convert(JsonElement value, ColumnVector vect, int row) {if (value == null || value.isJsonNull()) {vect.noNulls = false;vect.isNull[row] = true;} else {if (value.getAsJsonPrimitive().isString()) {TimestampColumnVector vector = (TimestampColumnVector) vect;vector.set(row,Timestamp.valueOf(value.getAsString().replaceAll("[TZ]", " ")));} else if (value.getAsJsonPrimitive().isNumber()) {TimestampColumnVector vector = (TimestampColumnVector) vect;vector.set(row,new Timestamp(value.getAsLong()));} else {if (!back.isBackOff()) {LOG.warn("Timestamp is neither string nor number: {}", value);}vect.noNulls = false;vect.isNull[row] = true;}}}}static class DecimalColumnConverter implements JsonConverter {public void convert(JsonElement value, ColumnVector vect, int row) {if (value == null || value.isJsonNull()) {vect.noNulls = false;vect.isNull[row] = true;} else {DecimalColumnVector vector = (DecimalColumnVector) vect;vector.vector[row].set(HiveDecimal.create(value.getAsString()));}}}// static class MapColumnConverter implements JsonConverter {
// private JsonConverter[] childConverters;
//
// public MapColumnConverter(TypeDescription schema) {
// assertKeyType(schema);
//
// List childTypes = schema.getChildren();
// childConverters = new JsonConverter[childTypes.size()];
// for (int c = 0; c < childConverters.length; ++c) {
// childConverters[c] = createConverter(childTypes.get(c));
// }
// }
//
// /**
// * Rejects non-string keys. This is a limitation imposed by JSON specifications that only allows strings
// * as keys.
// */
// private void assertKeyType(TypeDescription schema) {
// // NOTE: It may be tempting to ensure that schema.getChildren() returns at least one child here, but the
// // validity of an ORC schema is ensured by TypeDescription. Malformed ORC schema could be a concern.
// // For example, an ORC schema of `map<>` may produce a TypeDescription instance with no child. However,
// // TypeDescription.fromString() rejects any malformed ORC schema and therefore we may assume only valid
// // ORC schema will make to this point.
// TypeDescription keyType = schema.getChildren().get(0);
// String keyTypeName = keyType.getCategory().getName();
// if (!keyTypeName.equalsIgnoreCase("string")) {
// throw new IllegalArgumentException(
// String.format("Unsupported key type: %s", keyTypeName));
// }
// }
//
// public void convert(JsonElement value, ColumnVector vect, int row) {
// if (value == null || value.isJsonNull()) {
// vect.noNulls = false;
// vect.isNull[row] = true;
// } else {
// MapColumnVector vector = (MapColumnVector) vect;
// JsonObject obj = value.getAsJsonObject();
// vector.lengths[row] = obj.size();
// vector.offsets[row] = row > 0 ? vector.offsets[row - 1] + vector.lengths[row - 1] : 0;
//
// // Ensure enough space is available to store the keys and the values
// vector.keys.ensureSize((int) vector.offsets[row] + obj.size(), true);
// vector.values.ensureSize((int) vector.offsets[row] + obj.size(), true);
//
// int i = 0;
// for (String key : obj.keySet()) {
// childConverters[0].convert(new JsonPrimitive(key), vector.keys, (int) vector.offsets[row] + i);
// childConverters[1].convert(obj.get(key), vector.values, (int) vector.offsets[row] + i);
// i++;
// }
// }
// }
// }/*** The primary challenge here is that available type information at the time of class instantiation and at the* time of invocation of {@code convert()} is different. We have exact type information when* {@code UnionColumnConverter} is instantiated, as it is given as {@code TypeDescription} which represents an* ORC schema. Conversely, when {@code convert()} method is called, limited type information is available because* JSON supports three primitive types only: boolean, number, and string.** The proposed solution for this issue is to register appropriate converters at the time of instantiation with* a matching {@code ColumnVector} index. Note that {@code UnionColumnVector} has child column vectors to support* each of its child type.*/static class UnionColumnConverter implements JsonConverter {private enum JsonType {NULL, BOOLEAN, NUMBER, STRING, ARRAY, OBJECT}// TODO: Could we come up with a better name?private class ConverterInfo {private int vectorIndex;private JsonConverter converter;public ConverterInfo(int vectorIndex, JsonConverter converter) {this.vectorIndex = vectorIndex;this.converter = converter;}public int getVectorIndex() {return vectorIndex;}public JsonConverter getConverter() {return converter;}}/*** Union type in ORC is essentially a collection of two or more non-compatible types,* and it is represented by multiple child columns under UnionColumnVector.* Thus we need converters for each type.*/private Map<JsonType, ConverterInfo> childConverters = new HashMap<>();public UnionColumnConverter(TypeDescription schema) {List<TypeDescription> children = schema.getChildren();int index = 0;for (TypeDescription childType : children) {JsonType jsonType = getJsonType(childType.getCategory());JsonConverter converter = createConverter(childType);// FIXME: Handle cases where childConverters is pre-occupied with the same maskchildConverters.put(jsonType, new ConverterInfo(index++, converter));}}private JsonType getJsonType(TypeDescription.Category category) {switch (category) {case BOOLEAN:return JsonType.BOOLEAN;case BYTE:case SHORT:case INT:case LONG:case FLOAT:case DOUBLE:case DECIMAL:return JsonType.NUMBER;case CHAR:case VARCHAR:case STRING:return JsonType.STRING;default:throw new UnsupportedOperationException();}}private JsonType getJsonType(JsonPrimitive value) {if (value.isBoolean()) {return JsonType.BOOLEAN;} else if (value.isNumber()) {return JsonType.NUMBER;} else if (value.isString()) {return JsonType.STRING;} else {throw new UnsupportedOperationException();}}public void convert(JsonElement value, ColumnVector vect, int row) {if (value == null || value.isJsonNull()) {vect.noNulls = false;vect.isNull[row] = true;} else if (value.isJsonPrimitive()) {UnionColumnVector vector = (UnionColumnVector) vect;JsonPrimitive primitive = value.getAsJsonPrimitive();JsonType jsonType = getJsonType(primitive);ConverterInfo converterInfo = childConverters.get(jsonType);if (converterInfo == null) {String message = String.format("Unable to infer type for '%s'", primitive);throw new IllegalArgumentException(message);}int vectorIndex = converterInfo.getVectorIndex();JsonConverter converter = converterInfo.getConverter();vector.tags[row] = vectorIndex;converter.convert(value, vector.fields[vectorIndex], row);} else {// It would be great to support non-primitive types in union type.// Let's leave this for another PR in the future.throw new UnsupportedOperationException();}}}static class StructColumnConverter implements JsonConverter {private JsonConverter[] childrenConverters;private List<String> fieldNames;public StructColumnConverter(TypeDescription schema) {List<TypeDescription> kids = schema.getChildren();childrenConverters = new JsonConverter[kids.size()];for (int c = 0; c < childrenConverters.length; ++c) {childrenConverters[c] = createConverter(kids.get(c));}fieldNames = schema.getFieldNames();}public void convert(JsonElement value, ColumnVector vect, int row) {if (value == null || value.isJsonNull()) {vect.noNulls = false;vect.isNull[row] = true;} else {StructColumnVector vector = (StructColumnVector) vect;JsonObject obj = value.getAsJsonObject();for (int c = 0; c < childrenConverters.length; ++c) {JsonElement elem = obj.get(fieldNames.get(c));childrenConverters[c].convert(elem, vector.fields[c], row);}}}}static class ListColumnConverter implements JsonConverter {private JsonConverter childrenConverter;public ListColumnConverter(TypeDescription schema) {childrenConverter = createConverter(schema.getChildren().get(0));}public void convert(JsonElement value, ColumnVector vect, int row) {if (value == null || value.isJsonNull()) {vect.noNulls = false;vect.isNull[row] = true;} else {ListColumnVector vector = (ListColumnVector) vect;JsonArray obj = value.getAsJsonArray();vector.lengths[row] = obj.size();vector.offsets[row] = vector.childCount;vector.childCount += vector.lengths[row];vector.child.ensureSize(vector.childCount, true);for (int c = 0; c < obj.size(); ++c) {childrenConverter.convert(obj.get(c), vector.child,(int) vector.offsets[row] + c);}}}}public static JsonConverter createConverter(TypeDescription schema) {switch (schema.getCategory()) {case BYTE:case SHORT:case INT:case LONG:return new LongColumnConverter();case FLOAT:case DOUBLE:return new DoubleColumnConverter();case CHAR:case VARCHAR:case STRING:return new StringColumnConverter();case DECIMAL:return new DecimalColumnConverter();case TIMESTAMP:return new TimestampColumnConverter();case BINARY:return new BinaryColumnConverter();case BOOLEAN:return new BooleanColumnConverter();case STRUCT:return new StructColumnConverter(schema);case LIST:return new ListColumnConverter(schema);case MAP:
// return new MapColumnConverter(schema);case UNION:return new UnionColumnConverter(schema);default:throw new IllegalArgumentException("Unhandled type " + schema);}}public static void fillRow(int rowIndex, JsonConverter[] converters,TypeDescription schema, VectorizedRowBatch batch, JsonObject data) {List<String> fieldNames = schema.getFieldNames();for (int c = 0; c < converters.length; ++c) {JsonElement fieldValue = data.get(fieldNames.get(c));if (fieldValue == null) {batch.cols[c].noNulls = false;batch.cols[c].isNull[rowIndex] = true;} else {converters[c].convert(fieldValue, batch.cols[c], rowIndex);}}batch.size++;}
}
结语
这算是最通用的使用这两种文件API进行ORC与AVRO转换的code了,值得注意的是,在window读写ORC文件,需要使用winutils.exe这个插件,需要自行下载。
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
