saveAsTextFile
将 RDD 以文本文件的格式存储到文件系统中
scala版本
val conf = new SparkConf().setMaster("local[*]").setAppName("SaveAsTextFileScala")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(1 to 10,2)
rdd.saveAsTextFile("file:///E://save/测试文件夹")//保存到本地
rdd.saveAsTextFile("hdfs://192.168.8.99:9000/save")//保存到hdfs
saveAsSequenceFile
将 RDD 以SequenceFile
的文件格式保存到 HDFS 上,用法同saveAsTextFile
saveAsObjectFile
SequenceFile
保存var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.saveAsObjectFile("hdfs://cdh5/tmp/lxw1234.com/")
hadoop fs -cat /tmp/lxw1234.com/part-00000
SEQ !org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableT
saveAsHadoopFile
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],
classOf[com.hadoop.compression.lzo.LzopCodec])
saveAsHadoopDataset
使用saveAsHadoopDataset将RDD保存到HDFS中
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
var jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]])
jobConf.setOutputKeyClass(classOf[Text])
jobConf.setOutputValueClass(classOf[IntWritable])
jobConf.set("mapred.output.dir","/tmp/lxw1234/")
rdd1.saveAsHadoopDataset(jobConf)
结果:
hadoop fs -cat /tmp/lxw1234/part-00000
A 2
A 1
hadoop fs -cat /tmp/lxw1234/part-00001
B 6
B 3
B 7
保存数据到Hbase
HBase建表:
create 'lxw1234',{NAME => 'f1',VERSIONS => 1},{NAME => 'f2',VERSIONS => 1},{NAME => '3',VERSIONS => 1}
注意:保存到HBase,运行时需要在SPARK_CLASSPATH
中加入HBase相关的jar包。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
var conf = HBaseConfiguration.create()
var jobConf = new JobConf(conf)
jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3")
jobConf.set("zookeeper.znode.parent","/hbase")
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")
jobConf.setOutputFormat(classOf[TableOutputFormat])
var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
rdd1.map(x =>
{
var put = new Put(Bytes.toBytes(x._1))
put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
(new ImmutableBytesWritable,put)
}
).saveAsHadoopDataset(jobConf)
##结果:
hbase(main):005:0> scan 'lxw1234'
ROW COLUMN+CELL
A column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02
B column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06
C column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07
3 row(s) in 0.0550 seconds
saveAsNewAPIHadoopFile
将RDD数据保存到 HDFS 上,使用新版本 Hadoop API,用法基本同saveAsHadoopFile
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
saveAsNewAPIHadoopDataset
作用同saveAsHadoopDataset
,只不过采用新版本Hadoop API
HBase建表:
create 'lxw1234',{NAME => 'f1',VERSIONS => 1},{NAME => 'f2',VERSIONS => 1},{NAME => '3',VERSIONS => 1}
注意:保存到HBase,运行时候需要在SPARK_CLASSPATH
中加入HBase相关的jar包
package com.lxw1234.test
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
object Test {
def main(args : Array[String]) {
val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com")
val sc = new SparkContext(sparkConf);
var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3")
sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")
var job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
rdd1.map(
x => {
var put = new Put(Bytes.toBytes(x._1))
put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
(new ImmutableBytesWritable,put)
}
).saveAsNewAPIHadoopDataset(job.getConfiguration)
sc.stop()
}
}
因篇幅问题不能全部显示,请点此查看更多更全内容
怀疑对方AI换脸可以让对方摁鼻子 真人摁下去鼻子会变形
女子野生动物园下车狼悄悄靠近 后车司机按喇叭提醒
睡前玩8分钟手机身体兴奋1小时 还可能让你“变丑”
惊蛰为啥吃梨?倒春寒来不来就看惊蛰
男子高速犯困开智能驾驶出事故 60万刚买的奔驰严重损毁