如何用 Spark 深度集成 Tensorflow 实现文本分类

向作者提问
资深大数据架构师,10年研发,6年大数据/AI相关经验。现致力于大数据和机器学习在医疗领域的应用。工作期间曾使用流式处理架构处理日均千亿级别的日志,擅长Spark/ElasticSearch/Tensorflow等相关技术。 希望大家多多关注我的开源项目以及博客: 个人博客参看: http://www.jianshu.com/u/59d5607f1400 开源项目参看:https://github.com/allwefantasy?tab=repositories
查看本场Chat

阅读提示

本文代码较多,适合PC阅读。分成7个部分:

  1. 开发环境准备
  2. PySpark基础
  3. PySpark MLlib 基础
  4. Tensorflow基础
  5. 深度学习与NLP基础
  6. Spark和Tensorflow整合
  7. 一个完整应用案例

文章字数约10000+。读者可以将该文看成一个系列。除了第一部分环境准备以外,剩下的五个部分,都可以独立成章,读者可有选择的查看。

前言

我们知道,Spark 目前是大数据处理组件的王者,实现了让大数据处理更轻松的远景。Tensorflow则是深度学习当之无愧最热的框架。而在现实当中,Spark 和Tensorflow的衔接往往是脱节的。我们希望保留Spark/TF各自的优势和习惯,并且能够实现无缝衔接。

因为我自身是比较熟悉Spark的,Spark一开始就立足于提供一个完整技术栈,这自然包括数据组织,处理,还有机器学习。

奈何深度学习突然崛起,原有的MLlib和GraphX 突然就不够用了,和深度学习的整合成了Spark一个非常重要的挑战。无论是文本,图像,序列,还是AI,本质都是数据处理,只是说AI是一个比较高阶的数据处理而已。

为什么说Spark必须想办法和深度学习整合呢?因为深度学习框架譬如Tensorflow,自身也在不断调整数据读取和处理的API,比如DataSet API,这部分其实会反噬Spark的护城河。

从技术角度而言,如果将Spark 和Tensorflow集成后,那么数据获取,处理,训练,预测就会在一条pipeline上,无缝衔接。Spark提供了一套规范,约束了整个流程,使得维护变得简单。

Spark 在Mllib里提供了大量的针对机器学习的数据预处理API,这些API可以将数据快速转化为Tensorlfow能够识别的数学表示形态,比如词向量,普通向量,矩阵等。

从团队角度而言,算法和研发都被规范到了Spark这条线上后,Spark开发工程师拿着TF的训练代码丢到自己的Spark程序里就可以跑起来。

算法工程师不需要各种数据处理代码写完后,研发还要再写一遍,因为都是直接用spark写成的。同时值得一提的是,这还充分实现团队的弹性。比如研发资源紧张,那么要求算法工程师把数据处理的代码写完,直接丢给研发即可。如果算法资源紧张,算法只要写完模型训练代码,研发写数据获取和处理的代码。双方只要都掌握pyspark做衔接即可很好沟通和交流。

另外,大家可能会好奇,Spark 和 Tensorflow 如何进行衔接呢? 首先是Spark支持Python, Scala, Java, R 等语言,Tensorflow其实也支持Python, Java等语言,只要我们确定一个语言,那么其实就可以实现集成的。我最终选择了Python语言,因为这是算法和研发都相对容易接受的语言,并且两个框架都支持的不错。

环境准备

推荐实验环境如下:

  1. JDK 1.8: 下载地址
  2. spark-2.2.0-bin-hadoop2.7 : 下载地址
  3. python 2.7 /virtualenv
  4. Kafka

软件似乎有点多,读者可能感受到一丝压力,然而安装和使用它们是一件非常容易的事情。这里我会假设读者使用的电脑是Mac,并且使用homebrew进行软件管理。

Step 1:

下载JDK 1.8,你下载后双击安装即可。安装后的路径应该是在:

/Library/Java/JavaVirtualMachines

比如我的目录结构会是这样的:

Snip20171218_11.png

Step2:

接着根据前面提供的地址下载Spark安装包。下载完成后简单的解压即可。假设解压后的目录如下:

~/Softwares/spark-2.2.0-bin-hadoop2.7

Step3:

配置tensorflow环境。这里推荐用virtualenv,非常好用。

安装virtualenv:

pip install --upgrade virtualenv

之后创建virutalenv环境。

virtualenv --system-site-packages ~/python2.7/tensorflow

切换到该环境下:

source ~/python2.7/tensorflow/bin/activate  

最后的样子:

(tensorflow) [w@me]$

Step4:

现在让我们安装tensorflow:

pip install tensorflow

验证下:

(tensorflow) [w@me]$ python
Python 2.7.10 (default, Feb  7 2017, 00:08:15)
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.34)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import tensorflow as tf
>>> tf.__version__
'1.4.0'
>>>

对了,为了方便起见,我们还是设置下SPARKHOME和JAVAHOME两个环境变量。可以在

~/.bash_profile

文件中添加如下两行:

export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/
export SPARK_HOME=[YOUR-HOME]/Softwares/spark-2.2.0-bin-hadoop2.7

记得把YOUR-HOME替换成你自己的目录。

Step5:

通常有了上面的配置就够了。当时为了方便开发,我们还需要装一些python的依赖。把下面内容拷贝到一个文件里,假设是requirements.txt:

# This file should list any python package dependencies.
coverage>=4.4.1
h5py>=2.7.0
keras==2.0.4 # NOTE: this package has only been tested with keras 2.0.4 and may not work with other releases
nose>=1.3.7  # for testing
numpy>=1.11.2
pillow>=4.1.1,<4.2
pygments>=2.2.0
pandas>=0.19.1
six>=1.10.0
kafka-python>=1.3.5
tensorflowonspark>=1.0.5
tensorflow-tensorboard>=0.1.6
jieba>=0.39
PIL>=1.1.7
redis>=2.10.6
scikit-learn>=0.19.0
scipy>=0.19.1

在激活有的virtualenv后,运行如下命令安装:

pip install -r requirements.txt 

pyspark不是必须的,因为在Spark里已经集成,但是为了能够实现代码提示,我们还是通过pip安装下。以为它特别大,所以可以使用阿里云的镜像来加速:

pip install pyspark -i  http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com

PySpark基础

Spark 是目前大数据处理的事实标准。PySpark能让你使用Python语言来写Spark程序。

我们先做一个最简单的字符数统计程序。这样我们就知道一个PySpark程序是什么样子,以及如何运转起来。

我们准备一个文件a.csv。里面的内容如下:

a b c,1.0
a b,2.0
c,3.0
d,4.0

然后我们打开编辑器,比如我这里是Intellij IDEA。 新建一个myfirstpyspark.py文件。

PySpark需要你有一个SparkSession对象,这是一切的开始,在这里你可以做一些配置。

from pyspark.sql import SparkSession
session = SparkSession.builder.master("local[*]").appName("test").getOrCreate()

其中"local[*]"表示我们执行的是local模式,也就是单机模式,*表示使用所有的CPU核心,如果你写成local[2]那么表示单机模式,并且使用两个核。

在很多情况下,单机模式足够我们使用,因为他已经是多线程并行运行了,这比我们自己完成一个多线程的程序来的简单,而且,Spark可以让你的代码看起来就像单机,他自动完成分布式工作。

如果愿意,还可以运行在Standalone,Yarn,Mesos等模式下,它们都是真正的分布式模式。Spark是一个典型的Master-Slave结构。Master负责解释你写的代码,Slave则负责执行你的代码。

构建好了session之后,现在可以去读文件了。

from pyspark.sql.types import *
import pyspark.sql.functions as f

df = session.read.csv(
        "a.csv", 
        encoding="utf-8",
        header=False, 
        schema=StructType(
            [StructField("text", StringType()),
             StructField("index", StringType())]))

这里我使用了一个较为复杂的方式构建Dataframe, Dataframe你可以简单理解为SQL的编程表达形式。不使用常规教程提及的RDD API的原因是因为Spark花了大力气将大部分东西都迁移到Dataframe上,我们也就不要倒行逆施了。实际场景中,RDD非常灵活,但是往往导致代码难以管理和维护。Dataframe API 比较受限,但是更"SQL"化,更"结构化",大家写出来的会是基本一致的,并且性能更好。

session.read 是一个datasource API,这个时候你获得了一个“读”对象。聪明的读者可能会思索是不是还有session.write, 实际上是有的,session.write 可以让你获得了一个写的对象。

除了读取CSV文件,你还可以读取parquet, json, elasticsearch,mysql 等其他存储器。

让我们回转过来,读取一个csv文件的方式如同上面的示例代码,具体参数我会简单说明下。

header 指的是是否有头部。CSV文件通常第一行是列名。我们这里设置为False,因为构建的文件并没有这个需求。

schema 可以指定CSV每列的名字,类型。大家只要记住是这么写就OK了。这种方式非常有用。对于非结构化数据,你可以简单认为他是只有一列的结构化数据。

现在,我们已经有了一张名字叫'df'的表,这张表有两个字段,text 和 index。 现在想统计text里每个词汇出现的次数。这在pyspark里只需要一行代码就可以搞定。

df.select(f.explode(f.split("text", " ")).alias("word")).\
    groupBy("word").\
    count().\
    show()

最后运行的结果如下:

Snip20171219_12.png

通过导入import pyspark.sql.functions as f 我们可以使用Spark内置的UDF函数并且使用f来进行引用,额外的收获是你可以获得更友好的代码提示。

Joe
你好,想问一下export .... 这样的执行方式是在哪里执行呢? (初学者)
Joe: emmm 不用了,是我弱智了
微信扫描登录