Spark入hive表
遇到问题:
1.写入HDFS
generateActualKey 设置为NullWritable
generateActualValue 返回value值
设置输出文件名:
generateFileNameForKeyValue
设置目录可以覆盖模式
checkOutputSpecs
2.DataFrame转换为RDD时会有[];所以需要去掉;
默认字段分割符为","
3.hive创建表时要指定字段分割符和行分割符
dingdang,love NULL
xuejiao,love1312 NULL
解决问题:
CREATE TABLE IF NOT EXISTS src(
c1 string,
c2 string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
4.分区表
创建分区表目录
alter table src add partition(date_key='20200828')
创建分区表
CREATE TABLE IF NOT EXISTS src(
c1 string,
c2 string
)
PARTITIONED BY(Date_Key string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
代码:
public class RDDMultipleTextOutputFormatextends MultipleTextOutputFormat {@Overrideprotected K generateActualKey(K key, V value) {key = (K) NullWritable.get();//System.out.println("=============== key ======" + key);return key;}@Overrideprotected V generateActualValue(K key, V value) {value = (V)value.toString();//System.out.println("=============== value ======" + value);return value;}@Overrideprotected String generateFileNameForKeyValue(K key, V value, String name) {name = name.replace("part", key.toString());//System.out.println("=============== name ======" + name);return super.generateFileNameForKeyValue(key, value, name);}@Overridepublic void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {Path outDir = getOutputPath(job);if (outDir == null && job.getNumReduceTasks() != 0) {throw new InvalidJobConfException("Output directory not set in JobConf.");}if (outDir != null) {FileSystem fs = outDir.getFileSystem(job);// normalize the output directoryoutDir = fs.makeQualified(outDir);setOutputPath(job, outDir);// get delegation token for the outDir's file systemTokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] { outDir }, job);//使spark的输出目录可以存在// check its existence/*if (fs.exists(outDir)) {throw new FileAlreadyExistsException("Output directory "+ outDir + " already exists");}*/}}public static class OutputFormatUtil {public static String prefixOutputName = "";}}
@Override
public int run(String[] args) throws Exception {SparkConf conf = getSparkconf();logger.warn("===========HiveMain run start===========");try {//获取JavacontextJavaSparkContext jsc = getJavaSparkContext();//SQL的sessionSparkSession session = getSparkSession();Configuration hadoopconf = jsc.hadoopConfiguration();//conf.set("spark.hadoop.fs.defaultFS", "hdfs://192.168.13.124:8020");hadoopconf.set("mapreduce.output.fileoutputformat.compress", "false");//conf.set("spark.sql.warehouse.dir", "hdfs://192.168.13.124:9000/user/hive/warehouse");FileSystem fs = FileSystem.get(hadoopconf);session.sql("use default");String strPath = args[0];JavaRDD rowrdd = jsc.textFile(strPath);StructType schema = createSchema(new String[]{"c1","c2"},new DataType[]{DataTypes.StringType,DataTypes.StringType});JavaRDD rowJavaRDD = parserdata2Row(rowrdd);Dataset df = session.createDataFrame(rowJavaRDD, schema);df.createOrReplaceTempView("src");df.show();JavaPairRDD pairRDD = df.javaRDD().mapToPair(f -> {String replace = f.toString().replace("[", "").replace("]", "").replace("null", " ");String key = "S003_WA_SOURCE_0005";String[] value = replace.split(",");System.out.println("f : " + replace);return new Tuple2(new Text(key), new Text(value[0] + "," + value[1]));});pairRDD.saveAsHadoopFile("/user/hive/warehouse/src/date_key=20200828", Text.class,Text.class, RDDMultipleTextOutputFormat.class);}catch (Exception e) {// TODO: handle exception}return 0;
}private static StructType createSchema(String[] strFields, DataType[] dts ) {ArrayList fields = new ArrayList();StructField field = null;if (strFields.length != dts.length) {System.out.println("Schema is error");return null;}for (int i = 0; i < strFields.length; i++) {field = DataTypes.createStructField(strFields[i], dts[i], true);fields.add(field);}StructType schema = DataTypes.createStructType(fields);return schema;
}public JavaRDD parserdata2Row( JavaRDD sourceDataRDD) {return sourceDataRDD.map(f -> {String[] strLine = f.split("\t");//System.out.println("===== strline: " + Arrays.asList(strLine).size());//System.out.println("===== strline value: " + Arrays.asList(strLine));//获得TXT类型inputView的查询字段String[] res = new String[2];for (int i = 0; i < 2; i++) {res[i] = strLine[i];}return RowFactory.create(res);});
}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
