L’intégrateur est une application qui se place au niveau du back-office d’une solution SI. C’est le composant qui permet d’échanger avec les autres SI (partenaires, historiques, …) via l’intermédiaire de flux que ce soit sous forme JMS (MQSeries), mails, file system, SMS … Ces flux peuvent être reçus ou envoyés et une supervision adéquate est de rigueur.
Préambule
Dans un tel SI, l’intégration de ces flux est soumise à des contraintes de performance directement liées à la forte volumétrie et la taille des échanges et doit se faire en asynchrone afin de tirer meilleure partie des ressources matérielles. Cette contrainte sur la volumétrie est renforcée par le type d’activité de l’entreprise puisqu’il est fort probable que la réception et l’émission de ces flux se fasse sur un créneau horaire assez spécifique.
On pourrait concevoir que l’essentialité des flux serait reçu en semaine. La répartition sur une journée est elle-même hétérogène et se calque sur les horaires d’ouverture (8h-20h par exemple)
Alors que le client est connecté au serveur d’application, l’intégrateur fonctionne en mode stand-alone et peut avoir plusieurs instances lancées simultanément (à condition de bien compartimenter les traitements).
Pour compartimenter, on pourrait par exemple mettre sur une instance tous les traitements pour l’envoi de flux, dont les batch d’envoi de mail et sur une autre instance ceux de l’intégration et donc le recyclage. On pourrait également traiter certains types de flux sur une instance et le reste sur une autre. L’intégrateur développé par Synaptix permet de faire dialoguer les différentes instances entre elles.
Les contraintes techniques sont principalement dues à la volumétrie des échanges mais également à des contraintes d’unicité qui peuvent intervenir.
Deux traitements en parallèle peuvent ainsi se bloquer mutuellement s’ils modifient les mêmes objets dans un ordre différent, travailler sur des versions obsolètes des objets, créer en doublon des objets censés être uniques …
Etant donné la volumétrie, il faut pouvoir paramétrer le nombre de traitements en parallèle suivant le type de flux et ce afin de protéger l’intégrateur contre toute surcharge. Il ne s’agirait pas de noyer l’application avec plusieurs milliers de traitements en parallèle. Pouvoir intervenir sur ces paramètres à chaud, c’est-à-dire même une fois l’application lancée, est un avantage non négligeable.
Avec toute solution d’intégration de flux se pose le problème du recyclage notamment dans le cas où ces flux sont hiérarchisés et dépendent d’un autre.
Dans le cas quasi systématique où un premier flux crée un objet qui doit être modifié par le second, l’intégration du second ne peut se faire que si le premier a correctement été intégré. Cela vaut également dans le cas où les deux arrivent à quelques millisecondes d’écart, il est fort à parier que le premier n’aura pas fini d’être intégré étant donné le caractère asynchrone des traitements.
De plus, le besoin de supervision apparaît rapidement afin de pouvoir contrôler les traitements et ce afin de permettre des actions utilisateur pour débloquer la situation.
Avec des données référentielles, il arrive qu’il manque des données. Ces données doivent alors être rajoutées ou mises à jour via une action utilisateur. Une fois corrigé, l’utilisateur doit relancer les traitements bloqués.
Parfois, l’intégration d’un flux ne doit pas se faire. C’est le cas lorsqu’une contrainte d’unicité sur un objet doit être respectée et qu’un nouveau flux demande à nouveau sa création.
Le flux “doublon” doit être archivé ou supprimé sans qu’il ait été intégré.
Suite aux contraintes détectées, il faut proposer une solution qui permet de caractériser des types d’erreurs et de les rendre paramétrable afin de modifier le statut du flux en conséquence.
Ces différentes contraintes nous imposent de proposer un système de recyclage et de supervision, dans lequel les erreurs peuvent bénéficier d’un type de recyclage afin de savoir que faire du flux.
Le principe proposé retenu est le suivant :
Chaque traitement se fait dans un thread à part
Ces traitements peuvent remonter des erreurs
Si les flux sont des XML, ils peuvent être validés en utilisant un XSD
Les types d’erreurs sont paramétrables et entraînent les actions suivantes : alerte, recyclage automatique (délai paramétrable), recyclage manuel, rejeté
Un écran permet de visualiser les messages reçus, de les recycler
Un écran permet de choisir un type de recyclage pour chaque type d’erreur
Un écran permet de visualiser des statistiques sur les flux, par jour
Un écran permet de visualiser des statistiques sur les erreurs, par jour
Mise en place d’une servlet technique permettant de paramétrer le nombre de traitements en parallèle
Les flux doivent être stockés
Il faut pouvoir intégrer des flux pour effectuer des tests ou des rattrapages
L’intégrateur mis en place au sein du programme TOSCA à GEFCO sur le projet Plate-forme Service Client intègre un système complet de recyclage et de supervision des différents flux reçus. La solution retenue pour le stockage des flux a été de mettre à contribution la base de données. Cette solution n’est pas optimale mais a pour mérite de faciliter le recyclage et la supervision. Le principal problème est le stockage des flux pour lesquels une compression a été mise en place tardivement.
Les flux ont un état qui permet d’indiquer où ils en sont dans leur vie :
Messages en import | Messages en export |
A intégrer | A émettre |
En cours de traitement | En cours de traitement |
A recycler manuellement | A recycler manuellement |
A recycler automatiquement | A recycler automatiquement |
Intégré | Envoyé |
Rejeté | Rejeté |
Annulé | Annulé |
Lors de l’intégration d’un flux XML pour lequel le type de flux indique qu’il faut effectuer une validation en utilisant un XSD, la première étape est de vérifier sa syntaxe. En cas d’erreur, le flux remonte une erreur technique de type XSD_ERROR mais le traitement continue car elle peut-être caractérisée en tant qu'Alerte, donc non bloquante pour l’intégration.
Ce premier écran présente les différents flux qui n’ont pas encore été intégrés : ceux qui viennent d’être reçu et ceux qui sont en recyclage.
Une fois intégrés, ils seront transféré dans l’écran des messages archivés, quasi identique à celui présenté.
Cet écran présente des statistiques sur le nombre de messages, ici ceux qui ont été archivés.
Cet écran présente des statistiques sur les erreurs remontées par des traitements d’intégration et le mode de recyclage associé (automatique, manuel, rejeté, alerte).
Ce dernier écran présente le système de cahier de test avec le cas d’un cahier utilisé pour les tests de non régression.
L’édition du contenu d’un flux se fait dans une interface conviviale où il est facile de variabiliser des références et de voir la valeur actuelle.
Les statistiques suivantes ont été réalisées sur une semaine complète.
Sens | Nombre de flux | Proportion |
Réception | 3 828 408 | 79% |
Emission | 1 016 041 | 21% |
Malheureusement sur les quatre millions de flux reçus en une semaine, ils ne sont pas répartis équitablement par jour. Nous observons que 98% d’entre eux sont reçus du lundi au vendredi. De plus, en semaine, 70% des flux sont reçus de 8h à 20h (50% de la journée).
Sur ce créneau, nous recevons donc environ 550 000 flux, soit 45 800 par heure, 763 par minute, 12 par seconde.
Ces flux ne s’intègrent pas tous dès le premier coup, une bonne partie tombe en recyclage. Sur un échantillon donné, voici une approximation des proportions observées
Intégration | Nombre de flux | Proportion |
Intégré du premier coup | 1 413 025 | 67% |
Avec recyclage | 681 646 | 33% |
99.7% des flux à émettre le sont en semaine et la plage horaire 8h à 20h représente plus de 80% des envois. Sur le million de flux à envoyer, 810 000 le seront sur les heures ouvrées, totalisant au total 162 078 par jour, soit 13 506 par heure, 225 par minute, c’est-à-dire près de 4 par seconde.
Le recyclage n’est que peu utilisé dans les flux à l’export, il s’agirait principalement d’erreurs techniques relevées lors de la confection du flux et nous ayant contraint à ne pas pouvoir l’envoyer.
Nos calculs nous amènent à penser que nous traitons à l’import 25 flux par seconde en semaine de 8h à 20h, recyclage compris, et 4 à l’export. Le reste du temps, nous en traitons moins d’une quizaine. En réalité, il est fréquent d’observer que pendant une seconde nous traitons plus de 60 flux avec des maximum relevés allant jusqu’à 75 flux, ce qui semble être notre capacité maximale.
Il est très intéressant de pouvoir suivre l’état de l’intégrateur du point de vue technique. A ces fins, une servlet a été mise en place et permet de consulter le nombre de traitements en cours et en attente. Le format de sortie est en JSON.
{
"name": "TRIP_FILE",
"available": true,
"busy": true,
"overloaded": false,
"meaning": "Processing Channel 40/80, fr.gefco.tli.psc.integrator.agent.in.TripFileAgent, 24 running, 0 pending",
"nbWorking": 24,
"nbWaiting": 0
}
name est le nom de la file (qui est liée à un agent) available indique si l’agent est actif busy indique si au moins un thread est lancé sur l’agent overloaded indique si la file est surchargée meaning est une description nbWorking indique le maximum de threads en parallèle autorisé nbWaiting indique le maximum de messages en attente (utile s’il y a plusieurs instances de l’intégrateur)
A tout moment il est possible de modifier le nombre de traitements en parallèle en utilisant une servlet.
Cette opération doit être sécurisée, c’est pourquoi cette servlet n’est accessible qu’en interne et a besoin d’un token passé en paramètre pour être prise en compte
Un système de tests a été développé et est accessible depuis un écran permettant d’ajouter, modifier, effacer et jouer des flux. Permettant de jouer des cas de tests complets, il met également en avant les tests de non régression en proposant de sauvegarder le résultat de l’intégration (l’état du message et les erreurs remontées) afin de pouvoir effectuer un delta lorsqu’il sera rejoué.
Les cas de tests sont placés dans un cahier, créé par un utilisateur.
Afin de proposer une référence permettant ce delta, il est nécessaire de jouer au moins une fois le flux et d’indiquer que le résultat obtenu est la référence.
Une erreur dispose de trois propriétés
Propriété | Exemple 1 | Exemple 2 |
Code | _LIEU_INCONNU_ | _MARCHANDISE_INEXISTANTE_ |
Attribut | _VILLE_ | _NO_ |
Valeur | _Pariss_ | _123456_ |
Il existe plusieurs façons de créer des flux, allant du simple bouton Ajouter au bouton Ajouter dans un cahier de tests depuis un flux reçu en passant par le bouton Charger les dépendances, ce dernier permettant de chercher en base tous les flux qui portent sur le même objet ou qui sont nécessaires afin de pouvoir jouer le flux sélectionné.
Il est également possible d’exporter un ou plusieurs flux d’un cahier dans un .zip permettant de les importer dans un autre cahier, sur un autre environnement par exemple.
Ces méthodes permettent de reproduire très rapidement un cas observé en PROD sur un autre environnement.
Lors de l’ajout d’un flux ou lors de son édition, et si le type de flux le permet, la validation du XML en utilisant un XSD est faite à la volée, ce qui minimise les risques de se tromper.
Pour chaque flux, il est possible d’indiquer s’il s’agit d’un test à blanc ou s’il faut sauvegarder le résultat en base. Dans le premier cas, l’intégralité des modifications effectuées en base par l’intégration du flux est rollback. Pour le second cas, il est nécessaire de bénéficier d’une autorisation et ce afin de protéger contre toute erreur de manipulation en particulier sur l’environnement de production.
Un cas de test complet comporte des flux hiérarchisés. Ainsi, un ensemble de flux qui doit se jouer consécutivement sera placé dans un même groupe. A la fin de l’intégration d’un flux, le système jouera le suivant du même groupe et ainsi de suite. Si l’un des flux est rejeté les suivants ne sont pas joués, le test de non régression (si référence présente) est alors invalide.
Plusieurs groupes peuvent être joués en même temps. Des flux de deux groupes différents ne doivent pas dépendre l’un de l’autre.
Lors de l’édition du flux, il est possible de faire recours à des variables qui sont paramétrables, et ce par cahier. On variabilisera alors les identifiants, codes, … qui sont utilisés par différents flux du groupe.
<!-- premier flux -->
#{tripFileNoCas1}
<!-- second flux -->
#{tripFileNoCas1}
Lors du test, la variable sera remplacée par sa valeur dans chacun des flux.
Afin de faciliter les tests, il est possible de variabiliser une chaîne de caractères directement dans l’interface d’édition d’un flux en sélectionnant la chaîne de caractères à remplacer puis via clic-droit, variabiliser ou alt-shift-L. Cette variabilisation se fait dans tous les flux du groupe, c’est pourquoi il faut avoir commencé par le regroupement des flux du même cas de test.
Pour les tests de non régression, où les objets sont habituellement sauvegardés en base, il est indispensable de repartir sur de nouveaux jeux de données, d’où la possibilité qui est donnée à l’utilisateur d’incrémenter automatiquement une ou plusieurs variables lors du test. C’est directement dans l’édition du flux que sela s’opère, en utilisant une syntaxe qui indique à quel moment se fait l’incrémentation.
Première solution, on part sur un nouveau jeu de données
<!-- premier flux -->
#{++tripFileNoCas1} <!-- l'incrémentation se fait avant la lecture de la valeur -->
<!-- second flux -->
#{tripFileNoCas1}
Deuxième solution, on prépare le jeu de données suivant
<!-- premier flux -->
#{tripFileNoCas1}
<!-- second flux -->
#{tripFileNoCas1++} <!-- l'incrémentation se fait après la lecture de la valeur -->
Nous préférerons la première solution puisqu’elle permet, en cas d’erreur d’intégration sur l’un des flux, de pouvoir recommencer sur un nouveau jeu de données.
N.B. Les dates sont elles-aussi incrémentables en utilisant une syntaxe un peu plus complexe. Exemple :
${++(1w1d;yyyy-MM-dd'T'HH:mm:ss'Z')variable}
indiquant qu’on souhaite afficher la date sous tel format et que l’incrémentation se fait d’une semaine et un jour.
A GEFCO, nous avons rencontré plusieurs limites techniques.
Sur un environnement UNIX, le nombre de traitements qui peuvent être lancés simultanément sur une machine est limité par un paramètre (cf ulimit -a, max user processes). Par défaut configurée à 1024, il ne faut pas oublier de l’augmenter lorsque l’intégrateur monte en charge.
Une autre limite rencontrée a été au niveau de l’espace mémoire HEAP Space, actuellement fixée à 4 096Mo. Lors de tout développement, il faut penser à garder le moins longtemps possible les gros objets sinon lors d’une surcharge ou d’un retard accumulé, l’espace mémoire pourra saturer.
Un certain nombre de traitements ont besoin d’être synchronisés. Synchroniser un traitement destiné à être asynchrone pour des raisons de performances est à proscrire, on risque de figer tout le traitement. Dans ce genre de cas, il ne faut pas hésiter à retarder un traitement en le remettant dans la file d’attente et donc en jouer un autre plutôt que le faire attendre avec un verrou et par la même occasion accumuler le retard sur les autres traitements.
Lors de la sauvegarde des objets en base, une vérification de leur version est effectuée. Si la version a été changée, une erreur est remontée et le traitement doit recommencer après un certain délai (recyclage automatique).
Afin d’éviter les blocages en base de données, les objets ne sont sauvegardés qu’à la fin des traitements pour être le plus proche possible de leur commit en base. Attention, cette décision aura pour effet d’augmenter le nombre d’objets pour lesquels la version est obsolète (cf point précédent).
Des inter-blocages entre la partie Java et la partie BDD sont apparus lorsqu’un verrou Java et un verrou BDD étaient sollicités en même temps. Avant de rentrer dans un bloc synchronisé, la partie Java avait mis à jour une ligne en BDD sans commiter, et attend de pouvoir dans le bloc, sauf que le verrou Java est déjà utilisé par un traitement qui a besoin de mettre à jour la même ligne. Côté bonnes pratiques, il vaut mieux éviter l’utilisation de verrous Java en simultané avec Oracle Database et laisser à Oracle la gestion des verrous.
Le stockage des flux sur la base de données se fait dans un BLOB, difficilement optimisé pour des traitements par Oracle Database. La compression des flux a permis d’accélérer la récupération et le stockage de ces flux dans la base.