大数据项目之数据处理和Python操作Kafka
写在前面
昨天是中秋节,休息了一天,和老王楠姐去密室逃脱完了一局,玩的是不恐怖的本,但里面的氛围还是很阴森。从密室逃脱出来之后,吃了小炒黄牛肉,一共有四个菜,小炒黄牛肉、芋头蒸排骨、炒花菜、脆皮豆腐。小炒黄牛肉没有放辣椒,吃起来味道很不错。其中最辣的一道菜是脆皮豆腐,我没吃几口。炒花菜里面的油渣好吃,我吃了很多。吃完就去的helens,点了一扎果啤,虽然叫果啤,但对我来说还是有度数,三四杯下肚,头就晕晕的,红色的血管在脸上扩张,有种狼狈的醉意。隔壁桌有人来问我们的戒指是不是粉色的,这是个活动,每一桌都有一种颜色的戒指,如果两桌颜色的戒指一样,就可以去前台兑换啤酒。来了两桌人问,最后来的那一桌和我们的颜色是一样的,我就去和他们桌的一个人去兑换啤酒了。去的途中知道,那个人是中南林科大的,从长沙来湘潭玩。我们这桌一共是三个人,一人一瓶啤酒,拿来了三瓶。我们喝完了一扎果啤,就把兑换来的倒进果啤扎里面去了,因为里面还有很多冰块,可以喝到冰爽的啤酒。喝酒的过程,我们前期主要在玩手机,我投了一份方太的简历。我们聊到考研,觉得这件事本身并不难,难的是坚持下去,这个过程十分枯燥。后期,我们三个玩骰子,就这样,一人几口下肚,三瓶啤酒也就没了。喝完之后,打了个滴滴回去,在车上我就已经快睡着了,撑着睡意到家,和芦打了三十分钟的视频电话,就昏昏睡去了。对了,在去玩密室逃脱之前,我们买了茶颜悦色喝,我还是老样子,幽兰拿铁。就写到这里吧,做实验了。
数据预处理
本案例采用的数据集压缩包为data_format.zip点击这里下载data_format.zip数据集,该数据集压缩包是淘宝2015年双11前6个月(包含双11)的交易数据(交易数据有偏移,但是不影响实验的结果),里面包含3个文件,分别是用户行为日志文件user_log.csv 、回头客训练集train.csv 、回头客测试集test.csv. 在这个案例中只是用user_log.csv这个文件,下面列出文件user_log.csv的数据格式定义:
用户行为日志user_log.csv,日志中的字段定义如下:
user_id | 买家id
item_id | 商品id
cat_id | 商品类别id
merchant_id | 卖家id
brand_id | 品牌id
month | 交易时间:月
day | 交易事件:日
action | 行为,取值范围{0,1,2,3},0表示点击,1表示加入购物车,2表示购买,3表示关注商品
age_range | 买家年龄分段:1表示年龄<18,2表示年龄在[18,24],3表示年龄在[25,29],4表示年龄在[30,34],5表示年龄在[35,39],6表示年龄在[40,49],7和8表示年龄>=50,0和NULL则表示未知
gender | 性别:0表示女性,1表示男性,2和NULL表示未知
province| 收获地址省份
数据具体格式如下:
1 |
|
这个案例实时统计每秒中男女生购物人数,因此针对每条购物日志,我们只需要获取gender即可,然后发送给Kafka,接下来Spark Streaming再接收gender进行处理。
数据预处理
本案例使用Python对数据进行预处理,并将处理后的数据直接通过Kafka生产者发送给Kafka,这里需要先安装Python操作Kafka的代码库,请在Ubuntu中打开一个命令行终端,执行如下Shell命令来安装Python操作Kafka的代码库(备注:如果之前已经安装过,则这里不需要安装):
1 |
|
接着可以写如下Python代码,文件名为producer.py:
1 |
|
上述代码很简单,首先是先实例化一个Kafka生产者。然后读取用户日志文件,每次读取一行,接着每隔0.1秒发送给Kafka,这样1秒发送10条购物日志。这里发送给Kafka的topic为’sex’。
Python操作Kafka
我们可以写一个KafkaConsumer测试数据是否投递成功,代码如下,文件名为consumer.py
1 |
|
Python
在开启上述KafkaProducer和KafkaConsumer之前,需要先开启Kafka,命令如下:
1 |
|
1 |
|
在Kafka开启之后,即可开启KafkaProducer和KafkaConsumer。开启方法如下:
请在Ubuntu中,打开一个命令行 终端窗口,执行如下命令:
1 |
|
然后,请在Ubuntu中,打开另外一个命令行 终端窗口,执行如下命令:
1 |
|
运行上面这条命令以后,这时,你会看到屏幕上会输出一行又一行的数字,类似下面的样子:
1 |
|