SparkSQL Flow 支持的Sourse
支持从 Hive 获得数据;
支持文件:JSON,TextFile(CSV),ParquetFile,AvroFile
支持RDBMS数据库:PostgreSQL, MySQL,Oracle
支持 NOSQL 数据库:Hbase,MongoDB
SparkSQL Flow TextFile Source
textfile 为读取文本文件,把文本文件每行按照 delimiter 指定的字符进行切分,切分不够的列使用 null 填充。
<source type="textfile" table_name="et_rel_pty_cong" fields="cust_id,name1,gender1,age1:int" delimiter="," path="file:///Users/zhenqin/software/hive/user.txt"/>
可左右滑动查看代码
Tablename 为该文件映射的数据表名,可理解为数据的视图;
Fields 为切分后的字段,使用逗号分隔,字段后可紧跟该字段的类型,使用冒号分隔;
Delimiter 为每行的分隔符;
Path 用于指定文件地址,可以是文件,也可是文件夹;
Path 指定地址需要使用协议,如:file:// 、 hdfs://,否则跟 core-site.xml 配置密切相关;
SparkSQL Flow DB Source
<source type="mysql" table_name="et_rel_pty_cong" table="user" url="jdbc:mysql://localhost:3306/tdb?characterEncoding=UTF-8" driver="com.mysql.jdbc.Driver" user="root" password="123456"/>
可左右滑动查看代码
RDBMS 是从数据库使用 JDBC读取 数据集。支持 type 为:db、mysql、oracle、postgres、mssql;
tablename 为该数据表的抽象 table 名称(视图);
url、driver、user,password 为数据库 JDBC 驱动信息,为必须字段;
SparkSQL 会加载该表的全表数据,无法使用 where 条件。
SparkSQL Flow Transformer
<transform type="sql" table_name="cust_id_agmt_id_t" cached="true"> SELECT c_phone,c_type,c_num, CONCAT_VAL(cust_id) as cust_ids FROM user_concat_testx group by c_phone,c_type,c_num</transform>
可左右滑动查看代码
Transform 支持 cached 属性,默认为 false;如果设置为 true,相当于把该结果缓存到内存中,缓存到内存中的数据在后续其它 Transform 中使用能提高计算效率。但是需使用大量内存,开发者需要评估该数据集能否放到内存中,防止出现 OutofMemory 的异常。
SparkSQL Flow Targets
SparkSQL Flow Targets 支持输出数据到一个或者多个目标。这些目标,基本覆盖了 Source 包含的外部系统。下面以 Hive 举例说明:
<target type="hive" table_name="cust_id_agmt_id_t" savemode=”append”target_table_name="cust_id_agmt_id_h"/>
可左右滑动查看代码
table_name 为 source 或者 Transform 定义的表名称;
target_table_name 为 hive 中的表结果,Hive 表可不存在也可存在,sparksql 会根据 DataFrame 的数据类型自动创建表;
savemode 默认为 overwrite 覆盖写入,当写入目标已存在时删除源表再写入;支持 append 模式, 可增量写入。
Target 有一个特殊的 show 类型的 target。用于直接在控制台输出一个 DataFrame 的结果到控制台(print),该 target 用于开发和测试。
<target type="show" table_name="cust_id_agmt_id_t" rows=”10000”/>
可左右滑动查看代码
Rows 用于控制输出多少行数据。
SparkSQL Around
After 用于 Flow 在运行结束后执行的一个环绕,用于记录日志和写入状态。类似 Java 的 try {} finally{ round.execute() }
多个 round 一定会执行,round 异常不会导致任务失败。
<prepare> <round type="mysql" sql="insert into cpic_task_history(id, task_type, catalog_model, start_time, retry_count, final_status, created_at) values(${uuid}, ${task.type}, ${catalog.model}, ${starttime}, 0, ${status}, now())" url="${jdbc.url}" .../></prepare><after> <round type="mysql" sql="update cpic_task_history set end_time = ${endtime}, final_status = ${status}, error_text = ${error} where id = ${uuid}" url="${jdbc.url}”…/></after>
可左右滑动查看代码
Prepare round 和 after round 配合使用可用于记录 SparkSQL Flow 任务的运行日志。
SparkSQL Around可使用的变量
SparkSQL Around的执行效果
Prepare round 可做插入(insert)动作,after round 可做更新 (update)动作,相当于在数据库表中从执行开始到结束有了完整的日志记录。SparkSQL Flow 会保证round 一定能被执行,而且 round 的执行不影响任务的状态。
SparkSQL Flow 提交
bin/spark-submit --master yarn-client --driver-memory 1G --num-executors 10 --executor-memory 2G --jars /lib/jsoup-1.11.3.jarlib/jsqlparser-0.9.6.jar,/lib/mysql-connector-java-5.1.46.jar --conf spark.yarn.jars=hdfs:///lib/spark2/*.jar --queue default --name FlowTest etl-flow-0.2.0.jar -f hive-flow-test.xml
可左右滑动查看代码
接收必须的参数 –f,可选的参数为支持 Kerberos 认证的租户名称principal,和其认证需要的密钥文件。
usage: spark-submit --jars etl-flow.jar --class com.yiidata.etl.flow.source.FlowRunner -f,--xml-file <arg> Flow XML File Path --keytabFile <arg> keytab File Path(Huawei) --krb5File <arg> krb5 File Path(Huawei) --principal <arg> principal for hadoop(Huawei)
可左右滑动查看代码
SparkSQL Execution Plan
每个Spark Flow 任务本质上是一连串的 SparkSQL 操作,在 SparkUI SQL tab 里可以看到 flow 中重要的数据表操作。
regiserDataFrameAsTable 是每个 source 和 Transform 的数据在 SparkSQL 中的数据视图,每个视图都会在 SparkContex 中注册一次。