Не выполняется корректно таска Celery (Django)
У меня есть небольшая логика вызова тасков, есть модель достижений для юзеров (условно сделай то и получишь опыт и сможешь поднять уровень профиля). Есть сигнал:
@receiver(post_save, sender=AchievementsProgressStatus)
def check_status(sender, instance, created, **kwargs):
"""From here we check the achievement status and set the actual user level"""
if not created:
check_achievements_status.delay(instance.id)
этот сигнал вызывает таску, которая обновляет прогресс и может быть флаг выполнено/не выполнено
@shared_task
def check_achievements_status(obj_id):
"""Tracking progress to set current is_achieved value"""
from .models import AchievementsProgressStatus
obj = (
AchievementsProgressStatus.objects.select_related("user")
.filter(id=obj_id)
.first()
)
last_lvl = UserLevel.objects.last()
if obj:
with transaction.atomic():
is_achieved = obj.progress_rn >= obj.achievement.final_value
new_progress = min(obj.progress_rn, obj.achievement.final_value)
AchievementsProgressStatus.objects.filter(id=obj_id).update(
progress_rn=new_progress
)
# we are changing boolean field only when it has changed
if obj.is_achieved != is_achieved:
AchievementsProgressStatus.objects.filter(id=obj_id).update(
is_achieved=is_achieved
)
if obj.user.user_profile.user_level != last_lvl:
level_calculating.delay(obj.user.id)
Получаем наш обьект модели и джоиним юзера, после чего использую транзакцию и получаю 2 значения: is_achieved, new_progress. Если is_achieved изменил свое значение то меняем поле в модели и запускаем другую таску если юзер не на последнем уровне.
Таска имеет следующий вид
@shared_task
def level_calculating(user_id: int):
from ..achievements.models import AchievementsProgressStatus
from .models import UserLevel, UserProfile
user = get_user_model().objects.select_related("user_profile").filter(id=user_id).first()
if user:
total_exp = (
AchievementsProgressStatus.objects.select_related("achievement")
.filter(user=user, is_achieved=True)
.aggregate(total_exp=Sum("achievement__given_exp"))["total_exp"]
or 0
)
current_level = UserLevel.objects.filter(
low_range__lte=total_exp, top_range__gte=total_exp
).first()
if current_level:
UserProfile.objects.filter(user=user).update(user_level=current_level)
Получаем юзера по ид и агрегируем значения всех достижений которые выполнены, после чего это число сумы опыта всех выполненных достижений берем и просто проверяем какой у юзера уровень, если такой есть - ставим его юзеру.
На данном этапе все работает, пусть код и не самый лучший. И вот я решил написать сервис чтобы в таску не сувать логику.
class UserLevelCalculating:
def __init__(self, user_id):
self.user = get_user_model().objects.select_related("user_profile").filter(id=user_id).first()
def get_total_exp(self):
total_exp = (
AchievementsProgressStatus.objects.select_related("achievement")
.filter(user=self.user, is_achieved=True)
.aggregate(total_exp=Sum("achievement__given_exp"))["total_exp"]
or 0
)
return total_exp
def get_level(self, total_exp):
user_level = UserLevel.objects.filter(low_range__lte=total_exp, top_range__gte=total_exp).first()
return user_level
def level_updater(self):
total_exp = self.get_total_exp()
user_level = self.get_level(total_exp)
if user_level:
UserProfile.objects.filter(user=self.user).update(user_level=user_level)
Класс делает все идентично таске + так-же изменил таску чтобы она работала с классом, теперь она выглядит так
@shared_task
def level_calculating(user_id: int):
from ..users.service.level_calculating_service import UserLevelCalculating
obj = UserLevelCalculating(user_id)
obj.level_updater()
Вроде все должно работать но увы, тест не проходит?
def test_success_update_user_level(self):
"""
Verifies that the achievement status is correctly updated when progress changes,
and the Celery task runs successfully. Also checks that the user level is updated.
"""
achievement_progress_status = AchievementsProgressStatus.objects.get(
user=self.user, achievement__achievement_name=self.user_achievement.achievement_name
)
achievement_progress_status.progress_rn = 1
achievement_progress_status.save()
achievement_progress_status.refresh_from_db()
result = check_achievements_status.delay(achievement_progress_status.id)
self.assertTrue(result.successful())
self.assertTrue(achievement_progress_status.is_achieved)
self.user.user_profile.refresh_from_db()
self.assertEqual(self.user.user_profile.user_level, self.second_lvl)
Сам тест очень простой, просто берем достижение, ставим прогресс на 1, обновляем значения в бд и проверяем что у юзера 2 уровень. И вот тут сама проблема, без разделения логики на сервис все работает отлично и тест проходит, а вот с ним не хочет. Над этим я сидел не мало и всеравно не вижу ошибку, буду рад любой подсказке!
Тест не проходит потому что не обновляется уровень:
...F......
======================================================================
FAIL: test_success_update_user_level (map_apps.achievements.tests.ModelAchievementsStatusTest)
Verifies that the achievement status is correctly updated when progress changes,
----------------------------------------------------------------------
Traceback (most recent call last):
File "/service/map_apps/achievements/tests.py", line 98, in test_success_update_user_level
self.assertEqual(self.user.user_profile.user_level, self.second_lvl)
AssertionError: <UserLevel: Новачок> != <UserLevel: Продвинутий>
вдруг нужно будет пример моделей:
class Achievements(models.Model):
"""Models to save the achievements"""
achievement_name = models.CharField(max_length=128)
descr_achievement = models.CharField(max_length=256, blank=True)
given_exp = models.PositiveSmallIntegerField(
default=0, validators=[MinValueValidator(0)]
)
final_value = models.PositiveSmallIntegerField(
default=0, validators=[MinValueValidator(1)]
)
for_organization = models.BooleanField(default=False)
for_def_user = models.BooleanField(default=True)
achievement_image = models.ImageField(upload_to="achiev_img/", blank=True)
def __str__(self):
return f"{self.achievement_name}"
class AchievementsProgressStatus(models.Model):
"""Models to track the user achievements progres"""
user = models.ForeignKey(
to=User, on_delete=models.CASCADE, related_name="achievementsprogressstatus"
)
achievement = models.ForeignKey(
to=Achievements,
on_delete=models.CASCADE,
related_name="achievementsprogressstatus",
)
progress_rn = models.PositiveSmallIntegerField(
default=0, validators=[MinValueValidator(0)]
)
is_achieved = models.BooleanField(default=False, db_index=True)
def __str__(self):
return (
f"{self.user.get_full_name()} | {self.achievement.achievement_name} | Виконано = {self.is_achieved}, "
f"{self.progress_rn}/{self.achievement.final_value}"
)