这篇文章将为大家详细讲解有关spark应用程序如何在Java项目中运行,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
如下所示:
package org.shirdrn.spark.job;import java.io.File;import java.io.IOException;import java.util.Arrays;import java.util.Collections;import java.util.Comparator;import java.util.List;import java.util.regex.Pattern;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.shirdrn.spark.job.maxmind.Country;import org.shirdrn.spark.job.maxmind.LookupService;import scala.Serializable;import scala.Tuple2;public class IPAddressStats implements Serializable { private static final long serialVersionUID = 8533489548835413763L; private static final Log LOG = LogFactory.getLog(IPAddressStats.class); private static final Pattern SPACE = Pattern.compile(" "); private transient LookupService lookupService; private transient final String geoIPFile; public IPAddressStats(String geoIPFile) { this.geoIPFile = geoIPFile; try { // lookupService: get country code from a IP address File file = new File(this.geoIPFile); LOG.info("GeoIP file: " + file.getAbsolutePath()); lookupService = new AdvancedLookupService(file, LookupService.GEOIP_MEMORY_CACHE); } catch (IOException e) { throw new RuntimeException(e); } } @SuppressWarnings("serial") public void stat(String[] args) { JavaSparkContext ctx = new JavaSparkContext(args[0], "IPAddressStats", System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(IPAddressStats.class)); JavaRDD<String> lines = ctx.textFile(args[1], 1); // splits and extracts ip address filed JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { // 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" // ip address return Arrays.asList(SPACE.split(s)[0]); } }); // map JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); // reduce JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); List<Tuple2<String, Integer>> output = counts.collect(); // sort statistics result by value Collections.sort(output, new Comparator<Tuple2<String, Integer>>() { @Override public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { if(t1._2 < t2._2) { return 1; } else if(t1._2 > t2._2) { return -1; } return 0; } }); writeTo(args, output); } private void writeTo(String[] args, List<Tuple2<String, Integer>> output) { for (Tuple2<?, ?> tuple : output) { Country country = lookupService.getCountry((String) tuple._1); LOG.info("[" + country.getCode() + "] " + tuple._1 + "\t" + tuple._2); } } public static void main(String[] args) { // ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat if (args.length < 3) { System.err.println("Usage: IPAddressStats <master> <inFile> <GeoIPFile>"); System.err.println(" Example: org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat"); System.exit(1); } String geoIPFile = args[2]; IPAddressStats stats = new IPAddressStats(geoIPFile); stats.stat(args); System.exit(0); }}
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
软考中级精品资料免费领
- 历年真题答案解析
- 备考技巧名师总结
- 高频考点精准押题
- 资料下载
- 历年真题
193.9 KB下载数265
191.63 KB下载数245
143.91 KB下载数1142
183.71 KB下载数642
644.84 KB下载数2755
相关文章
发现更多好内容猜你喜欢
AI推送时光机spark应用程序如何在Java项目中运行
后端开发2023-05-31
如何在pycharm中运行flask应用程序
后端开发2023-06-15
LinkedList如何在java项目中运用
后端开发2023-05-31
Enum如何在Java项目中运用
后端开发2023-05-31
如何使用HbuilderX运行小程序项目
后端开发2023-08-31
如何在Java容器中高效地运行Django应用程序?
后端开发2023-06-15
你知道如何在Linux上运行Java应用程序吗?
后端开发2023-07-29
tomcat中如何运行Java程序
后端开发2023-10-25
Commons lang组件如何在Java项目中运用
后端开发2023-05-31
如何在阿里云服务器上运行Java应用程序
后端开发2023-11-06
如何在java项目中应用SSM框架
后端开发2023-05-30
MD5加密算法如何在java项目中运用
后端开发2023-05-31
如何在docker中运行mariadb程序
后端开发2023-06-07
如何使用Go在Shell中运行程序?
后端开发2023-06-14
eclipse项目如何在IDEA中打开并运行
后端开发2023-08-18
如何在java项目中利用IO流对数组进行排序
后端开发2023-06-06
线程池如何在Java项目中使用
后端开发2023-05-31
如何进行Jerry的spark演示应用程序分析
后端开发2023-06-02
浮点运算如何在Java项目中实现
后端开发2023-05-31
java多线程在项目中怎么应用
后端开发2023-09-29
咦!没有更多了?去看看其它编程学习网 内容吧