2017-09-09 13:35:48

Java: Spark Streaming实时计算

最近用Spark Streaming写了一个实时计算的模块,本文记录一下一些东西。

Spark Streaming

Spark Streaming可以达到高吞吐的实时计算效果,测试下来,配合实时消息队列和长度为200的消息并将压测QPS从五千涨到五万时,内存和CPU消耗一点都没有增长,维持在内存消耗400M和CPU消耗10%的状态(压测机器配置为8c16g),因此可以轻易做到秒级数据的实时计算,配合socket.io做出来的效果会很好。

Java

对应项目的基础设施没有Scala和Python对应的工具包,只能用Java凑活。下好Intellij后安模块安了半小时本来就挺郁闷了,接着看到对应的包是SNAPSHOT版本就更郁闷了,最后再发现Intellij所有菜单按钮我都点不了,得用ctrl+shift+a快捷键一个个输入名称才能唤出来时,简直是吐血三升。

Mac

Mac上使用Intellij配合Java8遇到了一个多重构造函数出错的问题,在Java8 u152(beta测试中)以及Java9上已经被解决,升级即可。

BigInteger

Java中大数据都得用BigInteger避免溢出问题,BigInteger带来的坏处是运算符不再是简单的+-*/了,而是需要调用对应的方法,毕竟Java中没了操作符重载,不过Spark Streaming中的API和BigInteger方法调用相得益彰,还是很好用的。

SimpleDateFormat

Java SimpleDateFormat不是线程安全的,因此在Spark Streaming每个RDD处理周期中,每次想格式化日期时就必须创建一个全新的SimpleDateFormat对象,因为每个RDD周期中都是一个异步函数(做Node的应该很好理解)。

博主为了偷懒,每次格式化日期都复制粘贴new了一个新对象,结果竟然误打误撞避过了这个坑。

Spark Streaming里的RDD处理周期和Node的异步概念非常相似,Node本身没有严格意义上的多线程,因此保证了绝对的线程安全,使用Java时就必须要注意了。

Spark Hadoop启动参数

在正式环境上单独部署包含Hadoop的Spark时,该环境一般都不会开放ssh默认端口22,而是在sshd配置中改为其他端口。由于Hadoop启动时需要通过ssh方式启动所有slave节点,因此必须做相应的修改。

查询sbin/slave.sh文件,可以发现一个叫SPARK_SSH_OPTS环境变量,和hadoop的HADOOP_SSH_OPTS作用一模一样,指定该参数为-p {{ssh端口}}即可完成修改。

ssh启动slave节点时,也有可能遇到配置不对导致以root权限ssh被拒绝的问题,这是正常的安全设定,使用诸如supervisor等工具保活时,启动方式基本都是非root权限,因此在实际运行中不会有问题。

日志过多的问题

由于Spark Streaming的Blocker Mannager会打印每一次存储的日志,而其间隔默认为200毫秒,在我们的场景下每天会生成超过1G的日志文件,这毫无疑问是不可接受的。翻出Spark下的conf文件夹,很容易就能发现一个叫log4j.properties.template的文件,去除template后缀再将其首行的INFO字段改为ERROR即可。

另外,Spark Streaming在日志这一点上比较奇怪,Blocker Manager的输出的INFO级别的日志被supervisor捕获成ERROR级别,导致日志输出位置有问题。在开发时注意到Interllij上这里的日志为红色字体,但却是以[INFO]开头,可能Spark本身的处理逻辑本就如此。

本文链接:https://smallpath.me/post/java-spark-streaming

-- EOF --