import io import json from datetime import date, datetime, timedelta from urllib.parse import urlencode import chardet from django.db import IntegrityError from django.http import JsonResponse from django.shortcuts import redirect from django.shortcuts import get_object_or_404, render from django.views.decorators.http import require_POST from .services import build_monthly_report, import_csv_lines from .models import Expense, ExpenseCategory, Store def csv_upload(request): context = {} if request.method == 'POST' and request.FILES.get('csv_file'): upload = request.FILES['csv_file'] raw = upload.read() detected = chardet.detect(raw) encoding = detected.get('encoding') or 'utf-8' text = raw.decode(encoding, errors='ignore') lines = io.StringIO(text) try: result = import_csv_lines(lines) except ValueError as exc: context['error_message'] = str(exc) return render(request, 'expenses/csv_upload.html', context) query = urlencode( {'imported': result.imported, 'duplicated': result.duplicated, 'encoding': encoding} ) return redirect(f"/expenses/?{query}") return render(request, 'expenses/csv_upload.html', context) def expense_list(request): include_canceled = request.GET.get('include_canceled') == '1' queryset = Expense.objects.select_related('store', 'expense_category') if not include_canceled: queryset = queryset.filter(is_canceled=False) expenses = queryset.order_by('-use_date', '-id')[:200] stores = Store.objects.filter(is_active=True).order_by('name') categories = ExpenseCategory.objects.filter(is_active=True).order_by('name') return render( request, 'expenses/expense_list.html', { 'expenses': expenses, 'stores': stores, 'categories': categories, 'imported': request.GET.get('imported'), 'duplicated': request.GET.get('duplicated'), 'encoding': request.GET.get('encoding'), 'include_canceled': include_canceled, }, ) def store_master(request): error_message = None if request.method == 'POST': name = request.POST.get('name', '').strip() store_id = request.POST.get('store_id') action = request.POST.get('action') if action == 'deactivate' and store_id: store = get_object_or_404(Store, pk=store_id) store.is_active = False store.save(update_fields=['is_active']) else: is_active = request.POST.get('is_active') == '1' if not name: error_message = '店舗名を入力してください。' else: try: if store_id: store = get_object_or_404(Store, pk=store_id) store.name = name store.is_active = is_active store.save(update_fields=['name', 'is_active']) else: Store.objects.create(name=name, is_active=is_active) except IntegrityError: error_message = '同名の店舗が既に存在します。' stores = Store.objects.order_by('name', 'id') return render( request, 'expenses/store_master.html', { 'stores': stores, 'error_message': error_message, }, ) def expense_category_master(request): error_message = None if request.method == 'POST': name = request.POST.get('name', '').strip() category_id = request.POST.get('category_id') action = request.POST.get('action') if action == 'deactivate' and category_id: category = get_object_or_404(ExpenseCategory, pk=category_id) category.is_active = False category.save(update_fields=['is_active']) else: is_active = request.POST.get('is_active') == '1' if not name: error_message = '経費区分名を入力してください。' else: try: if category_id: category = get_object_or_404(ExpenseCategory, pk=category_id) category.name = name category.is_active = is_active category.save(update_fields=['name', 'is_active']) else: ExpenseCategory.objects.create(name=name, is_active=is_active) except IntegrityError: error_message = '同名の経費区分が既に存在します。' categories = ExpenseCategory.objects.order_by('name', 'id') return render( request, 'expenses/expense_category_master.html', { 'categories': categories, 'error_message': error_message, }, ) @require_POST def expense_update(request, expense_id: int): try: payload = json.loads(request.body.decode('utf-8')) except json.JSONDecodeError: return JsonResponse({'status': 'error', 'message': 'JSON形式が不正です。'}, status=400) if not isinstance(payload, dict): return JsonResponse({'status': 'error', 'message': 'JSONはオブジェクトで送信してください。'}, status=400) expense = get_object_or_404(Expense, pk=expense_id) fields = {} for key in ('store_id', 'expense_category_id', 'owner_type', 'note'): if key in payload: value = payload[key] if value in ('', None): if key == 'owner_type': fields[key] = 'pending' else: fields[key] = None else: fields[key] = value if 'store_id' in fields and fields['store_id'] is not None: if not isinstance(fields['store_id'], int): return JsonResponse({'status': 'error', 'message': 'store_idの型が不正です。'}, status=400) if not Store.objects.filter(id=fields['store_id'], is_active=True).exists(): return JsonResponse({'status': 'error', 'message': 'store_idが不正です。'}, status=400) if 'expense_category_id' in fields and fields['expense_category_id'] is not None: if not isinstance(fields['expense_category_id'], int): return JsonResponse({'status': 'error', 'message': 'expense_category_idの型が不正です。'}, status=400) if not ExpenseCategory.objects.filter( id=fields['expense_category_id'], is_active=True ).exists(): return JsonResponse( {'status': 'error', 'message': 'expense_category_idが不正です。'}, status=400 ) if 'owner_type' in fields and fields['owner_type'] is not None: if not isinstance(fields['owner_type'], str): return JsonResponse({'status': 'error', 'message': 'owner_typeの型が不正です。'}, status=400) if fields['owner_type'] not in {'company', 'personal', 'pending'}: return JsonResponse({'status': 'error', 'message': 'owner_typeが不正です。'}, status=400) if 'note' in fields and fields['note'] is not None: if not isinstance(fields['note'], str): return JsonResponse({'status': 'error', 'message': 'noteの型が不正です。'}, status=400) if len(fields['note']) > 2000: return JsonResponse({'status': 'error', 'message': 'noteが長すぎます。'}, status=400) if fields: for key, value in fields.items(): setattr(expense, key, value) expense.human_confirmed = True expense.save() return JsonResponse({'status': 'ok'}) def monthly_report(request): error_message = None start_date_param = request.GET.get('start_date', '') end_date_param = request.GET.get('end_date', '') all_time = request.GET.get('all_time') == '1' store_param = request.GET.get('store_id', '') category_param = request.GET.get('expense_category_id', '') description_query = request.GET.get('description', '').strip() show_report = request.GET.get('show') == '1' store_id = None store_is_null = False expense_category_id = None expense_category_is_null = False if store_param == 'unassigned': store_is_null = True elif store_param: try: store_id = int(store_param) except ValueError: error_message = '店舗区分の指定が不正です。' if category_param == 'unassigned': expense_category_is_null = True elif category_param: try: expense_category_id = int(category_param) except ValueError: error_message = '経費区分の指定が不正です。' if start_date_param or end_date_param: all_time = False if all_time: start_date = None end_date = None end_date_display = '' else: start_date = None end_date = None end_date_display = end_date_param if start_date_param: try: start_date = datetime.strptime(start_date_param, '%Y-%m-%d').date() except ValueError: error_message = '開始日の形式が不正です。' if end_date_param: try: end_date = datetime.strptime(end_date_param, '%Y-%m-%d').date() except ValueError: error_message = '終了日の形式が不正です。' if start_date is None and end_date is None and error_message is None: start_date = date.today().replace(day=1) if start_date.month == 12: next_month = start_date.replace(year=start_date.year + 1, month=1) else: next_month = start_date.replace(month=start_date.month + 1) end_date = next_month - timedelta(days=1) start_date_param = start_date.strftime('%Y-%m-%d') end_date_display = end_date.strftime('%Y-%m-%d') if end_date is not None: end_date = end_date + timedelta(days=1) stores = Store.objects.filter(is_active=True).order_by('name') categories = ExpenseCategory.objects.filter(is_active=True).order_by('name') report = None if show_report and error_message is None: report = build_monthly_report( start_date, end_date, store_id=store_id, store_is_null=store_is_null, expense_category_id=expense_category_id, expense_category_is_null=expense_category_is_null, description_query=description_query or None, ) return render( request, 'expenses/monthly_report.html', { 'report': report, 'start_date': start_date_param or (start_date.strftime('%Y-%m-%d') if start_date else ''), 'end_date': end_date_display, 'all_time': all_time, 'stores': stores, 'selected_store': store_param, 'categories': categories, 'selected_category': category_param, 'description_query': description_query, 'show_report': show_report, 'error_message': error_message, }, ) def monthly_report_pdf(request): start_date_param = request.GET.get('start_date', '') end_date_param = request.GET.get('end_date', '') all_time = request.GET.get('all_time') == '1' store_param = request.GET.get('store_id', '') category_param = request.GET.get('expense_category_id', '') description_query = request.GET.get('description', '').strip() store_id = None store_is_null = False expense_category_id = None expense_category_is_null = False if store_param == 'unassigned': store_is_null = True elif store_param: try: store_id = int(store_param) except ValueError: store_id = None if category_param == 'unassigned': expense_category_is_null = True elif category_param: try: expense_category_id = int(category_param) except ValueError: expense_category_id = None if start_date_param or end_date_param: all_time = False if all_time: start_date = None end_date = None target_period = '全期間' else: start_date = None end_date = None if start_date_param: try: start_date = datetime.strptime(start_date_param, '%Y-%m-%d').date() except ValueError: start_date = None if end_date_param: try: end_date = datetime.strptime(end_date_param, '%Y-%m-%d').date() except ValueError: end_date = None if start_date is None and end_date is None: start_date = date.today().replace(day=1) if start_date.month == 12: next_month = start_date.replace(year=start_date.year + 1, month=1) else: next_month = start_date.replace(month=start_date.month + 1) end_date = next_month - timedelta(days=1) start_label = start_date.strftime('%Y-%m-%d') if start_date else '未指定' end_label = end_date.strftime('%Y-%m-%d') if end_date else '未指定' target_period = f'{start_label} 〜 {end_label}' if end_date is not None: end_date = end_date + timedelta(days=1) report = build_monthly_report( start_date, end_date, store_id=store_id, store_is_null=store_is_null, expense_category_id=expense_category_id, expense_category_is_null=expense_category_is_null, description_query=description_query or None, ) return render( request, 'expenses/monthly_report_pdf.html', { 'report': report, 'target_period': target_period, }, ) @require_POST def expense_cancel(request, expense_id: int): try: payload = json.loads(request.body.decode('utf-8')) except json.JSONDecodeError: return JsonResponse({'status': 'error', 'message': 'JSON形式が不正です。'}, status=400) if not isinstance(payload, dict): return JsonResponse({'status': 'error', 'message': 'JSONはオブジェクトで送信してください。'}, status=400) if 'is_canceled' not in payload: return JsonResponse({'status': 'error', 'message': 'is_canceledが必要です。'}, status=400) if not isinstance(payload['is_canceled'], bool): return JsonResponse({'status': 'error', 'message': 'is_canceledの型が不正です。'}, status=400) expense = get_object_or_404(Expense, pk=expense_id) expense.is_canceled = payload['is_canceled'] expense.human_confirmed = True expense.save(update_fields=['is_canceled', 'human_confirmed', 'updated_at']) return JsonResponse({'status': 'ok'})