Les channels en Go sont des files threadsafe très utiles. Voyons un cas d’usage pour gérer des événements.
Afin de pouvoir détecter la création de nouveaux fichiers dans un répertoire, j’ai utilisé la commande Inotify
sous Linux.
Inotify est un mécanisme sur linux permettant d’observer les événements sur un répertoire : création, suppression, modification de fichier ou répertoire.
L’idée est de détecter ces nouveaux fichiers comme des événements pour ensuite effectuer un traitement, dans mon cas, les copier dans un autre répertoire.
Fs notify est une librairie Go qui encapsule ce mécanisme à l’identique en poussant les événements dans un channel. A noter que Fs notify est multi-plateforme et n’utilise Inotify que sur linux.
Nous allons observer les événements qui se produisent sur un répertoire lorsque l’on copie des fichiers :
func main(){
watcher,_ := fsnotify.NewWatcher()
for {
if value, hasMore := <- watcher.Events; hasMore {
switch {
case value.Op&fsnotify.Create == fsnotify.Create:
log.Println("CREATE", value.Name)
break
case value.Op&fsnotify.Write == fsnotify.Write:
log.Println("WRITE", value.Name)
break
case value.Op&fsnotify.Remove == fsnotify.Remove:
log.Println("DELETE", value.Name)
break
}
}else{
break
}
}
}
Après avoir lancé le code, on copie deux fichiers, file1.jpg et file2.jpg :
2021/04/07 12:02:23 main.go: CREATE file1.jpg
2021/04/07 12:02:23 main.go: WRITE file1.jpg
2021/04/07 12:02:23 main.go: WRITE file1.jpg
2021/04/07 12:02:23 main.go: CREATE file2.jpg
2021/04/07 12:02:23 main.go: WRITE file2.jpg
2021/04/07 12:02:23 main.go: WRITE file2.jpg
2021/04/07 12:02:23 main.go: WRITE file2.jpg
On s’attend à avoir 2 lignes pour chaque fichier, une pour la création et l’autre pour l’écriture. Or plusieurs événements sont lancés pour l’écriture (WRITE).
Inotify
ne déclenche pas un événement WRITE lors de la fermeture du fichier mais apparemment lors de chaque flush.
Sur des fichiers plus gros, on peut observer encore plus d’événements.
Le fait d’effectuer plusieurs fois le même traitement n’est problématique que si notre traitement n’est pas idempotent, c’est à dire que le résultat d’un même traitement est toujours le même. Dans l’objectif de surveiller un répertoire est de recopier les nouveaux fichiers, la copie est idempotente et on pourrait laisser ainsi.
Cependant, il n’est jamais bon de laisser des écritures inutiles se faire quand on peut l’éviter :
Une solution serait de dédoublonner pour ne garder que le dernier événement d’un fichier, celui se produisant lors de la fermeture.
En reprenant l’exemple précédent, on voit que les événements reçus sont groupés par fichier ce qui est logique : la copie de fichiers n’est pas parallèle mais séquentielle.
2021/04/07 12:02:23 main.go: CREATE file1.jpg
2021/04/07 12:02:23 main.go: WRITE file1.jpg
2021/04/07 12:02:23 main.go: WRITE file1.jpg <-- Bien
2021/04/07 12:02:23 main.go: CREATE file2.jpg
2021/04/07 12:02:23 main.go: WRITE file2.jpg
2021/04/07 12:02:23 main.go: WRITE file2.jpg
2021/04/07 12:02:23 main.go: WRITE file2.jpg <-- Bien
Voici un algo qui va pouvoir corriger ce problème :
Il est possible de gérer facilement ce traitement avec un tableau et une analyse régulière des événéments.
Mais l’utilisation des channels apporte beaucoup plus de souplesse :
Pour gérer ce mécanisme avec des channels, on va créer une structure avec deux channels :
type Agregatechannel struct {
inputChannel chan string // Lecture des événements
outputChannel chan string // Ecriture des événements sans doublon
previousValue string // Précédent fichier lu
timeout time.Duration // Temps au delà duquel on estime que le dernier événement peut être traité
}
Pour gérer la notion de timeout, on va utiliser le mécanisme de select avec un Timer.
Le select
permet d’écouter plusieurs channels et de traiter le premier dans lequel on lit un message.
Un timer
est un channel dans lequel l’heure est écrite au bout du temps défini, parfait pour gérer un timeout :
func (ag Agregatechannel)readChanWithTimeout(){
select {
case value := <-ag.inputChannel:
ag.manageValue(value)
break
case <-time.NewTimer(ag.timeout).C:
ag.manageTimeout()
}
}
La méthode manageValue
implémente l’algorithme décrit plus haut :
func (ag *Agregatechannel)manageValue(value string){
if strings.EqualFold("", ag.previousValue) {
ag.previousValue = value
} else {
// Si la valeur est différente de la précédente, on traite l'événement de la précédente valeur
if !strings.EqualFold(ag.previousValue, value) {
ag.outputChannel <- ag.previousValue
ag.previousValue = value
}
}
}
En masquant l’implémentation à base de channel, avec une méthode Add et une méthode Get, on obtient une structure simple et légère.
Et voilà…en fixant un timeout de 2s, on obtient des résultats cohérents et je limite mes écritures au strict minimum.
La vraie force de l’algorithme présenté réside dans l’utilisation du select
.
Un cas d’usage intéressant est la sélection de la réponse la plus rapide lors de l’interrogation de plusieurs services / providers : celui qui a la réponse en premier sera choisi.
Vous pouvez retrouver le code source complet sur github.
Merci beaucoup à Julien Rollin pour les conseils et la relecture.