编译支持 spark 读写 oss(cdh 5.x)
SegmentFault
共 17233字,需浏览 35分钟
·
2021-07-28 11:07
作者:来世愿做友人_A
来源:SegmentFault 思否社区
前言
背景:使用 spark 读取 hdfs 文件写入到 oss
hadoop : 2.6.0-cdh5.15.1
spark : 2.4.1
增加了注意点和坑点
编译 hadoop-aliyun
hadoop 高版本已经默认支持 aliyun-oss 的访问,而本版本不支持,需要编译支持下
拉取 hadoop trunk 分支代码,copy hadoop-tools/hadoop-aliyun 模块代码到 cdh 对应的项目模块中 修改 hadoop-tools pom.xml <module>hadoop-aliyun</module> 添加 hadoop-aliyun 子 module 修改根 pom.xml 中的 java 版本为 1.8,hadoop-aliyun 使用了 1.8 的 lambda 语法,也可以直接修改代码支持 修改 hadoop-aliyun pom.xml,修改 version,以及相关的 oss,http 依赖包,使用 shade 插件将相关依赖打进去 代码修改 import org.apache.commons.lang3 改为 import org.apache.commons.lang 复制(cdh版本) hadoop-aws 模块下的 BlockingThreadPoolExecutorService 和 SemaphoredDelegatingExecutor 两个类 到 org.apache.hadoop.util 目录下 编译模块 hadoop-aliyun mvn clean package -pl hadoop-tools/hadoop-aliyun
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>2.6.0-cdh5.15.1</version>
<relativePath>../../hadoop-project</relativePath>
</parent>
<artifactId>hadoop-aliyun</artifactId>
<name>Apache Hadoop Aliyun OSS support</name>
<packaging>jar</packaging>
<properties>
<file.encoding>UTF-8</file.encoding>
<downloadSources>true</downloadSources>
</properties>
<profiles>
<profile>
<id>tests-off</id>
<activation>
<file>
<missing>src/test/resources/auth-keys.xml</missing>
</file>
</activation>
<properties>
<maven.test.skip>true</maven.test.skip>
</properties>
</profile>
<profile>
<id>tests-on</id>
<activation>
<file>
<exists>src/test/resources/auth-keys.xml</exists>
</file>
</activation>
<properties>
<maven.test.skip>false</maven.test.skip>
</properties>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration>
<findbugsXmlOutput>true</findbugsXmlOutput>
<xmlOutput>true</xmlOutput>
<excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml
</excludeFilterFile>
<effort>Max</effort>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>deplist</id>
<phase>compile</phase>
<goals>
<goal>list</goal>
</goals>
<configuration>
<!-- build a shellprofile -->
<outputFile>
${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt
</outputFile>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<id>shade-aliyun-sdk-oss</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
<createDependencyReducedPom>true</createDependencyReducedPom>
<createSourcesJar>true</createSourcesJar>
<relocations>
<relocation>
<pattern>org.apache.http</pattern>
<shadedPattern>com.xxx.thirdparty.org.apache.http</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-examples</artifactId>
<scope>test</scope>
<type>jar</type>
</dependency>
</dependencies>
</project>
spark 读写取 oss 文件
val inputPath = "hdfs:///xxx"
val outputPath = "oss://bucket/OSS_FILES"
val conf = new SparkConf()
conf.set("spark.hadoop.fs.oss.endpoint", "oss-cn-xxx")
conf.set("spark.hadoop.fs.oss.accessKeyId", "xxx")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "xxx")
conf.set("spark.hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")
conf.set("spark.hadoop.fs.oss.buffer.dir", "/tmp/oss")
conf.set("spark.hadoop.fs.oss.connection.secure.enabled", "false")
conf.set("spark.hadoop.fs.oss.connection.maximum", "2048")
spark.write.format("orc").mode("overwrite").save(outputPath)
spark submit
spark-submit \
--class org.example.HdfsToOSS \
--master yarn \
--deploy-mode cluster \
--num-executors 2 \
--executor-cores 2 \
--executor-memory 3G \
--driver-cores 1 \
--driver-memory 3G \
--conf "spark.driver.extraClassPath=hadoop-common-2.6.0-cdh5.15.1.jar" \
--conf "spark.executor.extraClassPath=hadoop-common-2.6.0-cdh5.15.1.jar" \
--jars ./hadoop-aliyun-2.6.0-cdh5.15.1.jar,./hadoop-common-2.6.0-cdh5.15.1.jar \
./spark-2.4-worker-1.0-SNAPSHOT.jar
评论