写在前面

昨天是中秋节,休息了一天,和老王楠姐去密室逃脱完了一局,玩的是不恐怖的本,但里面的氛围还是很阴森。从密室逃脱出来之后,吃了小炒黄牛肉,一共有四个菜,小炒黄牛肉、芋头蒸排骨、炒花菜、脆皮豆腐。小炒黄牛肉没有放辣椒,吃起来味道很不错。其中最辣的一道菜是脆皮豆腐,我没吃几口。炒花菜里面的油渣好吃,我吃了很多。吃完就去的helens,点了一扎果啤,虽然叫果啤,但对我来说还是有度数,三四杯下肚,头就晕晕的,红色的血管在脸上扩张,有种狼狈的醉意。隔壁桌有人来问我们的戒指是不是粉色的,这是个活动,每一桌都有一种颜色的戒指,如果两桌颜色的戒指一样,就可以去前台兑换啤酒。来了两桌人问,最后来的那一桌和我们的颜色是一样的,我就去和他们桌的一个人去兑换啤酒了。去的途中知道,那个人是中南林科大的,从长沙来湘潭玩。我们这桌一共是三个人,一人一瓶啤酒,拿来了三瓶。我们喝完了一扎果啤,就把兑换来的倒进果啤扎里面去了,因为里面还有很多冰块,可以喝到冰爽的啤酒。喝酒的过程,我们前期主要在玩手机,我投了一份方太的简历。我们聊到考研,觉得这件事本身并不难,难的是坚持下去,这个过程十分枯燥。后期,我们三个玩骰子,就这样,一人几口下肚,三瓶啤酒也就没了。喝完之后,打了个滴滴回去,在车上我就已经快睡着了,撑着睡意到家,和芦打了三十分钟的视频电话,就昏昏睡去了。对了,在去玩密室逃脱之前,我们买了茶颜悦色喝,我还是老样子,幽兰拿铁。就写到这里吧,做实验了。

数据预处理

本案例采用的数据集压缩包为data_format.zip点击这里下载data_format.zip数据集,该数据集压缩包是淘宝2015年双11前6个月(包含双11)的交易数据(交易数据有偏移,但是不影响实验的结果),里面包含3个文件,分别是用户行为日志文件user_log.csv 、回头客训练集train.csv 、回头客测试集test.csv. 在这个案例中只是用user_log.csv这个文件,下面列出文件user_log.csv的数据格式定义:

用户行为日志user_log.csv,日志中的字段定义如下:

  1. user_id | 买家id

  2. item_id | 商品id

  3. cat_id | 商品类别id

  4. merchant_id | 卖家id

  5. brand_id | 品牌id

  6. month | 交易时间:月

  7. day | 交易事件:日

  8. action | 行为,取值范围{0,1,2,3},0表示点击,1表示加入购物车,2表示购买,3表示关注商品

  9. age_range | 买家年龄分段:1表示年龄<18,2表示年龄在[18,24],3表示年龄在[25,29],4表示年龄在[30,34],5表示年龄在[35,39],6表示年龄在[40,49],7和8表示年龄>=50,0和NULL则表示未知

  10. gender | 性别:0表示女性,1表示男性,2和NULL表示未知

  11. province| 收获地址省份

数据具体格式如下:

1
2
3
4
5
6
7
8
9
user_id,item_id,cat_id,merchant_id,brand_id,month,day,action,age_range,gender,province
328862,323294,833,2882,2661,08,29,0,0,1,内蒙古
328862,844400,1271,2882,2661,08,29,0,1,1,山西
328862,575153,1271,2882,2661,08,29,0,2,1,山西
328862,996875,1271,2882,2661,08,29,0,1,1,内蒙古
328862,1086186,1271,1253,1049,08,29,0,0,2,浙江
328862,623866,1271,2882,2661,08,29,0,0,2,黑龙江
328862,542871,1467,2882,2661,08,29,0,5,2,四川
328862,536347,1095,883,1647,08,29,0,7,1,吉林

这个案例实时统计每秒中男女生购物人数,因此针对每条购物日志,我们只需要获取gender即可,然后发送给Kafka,接下来Spark Streaming再接收gender进行处理。

数据预处理

本案例使用Python对数据进行预处理,并将处理后的数据直接通过Kafka生产者发送给Kafka,这里需要先安装Python操作Kafka的代码库,请在Ubuntu中打开一个命令行终端,执行如下Shell命令来安装Python操作Kafka的代码库(备注:如果之前已经安装过,则这里不需要安装):

1
pip3 install kafka-python

接着可以写如下Python代码,文件名为producer.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# coding: utf-8
import csv
import time
from kafka import KafkaProducer

# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 打开数据文件
csvfile = open("../data/user_log.csv","r")
# 生成一个可用于读取csv文件的reader
reader = csv.reader(csvfile)

for line in reader:
gender = line[9] # 性别在每行日志代码的第9个元素
if gender == 'gender':
continue # 去除第一行表头
time.sleep(0.1) # 每隔0.1秒发送一行数据
# 发送数据,topic为'sex'
producer.send('sex',line[9].encode('utf8'))

上述代码很简单,首先是先实例化一个Kafka生产者。然后读取用户日志文件,每次读取一行,接着每隔0.1秒发送给Kafka,这样1秒发送10条购物日志。这里发送给Kafka的topic为’sex’。

Python操作Kafka

我们可以写一个KafkaConsumer测试数据是否投递成功,代码如下,文件名为consumer.py

1
2
3
4
5
from kafka import KafkaConsumer

consumer = KafkaConsumer('sex')
for msg in consumer:
print((msg.value).decode('utf8'))

Python

在开启上述KafkaProducer和KafkaConsumer之前,需要先开启Kafka,命令如下:

1
2
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
1
2
3
# 打开新的窗口
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties

在Kafka开启之后,即可开启KafkaProducer和KafkaConsumer。开启方法如下:
请在Ubuntu中,打开一个命令行 终端窗口,执行如下命令:

1
2
cd ~/bigdata/scripts  #进入到代码目录
python3 producer.py #启动生产者发送消息给Kafaka

然后,请在Ubuntu中,打开另外一个命令行 终端窗口,执行如下命令:

1
2
cd ~/bigdata/scripts  #进入到代码目录
python3 consumer.py #启动消费者从Kafaka接收消息

运行上面这条命令以后,这时,你会看到屏幕上会输出一行又一行的数字,类似下面的样子:

1
2
3
4
5
6
7
8
9
2
1
1
1
2
0
2
1
……