from __future__ import annotations from collections import defaultdict from dataclasses import dataclass from datetime import date from typing import Iterable from django.db.models import Count, Sum from .csv_parsers import get_parser from .models import Expense class CSVImportResult: def __init__(self, imported: int, duplicated: int) -> None: self.imported = imported self.duplicated = duplicated def import_csv_lines(lines: Iterable[str]) -> CSVImportResult: buffered_lines = list(lines) parser = get_parser(buffered_lines) rows = parser.parse(buffered_lines) imported = 0 duplicated = 0 for row in rows: expense, created = Expense.objects.get_or_create( source=row.source, source_hash=row.source_hash, defaults={ 'use_date': row.use_date, 'description': row.description, 'amount': row.amount, 'note': row.note, }, ) if created: imported += 1 else: duplicated += 1 return CSVImportResult(imported=imported, duplicated=duplicated) @dataclass class MonthlyReportResult: target_month: date total_amount: int store_totals: list[dict] category_totals: list[dict] owner_totals: list[dict] unclassified_counts: dict def build_monthly_report(start_date: date, end_date: date) -> MonthlyReportResult: queryset = Expense.objects.filter( is_canceled=False, use_date__gte=start_date, use_date__lt=end_date, ) detail_queryset = ( queryset.select_related('store', 'expense_category') .order_by('use_date', 'id') ) store_details = defaultdict(list) category_details = defaultdict(list) owner_details = defaultdict(list) for expense in detail_queryset: detail = { 'use_date': expense.use_date, 'description': expense.description, 'amount': expense.amount, 'note': expense.note, } store_details[expense.store_id].append(detail) category_details[expense.expense_category_id].append(detail) owner_details[expense.owner_type].append(detail) total_amount = queryset.aggregate(total=Sum('amount'))['total'] or 0 store_totals = list( queryset.values('store_id', 'store__name') .annotate(total=Sum('amount'), count=Count('id')) .order_by('store__name', 'store_id') ) for row in store_totals: row['details'] = store_details.get(row['store_id'], []) category_totals = list( queryset.values('expense_category_id', 'expense_category__name') .annotate(total=Sum('amount'), count=Count('id')) .order_by('expense_category__name', 'expense_category_id') ) for row in category_totals: row['details'] = category_details.get(row['expense_category_id'], []) owner_totals = list( queryset.values('owner_type') .annotate(total=Sum('amount'), count=Count('id')) .order_by('owner_type') ) for row in owner_totals: row['details'] = owner_details.get(row['owner_type'], []) unclassified_counts = { 'store_missing': queryset.filter(store__isnull=True).count(), 'category_missing': queryset.filter(expense_category__isnull=True).count(), 'owner_pending': queryset.filter(owner_type='pending').count(), 'total': queryset.count(), } return MonthlyReportResult( target_month=start_date, total_amount=total_amount, store_totals=store_totals, category_totals=category_totals, owner_totals=owner_totals, unclassified_counts=unclassified_counts, )