安装Kafka-Python
下载Kafka-Python压缩包
从https://github.com/mumrah/kafka-python/releases中下载最新的release包,此时最新的包为kafka-python-0.9.3.tar.gz
编译安装
$tar -xvf kafka-python-0.9.3.tar.gz
$cd kafka-python-0.9.3.tar.gz
$python setup.py install
如果报下面的异常说明没有安装setuptools,请参考下文进行安装:
[html] view plain copy print?
[root@hadoop01 kafka-python-0.9.3]# python setup.py install
Traceback (most recent call last):
File "setup.py", line 3, in <module>
from setuptools import setup, Command
ImportError: No module named setuptools
编码
[html] view plain copy print?
import time
__author__ = 'aihua.sun'
import logging
import random,string
from kafka.producer import SimpleProducer
from kafka.client import KafkaClient
LOG = logging.getLogger('kafka_producer')
class TrueCloudDataPointProducer():
def __init__(self,hosts,batch_send=False,batch_send_every_n=20,topic="true_cloud_datapoint_topic"):
self.hosts=hosts
self.client=KafkaClient(self.hosts)
self.batch_send=batch_send
self.batch_send_every_n=batch_send_every_n
self.producer = SimpleProducer(self.client,batch_send=batch_send,batch_send_every_n=batch_send_every_n)
self.topic=topic
def send_messages(self,msg):
self.producer.send_messages(self.topic,msg)
def get_instance():
hosts={'hadoop01':'9092','hadoop01':'9093','hadoop01':'9094','hadoop101':'9095','hadoop02':'9092','hadoop02':'9093','hadoop02':'9094'}
return TrueCloudDataPointProducer(hosts)
if __name__=="__main__":
begin=time.time()
producer=get_instance()
for i in range(0,10000):
msg='Message'+str(i)+' '+''.join(random.choice(string.lowercase) for i in range(64))+'\n'
producer.send_messages(msg)
end=time.time()
print("use time:"+str((end-begin)))
安装依赖模块
setuptools
下载
从https://pypi.python.org/pypi/setuptools/14.3.1#downloads下载setuptools包
编译安装
$tar zxvf setuptools-14.3.1.tar.gz
$cd setuptools-14.3.1
$python setup.py build
$python setup.py install
six
从https://pypi.python.org/pypi/six/1.9.0 下载six-1.9.0.tar.gz
编译安装
$tar zxvf six-1.9.0.tar.gz
$cd setuptools-0.6c11
$python setup.py build
$python setup.py install