eclipse开发spark的详细过程
本文主要介绍"eclipse开发spark的详细过程",希望能够解决您遇到有关问题,下面我们一起来看这篇 "eclipse开发spark的详细过程" 文章。
一、搭建环境
eclispe安装scala-ide插件
二、读取es和mysql
首先添加pom:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>test</groupId> <artifactId>test</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spark</name> <properties> <scala.version>2.11.8</scala.version> <spark.version>2.2.0</spark.version> <spark.artifactId.version>2.11</spark.artifactId.version> <guava.version>18.0</guava.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${spark.artifactId.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${spark.artifactId.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> <!--scope>compile</scope --> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.29</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_${spark.artifactId.version}</artifactId> <version>6.2.0</version> <scope>compile</scope> <exclusions> <exclusion> <artifactId>log4j-over-slf4j</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.6</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- log API --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.4</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>spark.example.Main</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
然后写主函数:
package test import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Row import org.apache.spark.sql.Dataset import java.util.Properties object querySql { def main(args: Array[String]): Unit = { // 读取mysql数据: val spark = SparkSession.builder().appName("Java Spark MYSQL basic example") .master("local") .config("es.nodes", "127.0.0.1") .config("es.port", "9200") .config("es.mapping.date.rich", "false") //不解析日期类型 .getOrCreate() val url = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8" val table = "sys_user"; val props = new Properties() props.setProperty("dbtable", table) // 设置表 props.setProperty("user", "root") // 设置用户名 props.setProperty("password", "123456") // 设置密码 // val df = spark.read.jdbc(url, table, props) // df.show() //添加筛选条件 // val filter = df.filter(col("TABLE_ID").gt("10")); // System.out.println("mysql count:" + filter.count()); val esRows = spark.read.format("org.elasticsearch.spark.sql").load("visitlog/_doc") // esRows.show() esRows.createOrReplaceGlobalTempView("table1"); // val subDf = spark.sql("SELECT userId,ip,createTime,createTime2 FROM global_temp.table1") val subDf = spark.sql("SELECT userId,count(userId) FROM global_temp.table1 group by userId") subDf.show(); spark.close(); } }
三、打包执行
打包命令:mvn clean scala:compile package
执行命令:java -Djava.ext.dirs=lib -cp test-0.0.1-SNAPSHOT.jar test.querySql
关于 "eclipse开发spark的详细过程" 就介绍到这。希望大家多多支持编程宝库。
本文主要介绍"k8s基本排错的方法",希望能够解决您遇到有关问题,下面我们一起来看这篇 "k8s基本排错的方法" 文章。k8s基本排错在排错过程中,kubectl 是最重要的工具,通常也是定位错误的起点。这里也列出一些 ...