Big Data·classifier·Model·Python·Régression logistique·Spark·supervised methods

Prédiction de taux de clics, régression logistique distribuée (avec Spark)

Contexte

Dans le monde de la publicité sur internet, on distingue 3 grands acteurs :

  • Les annonceurs (ceux qui ont un produit à vendre)
  • Les publishers (ceux chez qui la publicité d’un produit à vendre est diffusée)
  • Les matchmakers (ceux qui mettent en relation les annonceurs et les publishers)

La plupart du temps, les matchmakers conseillent aux publishers de diffuser l’annonce de tel ou tel annonceur. C’est une sorte d’entremetteur. L’annonceur ne paye le publisher que lorqu’un internaute clique sur sa publicité. Le publisher reverse alors une commission à son matchmaker.

Presentation1

Maintenant, mettez-vous à la place du matchmaker. Quel est votre problème ? Pour un publisher donné, vous voulez trouvez le meilleur annonceur, celui qui proposera une publicité sur laquelle le plus d’internautes cliqueront. Ok, donc pour résumer vous devez être capable de prédire si un user va cliquer sur la publicité de tel annonceur alors qu’il navigue sur la page web de tel publisher.

P(click o/n | user, publicité, publisher)

 

 

Méthode

Bon, à priori, il faudrait construire un binary classifier. Prenons le plus classique d’entre eux : la régression logistique. Pour éviter l’overfitting, on ajoute un terme de régularization et voici notre problème d’optimisation :

optimlogloss

 

 

Problèmes

Bon jusqu’ici rien que du très classique. Ce qu’il faut savoir c’est que les caractéristiques qui définissent un internaute, le contenu d’une publicité et le site web d’un publisher, peuvent être extrêmement nombreuses. Et plus, on a de variables à disposition, plus on a besoin d’observations pour que notre modèle apprenne quelque chose de pertinent dans toute cette masse d’informations. Si on ajoute à cela, qu’une grande partie de ces caractéristiques se présentent sous la forme de variables catégorielles à binariser, on peut vite atteindre des proportions gigantesques. Par exemple, l’échantillon sur lequel on va travailler provient d’un dataset qui contient plusieurs centaines de millions d’observations pour une trentaine de millions de dummy variables. On ne peut plus appliquer simplement lancer une simple commande dans R pour avoir le résultat de notre régression logistique. La masse d’information à traiter est telle qu’on est obligé de prendre certaines dispositions. Les voici :

(1) Calcul distribué : la très grosse majorité du temps, quand on lance une modélisation sur des données, c’est le processeur et la mémoire d’un seul et unique ordinateur qui se charge de tout. Ici, on veut distribuer le travail sur plusieurs postes, histoire d’aller un peu plus vite. On utilisera Spark qui est un framework destiné à cette tâche. On peut utiliser Spark sur un seul poste pour s’entraîner.

(2) Hashing des variables : pour chaque tuple (user, add, publisher) on dispose de 33 Millions de dummy variables. cette taille gigantesque risque d’être un gros problème technique pour des raisons évidentes. Cependant, on remarquera que chacun de ces vecteurs est très « sparse » et de ce fait, on peut diminuer leur dimension en leur appliquant une fonction de hashing. Une fonction de hashing prend en input un vecteur et recrache en output un vecteur de plus petite taille. Je ne rentrerais pas dans les détails du pourquoi et comment ça marche mais ceux qui désirent approfondir leur connaissance à ce sujet peuvent se référer aux liens suivants :

(3) Stochastic gradient descent : la méthode la plus classique pour trouver les w (qu’on appelle aussi θ) qui vont minimiser la quantité (1) est la méthode du gradient descendant. Pour rappel, pour cette méthode, la mise à jour des θ s’écrit :

gradientdescent.pngAvec hθ(xi) -yi  la fonction de coût pour l’observation i
Avec α le taux d’apprentissage

Comme on peut le constater, à chaque step, on doit faire une somme sur l’ensemble des observations du dataset. Lorsque le dataset contient plusieurs milliards d’observations, ça devient trop coûteux. C’est là que le Stochastic Gradient Descent devient intéressant. En fait, dans le cas du SGD, à chaque itération, on ne va pas soustraire aux θj la somme entière des dérivés mais juste une seule dérivé d’une seule observation prise au hasard.

SGD

Evidemment, l’inconvénient de cette méthode c’est que le chemin des θj  temporaires vers les θj optimaux est beaucoup moins direct que dans le cas du gradient descendant. En revanche, la plus longue distance vers la solution optimale est compensée par le coût minime d’une itération (une dérivée Versus une somme de n dérivées). Suivant le nombre n d’observations, on n’est parfois obligé de recommencer le processus et de refaire tourner une boucle sur la mise à jour des θj.

 

 

Les données et les objectifs

Les données proviennent du concours KAGGLE Criteo 2014 lors duquel il était question de prédire si un user allait cliquer sur une publicité grâce à des variables relatives au user, à la publicité et au publisher. Les données sont téléchargeables ici Vous avez le choix entre le dataset entier (4.3 Go) et un échantillon (8 Mo). L’objectif est de prédire le clic d’un user  en prenant ses dispositions pour réduire le temps de calcul, i.e.

  • Construire des régressions logistiques distribuées via Spark
  • Hasher les variables afin de réduire la dimension des observations
  • Utiliser la méthode du Stochastic Gradient Descent pour apprendre les poids des modèles

 

 

C’est parti !

0 & 1) Comme d’hab, importation des modules nécessaires et chargement des données. rawData est un fichier RDD (Resilient Distributed Dataset) distribué sur deux partitions.

#0) Importation des modules nécessaires
import numpy as np 
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from math import log
from math import exp 
import os.path

#1) Chargement du dataset échantillon dac_sample.txt
baseDir = os.path.join('data')
inputPath = os.path.join(kaggle_contest, 'dac_sample.txt')
fileName = os.path.join(baseDir, inputPath)
if os.path.isfile(fileName):
    rawData = (sc
               .textFile(fileName, 2)
               .map(lambda x: x.replace('\t', ',')))  

 

2) On va ici diviser le set en 3. Un training set, un validation set et un test set. Afin qu’on n’ai pas à recompute à chaque fois ces commandes, ces fichiers sont enregistrés dans la mémoire cache pour qu’ils soient accessibles plus rapidement.

#2.a) Division du dataset en trois sets : training, validation et test set
weights = [.8, .1, .1]
seed = 99
# Use randomSplit with weights and seed
rawTrainData, rawValidationData, rawTestData = rawData.randomSplit(weights,seed)
# Cache the data
rawTrainData.cache()
rawValidationData.cache()
rawTestData.cache()

#2.b) On compte le nombre d'observations des trois sets
nTrain = rawTrainData.count()
nVal = rawValidationData.count()
nTest = rawTestData.count()

 

3) Ici on va créer toutes les fonctions nécessaires pour la suite du programme. En vrac :

  • getP pour calculer la probabilité d’un click sachant les variables et les paramètres d’un modèle
  • computeLogloss pour compute la logloss de la prédiction d’une observation
  • evaluateResults pour compute la logloss d’un modèle sur un jeu de données
  • hashFunction pour réduire la dimension d’un point
  • parsePoint pour convertir un point en une liste de tuples (IDvariable,value)
  • parseHashPoint pour convertir un point en un objet de type LabeledPoint qui comprend le label du point et ses variables hashées
#3) Création des fonctions

#3.a) getP permet de calculer la probabilité de click sachant les poids et l'intercept du modèle (intercept et w) et les variables (x) d'une observation
#     On borne le combinaison linéaire entre 20 et -20 pour des considérations de mémoire :
#     Les résultats sont très peu affectés car la probabilité est déjà extrêmement proche de 1 ou de 0
#     Ca permet d'éviter à l'ordinateur de stocker un trop grand nombre de décimales
function getP
def getP(x, w, intercept):
    rawPrediction = x.dot(w)+intercept
    # Bound the raw prediction value
    rawPrediction = min(rawPrediction, 20)
    rawPrediction = max(rawPrediction, -20)
    return 1.0/(1.0+exp(-rawPrediction))

#3.b) computeLogLoss prend en paramètres, une probabilité de click et le label d'une observation et renvoie la logloss associée
def computeLogLoss(p, y):
    epsilon = 10e-12
    if y==1:
        return -log(p+epsilon) if p==0 else  -log(p)
    else:
        return -log(1-p+epsilon) if p==1 else  -log(1-p)

#3.c) evalueateResults calcule la logloss d'un modèle sur un jeu de données
def evaluateResults(model, data):
    res=data.map(lambda x : computeLogLoss ( getP(x.features,model.weights,model.intercept) , x.label)).sum()/data.count()
    return res

#3.d) hashFunction est la fonction qui va nous permettre de réduire la dimension d'un point
#     (*) numBuckets correspond au nombre de dimensions qu'on souhaite à l'arrivée
#     (*) printMapping est un booléen qui permet d'afficher ou pas les étapes d'un hashing
#     (*) rawFeats représente les coordonnées du point. Il est sous la forme d'une liste de tuple (IDvariable,value)
#     Par exemple [(0, 'bear'), (1, 'black'), (2, 'salmon')]
from collections import defaultdict
import hashlib
def hashFunction(numBuckets, rawFeats, printMapping=False):
    mapping = {}
    for ind, category in rawFeats:
        featureString = category + str(ind)
        mapping[featureString] = int(int(hashlib.md5(featureString).hexdigest(), 16) % numBuckets)
    if(printMapping): print mapping
    sparseFeatures = defaultdict(float)
    for bucket in mapping.values():
        sparseFeatures[bucket] += 1.0
    return dict(sparseFeatures)

#3.e) parsePoint prend en paramètre un point sous sa forme brute et renvoie une liste de tuples (IDvariable,value)
#     forme brute = une chaîne de caractère qui comprend
#     le label (1 ou 0 pour click ou pas) en première position
#     puis une ribambelle de variables
#     chaque position est séparée par une virgule
def parsePoint(point):
    items=point.split(',')
    return [(i,item) for i,item in enumerate (items[1:])]

#3.f) parseHashPoint prend en paramètre un point sous sa forme brute et renvoie un objet de type Labeled Point
#     Cet objet est constitué d'un label (on peut accéder à ce label avec la commande name_objet.label)
#     Cet objet est également consituté des variables hashées (on y accède avec la commande name_objet.features
def parseHashPoint(point, numBuckets):
    parsedPoints=parsePoint(point)
    items=point.split(',')
    label=items[0]
    features=hashFunction(numBuckets, parsedPoints, False)
    return LabeledPoint(label,SparseVector(numBuckets,features))

 

4) Ici on va hasher les observations de nos trois sets. On va cette fois-ci encore les stocker dans la mémoire cache.

#4) On va hasher les observations de nos trois sets et les stocker en cache.
#   On veut convertir réduire la dimension de nos observations à 2^15.
numBucketsCTR = 2 ** 15
hashTrainData = rawTrainData.map(lambda x : parseHashPoint(x,numBucketsCTR))
hashTrainData.cache()
hashValidationData = rawValidationData.map(lambda x : parseHashPoint(x,numBucketsCTR))
hashValidationData.cache()
hashTestData = rawTestData.map(lambda x : parseHashPoint(x,numBucketsCTR))
hashTestData.cache()

 

5) On a toutes nos fonctions, les observations de nos datasets ont été hashées afin de réduire leur dimensions. On peut maintenant lancer plusieurs modèles de régression logistiques avec terme de pénalisation de norme L2. Les poids de chacun de ces modèles dépendent de deux hyperparameters : le terme λ de pénalisation et α le learning rate du SGD. On va tester 6 valeurs de λ et 6 valeurs de α, ce qui va nous donner une grille de 36 modèles. Pour chaque modèle, l’apprentissage se fera sur le training set, et on computera sa performance (la logloss) sur le validation set.

#5) On utilise la libraire mlLib pour lancer plusieurs régression logistique sur nos données
#   Il s'agira de régressions logistique avec terme de régularisation de norme L2
#   L'apprentissage sera effectué via la méthode Stochastic Gradient Descent
#   On va trainer 6*6 modèles avec les hyperparameters allant de 3 à 18 pour le taux d'apprentissage
#   et 10^-7 à 10^-2 pour le lambda de régularisation

# Paramètres du SGD et de la régression logistique
# regType correspond à un terme de pénalisation de norme L2 comme dans le cas de la Ridge Regression
numIters = 500
regType = 'l2'
includeIntercept = True

# On crée les variables bestModel et bestLogLoss de sorte à enregistrer le meilleur modèle parmi
# la série de modèles qu'on s'apprête à tester
bestModel = None
bestLogLoss = 1e10

# Apprentissage et validation des 36 modèles
# Les poids des 36 modèles sont appris sur le training set,
# le critère de comparaison : la logloss est compute sur le validation set
stepSizes = [3, 6, 9, 12, 15, 18]
regParams = [1e-7, 1e-6, 1e-5, 1e-4, 1e-3, 1e-2]
for stepSize in stepSizes:
    for regParam in regParams:
        model = (LogisticRegressionWithSGD
                 .train(hashTrainData, numIters, stepSize, regParam=regParam, regType=regType,
                        intercept=includeIntercept))
        logLossVa = evaluateResults(model, hashValidationData)
        print ('\tstepSize = {0:.1f}, regParam = {1:.0e}: logloss = {2:.3f}'
               .format(stepSize, regParam, logLossVa))
        if (logLossVa < bestLogLoss):
            bestModel = model
            bestLogLoss = logLossVa

# On regarde ce que donne le modèle retenu sur le Test Set
logLossTest = evaluateResults(bestModel, hashTestData)
print(logLossTest)

 

 

Script et données

Publicités

Une réflexion au sujet de « Prédiction de taux de clics, régression logistique distribuée (avec Spark) »

Laisser un commentaire

Entrez vos coordonnées ci-dessous ou cliquez sur une icône pour vous connecter:

Logo WordPress.com

Vous commentez à l'aide de votre compte WordPress.com. Déconnexion / Changer )

Image Twitter

Vous commentez à l'aide de votre compte Twitter. Déconnexion / Changer )

Photo Facebook

Vous commentez à l'aide de votre compte Facebook. Déconnexion / Changer )

Photo Google+

Vous commentez à l'aide de votre compte Google+. Déconnexion / Changer )

Connexion à %s