方法一:各个字段都是提前定好的
val prop = new java.util.Properties prop.setProperty("user", "root") prop.setProperty("password", "123456") df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test", "mytab", prop) 12345
方法二:字段可自由添减
df.foreachPartition(p => { @transient val conn = ConnectionPool.getConnection p.foreach(x => { val sql = "insert into app_id(id,date,appid,num) values (" + "'"+UUID.randomUUID+"'," + "'"+x.getInt(0)+"'," + "'"+x.getString(1)+"'," + "'"+x.getLong(2)+"'" + ")" val stmt = conn.createStatement stmt.executeUpdate(sql) }) ConnectionPool.returnConnection(conn) }) 1234567891011121314
数据库链接池:
package com.prince.spark.util; import java.sql.Connection; import java.sql.DriverManager; import java.util.LinkedList; public class ConnectionPool { private static LinkedList<Connection> connectionQueue; static { try { Class.forName("com.mysql.jdbc.Driver"); }catch (ClassNotFoundException e) { e.printStackTrace(); } } public synchronized static Connection getConnection() { try { if (connectionQueue == null) { connectionQueue = new LinkedList<Connection>(); for (int i = 0;i < 5;i ++) { Connection conn = DriverManager.getConnection( "jdbc:mysql://192.168.1.97:3306/xiang_log?characterEncoding=utf8", "root", "123456" ); connectionQueue.push(conn); } } }catch (Exception e) { e.printStackTrace(); } return connectionQueue.poll(); } public static void returnConnection(Connection conn) { connectionQueue.push(conn); } } 123456789101112131415161718192021222324252627282930313233343536373839
方法三:有时涉及到计算结果的写入,还要组装df
//组装结果RDD val arrayRDD = sc.parallelize(List ((num,log_date))) //将结果RDD映射到rowRDD val resultRowRDD = arrayRDD.map(p =>Row( p._1.toInt, p._2.toString, new Timestamp(new java.util.Date().getTime) )) //通过StructType直接指定每个字段的schema val resultSchema = StructType( List( StructField("verify_num", IntegerType, true), StructField("log_date", StringType, true), //是哪一天日志分析出来的结果 StructField("create_time", TimestampType, true) //分析结果的创建时间 ) ) //组装新的DataFrame val DF = spark.createDataFrame(resultRowRDD,resultSchema) //将结果写入到Mysql DF.write.mode("append") .format("jdbc") .option("url","jdbc:mysql://192.168.1.97:3306/xiang_log") .option("dbtable","verify") //表名 .option("user","root") .option("password","123456") .save()
注意:有一个小坑,创建表名一定不要使用关键字和‘-’中横线,可以使用‘_’下划线