BI experience

Copier une base de données vers une autre avec Talend

| 2 Commentaires

Depuis la version 4.1.2, Talend a sorti une option assez remarquable :le Dynamic Schema

Je vous propose dans ce post de détailler la démarche que nous avons adoptée pour l‘alimentation de notre infocentre : copie des différentes bases de données de la société à partir desquelles le datawarehouse est alimenté.

Un simple job sera réalisé pour amener les données d’une base de données source vers une base de données cible :
job_fils

Voici l’option en question que vous pourrez utiliser dans votre input:

option_dynamic_schema2

Et si vous avez des règles de gestion à ajouter n’hésitez pas avec le tJavaFlex :
action_sur_lignes1

/**l’exemple de traitement en plus lisible…**/
for(int i = 0; i < row3.dyn_Col.metadatas.size(); i++) {
row3.dyn_Col.metadatas.get(i).setName(row3.dyn_Col.metadatas.get(i).getName().toLowerCase());
}

Ce job, décrit ci-dessus, est appelé ainsi par un job père qui lui donne le contexte contenant le nom de la table courante à migrer :

job_pere

Une variable de context est passée au sous job (monsousjobcopie) : my_current_table

L’input requête sur une table de paramétrage. En effet nous n’avons pas forcément besoin de toutes les tables de toutes les bases de données. Donc à travers une table de paramétrage nous choisissons celle que l’on veut migrer :

input_job_pere

Ce job père appelle aussi un autre sous job qui lui va permettre de migrer les indexes :
index

Donc pour chaque table on fera un appel à une routine (code qu’il y a dans le tJava):

routines.IndexMigrer.migrate_index(((java.sql.Connection)globalMap.get(« conn_tMSSqlConnection_2″)), ((String)globalMap.get(« current_TABLE.nom_table »)), ((java.sql.Connection)globalMap.get(« conn_tPostgresqlConnection_4″)),context.Cible_Schema + (String)globalMap.get(« schema ») + « . » +
((String)globalMap.get(« current_TABLE.nom_table »)));

Et en bonus voici la routine que nous avons créée qui selon la base de données (Oracle, Posgres, SQL Server 2000 ou SQL Server 2005) va requêter dans la bonne table système :

/***************Routines migration indexes ************************/

public class IndexMigrer {
public static void migrate_index(Connection org_connect, String org_nomtable,
Connection dest_connect, String dest_nomtable)  throws java.sql.SQLException
{
if(org_connect.getMetaData().getDatabaseProductName().equalsIgnoreCase(« Microsoft SQL Server« ) )
{
Statement stat = org_connect.createStatement();
ResultSet rs;
if(org_connect.getMetaData().getDatabaseMajorVersion()==9)
{
rs=stat.executeQuery(« SELECT IDX.name as index_name, COL.name as column_name FROM sys.index_columns IDXC »
+ » INNER JOIN sys.objects OBJ »
+ » ON IDXC.object_id = OBJ.object_id »
+ » INNER JOIN sys.schemas SCH »
+ » ON SCH.schema_id = OBJ.schema_id »
+ » INNER JOIN sys.indexes IDX »
+ » ON (IDXC.object_id = IDX.object_id AND IDXC.index_id = IDX.index_id) »
+ » INNER JOIN sys.columns COL »
+ » ON (IDXC.column_id = COL.column_id AND OBJ.object_id = COL.object_id) »
+ » where OBJ.name=\’ »+org_nomtable+ »\’ »
+ » ORDER BY OBJ.name » );
}
else
{
rs=stat.executeQuery(« select i.name as index_name,c.name as column_name »
+ » from sysindexkeys as indk »
+ » inner join sysindexes as i on »
+ » (indk.indid=i.indid and indk.id=i.id ) »
+ » inner join syscolumns as c on »
+ » (indk.colid=c.colid and indk.id=c.id ) »
+ » inner join sysobjects as o on »
+ » (indk.id=o.id) »
+ » and o.name=\’ »+org_nomtable+ »\’  »
+ » ORDER BY o.name » );
}
Map m =new HashMap();
Map m_key=new HashMap();
while(rs.next())
{
if(!m.containsKey(rs.getString(1)))
{
List l = new ArrayList();
l.add(rs.getString(2));
m.put(rs.getString(1),l);
m_key.put(rs.getString(1), rs.getString(1));
}
else if(m.containsKey(rs.getString(1)))
{
List l=(List)m.get(rs.getString(1));
l.add(rs.getString(2));
m.put(rs.getString(1),l);
}
}
StringBuffer sb=new StringBuffer();
for (Iterator i = m.keySet().iterator() ; i.hasNext();){
String key = (String)i.next();
sb.append(« CREATE INDEX « );
sb.append(« IDX_ »+key);
sb.append( » ON « +dest_nomtable+ » (« );
List l=(List)m.get(key);
for(int j=0;j<l.size();j++)
{
sb.append(l.get(j));
if(j!=l.size()-1)
sb.append(« , »);
}
sb.append(« );\n »);
}
try{
Statement bla=dest_connect.createStatement();
System.out.println(«  »+sb.toString());
bla.execute(sb.toString());
}
catch (java.sql.SQLException e)
{
//System.out.println(dest_nomtable);
System.out.println(e.getMessage());
}
dest_connect.commit();
//System.out.println(dest_nomtable);
}
if(org_connect.getMetaData().getDatabaseProductName().equalsIgnoreCase(« PostgreSQL« ))
{
Statement stat = org_connect.createStatement();
ResultSet rs=stat.executeQuery(« SELECT indkey as colonne_index,pci.relname as nom_index,pct.relname as nom_table from pg_index as pi »
+ » , pg_class as pci, pg_class as pct  »
+ » where pci.oid=pi.indexrelid »
+ » and pct.oid=pi.indrelid and pct.relname  not like ‘pg_%’ and pct.relname=’ »+org_nomtable+ »‘  » );
while( rs.next())
{
String s=rs.getString(1);
java.util.StringTokenizer tokenizer = new java.util.StringTokenizer(s,  » « );
List <Integer> num_columns=new ArrayList();
while ( tokenizer.hasMoreTokens() ) {
num_columns.add(Integer.parseInt(tokenizer.nextToken()));
}
Statement sel=org_connect.createStatement();
ResultSet rsset=sel.executeQuery(« SELECT * from « +org_nomtable+ » LIMIT 1″);
StringBuffer sb=new StringBuffer();
sb.append(« CREATE INDEX « +rs.getString(2)+ » ON « +dest_nomtable+ » ( » );
Iterator i=num_columns.iterator();
while(i.hasNext())
{
sb.append(rsset.getMetaData().getColumnLabel((Integer)i.next()));
if(i.hasNext())
sb.append(« , »);
}
sb.append(« ) ;\n »);
Statement bla=dest_connect.createStatement();
bla.execute(sb.toString());
dest_connect.commit();
bla.close();
}
}
if(org_connect.getMetaData().getDatabaseProductName().equalsIgnoreCase(« Oracle« ))
{
Statement stat = org_connect.createStatement();
ResultSet rs=stat.executeQuery(« SELECT col.index_name,col.column_name FROM all_indexes cons INNER JOIN all_ind_columns col ON cons.owner = col.index_owner AND cons.index_name = col.index_name WHERE OWNER = ‘AEFE’ AND cons.table_name=\’ »+org_nomtable+ »\’ » );
Map m =new HashMap();
Map m_key=new HashMap();
while(rs.next())
{
if(!m.containsKey(rs.getString(1)))
{
List l = new ArrayList();
l.add(rs.getString(2));
m.put(rs.getString(1),l);
m_key.put(rs.getString(1), rs.getString(1));
}
else if(m.containsKey(rs.getString(1)))
{
List l=(List)m.get(rs.getString(1));
l.add(rs.getString(2));
m.put(rs.getString(1),l);
}
}
StringBuffer sb=new StringBuffer();
for (Iterator i = m.keySet().iterator() ; i.hasNext();){
String key = (String)i.next();
sb.append(« CREATE INDEX « );
sb.append(« IDX_ »+key);
sb.append( » ON « +dest_nomtable+ » (« );
List l=(List)m.get(key);
for(int j=0;j<l.size();j++)
{
sb.append(l.get(j));
if(j!=l.size()-1)
sb.append(« , »);
}
sb.append(« );\n »);
}
Statement bla=dest_connect.createStatement();
System.out.println(«  »+sb.toString());
bla.execute(sb.toString());
dest_connect.commit();
}}}

Très important, si un problème de typage persiste n’hésitez pas à modifier les fichiers de mapping dans :

fichier_mappingCes fichiers sont utiles et simples à utiliser puisqu’ils permettent de connaitre les transformations de typage. En gros voici comment procède Talend :

typage BDD source => typage JAVA => typage BDD cible

Exemple d’utilisation :

ma base de données source est SQL server et ma base de données cible est postgres  :

La démarche sera donc d’aller dans  MSSQL_mapping.xml  et de regarder la rubrique : <dbToTalendTypes> pour voir en quel type Java les typages de ma base de données MSSQL seront transformés. Ensuite il faudra regarder dans postgres_mapping.xml pour regarder les typages par défault que propose Talend dans la rubrique  <talendToDbTypes> pour la retransformation des types JAVA en typages postgres.

Vous pourrez ainsi personnaliser ces transformations.

Si ces fichiers sont modifiés il faudra bien sûr les répercutés sur tous les clients  TIS  et sur le serveur  dans les workspaces dans:

  • .JAVA > src > xmlMappings
  • et dans .JAVA > classes > xmlMappings

J’espère que ce post vous aura permis de voir les possibilités qu’offre la solution Talend pour ce genre de besoin.

N’hésitez pas à poster pour donner votre opinion!

2 Commentaires

  1. Ce topic est super !
    Mais pourrait-on aller plus loin en effectuant un update/insert ?
    J’ai essayé, mais bloc toujours sur le fait de ne pas pouvoir définir en variable de context le nom d’un champ dans un schema.
    Auriez-vous une idée ?

    Merci

  2. Talend a beaucoup évolué depuis plusieurs années.
    Ils proposent notamment la plateforme Talend Data Fabric désormais, qui est une plateforme globale autour de la gouvernance de la donnée.
    Beaucoup de nos clients l’utilisent aujourd’hui d’ailleurs.

Laisser un commentaire

Champs Requis *.

*