Comme convenu dans le dernier post, nous allons parler de flume et de la récupération de flux twitter.
Tout d’abord, nous partirons de la VM HortonWorks (voir le dernier post).
Voici les fichiers dont vous allez avoir besoin pour travailler
Les étapes d’installation de flume :
Si vous êtes novices comme moi, et que vous ne maitriser pas bien les configurations de flume, partez d’une installation automatique avec :
yum install flume
Ceci implique donc que votre VM ait un accès internet.
En effet, au début j’avais fait un download sur ma machine. Puis j’avais pousser le programme flume sur la VM avec le petit programme wscp. Et là! C’était le drame! Je n’ai pas su configurer correctement les fichiers et donc rien ne fonctionnait… (ça sens le manque d’expériences!).
Du coup vive « yum install… »
Ensuite il faut configurer flume :
- modifier le fichier de log log4j.properties et mettre flume.log.dir=/var/log/flume si ce n’est pas déjà fait pour aller voir les logs par la suite.
- modifier le fichier flume-env.sh en lui indiquant les variables d’environnement avec notamment le flume_classpath (flume-sources-1.0-SNAPSHOT.jar). Un exemple de contenu de ce fichier est dans la suite de l’article
- le hive-serdes-1.0-SNAPSHOT.jar sert à la lecture de ce qu’on aura capturer en créant une table externe dans hive. Nous ne traiterons pas cette partie dans cet article. Pour information, j’ai ajouté le fichier directement dans l’interface beewax de hive pour créer la table externe et faire mes requêtes.
Avant de commencer voici quelques petits liens utiles :
- la documentation hortonWorks pour la configuration de flume
- un super article qui montre l’installation de flume et un exemple simple d’implémentation de flume.conf sur ce blog
- le user guide apache Flume
- d’autres pages du web qui peuvent aider : blog.xebia.fr, questforthought.wordpress.com
Avant de vous montrer le code, je propose, ci-dessous, les parties de la documentation d’apache flume qui m’ont aidé à comprendre le fonctionnement du fichier flume.conf. En effet, la documentation n’est pas très lisible car le sommaire sur la gauche est imbitable et les parties importantes sont entrecoupées par des informations sur des systèmes tels que Avro, Thrift…:
Donc les parties vraiment intéressantes pour notre exemple sont :
Les parties pour une vision générale, globale :
- la partie Architecture pour les points suivants : data-flow-model, complex-flows, reliability, recoverability
- la partie Setting up an agent pour les points suivants : configuring-individual-components, wiring-the-pieces-together, starting-an-agent, a-simple-example, installing-third-party-plugins
- les parties sur les différentes possibilités que l’on a pour créer des flux : setting multi agent flow, consolidation, multiplexing-the-flow
Les parties plus techniques avec des exemples de code :
- la partie de configuration permet de donner une vue générale d’un flux : defining the flow, configuring individual components, adding multiple flows in an agent, configuring a multi agent flow, fan out flow
- la partie de custom source permet de montrer comment configuer la source
- la partie de flume sinks avec les points hdfs sink
- la partie flume channels avec les points memory-channel
- la partie flume channels selector avec les points replicating channel selector default,multiplexing channel selector, custom channel selector
- la partie flume interceptor avec les points flume interceptors, timestamp interceptor, regex filtering interceptor(avec 2 exemples : example 1, example 2
Maintenant que le décor est planté, je vous propose de vous montrer mon fichier flume-env.sh et 2 exemples de flume.conf pour 2 flux différents.
Comme j’adore les chanteuse Beyonce et Shakira, mon exemple permettra de récupérer les tweets correspondant à ces 2 stars.
Le contenu du fichier flume-env.sh :
export JAVA_OPTS= »-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote »
export FLUME_CLASSPATH= »/usr/lib/flume/lib/flume-sources-1.0-SNAPSHOT.jar »
Le premier exemple avec un flux simple :
Voici le code de flume.conf
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = remplacer par le consumer Key de votre application twitter
TwitterAgent.sources.Twitter.consumerSecret = remplacer par le consumer Secret de votre application twitter
TwitterAgent.sources.Twitter.accessToken = remplacer par l’acces Token de votre application twitter
TwitterAgent.sources.Twitter.accessTokenSecret = remplacer par l’access Token Secret de votre application twitter
TwitterAgent.sources.Twitter.keywords = beyonce
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://sandbox.hortonworks.com:8020/user/hue/flume/tweets
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100
Un second exemple avec un double channels
le code flume.conf
TwitterAgent.sources = Twitter
TwitterAgent.sinks = HDFS_Beyonce HDFS_shakira
TwitterAgent.channels = MemChannel_Beyonce MemChannel_Shakira
TwitterAgent.sources.Twitter.keywords = Beyonce,Shakira
# Ici c’est la partie la plus délicate. Il faut créer
# un interceptor pour lui indiquer des mots que l’on souhaite
# ajouter dans le header du flux. Car un flux est composé
# d’un header et d’un body. Le but ici est de remplir le
# header avec Beyonce ou Shakira pour ensuite dispatcher
# le flux dans le bon channel grâce au mapping.
TwitterAgent.sources.Twitter.interceptors = i1
TwitterAgent.sources.Twitter.interceptors.i1.type = regex_extractor
#ici l’expression (Beyonce|Shakira) permet de chercher dans le body du flux soit Beyonce soit Shakira.
TwitterAgent.sources.Twitter.interceptors.i1.regex = (Beyonce|Shakira)
TwitterAgent.sources.Twitter.interceptors.i1.serializers = s1
TwitterAgent.sources.Twitter.interceptors.i1.serializers.s1.name = star
# il existe 2 types : multiplexing pour mettre le résultat dans des sous channel ou replicating pour répliquer la totalité dans des sous channel
TwitterAgent.sources.Twitter.selector.type = multiplexing
TwitterAgent.sources.Twitter.selector.header = star
TwitterAgent.sources.Twitter.selector.mapping.Beyonce = MemChannel_Beyonce
TwitterAgent.sources.Twitter.selector.mapping.Shakira = MemChannel_Shakira
# Describe/configure the source
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
#TwitterAgent.sources.Twitter.bind = localhost
#TwitterAgent.sources.Twitter.port = 44444
# Bind the source and sink to the channel
#TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.channels = MemChannel_Beyonce MemChannel_Shakira
TwitterAgent.sources.Twitter.consumerKey = remplacer par le consumer Key de votre application twitter
TwitterAgent.sources.Twitter.consumerSecret = remplacer par le consumer Secret de votre application twitter
TwitterAgent.sources.Twitter.accessToken = remplacer par l’acces Token de votre application twitter
TwitterAgent.sources.Twitter.accessTokenSecret = remplacer par l’access Token Secret de votre application twitter
# Describe the sink TwitterAgent.sinks.HDFS.type = logger
# Use a channel which buffers events in memory
TwitterAgent.channels.MemChannel_Beyonce.type = memory
TwitterAgent.channels.MemChannel_Beyonce.capacity = 1000
TwitterAgent.channels.MemChannel_Beyonce.transactionCapacity = 100
TwitterAgent.channels.MemChannel_Shakira.type = memory
TwitterAgent.channels.MemChannel_Shakira.capacity = 1000
TwitterAgent.channels.MemChannel_Shakira.transactionCapacity = 100
TwitterAgent.sinks.HDFS_Beyonce.channel = MemChannel_Beyonce
TwitterAgent.sinks.HDFS_Beyonce.type = hdfs
TwitterAgent.sinks.HDFS_Beyonce.hdfs.path = hdfs://sandbox.hortonworks.com:8020/user/hue/flume/tweets_Beyonce
TwitterAgent.sinks.HDFS_Beyonce.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS_Beyonce.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS_Beyonce.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS_Beyonce.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS_Beyonce.hdfs.rollCount = 100000
TwitterAgent.sinks.HDFS_shakira.channel = MemChannel_Shakira
TwitterAgent.sinks.HDFS_shakira.type = hdfs
TwitterAgent.sinks.HDFS_shakira.hdfs.path = hdfs://sandbox.hortonworks.com:8020/user/hue/flume/tweets_Shakira
TwitterAgent.sinks.HDFS_shakira.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS_shakira.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS_shakira.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS_shakira.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS_shakira.hdfs.rollCount = 100000
Il faudrait que j’essaye de faire un dernier exemple multisource multi channel avec de la consolidation en dernière étape.
Cet article pose une brique de plus sur l’apprentissage du NoSQL, du big data…
J’ai essayé de rassembler les liens qui m’ont permis d’avoir ce cheminement. Je ne sais pas si tout est clair mais j’ai essayé de mettre un maximum d’informations.
Si vous avez un retour d’expérience à faire sur ce genre de développement notamment sur tout ce qui est performance liée au fait d’avoir plusieurs agents, plusieurs channels ou plusieurs sinks. N’hésitez pas à partager ça avec nous!
Commentez!
28 août 2014 à 17 h 02 min
Bonjour, article intéressant.
Voici un retour d’expérience sur l’utilisation de HDInsight (une version HDP dans Azure) pour analyser des Tweets : http://www.pulsweb.fr/la-demystification-du-big-data/
Nous utilisons non pas à Flume mais un Worker Role Azure pour rapatrier certains Tweets en fonction de leurs mots-clefs.
Cordialement,
Romain Casteres
28 août 2014 à 19 h 26 min
Merci pour ton commentaire Romain!