BI experience

Il était une fois VM HortonWorks, Flume et Tweeter

| 2 Commentaires

Comme convenu dans le dernier post, nous allons parler de flume et de la récupération de flux twitter.

Tout d’abord, nous partirons de la VM HortonWorks (voir le dernier post).

Voici les fichiers dont vous allez avoir besoin pour travailler

Les étapes d’installation de flume :

Si vous êtes novices comme moi, et que vous ne maitriser pas bien les configurations de flume, partez d’une installation automatique avec :

yum install flume 

Ceci implique donc que votre VM ait un accès internet.
En effet, au début j’avais fait un download sur ma machine. Puis j’avais pousser le programme flume sur la VM avec le petit programme wscp. Et là! C’était le drame! Je n’ai pas su configurer correctement les fichiers et donc rien ne fonctionnait… (ça sens le manque d’expériences!).
Du coup vive « yum install… »

Ensuite il faut configurer flume :

  1. modifier le fichier de log log4j.properties et mettre flume.log.dir=/var/log/flume si ce n’est pas déjà fait pour aller voir les logs par la suite.
  2. modifier le fichier flume-env.sh en lui indiquant les variables d’environnement  avec notamment le flume_classpath (flume-sources-1.0-SNAPSHOT.jar). Un exemple de contenu de ce fichier est dans la suite de l’article
  3. le hive-serdes-1.0-SNAPSHOT.jar sert à la lecture  de ce qu’on aura capturer en créant une table externe dans hive. Nous ne traiterons pas cette partie dans cet article. Pour information, j’ai ajouté le fichier directement dans l’interface beewax de hive pour créer la table externe et faire mes requêtes.

 Avant de commencer voici quelques petits liens utiles :

 

Avant de vous montrer le code, je propose, ci-dessous, les parties de la documentation d’apache flume qui m’ont aidé à comprendre le fonctionnement du fichier flume.conf. En effet, la documentation n’est pas très lisible car le sommaire sur la gauche est imbitable et les parties importantes sont entrecoupées par des informations sur des systèmes tels que Avro, Thrift…:

Donc les parties vraiment intéressantes pour notre exemple sont :

Les parties pour une vision générale, globale :

  • la partie Architecture pour les points suivants : data-flow-model, complex-flows, reliability, recoverability
  • la partie Setting up an agent pour les points suivants : configuring-individual-components, wiring-the-pieces-together, starting-an-agent, a-simple-example, installing-third-party-plugins
  • les parties sur les différentes possibilités que l’on a pour créer des flux :  setting multi agent flow, consolidation, multiplexing-the-flow

Les parties plus techniques avec des exemples de code :

Maintenant que le décor est planté, je vous propose de vous montrer mon fichier flume-env.sh et 2 exemples de flume.conf pour 2 flux différents.

Comme j’adore les chanteuse Beyonce et Shakira, mon exemple permettra de récupérer les tweets correspondant à ces 2 stars.

 

Le contenu du fichier flume-env.sh :

export JAVA_OPTS= »-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote »
export FLUME_CLASSPATH= »/usr/lib/flume/lib/flume-sources-1.0-SNAPSHOT.jar »

Le premier exemple avec un flux simple :

 

Voici le code de flume.conf

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = remplacer par le consumer Key de votre application twitter
TwitterAgent.sources.Twitter.consumerSecret = remplacer par le consumer Secret de votre application twitter
TwitterAgent.sources.Twitter.accessToken = remplacer par l’acces Token de votre application twitter
TwitterAgent.sources.Twitter.accessTokenSecret = remplacer par l’access Token Secret  de votre application twitter

TwitterAgent.sources.Twitter.keywords = beyonce

TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://sandbox.hortonworks.com:8020/user/hue/flume/tweets
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

 

Un second exemple avec un double channels

 

 

le code flume.conf

 

TwitterAgent.sources = Twitter
TwitterAgent.sinks = HDFS_Beyonce HDFS_shakira
TwitterAgent.channels = MemChannel_Beyonce MemChannel_Shakira

TwitterAgent.sources.Twitter.keywords = Beyonce,Shakira

# Ici c’est la partie la plus délicate. Il faut créer
# un interceptor pour lui indiquer des mots que l’on souhaite
# ajouter dans le header du flux. Car un flux est composé
# d’un header et d’un body. Le but ici est de remplir le
# header avec Beyonce ou Shakira pour ensuite dispatcher
# le flux dans le bon channel grâce au mapping.

TwitterAgent.sources.Twitter.interceptors = i1
TwitterAgent.sources.Twitter.interceptors.i1.type = regex_extractor

#ici l’expression (Beyonce|Shakira) permet de chercher dans le body du flux soit Beyonce soit Shakira.

TwitterAgent.sources.Twitter.interceptors.i1.regex = (Beyonce|Shakira)
TwitterAgent.sources.Twitter.interceptors.i1.serializers = s1
TwitterAgent.sources.Twitter.interceptors.i1.serializers.s1.name = star

# il existe 2 types  : multiplexing pour mettre le résultat dans des sous channel ou replicating pour répliquer la totalité dans des sous channel
TwitterAgent.sources.Twitter.selector.type = multiplexing
TwitterAgent.sources.Twitter.selector.header = star
TwitterAgent.sources.Twitter.selector.mapping.Beyonce = MemChannel_Beyonce
TwitterAgent.sources.Twitter.selector.mapping.Shakira = MemChannel_Shakira

# Describe/configure the source
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
#TwitterAgent.sources.Twitter.bind = localhost
#TwitterAgent.sources.Twitter.port = 44444

# Bind the source and sink to the channel
#TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.channels = MemChannel_Beyonce MemChannel_Shakira
TwitterAgent.sources.Twitter.consumerKey = remplacer par le consumer Key de votre application twitter
TwitterAgent.sources.Twitter.consumerSecret = remplacer par le consumer Secret de votre application twitter
TwitterAgent.sources.Twitter.accessToken = remplacer par l’acces Token de votre application twitter
TwitterAgent.sources.Twitter.accessTokenSecret = remplacer par l’access Token Secret  de votre application twitter

# Describe the sink TwitterAgent.sinks.HDFS.type = logger
# Use a channel which buffers events in memory
TwitterAgent.channels.MemChannel_Beyonce.type = memory
TwitterAgent.channels.MemChannel_Beyonce.capacity = 1000
TwitterAgent.channels.MemChannel_Beyonce.transactionCapacity = 100

TwitterAgent.channels.MemChannel_Shakira.type = memory
TwitterAgent.channels.MemChannel_Shakira.capacity = 1000
TwitterAgent.channels.MemChannel_Shakira.transactionCapacity = 100

TwitterAgent.sinks.HDFS_Beyonce.channel = MemChannel_Beyonce
TwitterAgent.sinks.HDFS_Beyonce.type = hdfs
TwitterAgent.sinks.HDFS_Beyonce.hdfs.path = hdfs://sandbox.hortonworks.com:8020/user/hue/flume/tweets_Beyonce
TwitterAgent.sinks.HDFS_Beyonce.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS_Beyonce.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS_Beyonce.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS_Beyonce.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS_Beyonce.hdfs.rollCount = 100000

TwitterAgent.sinks.HDFS_shakira.channel = MemChannel_Shakira
TwitterAgent.sinks.HDFS_shakira.type = hdfs
TwitterAgent.sinks.HDFS_shakira.hdfs.path = hdfs://sandbox.hortonworks.com:8020/user/hue/flume/tweets_Shakira
TwitterAgent.sinks.HDFS_shakira.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS_shakira.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS_shakira.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS_shakira.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS_shakira.hdfs.rollCount = 100000

Il faudrait que j’essaye de faire un dernier exemple multisource multi channel avec de la consolidation en dernière étape.

Cet article pose une brique de plus sur l’apprentissage du NoSQL, du big data…

J’ai essayé de rassembler les liens qui m’ont permis d’avoir ce cheminement. Je ne sais pas si tout est clair mais j’ai essayé de mettre un maximum d’informations.

Si vous avez un retour d’expérience à faire sur ce genre de développement notamment sur tout ce qui est performance liée au fait d’avoir plusieurs agents, plusieurs channels ou plusieurs sinks. N’hésitez pas à partager ça avec nous!

Commentez!

2 Commentaires

  1. Bonjour, article intéressant.
    Voici un retour d’expérience sur l’utilisation de HDInsight (une version HDP dans Azure) pour analyser des Tweets : http://www.pulsweb.fr/la-demystification-du-big-data/
    Nous utilisons non pas à Flume mais un Worker Role Azure pour rapatrier certains Tweets en fonction de leurs mots-clefs.
    Cordialement,
    Romain Casteres

  2. Merci pour ton commentaire Romain!

Laisser un commentaire

Champs Requis *.

*