Dédoublonner les messages d'un channel en Go

illustration de l'article

Dédoublonner les messages d’un channel en go

Les channels en Go sont des files threadsafe très utiles. Voyons un cas d’usage pour gérer des événements.

Contexte : Inotify et la détection d’événements sur un disque

Afin de pouvoir détecter la création de nouveaux fichiers dans un répertoire, j’ai utilisé la commande Inotify sous Linux.

Inotify est un mécanisme sur linux permettant d’observer les événements sur un répertoire : création, suppression, modification de fichier ou répertoire.

L’idée est de détecter ces nouveaux fichiers comme des événements pour ensuite effectuer un traitement, dans mon cas, les copier dans un autre répertoire.

Fs notify est une librairie Go qui encapsule ce mécanisme à l’identique en poussant les événements dans un channel. A noter que Fs notify est multi-plateforme et n’utilise Inotify que sur linux.

Détection des événements

Nous allons observer les événements qui se produisent sur un répertoire lorsque l’on copie des fichiers :

func main(){
    watcher,_ := fsnotify.NewWatcher()
    for {
        if value, hasMore := <- watcher.Events; hasMore {
            switch {
                case value.Op&fsnotify.Create == fsnotify.Create:
                    log.Println("CREATE", value.Name)
                    break
                case value.Op&fsnotify.Write == fsnotify.Write:
                    log.Println("WRITE", value.Name)
                    break
                case value.Op&fsnotify.Remove == fsnotify.Remove:
                    log.Println("DELETE", value.Name)
                    break
            }
        }else{
            break
        }
    }
}

Après avoir lancé le code, on copie deux fichiers, file1.jpg et file2.jpg :

2021/04/07 12:02:23 main.go: CREATE file1.jpg 
2021/04/07 12:02:23 main.go: WRITE file1.jpg 
2021/04/07 12:02:23 main.go: WRITE file1.jpg 
2021/04/07 12:02:23 main.go: CREATE file2.jpg 
2021/04/07 12:02:23 main.go: WRITE file2.jpg 
2021/04/07 12:02:23 main.go: WRITE file2.jpg 
2021/04/07 12:02:23 main.go: WRITE file2.jpg 

On s’attend à avoir 2 lignes pour chaque fichier, une pour la création et l’autre pour l’écriture. Or plusieurs événements sont lancés pour l’écriture (WRITE).

Inotify ne déclenche pas un événement WRITE lors de la fermeture du fichier mais apparemment lors de chaque flush. Sur des fichiers plus gros, on peut observer encore plus d’événements.

Evénement et idempotence

Le fait d’effectuer plusieurs fois le même traitement n’est problématique que si notre traitement n’est pas idempotent, c’est à dire que le résultat d’un même traitement est toujours le même. Dans l’objectif de surveiller un répertoire est de recopier les nouveaux fichiers, la copie est idempotente et on pourrait laisser ainsi.

Cependant, il n’est jamais bon de laisser des écritures inutiles se faire quand on peut l’éviter :

  • hausse des I/O
  • hausse de l’utilisation du cpu
  • usure prématurée des disques
  • problème de concurrence si les copies sont effectuées en parallèle

Comment garder le dernier événement ?

Une solution serait de dédoublonner pour ne garder que le dernier événement d’un fichier, celui se produisant lors de la fermeture.

En reprenant l’exemple précédent, on voit que les événements reçus sont groupés par fichier ce qui est logique : la copie de fichiers n’est pas parallèle mais séquentielle.

2021/04/07 12:02:23 main.go: CREATE file1.jpg 
2021/04/07 12:02:23 main.go: WRITE file1.jpg 
2021/04/07 12:02:23 main.go: WRITE file1.jpg <-- Bien
2021/04/07 12:02:23 main.go: CREATE file2.jpg 
2021/04/07 12:02:23 main.go: WRITE file2.jpg 
2021/04/07 12:02:23 main.go: WRITE file2.jpg 
2021/04/07 12:02:23 main.go: WRITE file2.jpg <-- Bien

Découpler la détection du traitement d’un événement avec les channels en Go

Voici un algo qui va pouvoir corriger ce problème :

  • Tant que le fichier sur lequel est lancé l’événement est le même, on l’ignore
  • Quand le nom du fichier change, on le traite
  • Si pendant un certain temps, il n’y a plus d’événement, on considère que le dernier reçu peut être traité

Il est possible de gérer facilement ce traitement avec un tableau et une analyse régulière des événéments.

Mais l’utilisation des channels apporte beaucoup plus de souplesse :

  • Découplage entre la détection des événements et leur traitement
  • Gestion du parallélisme
  • Les événements sont traités de manière synchrone (à la queue leu leu)
  • Les channels c’est cool :)

Implémentation

Pour gérer ce mécanisme avec des channels, on va créer une structure avec deux channels :

  • Le premier pour lire les événements de fs notify
  • Le second pour écrire les événéments dédoublonnés
type Agregatechannel struct {
	inputChannel   chan string   // Lecture des événements
	outputChannel  chan string   // Ecriture des événements sans doublon
	previousValue  string        // Précédent fichier lu
	timeout        time.Duration // Temps au delà duquel on estime que le dernier événement peut être traité
}

Pour gérer la notion de timeout, on va utiliser le mécanisme de select avec un Timer.

Le select permet d’écouter plusieurs channels et de traiter le premier dans lequel on lit un message. Un timer est un channel dans lequel l’heure est écrite au bout du temps défini, parfait pour gérer un timeout :

func (ag Agregatechannel)readChanWithTimeout(){
	select {
	case value := <-ag.inputChannel:
		ag.manageValue(value)
		break
	case <-time.NewTimer(ag.timeout).C:
		ag.manageTimeout()
	}
}

La méthode manageValue implémente l’algorithme décrit plus haut :

func (ag *Agregatechannel)manageValue(value string){
	if strings.EqualFold("", ag.previousValue) {
		ag.previousValue = value
	} else {
		// Si la valeur est différente de la précédente, on traite l'événement de la précédente valeur
		if !strings.EqualFold(ag.previousValue, value) {
			ag.outputChannel <- ag.previousValue
			ag.previousValue = value
		}
	}
}

En masquant l’implémentation à base de channel, avec une méthode Add et une méthode Get, on obtient une structure simple et légère.

Et voilà…en fixant un timeout de 2s, on obtient des résultats cohérents et je limite mes écritures au strict minimum.

La vraie force de l’algorithme présenté réside dans l’utilisation du select. Un cas d’usage intéressant est la sélection de la réponse la plus rapide lors de l’interrogation de plusieurs services / providers : celui qui a la réponse en premier sera choisi.

Vous pouvez retrouver le code source complet sur github.

Merci beaucoup à Julien Rollin pour les conseils et la relecture.

Commentaires

comments powered by Disqus

Date

Auteur

Avatar Jonathan BARANZINI

Jonathan BARANZINI

Développeur