Depuis la version 4.1.2, Talend a sorti une option assez remarquable :le Dynamic Schema
Je vous propose dans ce post de détailler la démarche que nous avons adoptée pour l‘alimentation de notre infocentre : copie des différentes bases de données de la société à partir desquelles le datawarehouse est alimenté.
Un simple job sera réalisé pour amener les données d’une base de données source vers une base de données cible :
Voici l’option en question que vous pourrez utiliser dans votre input:
Et si vous avez des règles de gestion à ajouter n’hésitez pas avec le tJavaFlex :
/**l’exemple de traitement en plus lisible…**/
for(int i = 0; i < row3.dyn_Col.metadatas.size(); i++) { row3.dyn_Col.metadatas.get(i).setName(row3.dyn_Col.metadatas.get(i).getName().toLowerCase());
}
Ce job, décrit ci-dessus, est appelé ainsi par un job père qui lui donne le contexte contenant le nom de la table courante à migrer :
Une variable de context est passée au sous job (monsousjobcopie) : my_current_table
L’input requête sur une table de paramétrage. En effet nous n’avons pas forcément besoin de toutes les tables de toutes les bases de données. Donc à travers une table de paramétrage nous choisissons celle que l’on veut migrer :
Ce job père appelle aussi un autre sous job qui lui va permettre de migrer les indexes :
Donc pour chaque table on fera un appel à une routine (code qu’il y a dans le tJava):
routines.IndexMigrer.migrate_index(((java.sql.Connection)globalMap.get(« conn_tMSSqlConnection_2″)), ((String)globalMap.get(« current_TABLE.nom_table »)), ((java.sql.Connection)globalMap.get(« conn_tPostgresqlConnection_4″)),context.Cible_Schema + (String)globalMap.get(« schema ») + « . » +
((String)globalMap.get(« current_TABLE.nom_table »)));
Et en bonus voici la routine que nous avons créée qui selon la base de données (Oracle, Posgres, SQL Server 2000 ou SQL Server 2005) va requêter dans la bonne table système :
/***************Routines migration indexes ************************/
public class IndexMigrer {
public static void migrate_index(Connection org_connect, String org_nomtable,
Connection dest_connect, String dest_nomtable) throws java.sql.SQLException
{
if(org_connect.getMetaData().getDatabaseProductName().equalsIgnoreCase(« Microsoft SQL Server« ) )
{
Statement stat = org_connect.createStatement();
ResultSet rs;
if(org_connect.getMetaData().getDatabaseMajorVersion()==9)
{
rs=stat.executeQuery(« SELECT IDX.name as index_name, COL.name as column_name FROM sys.index_columns IDXC »
+ » INNER JOIN sys.objects OBJ »
+ » ON IDXC.object_id = OBJ.object_id »
+ » INNER JOIN sys.schemas SCH »
+ » ON SCH.schema_id = OBJ.schema_id »
+ » INNER JOIN sys.indexes IDX »
+ » ON (IDXC.object_id = IDX.object_id AND IDXC.index_id = IDX.index_id) »
+ » INNER JOIN sys.columns COL »
+ » ON (IDXC.column_id = COL.column_id AND OBJ.object_id = COL.object_id) »
+ » where OBJ.name=\’ »+org_nomtable+ »\’ »
+ » ORDER BY OBJ.name » );
}
else
{
rs=stat.executeQuery(« select i.name as index_name,c.name as column_name »
+ » from sysindexkeys as indk »
+ » inner join sysindexes as i on »
+ » (indk.indid=i.indid and indk.id=i.id ) »
+ » inner join syscolumns as c on »
+ » (indk.colid=c.colid and indk.id=c.id ) »
+ » inner join sysobjects as o on »
+ » (indk.id=o.id) »
+ » and o.name=\’ »+org_nomtable+ »\’ »
+ » ORDER BY o.name » );
}
Map m =new HashMap();
Map m_key=new HashMap();
while(rs.next())
{
if(!m.containsKey(rs.getString(1)))
{
List l = new ArrayList();
l.add(rs.getString(2));
m.put(rs.getString(1),l);
m_key.put(rs.getString(1), rs.getString(1));
}
else if(m.containsKey(rs.getString(1)))
{
List l=(List)m.get(rs.getString(1));
l.add(rs.getString(2));
m.put(rs.getString(1),l);
}
}
StringBuffer sb=new StringBuffer();
for (Iterator i = m.keySet().iterator() ; i.hasNext();){
String key = (String)i.next();
sb.append(« CREATE INDEX « );
sb.append(« IDX_ »+key);
sb.append( » ON « +dest_nomtable+ » (« );
List l=(List)m.get(key);
for(int j=0;j<l.size();j++)
{
sb.append(l.get(j));
if(j!=l.size()-1)
sb.append(« , »);
}
sb.append(« );\n »);
}
try{
Statement bla=dest_connect.createStatement();
System.out.println(« »+sb.toString());
bla.execute(sb.toString());
}
catch (java.sql.SQLException e)
{
//System.out.println(dest_nomtable);
System.out.println(e.getMessage());
}
dest_connect.commit();
//System.out.println(dest_nomtable);
}
if(org_connect.getMetaData().getDatabaseProductName().equalsIgnoreCase(« PostgreSQL« ))
{
Statement stat = org_connect.createStatement();
ResultSet rs=stat.executeQuery(« SELECT indkey as colonne_index,pci.relname as nom_index,pct.relname as nom_table from pg_index as pi »
+ » , pg_class as pci, pg_class as pct »
+ » where pci.oid=pi.indexrelid »
+ » and pct.oid=pi.indrelid and pct.relname not like ‘pg_%’ and pct.relname=’ »+org_nomtable+ »‘ » );
while( rs.next())
{
String s=rs.getString(1);
java.util.StringTokenizer tokenizer = new java.util.StringTokenizer(s, » « );
List <Integer> num_columns=new ArrayList();
while ( tokenizer.hasMoreTokens() ) {
num_columns.add(Integer.parseInt(tokenizer.nextToken()));
}
Statement sel=org_connect.createStatement();
ResultSet rsset=sel.executeQuery(« SELECT * from « +org_nomtable+ » LIMIT 1″);
StringBuffer sb=new StringBuffer();
sb.append(« CREATE INDEX « +rs.getString(2)+ » ON « +dest_nomtable+ » ( » );
Iterator i=num_columns.iterator();
while(i.hasNext())
{
sb.append(rsset.getMetaData().getColumnLabel((Integer)i.next()));
if(i.hasNext())
sb.append(« , »);
}
sb.append(« ) ;\n »);
Statement bla=dest_connect.createStatement();
bla.execute(sb.toString());
dest_connect.commit();
bla.close();
}
}
if(org_connect.getMetaData().getDatabaseProductName().equalsIgnoreCase(« Oracle« ))
{
Statement stat = org_connect.createStatement();
ResultSet rs=stat.executeQuery(« SELECT col.index_name,col.column_name FROM all_indexes cons INNER JOIN all_ind_columns col ON cons.owner = col.index_owner AND cons.index_name = col.index_name WHERE OWNER = ‘AEFE’ AND cons.table_name=\’ »+org_nomtable+ »\’ » );
Map m =new HashMap();
Map m_key=new HashMap();
while(rs.next())
{
if(!m.containsKey(rs.getString(1)))
{
List l = new ArrayList();
l.add(rs.getString(2));
m.put(rs.getString(1),l);
m_key.put(rs.getString(1), rs.getString(1));
}
else if(m.containsKey(rs.getString(1)))
{
List l=(List)m.get(rs.getString(1));
l.add(rs.getString(2));
m.put(rs.getString(1),l);
}
}
StringBuffer sb=new StringBuffer();
for (Iterator i = m.keySet().iterator() ; i.hasNext();){
String key = (String)i.next();
sb.append(« CREATE INDEX « );
sb.append(« IDX_ »+key);
sb.append( » ON « +dest_nomtable+ » (« );
List l=(List)m.get(key);
for(int j=0;j<l.size();j++)
{
sb.append(l.get(j));
if(j!=l.size()-1)
sb.append(« , »);
}
sb.append(« );\n »);
}
Statement bla=dest_connect.createStatement();
System.out.println(« »+sb.toString());
bla.execute(sb.toString());
dest_connect.commit();
}}}
Très important, si un problème de typage persiste n’hésitez pas à modifier les fichiers de mapping dans :
Ces fichiers sont utiles et simples à utiliser puisqu’ils permettent de connaitre les transformations de typage. En gros voici comment procède Talend :
typage BDD source => typage JAVA => typage BDD cible
Exemple d’utilisation :
ma base de données source est SQL server et ma base de données cible est postgres :
La démarche sera donc d’aller dans MSSQL_mapping.xml et de regarder la rubrique : <dbToTalendTypes> pour voir en quel type Java les typages de ma base de données MSSQL seront transformés. Ensuite il faudra regarder dans postgres_mapping.xml pour regarder les typages par défault que propose Talend dans la rubrique <talendToDbTypes> pour la retransformation des types JAVA en typages postgres.
Vous pourrez ainsi personnaliser ces transformations.
Si ces fichiers sont modifiés il faudra bien sûr les répercutés sur tous les clients TIS et sur le serveur dans les workspaces dans:
- .JAVA > src > xmlMappings
- et dans .JAVA > classes > xmlMappings
J’espère que ce post vous aura permis de voir les possibilités qu’offre la solution Talend pour ce genre de besoin.
N’hésitez pas à poster pour donner votre opinion!
27 mars 2012 à 21 h 19 min
Ce topic est super !
Mais pourrait-on aller plus loin en effectuant un update/insert ?
J’ai essayé, mais bloc toujours sur le fait de ne pas pouvoir définir en variable de context le nom d’un champ dans un schema.
Auriez-vous une idée ?
Merci
24 janvier 2022 à 11 h 46 min
Talend a beaucoup évolué depuis plusieurs années.
Ils proposent notamment la plateforme Talend Data Fabric désormais, qui est une plateforme globale autour de la gouvernance de la donnée.
Beaucoup de nos clients l’utilisent aujourd’hui d’ailleurs.