Talend | BI experience

Archive pour la catégorie ‘Talend’

Copier une base de données vers une autre avec Talend

Jeudi 25 août 2011

Depuis la version 4.1.2, Talend a sortie une option assez remarquable :le Dynamic Schema

Je vous propose dans ce post de détailler la démarche que nous avons adoptée pour l‘alimentation de notre infocentre : copie des différentes bases de données de la société à partir desquelles le datawarehouse est alimenté.

Un simple job sera réalisé pour amener les données d’une base de données source vers une base de données cible :
job_fils

Voici l’option en question que vous pourrez utiliser dans votre input:

option_dynamic_schema2

Et si vous avez des règles de gestion à ajouter n’hésitez pas avec le tJavaFlex :
action_sur_lignes1

/**l’exemple de traitement en plus lisible…**/
for(int i = 0; i < row3.dyn_Col.metadatas.size(); i++) {
row3.dyn_Col.metadatas.get(i).setName(row3.dyn_Col.metadatas.get(i).getName().toLowerCase());
}

Ce job, décrit ci-dessus, est appelé par un job père qui lui donne le contexte contenant le nom de la table courante à migrer :

job_pere

Une variable de context est passée au sous job (monsousjobcopie) : my_current_table

L’input requête sur une table de paramétrage. En effet nous n’avons pas forcement besoin de toutes les tables de toutes les bases de données. Donc à travers une table de paramétrage nous choisisson celle que l’on veut migrer :

input_job_pere

Ce job père appelle aussi un autre sous job qui lui va permettre de migrer les indexes :
index

Donc pour chaque table on fera un appel à une routine (code qu’il y a dans le tJava):

routines.IndexMigrer.migrate_index(((java.sql.Connection)globalMap.get(”conn_tMSSqlConnection_2″)), ((String)globalMap.get(”current_TABLE.nom_table”)), ((java.sql.Connection)globalMap.get(”conn_tPostgresqlConnection_4″)),context.Cible_Schema + (String)globalMap.get(”schema”) + “.” +
((String)globalMap.get(”current_TABLE.nom_table”)));

Et en bonus voici la routine que nous avons créés qui selon la base de données (Oracle, Posgres, SQL Server 2000 ou SQL Server 2005) va requêter dans la bonne table système :

/***************Routines migration indexes ************************/

public class IndexMigrer {
public static void migrate_index(Connection org_connect, String org_nomtable,
Connection dest_connect, String dest_nomtable)  throws java.sql.SQLException
{
if(org_connect.getMetaData().getDatabaseProductName().equalsIgnoreCase(”Microsoft SQL Server“) )
{
Statement stat = org_connect.createStatement();
ResultSet rs;
if(org_connect.getMetaData().getDatabaseMajorVersion()==9)
{
rs=stat.executeQuery(”SELECT IDX.name as index_name, COL.name as column_name FROM sys.index_columns IDXC”
+” INNER JOIN sys.objects OBJ”
+” ON IDXC.object_id = OBJ.object_id”
+” INNER JOIN sys.schemas SCH”
+” ON SCH.schema_id = OBJ.schema_id”
+” INNER JOIN sys.indexes IDX”
+” ON (IDXC.object_id = IDX.object_id AND IDXC.index_id = IDX.index_id)”
+” INNER JOIN sys.columns COL”
+” ON (IDXC.column_id = COL.column_id AND OBJ.object_id = COL.object_id)”
+” where OBJ.name=\’”+org_nomtable+”\’”
+” ORDER BY OBJ.name” );
}
else
{
rs=stat.executeQuery(”select i.name as index_name,c.name as column_name”
+” from sysindexkeys as indk”
+” inner join sysindexes as i on”
+” (indk.indid=i.indid and indk.id=i.id )”
+” inner join syscolumns as c on”
+” (indk.colid=c.colid and indk.id=c.id )”
+” inner join sysobjects as o on”
+” (indk.id=o.id)”
+” and o.name=\’”+org_nomtable+”\’ ”
+” ORDER BY o.name” );
}
Map m =new HashMap();
Map m_key=new HashMap();
while(rs.next())
{
if(!m.containsKey(rs.getString(1)))
{
List l = new ArrayList();
l.add(rs.getString(2));
m.put(rs.getString(1),l);
m_key.put(rs.getString(1), rs.getString(1));
}
else if(m.containsKey(rs.getString(1)))
{
List l=(List)m.get(rs.getString(1));
l.add(rs.getString(2));
m.put(rs.getString(1),l);
}
}
StringBuffer sb=new StringBuffer();
for (Iterator i = m.keySet().iterator() ; i.hasNext();){
String key = (String)i.next();
sb.append(”CREATE INDEX “);
sb.append(”IDX_”+key);
sb.append(” ON “+dest_nomtable+” (”);
List l=(List)m.get(key);
for(int j=0;j<l.size();j++)
{
sb.append(l.get(j));
if(j!=l.size()-1)
sb.append(”,”);
}
sb.append(”);\n”);
}
try{
Statement bla=dest_connect.createStatement();
System.out.println(”"+sb.toString());
bla.execute(sb.toString());
}
catch (java.sql.SQLException e)
{
//System.out.println(dest_nomtable);
System.out.println(e.getMessage());
}
dest_connect.commit();
//System.out.println(dest_nomtable);
}
if(org_connect.getMetaData().getDatabaseProductName().equalsIgnoreCase(”PostgreSQL“))
{
Statement stat = org_connect.createStatement();
ResultSet rs=stat.executeQuery(”SELECT indkey as colonne_index,pci.relname as nom_index,pct.relname as nom_table from pg_index as pi”
+” , pg_class as pci, pg_class as pct ”
+” where pci.oid=pi.indexrelid”
+” and pct.oid=pi.indrelid and pct.relname  not like ‘pg_%’ and pct.relname=’”+org_nomtable+”‘ ” );
while( rs.next())
{
String s=rs.getString(1);
java.util.StringTokenizer tokenizer = new java.util.StringTokenizer(s, ” “);
List <Integer> num_columns=new ArrayList();
while ( tokenizer.hasMoreTokens() ) {
num_columns.add(Integer.parseInt(tokenizer.nextToken()));
}
Statement sel=org_connect.createStatement();
ResultSet rsset=sel.executeQuery(”SELECT * from “+org_nomtable+” LIMIT 1″);
StringBuffer sb=new StringBuffer();
sb.append(”CREATE INDEX “+rs.getString(2)+” ON “+dest_nomtable+” (” );
Iterator i=num_columns.iterator();
while(i.hasNext())
{
sb.append(rsset.getMetaData().getColumnLabel((Integer)i.next()));
if(i.hasNext())
sb.append(”,”);
}
sb.append(”) ;\n”);
Statement bla=dest_connect.createStatement();
bla.execute(sb.toString());
dest_connect.commit();
bla.close();
}
}
if(org_connect.getMetaData().getDatabaseProductName().equalsIgnoreCase(”Oracle“))
{
Statement stat = org_connect.createStatement();
ResultSet rs=stat.executeQuery(”SELECT col.index_name,col.column_name FROM all_indexes cons INNER JOIN all_ind_columns col ON cons.owner = col.index_owner AND cons.index_name = col.index_name WHERE OWNER = ‘AEFE’ AND cons.table_name=\’”+org_nomtable+”\’” );
Map m =new HashMap();
Map m_key=new HashMap();
while(rs.next())
{
if(!m.containsKey(rs.getString(1)))
{
List l = new ArrayList();
l.add(rs.getString(2));
m.put(rs.getString(1),l);
m_key.put(rs.getString(1), rs.getString(1));
}
else if(m.containsKey(rs.getString(1)))
{
List l=(List)m.get(rs.getString(1));
l.add(rs.getString(2));
m.put(rs.getString(1),l);
}
}
StringBuffer sb=new StringBuffer();
for (Iterator i = m.keySet().iterator() ; i.hasNext();){
String key = (String)i.next();
sb.append(”CREATE INDEX “);
sb.append(”IDX_”+key);
sb.append(” ON “+dest_nomtable+” (”);
List l=(List)m.get(key);
for(int j=0;j<l.size();j++)
{
sb.append(l.get(j));
if(j!=l.size()-1)
sb.append(”,”);
}
sb.append(”);\n”);
}
Statement bla=dest_connect.createStatement();
System.out.println(”"+sb.toString());
bla.execute(sb.toString());
dest_connect.commit();
}}}

Très important, si un problème de typage persiste n’hésitez pas à modifier les fichiers de mapping dans :

fichier_mappingCes fichiers sont utiles et simples à utiliser puisqu’ils permettent de connaitre les transformations de typage. En gros voici comment procède Talend :

typage BDD source => typage JAVA => typage BDD cible

Exemple d’utilisation :

ma base de données source est SQL server et ma base de données cible est postgres  :

La démarche sera donc d’aller dans  MSSQL_mapping.xml  et de regarder la rubrique : <dbToTalendTypes> pour voir en quel type Java les typages de ma base de données MSSQL seront transformés. Ensuite il faudra regarder dans postgres_mapping.xml pour regarder les typages par défault que propose Talend dans la rubrique  <talendToDbTypes> pour la retransformation des types JAVA en typages postgres.

Vous pourrez ainsi personnaliser ces transformations.

Si ces fichiers sont modifiés il faudra bien sur les répercutés sur tous les clients  TIS  et sur le serveur  dans les workspaces dans:

  • .JAVA > src > xmlMappings
  • et dans .JAVA > classes > xmlMappings

J’espère que ce post vous aura permis de voir les possibilités qu’offrent la solution Talend pour ce genre de besoin.

N’hésitez pas à poster pour donner votre opinion!

Gérer l’import/export des jobs Talend grâce à des jobs

Jeudi 4 mars 2010

Avant que Talend ne propose sa plateforme “Repository Manager”, nous avions demandé l’intervention d’une personne de Talend pour la mise en place de leur “framework” (voir ce post).  Le but de cette prestation était de nous permettre l’industrialisation de notre travail au quotidien. Le consultant Talend nous avait donc créé 2 jobs utilisant les commandes du commandline :

  • le premier pour l’export de jobs d’un serveur A
  • le second pour l’import de jobs dans un serveur B

TIS permet donc de faire ce travail d’import/export. Comment? c’est que nous allons voir dans ce post.

Voici le design du job “export” proposé :

export

Vous pourrez  consulter les commandes utilisées dans ce jobs dans le wiki de Talend  à cette adresse (avec les versions linux et windows).

Voici le contenu du composant tForeach (chaque ligne représente une commande)

/*connection au serveur*/
“initRemote “+context.Export_Url_Tac
/*log au bon projet*/
“logonProject -pn “+context.Export_Projet+” -ul “+context.Export_Identifiant+” -up “+context.Export_MotDePasse
/*Export des jobs à l’adresse indiqué en ajoutant dans le nom la date du jour si le job a type = process et à tel statut (par exemple : dev , recette ou prod) en incluant les dépendances*/
“exportItems “+context.Export_Chemin_Zip+”_”+TalendDate.getDate(”CCYYMMDD”)+”.zip”+” -if (type=process)and(status=”+context.Export_New_Status+”) -d”
/* En exportant on change la version des jobs ainsi que leurs statuts*/
“changeVersion “+context.Export_New_Version+” -if (type=process)and(status=”+context.Export_New_Status+”)”
“changeStatus “+context.Export_Old_Status+” -if (type=process)and(status=”+context.Export_New_Status+”)”

Pour un petit rappel sur les commandes du commandline aller voir ce post

Dans le TFlowiterate on mettra : ((String)globalMap.get(”tForeach_1_CURRENT_VALUE”)) afin de récupérer ces lignes avec les bonnes valeurs et de les mettre dans un fichier csv qui sera exécuter ensuite grâce au tSystem via cette commande :

” ./TISEE-linux-gtk-x86 -nosplash -application org.talend.commandline.CommandLine -consoleLog -data commandline-workspace scriptFile “+context.Chemin_Script

Pour l’import c’est exactement la même logique avec les commandes d’import.

Pour ceux qui préfère le confort d’une interface graphique n’hésitez pas à aller voir du coté du  “Repository Manager”   (voir ce post).

Repository Manager : une brique Talend qui facilite la vie

Mercredi 3 février 2010

Pour arriver à toujours mieux développer et gérer ses jobs, Talend propose un petit module : le “Repository Manager”. Cette brique a pour but de migrer les jobs d’une plateforme à une autre. Exemple : de la plateforme de développement à la plateforme de recette.
Cette brique fait partie de l’offre LCP (Life Cycle Plateform) que propose Talend qui comprend :

  • l’audit Talend (dans la plateforme TAC ou les données seront stockées en base de données)
  • le repository manager
  • le Talend Testing Plateform (permet de tester les évolutions d’un job faire des tests de non régression d’une version à une autre)

Le repository manager, plateforme web, propose la logique suivante :

  1. L’admnistrateur va dans un premier temps déclarer les différents environnements de développement, de recette et de production par type. La plateforme de développement sera déclarée en tant que “source”, la plateforme de recette en tant que “source and target” et la plateforme de production en tant que “target”.
  2. Des règles seront ensuite définies selon le type d’objet et son statut, sa version…pour importer ou exporter les job. La plateforme propose la gestion de l’évolution des statuts des jobs (Exemple statut “GoToRecette” puis lors de lorsqu’il est exporter puis importer dans la recette son statut deviendra “Delivered”)
  3. Ces règles peuvent être planifiées
  4. Un onglet Dashboard permettra de suivre les différentes étapes de chaque exécution.

Cet outil peut vraiment se révéler indispensable lors de développement au quotidien. Talend marque encore un point!

Talend et son Master Data Management

Dimanche 31 janvier 2010

Talend propose une formule “full integration” qui comprend : TIS (Talend Integration Studio), TDQ (Talend, Data Quality), MDM (Master Data Management).

talendmdm

Avec le master data management, Talend permet de gérer un référentiel de données. Le but de cette brique est finalement de centraliser l’information qui sera qualifiée de “Vérité absolue” pour l’entreprise.

Ce référentiel sera géré grâce à une plateforme qui est une autre vue dans eclipse (du même genre que Talend Integration Studio) et grâce à un client web (du même type que la plateforme TAC (Talend Administration Center).

Voici la démarche du produit :

  1. Tout d’abord dans la plateforme eclipse, il faudra créer le modèle de données : la structure du référentiel qui créera automatiquement des écrans de gestion pour visualiser ou modifier les données (plateforme web) qu’il y aura dans ces structures
  2. MDM permet la gestion des rôles et des droits d’accès aux données.
  3. Au niveau de l’application web, les utilisateurs pourront se connecter et verront une vue  du référentiel que le développeur aura créé au préalable. Sur cette plateforme web, certains utilisateurs clefs pourront faire des recherches, modifier ou ajouter des données. Ils pourront naviguer dans les données
  4. Le développeur mettra en place des règles qui permettront de faire un contrôle sur les données. Ces dernières ne passeront dans le référentiel que si ces règles sont respectées. Ces règles seront appliquées dès que les utilisateurs voudront toucher aux données du référentiel via la plateforme web.
  5. D’autres données pourront être chargées via des jobs Talend (déployés sur le MDM et accessibles en webservice.), qui pourront utiliser les composants de data quality de l’entreprise QAS. Ces jobs ou process mettront donc en place aussi des contrôles de qualité.
  6. Pour déclencher ces jobs, le développeur pourra définir des triggers.
  7. Pour venir parfaire ce processus, les développeurs pourront designer le workflow que suivent les données.
  8. Un journal permettra de suivre toutes les modifications faites.
  9. L’outil offre une possibilité intéressante : lors d’une modification, un utilisateur aura la possibilité de tester l’impact de cette modification sur un projet local avant de la synchroniser avec le projet principale.

Une version open source est actuellement accessible depuis le 25 Janvier : ici

La différence entre la version open source gratuite et payante est :

  • dans la version gratuite vous aurez l’accès à la modélisation du référentiel, et à la partie web visualisation et modification des données
  • dans la version payante, vous pourrez utiliser la gestion de workflow en plus, les droits d’accès, le versioning, le journal…

Talend et sa prestation “Framework”

Dimanche 24 janvier 2010

Talend nous a proposé une formation de 7 jours appelée : “Framework”. Cette prestation avait pour but d’analyser nos jobs, notre manière de fonctionner afin d’aller vers une industrialisation de nos jobs.

Voici ce qui a été analysé :

  • l’utilisation de joblet pour la mutialisation de certains morceaux de codes ou de flux (exemple un joblet pour les connexions qui reviennent pour chaque job)
  • l’utilisation de contextes pour gérer les connexions aux différents environnements voir ce post
  • la notion de commit et rollback pour garder l’intégrité de nos données en cas de problèmes d’exécution
  • la gestion de tables de rejets avec des commentaires techniques et fonctionnels
  • la gestion de logs voir ce post
  • l’utilisation du fichier audit que TIS nous fournit afin d’avoir plus d’information sur la construction des jobs
  • le schedule des jobs sur la plateforme TAC
  • l’utilisation du projet de référence : une sorte de projet virtuel vue par d’autres projets (à configurer dans la plateforme TAC). Ce projet de référence définit des connexions, des joblets, des jobs types. Ces différentes composantes sont ainsi réutilisables dans un projet quelconque grâce à un drag and drop.
  • la présentation du nouveau logiciel “Repository Manager” qui va nous permettre d’exporter et d’importer nos jobs grâce à une interface assez friendly

Cette prestation nous a vraiment permis de faire  des développements plus propres, plus travaillés. Je la recommande si vous en avez l’occasion.