国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 開發 > Java > 正文

Java和scala實現 Spark RDD轉換成DataFrame的兩種方法小結

2024-07-14 08:41:10
字體:
來源:轉載
供稿:網友

一:準備數據源

在項目下新建一個student.txt文件,里面的內容為:

1,zhangsan,20 2,lisi,21 3,wanger,19 4,fangliu,18 

二:實現

Java版:

1.首先新建一個student的Bean對象,實現序列化和toString()方法,具體代碼如下:

package com.cxd.sql;import java.io.Serializable;@SuppressWarnings("serial")public class Student implements Serializable { String sid; String sname; int sage; public String getSid() {  return sid; } public void setSid(String sid) {  this.sid = sid; } public String getSname() {  return sname; } public void setSname(String sname) {  this.sname = sname; } public int getSage() {  return sage; } public void setSage(int sage) {  this.sage = sage; } @Override public String toString() {  return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]"; } }		

2.轉換,具體代碼如下

package com.cxd.sql;import java.util.ArrayList;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SaveMode;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;public class TxtToParquetDemo { public static void main(String[] args) {    SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");  SparkSession spark = SparkSession.builder().config(conf).getOrCreate();  reflectTransform(spark);//Java反射  dynamicTransform(spark);//動態轉換 }  /**  * 通過Java反射轉換  * @param spark  */ private static void reflectTransform(SparkSession spark) {  JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();    JavaRDD<Student> rowRDD = source.map(line -> {   String parts[] = line.split(",");   Student stu = new Student();   stu.setSid(parts[0]);   stu.setSname(parts[1]);   stu.setSage(Integer.valueOf(parts[2]));   return stu;  });    Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);  df.select("sid", "sname", "sage").  coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res"); } /**  * 動態轉換  * @param spark  */ private static void dynamicTransform(SparkSession spark) {  JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();    JavaRDD<Row> rowRDD = source.map( line -> {   String[] parts = line.split(",");   String sid = parts[0];   String sname = parts[1];   int sage = Integer.parseInt(parts[2]);      return RowFactory.create(     sid,     sname,     sage     );  });    ArrayList<StructField> fields = new ArrayList<StructField>();  StructField field = null;  field = DataTypes.createStructField("sid", DataTypes.StringType, true);  fields.add(field);  field = DataTypes.createStructField("sname", DataTypes.StringType, true);  fields.add(field);  field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);  fields.add(field);    StructType schema = DataTypes.createStructType(fields);    Dataset<Row> df = spark.createDataFrame(rowRDD, schema);  df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");     } }

scala版本:

import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types.StringTypeimport org.apache.spark.sql.types.StructFieldimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.Rowimport org.apache.spark.sql.types.IntegerTypeobject RDD2Dataset {  case class Student(id:Int,name:String,age:Int) def main(args:Array[String]) {  val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate() import spark.implicits._ reflectCreate(spark) dynamicCreate(spark) }  /**	 * 通過Java反射轉換	 * @param spark	 */ private def reflectCreate(spark:SparkSession):Unit={ import spark.implicits._ val stuRDD=spark.sparkContext.textFile("student2.txt") //toDF()為隱式轉換 val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF() //stuDf.select("id","name","age").write.text("result") //對寫入文件指定列名 stuDf.printSchema() stuDf.createOrReplaceTempView("student") val nameDf=spark.sql("select name from student where age<20") //nameDf.write.text("result") //將查詢結果寫入一個文件 nameDf.show() }  /**	 * 動態轉換	 * @param spark	 */ private def dynamicCreate(spark:SparkSession):Unit={ val stuRDD=spark.sparkContext.textFile("student.txt") import spark.implicits._ val schemaString="id,name,age" val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema=StructType(fields) val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2))) val stuDf=spark.createDataFrame(rowRDD, schema)  stuDf.printSchema() val tmpView=stuDf.createOrReplaceTempView("student") val nameDf=spark.sql("select name from student where age<20") //nameDf.write.text("result") //將查詢結果寫入一個文件 nameDf.show() }}

注:

1.上面代碼全都已經測試通過,測試的環境為spark2.1.0,jdk1.8。

2.此代碼不適用于spark2.0以前的版本。

以上這篇Java和scala實現 Spark RDD轉換成DataFrame的兩種方法小結就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持VeVb武林網。


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 凤庆县| 清丰县| 海口市| 许昌市| 庆阳市| 南漳县| 长汀县| 天柱县| 临西县| 蓝山县| 乌鲁木齐市| 兴业县| 宣化县| 平陆县| 溧水县| 穆棱市| 金寨县| 永泰县| 佛坪县| 安宁市| 岑溪市| 沙洋县| 正镶白旗| 酒泉市| 文成县| 通渭县| 彩票| 连江县| 建宁县| 涟水县| 汝州市| 定兴县| 白山市| 梁河县| 安岳县| 桃源县| 穆棱市| 无锡市| 邹城市| 泊头市| 南投市|