Gefco : intégrateur - mise en place d'un intégrateur de flux

L’intégrateur est une application qui se place au niveau du back-office d’une solution SI. C’est le composant qui permet d’échanger avec les autres SI (partenaires, historiques, …) via l’intermédiaire de flux que ce soit sous forme JMS (MQSeries), mails, file system, SMS … Ces flux peuvent être reçus ou envoyés et une supervision adéquate est de rigueur.

Préambule

  • Contraintes
  • Cahier des charges
  • Mise en place
  • Ecrans
  • Statistiques
  • Réception
  • Emission
  • Observations
  • Exploitation
  • Cahier de tests
  • Import de cas de tests
  • Sauvegarde du résultat
  • Cas de tests : groupes
  • Variables
  • Limites techniques

Préambule

Dans un tel SI, l’intégration de ces flux est soumise à des contraintes de performance directement liées à la forte volumétrie et la taille des échanges et doit se faire en asynchrone afin de tirer meilleure partie des ressources matérielles. Cette contrainte sur la volumétrie est renforcée par le type d’activité de l’entreprise puisqu’il est fort probable que la réception et l’émission de ces flux se fasse sur un créneau horaire assez spécifique.

On pourrait concevoir que l’essentialité des flux serait reçu en semaine. La répartition sur une journée est elle-même hétérogène et se calque sur les horaires d’ouverture (8h-20h par exemple)

Alors que le client est connecté au serveur d’application, l’intégrateur fonctionne en mode stand-alone et peut avoir plusieurs instances lancées simultanément (à condition de bien compartimenter les traitements).

Pour compartimenter, on pourrait par exemple mettre sur une instance tous les traitements pour l’envoi de flux, dont les batch d’envoi de mail et sur une autre instance ceux de l’intégration et donc le recyclage. On pourrait également traiter certains types de flux sur une instance et le reste sur une autre. L’intégrateur développé par Synaptix permet de faire dialoguer les différentes instances entre elles.

Contraintes

Contraintes techniques

Les contraintes techniques sont principalement dues à la volumétrie des échanges mais également à des contraintes d’unicité qui peuvent intervenir.

Deux traitements en parallèle peuvent ainsi se bloquer mutuellement s’ils modifient les mêmes objets dans un ordre différent, travailler sur des versions obsolètes des objets, créer en doublon des objets censés être uniques …

Etant donné la volumétrie, il faut pouvoir paramétrer le nombre de traitements en parallèle suivant le type de flux et ce afin de protéger l’intégrateur contre toute surcharge. Il ne s’agirait pas de noyer l’application avec plusieurs milliers de traitements en parallèle. Pouvoir intervenir sur ces paramètres à chaud, c’est-à-dire même une fois l’application lancée, est un avantage non négligeable.

Contraintes fonctionnelles

Avec toute solution d’intégration de flux se pose le problème du recyclage notamment dans le cas où ces flux sont hiérarchisés et dépendent d’un autre.

Dans le cas quasi systématique où un premier flux crée un objet qui doit être modifié par le second, l’intégration du second ne peut se faire que si le premier a correctement été intégré. Cela vaut également dans le cas où les deux arrivent à quelques millisecondes d’écart, il est fort à parier que le premier n’aura pas fini d’être intégré étant donné le caractère asynchrone des traitements.

De plus, le besoin de supervision apparaît rapidement afin de pouvoir contrôler les traitements et ce afin de permettre des actions utilisateur pour débloquer la situation.

Avec des données référentielles, il arrive qu’il manque des données. Ces données doivent alors être rajoutées ou mises à jour via une action utilisateur. Une fois corrigé, l’utilisateur doit relancer les traitements bloqués.

Parfois, l’intégration d’un flux ne doit pas se faire. C’est le cas lorsqu’une contrainte d’unicité sur un objet doit être respectée et qu’un nouveau flux demande à nouveau sa création.

Le flux “doublon” doit être archivé ou supprimé sans qu’il ait été intégré.

Cahier des charges

Suite aux contraintes détectées, il faut proposer une solution qui permet de caractériser des types d’erreurs et de les rendre paramétrable afin de modifier le statut du flux en conséquence.

Ces différentes contraintes nous imposent de proposer un système de recyclage et de supervision, dans lequel les erreurs peuvent bénéficier d’un type de recyclage afin de savoir que faire du flux.

Le principe proposé retenu est le suivant :

  • Chaque traitement se fait dans un thread à part

  • Ces traitements peuvent remonter des erreurs

  • Si les flux sont des XML, ils peuvent être validés en utilisant un XSD

  • Les types d’erreurs sont paramétrables et entraînent les actions suivantes : alerte, recyclage automatique (délai paramétrable), recyclage manuel, rejeté

  • Un écran permet de visualiser les messages reçus, de les recycler

  • Un écran permet de choisir un type de recyclage pour chaque type d’erreur

  • Un écran permet de visualiser des statistiques sur les flux, par jour

  • Un écran permet de visualiser des statistiques sur les erreurs, par jour

  • Mise en place d’une servlet technique permettant de paramétrer le nombre de traitements en parallèle

  • Les flux doivent être stockés

  • Il faut pouvoir intégrer des flux pour effectuer des tests ou des rattrapages

Mise en place

L’intégrateur mis en place au sein du programme TOSCA à GEFCO sur le projet Plate-forme Service Client intègre un système complet de recyclage et de supervision des différents flux reçus. La solution retenue pour le stockage des flux a été de mettre à contribution la base de données. Cette solution n’est pas optimale mais a pour mérite de faciliter le recyclage et la supervision. Le principal problème est le stockage des flux pour lesquels une compression a été mise en place tardivement.

Les flux ont un état qui permet d’indiquer où ils en sont dans leur vie :

Messages en import Messages en export
A intégrer A émettre
En cours de traitement En cours de traitement
A recycler manuellement A recycler manuellement
A recycler automatiquement A recycler automatiquement
Intégré Envoyé
Rejeté Rejeté
Annulé Annulé
Deux états supplémentaires ont été rajoutés pour les flux en mode **_test_** :
  • Test réussi
    • Test échoué

Lors de l’intégration d’un flux XML pour lequel le type de flux indique qu’il faut effectuer une validation en utilisant un XSD, la première étape est de vérifier sa syntaxe. En cas d’erreur, le flux remonte une erreur technique de type XSD_ERROR mais le traitement continue car elle peut-être caractérisée en tant qu'Alerte, donc non bloquante pour l’intégration.

Ecrans

Ce premier écran présente les différents flux qui n’ont pas encore été intégrés : ceux qui viennent d’être reçu et ceux qui sont en recyclage.

en_cours

Une fois intégrés, ils seront transféré dans l’écran des messages archivés, quasi identique à celui présenté.

Cet écran présente des statistiques sur le nombre de messages, ici ceux qui ont été archivés.

analyse3

Cet écran présente des statistiques sur les erreurs remontées par des traitements d’intégration et le mode de recyclage associé (automatique, manuel, rejeté, alerte).

analyse4

Ce dernier écran présente le système de cahier de test avec le cas d’un cahier utilisé pour les tests de non régression.

test

L’édition du contenu d’un flux se fait dans une interface conviviale où il est facile de variabiliser des références et de voir la valeur actuelle.

edit_test

Statistiques

Les statistiques suivantes ont été réalisées sur une semaine complète.

Sens Nombre de flux Proportion
Réception 3 828 408 79%
Emission 1 016 041 21%

Réception

Malheureusement sur les quatre millions de flux reçus en une semaine, ils ne sont pas répartis équitablement par jour. Nous observons que 98% d’entre eux sont reçus du lundi au vendredi. De plus, en semaine, 70% des flux sont reçus de 8h à 20h (50% de la journée).

Sur ce créneau, nous recevons donc environ 550 000 flux, soit 45 800 par heure, 763 par minute, 12 par seconde.

Recyclage

Ces flux ne s’intègrent pas tous dès le premier coup, une bonne partie tombe en recyclage. Sur un échantillon donné, voici une approximation des proportions observées

Intégration Nombre de flux Proportion
Intégré du premier coup 1 413 025 67%
Avec recyclage 681 646 33%
Ces 681 646 flux ont été joués au total plus de 8 000 000 de fois. C'est le paramétrage des types d'erreurs qui va pouvoir directement influer sur le nombre de recyclages. Les recyclages étant répartis sur la semaine entière, nous pouvons en déduire que nous en traitons 47 500 par heure, soit 793 par minute, c'est à dire environ 13 par seconde. Du moins en théorie ! Dans les faits, le recyclage s'effectue par des batchs qui ont pour effet de concentrer le recyclage sur une période très réduite.

Emission

99.7% des flux à émettre le sont en semaine et la plage horaire 8h à 20h représente plus de 80% des envois. Sur le million de flux à envoyer, 810 000 le seront sur les heures ouvrées, totalisant au total 162 078 par jour, soit 13 506 par heure, 225 par minute, c’est-à-dire près de 4 par seconde.

Le recyclage n’est que peu utilisé dans les flux à l’export, il s’agirait principalement d’erreurs techniques relevées lors de la confection du flux et nous ayant contraint à ne pas pouvoir l’envoyer.

Observations

Nos calculs nous amènent à penser que nous traitons à l’import 25 flux par seconde en semaine de 8h à 20h, recyclage compris, et 4 à l’export. Le reste du temps, nous en traitons moins d’une quizaine. En réalité, il est fréquent d’observer que pendant une seconde nous traitons plus de 60 flux avec des maximum relevés allant jusqu’à 75 flux, ce qui semble être notre capacité maximale.

Exploitation

Il est très intéressant de pouvoir suivre l’état de l’intégrateur du point de vue technique. A ces fins, une servlet a été mise en place et permet de consulter le nombre de traitements en cours et en attente. Le format de sortie est en JSON.

{
  "name": "TRIP_FILE",
  "available": true,
  "busy": true,
  "overloaded": false,
  "meaning": "Processing Channel 40/80, fr.gefco.tli.psc.integrator.agent.in.TripFileAgent, 24 running, 0 pending",
  "nbWorking": 24,
  "nbWaiting": 0
}

name est le nom de la file (qui est liée à un agent) available indique si l’agent est actif busy indique si au moins un thread est lancé sur l’agent overloaded indique si la file est surchargée meaning est une description nbWorking indique le maximum de threads en parallèle autorisé nbWaiting indique le maximum de messages en attente (utile s’il y a plusieurs instances de l’intégrateur)

A tout moment il est possible de modifier le nombre de traitements en parallèle en utilisant une servlet.

Cette opération doit être sécurisée, c’est pourquoi cette servlet n’est accessible qu’en interne et a besoin d’un token passé en paramètre pour être prise en compte

Cahier de tests

Un système de tests a été développé et est accessible depuis un écran permettant d’ajouter, modifier, effacer et jouer des flux. Permettant de jouer des cas de tests complets, il met également en avant les tests de non régression en proposant de sauvegarder le résultat de l’intégration (l’état du message et les erreurs remontées) afin de pouvoir effectuer un delta lorsqu’il sera rejoué.

Les cas de tests sont placés dans un cahier, créé par un utilisateur.

Afin de proposer une référence permettant ce delta, il est nécessaire de jouer au moins une fois le flux et d’indiquer que le résultat obtenu est la référence.

Une erreur dispose de trois propriétés

Propriété Exemple 1 Exemple 2
Code _LIEU_INCONNU_ _MARCHANDISE_INEXISTANTE_
Attribut _VILLE_ _NO_
Valeur _Pariss_ _123456_
Le delta effectué est fait en utilisant toutes les propriétés. Si une erreur était attendue et n'a pas été remontée ou si une erreur inattendue a été remontée, le cas de test est invalide.

Import de cas de tests

Il existe plusieurs façons de créer des flux, allant du simple bouton Ajouter au bouton Ajouter dans un cahier de tests depuis un flux reçu en passant par le bouton Charger les dépendances, ce dernier permettant de chercher en base tous les flux qui portent sur le même objet ou qui sont nécessaires afin de pouvoir jouer le flux sélectionné.

Il est également possible d’exporter un ou plusieurs flux d’un cahier dans un .zip permettant de les importer dans un autre cahier, sur un autre environnement par exemple.

Ces méthodes permettent de reproduire très rapidement un cas observé en PROD sur un autre environnement.

Lors de l’ajout d’un flux ou lors de son édition, et si le type de flux le permet, la validation du XML en utilisant un XSD est faite à la volée, ce qui minimise les risques de se tromper.

Sauvegarde du résultat

Pour chaque flux, il est possible d’indiquer s’il s’agit d’un test à blanc ou s’il faut sauvegarder le résultat en base. Dans le premier cas, l’intégralité des modifications effectuées en base par l’intégration du flux est rollback. Pour le second cas, il est nécessaire de bénéficier d’une autorisation et ce afin de protéger contre toute erreur de manipulation en particulier sur l’environnement de production.

Cas de tests : groupes

Un cas de test complet comporte des flux hiérarchisés. Ainsi, un ensemble de flux qui doit se jouer consécutivement sera placé dans un même groupe. A la fin de l’intégration d’un flux, le système jouera le suivant du même groupe et ainsi de suite. Si l’un des flux est rejeté les suivants ne sont pas joués, le test de non régression (si référence présente) est alors invalide.

Plusieurs groupes peuvent être joués en même temps. Des flux de deux groupes différents ne doivent pas dépendre l’un de l’autre.

Variables

Lors de l’édition du flux, il est possible de faire recours à des variables qui sont paramétrables, et ce par cahier. On variabilisera alors les identifiants, codes, … qui sont utilisés par différents flux du groupe.

	<!-- premier flux -->
  #{tripFileNoCas1}
  
  <!-- second flux -->
  #{tripFileNoCas1}

Lors du test, la variable sera remplacée par sa valeur dans chacun des flux.

Afin de faciliter les tests, il est possible de variabiliser une chaîne de caractères directement dans l’interface d’édition d’un flux en sélectionnant la chaîne de caractères à remplacer puis via clic-droit, variabiliser ou alt-shift-L. Cette variabilisation se fait dans tous les flux du groupe, c’est pourquoi il faut avoir commencé par le regroupement des flux du même cas de test.

Incrémentation

Pour les tests de non régression, où les objets sont habituellement sauvegardés en base, il est indispensable de repartir sur de nouveaux jeux de données, d’où la possibilité qui est donnée à l’utilisateur d’incrémenter automatiquement une ou plusieurs variables lors du test. C’est directement dans l’édition du flux que sela s’opère, en utilisant une syntaxe qui indique à quel moment se fait l’incrémentation.

Première solution, on part sur un nouveau jeu de données

	<!-- premier flux -->
  #{++tripFileNoCas1} <!-- l'incrémentation se fait avant la lecture de la valeur -->
  <!-- second flux -->
  
  #{tripFileNoCas1}

Deuxième solution, on prépare le jeu de données suivant

	<!-- premier flux -->
  #{tripFileNoCas1}
  <!-- second flux -->
  
  #{tripFileNoCas1++} <!-- l'incrémentation se fait après la lecture de la valeur -->

Nous préférerons la première solution puisqu’elle permet, en cas d’erreur d’intégration sur l’un des flux, de pouvoir recommencer sur un nouveau jeu de données.

N.B. Les dates sont elles-aussi incrémentables en utilisant une syntaxe un peu plus complexe. Exemple : ${++(1w1d;yyyy-MM-dd'T'HH:mm:ss'Z')variable} indiquant qu’on souhaite afficher la date sous tel format et que l’incrémentation se fait d’une semaine et un jour.

Limites techniques

A GEFCO, nous avons rencontré plusieurs limites techniques.

Sur un environnement UNIX, le nombre de traitements qui peuvent être lancés simultanément sur une machine est limité par un paramètre (cf ulimit -a, max user processes). Par défaut configurée à 1024, il ne faut pas oublier de l’augmenter lorsque l’intégrateur monte en charge.

Une autre limite rencontrée a été au niveau de l’espace mémoire HEAP Space, actuellement fixée à 4 096Mo. Lors de tout développement, il faut penser à garder le moins longtemps possible les gros objets sinon lors d’une surcharge ou d’un retard accumulé, l’espace mémoire pourra saturer.

Un certain nombre de traitements ont besoin d’être synchronisés. Synchroniser un traitement destiné à être asynchrone pour des raisons de performances est à proscrire, on risque de figer tout le traitement. Dans ce genre de cas, il ne faut pas hésiter à retarder un traitement en le remettant dans la file d’attente et donc en jouer un autre plutôt que le faire attendre avec un verrou et par la même occasion accumuler le retard sur les autres traitements.

Lors de la sauvegarde des objets en base, une vérification de leur version est effectuée. Si la version a été changée, une erreur est remontée et le traitement doit recommencer après un certain délai (recyclage automatique).

Afin d’éviter les blocages en base de données, les objets ne sont sauvegardés qu’à la fin des traitements pour être le plus proche possible de leur commit en base. Attention, cette décision aura pour effet d’augmenter le nombre d’objets pour lesquels la version est obsolète (cf point précédent).

Des inter-blocages entre la partie Java et la partie BDD sont apparus lorsqu’un verrou Java et un verrou BDD étaient sollicités en même temps. Avant de rentrer dans un bloc synchronisé, la partie Java avait mis à jour une ligne en BDD sans commiter, et attend de pouvoir dans le bloc, sauf que le verrou Java est déjà utilisé par un traitement qui a besoin de mettre à jour la même ligne. Côté bonnes pratiques, il vaut mieux éviter l’utilisation de verrous Java en simultané avec Oracle Database et laisser à Oracle la gestion des verrous.

Le stockage des flux sur la base de données se fait dans un BLOB, difficilement optimisé pour des traitements par Oracle Database. La compression des flux a permis d’accélérer la récupération et le stockage de ces flux dans la base.

Date

Auteur

Avatar Nicolas POSTE

Nicolas POSTE

Tech Lead

Catégories

back data

Tags

#Performance