TIS | BI experience

Archive pour le mot-clef ‘TIS’

Copier une base de données vers une autre avec Talend

Jeudi 25 août 2011

Depuis la version 4.1.2, Talend a sortie une option assez remarquable :le Dynamic Schema

Je vous propose dans ce post de détailler la démarche que nous avons adoptée pour l‘alimentation de notre infocentre : copie des différentes bases de données de la société à partir desquelles le datawarehouse est alimenté.

Un simple job sera réalisé pour amener les données d’une base de données source vers une base de données cible :
job_fils

Voici l’option en question que vous pourrez utiliser dans votre input:

option_dynamic_schema2

Et si vous avez des règles de gestion à ajouter n’hésitez pas avec le tJavaFlex :
action_sur_lignes1

/**l’exemple de traitement en plus lisible…**/
for(int i = 0; i < row3.dyn_Col.metadatas.size(); i++) {
row3.dyn_Col.metadatas.get(i).setName(row3.dyn_Col.metadatas.get(i).getName().toLowerCase());
}

Ce job, décrit ci-dessus, est appelé par un job père qui lui donne le contexte contenant le nom de la table courante à migrer :

job_pere

Une variable de context est passée au sous job (monsousjobcopie) : my_current_table

L’input requête sur une table de paramétrage. En effet nous n’avons pas forcement besoin de toutes les tables de toutes les bases de données. Donc à travers une table de paramétrage nous choisisson celle que l’on veut migrer :

input_job_pere

Ce job père appelle aussi un autre sous job qui lui va permettre de migrer les indexes :
index

Donc pour chaque table on fera un appel à une routine (code qu’il y a dans le tJava):

routines.IndexMigrer.migrate_index(((java.sql.Connection)globalMap.get(”conn_tMSSqlConnection_2″)), ((String)globalMap.get(”current_TABLE.nom_table”)), ((java.sql.Connection)globalMap.get(”conn_tPostgresqlConnection_4″)),context.Cible_Schema + (String)globalMap.get(”schema”) + “.” +
((String)globalMap.get(”current_TABLE.nom_table”)));

Et en bonus voici la routine que nous avons créés qui selon la base de données (Oracle, Posgres, SQL Server 2000 ou SQL Server 2005) va requêter dans la bonne table système :

/***************Routines migration indexes ************************/

public class IndexMigrer {
public static void migrate_index(Connection org_connect, String org_nomtable,
Connection dest_connect, String dest_nomtable)  throws java.sql.SQLException
{
if(org_connect.getMetaData().getDatabaseProductName().equalsIgnoreCase(”Microsoft SQL Server“) )
{
Statement stat = org_connect.createStatement();
ResultSet rs;
if(org_connect.getMetaData().getDatabaseMajorVersion()==9)
{
rs=stat.executeQuery(”SELECT IDX.name as index_name, COL.name as column_name FROM sys.index_columns IDXC”
+” INNER JOIN sys.objects OBJ”
+” ON IDXC.object_id = OBJ.object_id”
+” INNER JOIN sys.schemas SCH”
+” ON SCH.schema_id = OBJ.schema_id”
+” INNER JOIN sys.indexes IDX”
+” ON (IDXC.object_id = IDX.object_id AND IDXC.index_id = IDX.index_id)”
+” INNER JOIN sys.columns COL”
+” ON (IDXC.column_id = COL.column_id AND OBJ.object_id = COL.object_id)”
+” where OBJ.name=\’”+org_nomtable+”\’”
+” ORDER BY OBJ.name” );
}
else
{
rs=stat.executeQuery(”select i.name as index_name,c.name as column_name”
+” from sysindexkeys as indk”
+” inner join sysindexes as i on”
+” (indk.indid=i.indid and indk.id=i.id )”
+” inner join syscolumns as c on”
+” (indk.colid=c.colid and indk.id=c.id )”
+” inner join sysobjects as o on”
+” (indk.id=o.id)”
+” and o.name=\’”+org_nomtable+”\’ ”
+” ORDER BY o.name” );
}
Map m =new HashMap();
Map m_key=new HashMap();
while(rs.next())
{
if(!m.containsKey(rs.getString(1)))
{
List l = new ArrayList();
l.add(rs.getString(2));
m.put(rs.getString(1),l);
m_key.put(rs.getString(1), rs.getString(1));
}
else if(m.containsKey(rs.getString(1)))
{
List l=(List)m.get(rs.getString(1));
l.add(rs.getString(2));
m.put(rs.getString(1),l);
}
}
StringBuffer sb=new StringBuffer();
for (Iterator i = m.keySet().iterator() ; i.hasNext();){
String key = (String)i.next();
sb.append(”CREATE INDEX “);
sb.append(”IDX_”+key);
sb.append(” ON “+dest_nomtable+” (”);
List l=(List)m.get(key);
for(int j=0;j<l.size();j++)
{
sb.append(l.get(j));
if(j!=l.size()-1)
sb.append(”,”);
}
sb.append(”);\n”);
}
try{
Statement bla=dest_connect.createStatement();
System.out.println(”"+sb.toString());
bla.execute(sb.toString());
}
catch (java.sql.SQLException e)
{
//System.out.println(dest_nomtable);
System.out.println(e.getMessage());
}
dest_connect.commit();
//System.out.println(dest_nomtable);
}
if(org_connect.getMetaData().getDatabaseProductName().equalsIgnoreCase(”PostgreSQL“))
{
Statement stat = org_connect.createStatement();
ResultSet rs=stat.executeQuery(”SELECT indkey as colonne_index,pci.relname as nom_index,pct.relname as nom_table from pg_index as pi”
+” , pg_class as pci, pg_class as pct ”
+” where pci.oid=pi.indexrelid”
+” and pct.oid=pi.indrelid and pct.relname  not like ‘pg_%’ and pct.relname=’”+org_nomtable+”‘ ” );
while( rs.next())
{
String s=rs.getString(1);
java.util.StringTokenizer tokenizer = new java.util.StringTokenizer(s, ” “);
List <Integer> num_columns=new ArrayList();
while ( tokenizer.hasMoreTokens() ) {
num_columns.add(Integer.parseInt(tokenizer.nextToken()));
}
Statement sel=org_connect.createStatement();
ResultSet rsset=sel.executeQuery(”SELECT * from “+org_nomtable+” LIMIT 1″);
StringBuffer sb=new StringBuffer();
sb.append(”CREATE INDEX “+rs.getString(2)+” ON “+dest_nomtable+” (” );
Iterator i=num_columns.iterator();
while(i.hasNext())
{
sb.append(rsset.getMetaData().getColumnLabel((Integer)i.next()));
if(i.hasNext())
sb.append(”,”);
}
sb.append(”) ;\n”);
Statement bla=dest_connect.createStatement();
bla.execute(sb.toString());
dest_connect.commit();
bla.close();
}
}
if(org_connect.getMetaData().getDatabaseProductName().equalsIgnoreCase(”Oracle“))
{
Statement stat = org_connect.createStatement();
ResultSet rs=stat.executeQuery(”SELECT col.index_name,col.column_name FROM all_indexes cons INNER JOIN all_ind_columns col ON cons.owner = col.index_owner AND cons.index_name = col.index_name WHERE OWNER = ‘AEFE’ AND cons.table_name=\’”+org_nomtable+”\’” );
Map m =new HashMap();
Map m_key=new HashMap();
while(rs.next())
{
if(!m.containsKey(rs.getString(1)))
{
List l = new ArrayList();
l.add(rs.getString(2));
m.put(rs.getString(1),l);
m_key.put(rs.getString(1), rs.getString(1));
}
else if(m.containsKey(rs.getString(1)))
{
List l=(List)m.get(rs.getString(1));
l.add(rs.getString(2));
m.put(rs.getString(1),l);
}
}
StringBuffer sb=new StringBuffer();
for (Iterator i = m.keySet().iterator() ; i.hasNext();){
String key = (String)i.next();
sb.append(”CREATE INDEX “);
sb.append(”IDX_”+key);
sb.append(” ON “+dest_nomtable+” (”);
List l=(List)m.get(key);
for(int j=0;j<l.size();j++)
{
sb.append(l.get(j));
if(j!=l.size()-1)
sb.append(”,”);
}
sb.append(”);\n”);
}
Statement bla=dest_connect.createStatement();
System.out.println(”"+sb.toString());
bla.execute(sb.toString());
dest_connect.commit();
}}}

Très important, si un problème de typage persiste n’hésitez pas à modifier les fichiers de mapping dans :

fichier_mappingCes fichiers sont utiles et simples à utiliser puisqu’ils permettent de connaitre les transformations de typage. En gros voici comment procède Talend :

typage BDD source => typage JAVA => typage BDD cible

Exemple d’utilisation :

ma base de données source est SQL server et ma base de données cible est postgres  :

La démarche sera donc d’aller dans  MSSQL_mapping.xml  et de regarder la rubrique : <dbToTalendTypes> pour voir en quel type Java les typages de ma base de données MSSQL seront transformés. Ensuite il faudra regarder dans postgres_mapping.xml pour regarder les typages par défault que propose Talend dans la rubrique  <talendToDbTypes> pour la retransformation des types JAVA en typages postgres.

Vous pourrez ainsi personnaliser ces transformations.

Si ces fichiers sont modifiés il faudra bien sur les répercutés sur tous les clients  TIS  et sur le serveur  dans les workspaces dans:

  • .JAVA > src > xmlMappings
  • et dans .JAVA > classes > xmlMappings

J’espère que ce post vous aura permis de voir les possibilités qu’offrent la solution Talend pour ce genre de besoin.

N’hésitez pas à poster pour donner votre opinion!

l’AMC et l’AMD de TIS

Lundi 21 septembre 2009

L’AMD (Activity Monitoring Dashboard) et l’AMC (Activity Monitoring Control) sont deux modules qui permettent de faire des contrôles sur l’exécution des flux.

L’AMD est plutôt pour l’administrateur des jobs Talend. Ce module se trouve sur la plateforme TAC (Talend Administration Center). Il propose un contrôle sur :

  • le temps d’exécution
  • la visualisation des erreurs JAVA ou PERL lors d’un problème d’exécution d’un job

L’AMC est un module proposé au développeur dans la plateforme TIS (Talend Integration Studio) . Il lui permet de visualiser :

  • le temps d’éxécution
  • le nombre de lignes géré
  • les erreurs JAVA, PERL enventuelles

Ces informations peuvent être collectées aussi bien au niveau d’un composant qu’au niveau d’un job. Ces données que fourni l’AMC pourront être stockées dans une base de données à part, dans le référentiel de TAC ou dans une autre base de donnéess au choix.

Pour configurer votre TIS :

  • aller dans Window>preferences>AMC et créer le point de connexion à une base de données
  • redéclarer cette connexion dans le référentiel Talend Studio

Puis pour avoir accès à la plateforme AMC aller dans Window > perspective > AMC perspective.

Afin de collecter les informations sur l’exécution d’un job, deux méthodes s’offrent à nous:

  • une méthode implicite
  • une méthode explicite

La méthode implicite : Il suffit de cocher les options dans la view du job (Window > Show view > Job).

implicite

Puis

pour le contrôle du temps d’exécution : cocher les options dans les composants : advanced settings >tStatCatcher Statistic

pour le contrôle du nombre de lignes : en cliquant sur un lien de type row on a ensuite accès à 2 modes :

  • absolute : permet juste de connaître le nombre de lignes passant dans ce lien
  • relative : permet de regarder le nombre de lignes passant dans ce lien en fonction d’un autre. Il est possible ensuite de faire des seuils avec des codes couleurs qui seront répercutés au niveau des graphiques de l’AMC. (voir Thresholds)

La méthode explicite : en utilisant directement des composants dans le job (voir Logs&Errors):

  • tStatCatcher : pour les statisitques du temps d’exécution
  • tFlowMetter : pour le nombre de lignes traitées
  • tLogCatcher, tWarn, tDie : pour le contrôle des erreurs Java