implicit feedback을 사용한 CF를 가장 잘 정리한 페이퍼를 파이썬으로 구현한 내용을 정리한다. 포스팅에서 모든 코드를 다루지는 않았는데, 완벽한 코드를 보고 싶다면 github repo을 참고하면 된다.
시작하기에 앞서, 아래 포스팅에서 implicit feedback CF에 대해 이론적으로 정리했으니 참고하면 좋을 것 같다.
https://steady-programming.tistory.com/45
[Recommender System / Paper review] #07 Collaborative Filtering for Implicit Feedback Datasets
논문 링크(3690회 인용) Summary 사용자가 직접적으로 아이템에 대해 매기는 explicit feedback이 아닌, implicit feedback 데이터를 이용하여 unknown preference을 예측하는 방법을 제안한다. implicit feedback은 explic
steady-programming.tistory.com
https://steady-programming.tistory.com/70
[Recommender System] Derivation of optimization process of ALS
이전 포스팅에서 time complexity을 개선한 행렬 분해에 대해서 알아보았다. https://steady-programming.tistory.com/57 [Recommender System / Paper review] #19 Fast Matrix Factorization for Online Recommendation with Implicit Feedback 논
steady-programming.tistory.com
여태까지 이론적으로 접근했으니, 이제 실제 코드를 짜봄으로써 알고리즘을 어떻게 구현해야하는지, 어디서 병목현상이 일어나는지 등을 살펴보자.
기본적인 코드는 https://github.com/benfred/implicit 의 레포를 따라갔으며 입맛대로 조금씩 변형하였다.
코드 구성
뼈대가 되는 클래스는 RecommenderBase이다. 이 클래스는 mf을 사용하는 알고리즘과 neural net을 사용하는 알고리즘 모두에게 상속이 되는 클래스로, 꼭 들어가야하는 클래스 메써드를 abstractmethod을 통해 지정해두었다.
from abc import abstractmethod
import numpy as np
class RecommenderBase:
@abstractmethod
def fit(self, user_items, val_user_items):
"""
Trains recommendation algorithm on a sparse matrix
Parameters
----------
user_items : csr_matrix
"""
pass
@abstractmethod
def recommend(self, userid, user_items, N=10):
"""
Recommends items for users
Parameters
----------
userid : Union[int, array_like]
user_items : csr_matrix
N : int, optional
Returns
-------
indices : M1 x k numpy array
2D array of selected k item indices for each user
distances : M1 x k numpy array
2D array of selected k item similarities for each user
"""
@abstractmethod
def predict(self, user_factors, item_factors):
"""
Predicts users' ratings (or preference) based on factorized user_factors and item_factors.
For matrix factorization models, this could be dot product between user_factors and item_factors.
For deep learning models, this could be inference step fed with user/item information to neural network
user_factors : M1 x K
Genearally, M1 does not need to be M, which is total number of users in dataset. We consider general
situation where we want to recommend for selected M1 users
item_factors : N x K
Although we may not want to recommend items for all users, still we need all item embedding vectors.
That's why we use all of item vectors.
"""
@abstractmethod
def calculate_loss(self, user_items):
"""
Calculates training / validation loss for early stopping to prevent overfitting
Parameters
----------
user_items : M x N user item matrix
user_factors : M x K user latent vectors
item_factors : N x K item latent vectors
"""
- fit: 알고리즘의 학습 단계에서 사용하는 메써드이다.
- recommend: 학습을 모두 끝낸 후에, 유저별로 개인화된 추천을 진행할 때 사용하는 메써드이다.
- predict: 학습된 유저/아이템 벡터를 기반으로 아이템에 대한 유저의 선호도를 예측할 때 사용하는 메써드이다.
- calculate_loss: training / validation 데이터에 대한 loss을 계산할 때 쓰이는 메써드이다.
다음으로 MatrixFactorizationBaes 클래스이다. 이는 RecommenderBase 클래스를 상속하며 구조는 거의 동일하다.
recommend 메써드의 경우, 알고리즘에 관계없이 topk 추천을 할 예정이고 fit, predict 메써드는 알고리즘별로 다를 예정이므로 비워뒀다.
import numpy as np
from scipy.sparse import csr_matrix
from topk import topk
from recommender_base import RecommenderBase
class MatrixFactorizationBase(RecommenderBase):
def __init__(self):
self.user_factors = None
self.item_factors = None
def recommend(self, userid, user_items, N=10):
user_factors = self.user_factors[userid]
item_factors = self.item_factors
indices, distances = topk(self.predict(user_factors, item_factors), N)
return indices, distances
def fit(self, user_items, val_user_items):
pass
def predict(self, user_factors, item_factors):
pass
다음으로 이번 포스팅에서 구현한 AlternatingLeastSquares 클래스이다.
import numpy as np
import tqdm
from scipy.sparse import csr_matrix, diags, identity
from utils import check_csr, check_random_state, nonzeros
from mf.matrix_factorization_base import MatrixFactorizationBase
class AlternatingLeastSquares(MatrixFactorizationBase):
def __init__(
self,
factors=10,
regularization=0.01,
alpha=0.1,
dtype=np.float32,
iterations=100,
calculate_training_loss=False,
random_state=42
):
super().__init__()
self.factors = factors
self.regularization = regularization
self.alpha = alpha
self.dtype = dtype
self.iterations = iterations
self.calculate_training_loss = calculate_training_loss
self.random_state = random_state
- factors: latent dimenstion
- regularization: 정규화 값
- alpha: 논문에서 confidence 값을 변환하는 정도
- iterations: user/item latent vector을 계산하는 횟수
다음으로 fit 메써드를 살펴보자.
class AlternatingLeastSquares(MatrixFactorizationBase):
def fit(self, user_items, val_user_items=None):
"""
Factorizes the user_items matrix.
While selected iterations, this method updates user and item factors using closed form
derived in koren et al. who proposed calculating YtY in advance to reduce computation time
As noted in Xiangnan et al., time complexity of original als is O((M+N)K^3 + |R|K^2)
P : M x K user factors matrix
Q : N x K item factors matrix
Cui : M x N confidence matrix
Rui : M x N binarized matrix
Wu : N x N diagonal matrix whose (i,i) element is c_ui
Parameters
----------
user_items : csr_matrix
This is user x item matrix whose dimension is M x N. It is used in training step
val_user_items : csr_matrix, optional
This is user x item matrix whose dimension is M x N. It is used in validation step.
Note that in training step, we do not use this matrix.
Returns
-------
"""
# initialize the random state
random_state = check_random_state(self.random_state)
Cui = check_csr(user_items)
M,N = Cui.shape # number of users and items
# initialize parameters randomly
if self.user_factors is None:
self.user_factors = random_state.rand(M, self.factors).astype(self.dtype) * 0.01
if self.item_factors is None:
self.item_factors = random_state.rand(N, self.factors).astype(self.dtype) * 0.01
self._PtP = None
self._QtQ = None
self.tr_loss = []
self.val_loss = []
with tqdm.tqdm(total=self.iterations) as progress:
for iteration in range(self.iterations):
# alternate updating user and item factors
# update user factors
self._QtQ = self.QtQ()
for u in range(M):
self.update_user_factors(u, self.item_factors, Cui)
# update item factors
self._PtP = self.PtP()
for i in range(N):
self.update_item_factors(i, self.user_factors, Cui.T.tocsr())
# calculate training / validation loss
self.tr_loss.append(self.calculate_loss(user_items))
if val_user_items is not None:
self.val_loss.append(self.calculate_loss(val_user_items))
progress.update(1)
이터레이션을 돌면서 user/item latent vector을 업데이트한다. 핵심이 되는 함수는 update_user_factors, update_item_factors 메써드이다. 이 중, update_user_factors을 살펴보자.
def update_user_factors(self, u, Q, Cui):
"""
Update user embedding factors for user u using closed form optimization
Parameters
----------
u : integer
Index for user u
Q : np.array
N x K item factors matrix
Cui : csr_matrix
M x N confidence sparse matrix
Returns
-------
"""
N,K = Q.shape
# A = QtW_uQ + regularization * I = QtQ + Qt(W_u-1)Q + regularization * I
# b = QtW_ur_u
# accumulate Qt(W_u-1)Q using outer product form
A = self._QtQ + np.eye(K)*self.regularization
b = np.zeros(K)
for i, confidence in nonzeros(Cui, u):
item_factor_i = Q[i]
if confidence > 0:
b += confidence * item_factor_i
else:
confidence *= -1
A += np.outer(item_factor_i,item_factor_i) * (confidence - 1)
# solve linear equation
p_u = self.linear_equation(A, b)
self.user_factors[u] = p_u
위 수식을 구현한 것으로, implicit feedback cf 논문 내용을 이해하고 수식을 전개해봤다면 이해하는데 크게 어려움이 없는 코드이다. 먼저 계산 편의를 위해 $Q^TQ$을 미리 계산한다. 다음으로 유저별로 for문을 돌면서 $A,b$를 계산하고 linear_equation 함수를 통해 $Ax = b$의 해를 구한다. 구한 해가 $u$번째 유저의 latent vector이다. 아이템에 대한 latent vector도 비슷하게 계산한다.
다음으로 loss을 계산하는 과정을 보자.
def calculate_loss(self, user_items):
"""
Calculates training/validation loss in each iteration.
We calculate loss in each iteration to check if parameters are converged or not.
Depending on the user_items argument, it calculates training or validation loss.
It is strongly recommended that it should be checked whether validation loss drops
and becomes stable
Parameters
----------
user_items : csr_matrix
Training or validation user x item matrix
Returns
-------
loss : float
Calculated loss value
"""
loss = 0
M,N = user_items.shape
Q = self.item_factors
for u in range(M):
c_u = user_items[u].todense()
r_u = self.binarize(c_u)
p_u = self.user_factors[u]
r_u_hat = Q.dot(p_u)
temp = np.multiply(c_u, np.power(r_u - r_u_hat, 2))
loss += temp.sum()
loss += self.regularization * np.power(p_u, 2).sum()
for i in range(N):
q_i = self.item_factors[i]
loss += self.regularization * np.power(q_i, 2).sum()
return loss
위 수식을 구현한 것으로, double summation이 있는 부분은 for문을 유저별로 돌아서 loss 값을 모두 더했다. 마지막으로 잊지 않고 아이템 벡터에 대한 l2 norm 값인 $\sum^N_{i=1} || \mathbf{q}_i ||^2$을 더하는 것도 잊지 않고 추가했다.
Unittest
unittest은 코드에 신뢰를 더해주는 기술이다. 구현한 als의 결과와 implicit repository에서 구현된 als의 결과를 소수점 두번째 자리까지 비교하는 unittest을 아래와 같이 구현하였다.
import unittest
import numpy as np
import implicit
from scipy.sparse import csr_matrix
from numpy.testing import assert_array_equal, assert_almost_equal
from mf.als import AlternatingLeastSquares
class TestALS(unittest.TestCase):
def test_binarize(self):
als = AlternatingLeastSquares()
tc = np.array([1,0,5,10,0,0,100])
expected = np.array([1,0,1,1,0,0,1])
assert_array_equal(expected, als.binarize(tc))
def test_als_result_same_as_implicit_library(self):
params = {
'factors':10,
'iterations':100,
'regularization':0.01,
'random_state':42
}
als = AlternatingLeastSquares(**params)
imp_als = implicit.als.AlternatingLeastSquares(**params)
tc_user_items = csr_matrix(np.array(
[[2, 0, 0, 0, 3, 4],
[0, 0, 2, 1, 0, 0],
[5, 1, 0, 0, 2, 1]]
))
als.fit(tc_user_items)
imp_als.fit(tc_user_items)
assert_almost_equal(als.user_factors, imp_als.user_factors, decimal=2)
assert_almost_equal(als.item_factors, imp_als.item_factors, decimal=2)
if '__name__' == '__main__':
unittest.main()
test case에 대해 아래와 같이 통과함을 확인하였다.
Limitation
movielens 1m 데이터를 돌리는데 약 50분이 걸렸다...
from mf.als import AlternatingLeastSquares
from datasets.movielens import get_movielens
if __name__ == '__main__':
titles, ratings = get_movielens(variant="1m")
als = AlternatingLeastSquares()
als.fit(ratings)
아무래도 코드 성능을 고려하지 않고 numpy array로만 코드를 짜다보니까 시간이 오래걸리는 것 같다. implicit 레포나 buffalo 레포는 cython을 이용하던데.. 우선 코드의 수행 속도를 고려하지 말고 짜보자! 라는 마인드로 구현을 했지만 확실히 한계가 있어 보인다. 이는 차근차근 고쳐나갈 계획이다.
댓글