eclipse开发spark的详细过程

本文主要介绍"eclipse开发spark的详细过程",希望能够解决您遇到有关问题,下面我们一起来看这篇 "eclipse开发spark的详细过程" 文章。

一、搭建环境

eclispe安装scala-ide插件

二、读取es和mysql

首先添加pom:

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>test</groupId>
	<artifactId>test</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>spark</name>

	<properties>
		<scala.version>2.11.8</scala.version>
		<spark.version>2.2.0</spark.version>
		<spark.artifactId.version>2.11</spark.artifactId.version>

		<guava.version>18.0</guava.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_${spark.artifactId.version}</artifactId>
			<version>${spark.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-sql_${spark.artifactId.version}</artifactId>
			<version>${spark.version}</version>
		</dependency>

		<dependency>
			<groupId>org.scala-lang</groupId>
			<artifactId>scala-compiler</artifactId>
			<version>${scala.version}</version>
			<!--scope>compile</scope -->
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.29</version>
		</dependency>

		<dependency>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch-spark-20_${spark.artifactId.version}</artifactId>
			<version>6.2.0</version>
			<scope>compile</scope>
			<exclusions>
				<exclusion>
					<artifactId>log4j-over-slf4j</artifactId>
					<groupId>org.slf4j</groupId>
				</exclusion>
			</exclusions>
		</dependency>

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.6</version>
		</dependency>

		<dependency>
			<groupId>org.scala-lang</groupId>
			<artifactId>scala-library</artifactId>
			<version>${scala.version}</version>
		</dependency>

		<!-- log API -->
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.6.4</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.7.25</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.6.1</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
			<plugin>
				<groupId>net.alchim31.maven</groupId>
				<artifactId>scala-maven-plugin</artifactId>
				<version>3.2.2</version>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-jar-plugin</artifactId>
				<version>3.0.2</version>
				<configuration>
					<archive>
						<manifest>
							<addClasspath>true</addClasspath>
							<classpathPrefix>lib/</classpathPrefix>
							<mainClass>spark.example.Main</mainClass>
						</manifest>
					</archive>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-dependency-plugin</artifactId>
				<version>3.0.0</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>copy-dependencies</goal>
						</goals>
						<configuration>
							<outputDirectory>${project.build.directory}/lib</outputDirectory>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

然后写主函数:

package test

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import java.util.Properties

object querySql {
  def main(args: Array[String]): Unit = {
    //  读取mysql数据:

    val spark = SparkSession.builder().appName("Java Spark MYSQL basic example")
      .master("local")
      .config("es.nodes", "127.0.0.1")
      .config("es.port", "9200")
      .config("es.mapping.date.rich", "false") //不解析日期类型
      .getOrCreate()

    val url = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8"
    val table = "sys_user";
    val props = new Properties()
    props.setProperty("dbtable", table) // 设置表
    props.setProperty("user", "root") // 设置用户名
    props.setProperty("password", "123456") // 设置密码

    //    val df = spark.read.jdbc(url, table, props)
    //    df.show()

    //添加筛选条件
    //   val filter = df.filter(col("TABLE_ID").gt("10"));
    //    System.out.println("mysql count:" + filter.count());

    val esRows = spark.read.format("org.elasticsearch.spark.sql").load("visitlog/_doc")
    //      esRows.show()

    esRows.createOrReplaceGlobalTempView("table1");

    //       val subDf = spark.sql("SELECT userId,ip,createTime,createTime2 FROM global_temp.table1")
    val subDf = spark.sql("SELECT userId,count(userId) FROM global_temp.table1 group by userId")

    subDf.show();

    spark.close();
  }
}

三、打包执行

打包命令:mvn clean scala:compile package

执行命令:java -Djava.ext.dirs=lib -cp test-0.0.1-SNAPSHOT.jar  test.querySql

关于 "eclipse开发spark的详细过程" 就介绍到这。希望大家多多支持编程宝库

本文主要介绍"k8s基本排错的方法",希望能够解决您遇到有关问题,下面我们一起来看这篇 "k8s基本排错的方法" 文章。k8s基本排错在排错过程中,kubectl 是最重要的工具,通常也是定位错误的起点。这里也列出一些 ...