Files

367 lines
15 KiB
Python

import io
import json
from datetime import date, datetime, timedelta
from urllib.parse import urlencode
import chardet
from django.db import IntegrityError
from django.http import JsonResponse
from django.shortcuts import redirect
from django.shortcuts import get_object_or_404, render
from django.views.decorators.http import require_POST
from .services import build_monthly_report, import_csv_lines
from .models import Expense, ExpenseCategory, Store
def csv_upload(request):
context = {}
if request.method == 'POST' and request.FILES.get('csv_file'):
upload = request.FILES['csv_file']
raw = upload.read()
detected = chardet.detect(raw)
encoding = detected.get('encoding') or 'utf-8'
text = raw.decode(encoding, errors='ignore')
lines = io.StringIO(text)
try:
result = import_csv_lines(lines)
except ValueError as exc:
context['error_message'] = str(exc)
return render(request, 'expenses/csv_upload.html', context)
query = urlencode(
{'imported': result.imported, 'duplicated': result.duplicated, 'encoding': encoding}
)
return redirect(f"/expenses/?{query}")
return render(request, 'expenses/csv_upload.html', context)
def expense_list(request):
include_canceled = request.GET.get('include_canceled') == '1'
queryset = Expense.objects.select_related('store', 'expense_category')
if not include_canceled:
queryset = queryset.filter(is_canceled=False)
expenses = queryset.order_by('-use_date', '-id')[:200]
stores = Store.objects.filter(is_active=True).order_by('name')
categories = ExpenseCategory.objects.filter(is_active=True).order_by('name')
return render(
request,
'expenses/expense_list.html',
{
'expenses': expenses,
'stores': stores,
'categories': categories,
'imported': request.GET.get('imported'),
'duplicated': request.GET.get('duplicated'),
'encoding': request.GET.get('encoding'),
'include_canceled': include_canceled,
},
)
def store_master(request):
error_message = None
if request.method == 'POST':
name = request.POST.get('name', '').strip()
store_id = request.POST.get('store_id')
action = request.POST.get('action')
if action == 'deactivate' and store_id:
store = get_object_or_404(Store, pk=store_id)
store.is_active = False
store.save(update_fields=['is_active'])
else:
is_active = request.POST.get('is_active') == '1'
if not name:
error_message = '店舗名を入力してください。'
else:
try:
if store_id:
store = get_object_or_404(Store, pk=store_id)
store.name = name
store.is_active = is_active
store.save(update_fields=['name', 'is_active'])
else:
Store.objects.create(name=name, is_active=is_active)
except IntegrityError:
error_message = '同名の店舗が既に存在します。'
stores = Store.objects.order_by('name', 'id')
return render(
request,
'expenses/store_master.html',
{
'stores': stores,
'error_message': error_message,
},
)
def expense_category_master(request):
error_message = None
if request.method == 'POST':
name = request.POST.get('name', '').strip()
category_id = request.POST.get('category_id')
action = request.POST.get('action')
if action == 'deactivate' and category_id:
category = get_object_or_404(ExpenseCategory, pk=category_id)
category.is_active = False
category.save(update_fields=['is_active'])
else:
is_active = request.POST.get('is_active') == '1'
if not name:
error_message = '経費区分名を入力してください。'
else:
try:
if category_id:
category = get_object_or_404(ExpenseCategory, pk=category_id)
category.name = name
category.is_active = is_active
category.save(update_fields=['name', 'is_active'])
else:
ExpenseCategory.objects.create(name=name, is_active=is_active)
except IntegrityError:
error_message = '同名の経費区分が既に存在します。'
categories = ExpenseCategory.objects.order_by('name', 'id')
return render(
request,
'expenses/expense_category_master.html',
{
'categories': categories,
'error_message': error_message,
},
)
@require_POST
def expense_update(request, expense_id: int):
try:
payload = json.loads(request.body.decode('utf-8'))
except json.JSONDecodeError:
return JsonResponse({'status': 'error', 'message': 'JSON形式が不正です。'}, status=400)
if not isinstance(payload, dict):
return JsonResponse({'status': 'error', 'message': 'JSONはオブジェクトで送信してください。'}, status=400)
expense = get_object_or_404(Expense, pk=expense_id)
fields = {}
for key in ('store_id', 'expense_category_id', 'owner_type', 'note'):
if key in payload:
value = payload[key]
if value in ('', None):
if key == 'owner_type':
fields[key] = 'pending'
else:
fields[key] = None
else:
fields[key] = value
if 'store_id' in fields and fields['store_id'] is not None:
if not isinstance(fields['store_id'], int):
return JsonResponse({'status': 'error', 'message': 'store_idの型が不正です。'}, status=400)
if not Store.objects.filter(id=fields['store_id'], is_active=True).exists():
return JsonResponse({'status': 'error', 'message': 'store_idが不正です。'}, status=400)
if 'expense_category_id' in fields and fields['expense_category_id'] is not None:
if not isinstance(fields['expense_category_id'], int):
return JsonResponse({'status': 'error', 'message': 'expense_category_idの型が不正です。'}, status=400)
if not ExpenseCategory.objects.filter(
id=fields['expense_category_id'], is_active=True
).exists():
return JsonResponse(
{'status': 'error', 'message': 'expense_category_idが不正です。'}, status=400
)
if 'owner_type' in fields and fields['owner_type'] is not None:
if not isinstance(fields['owner_type'], str):
return JsonResponse({'status': 'error', 'message': 'owner_typeの型が不正です。'}, status=400)
if fields['owner_type'] not in {'company', 'personal', 'pending'}:
return JsonResponse({'status': 'error', 'message': 'owner_typeが不正です。'}, status=400)
if 'note' in fields and fields['note'] is not None:
if not isinstance(fields['note'], str):
return JsonResponse({'status': 'error', 'message': 'noteの型が不正です。'}, status=400)
if len(fields['note']) > 2000:
return JsonResponse({'status': 'error', 'message': 'noteが長すぎます。'}, status=400)
if fields:
for key, value in fields.items():
setattr(expense, key, value)
expense.human_confirmed = True
expense.save()
return JsonResponse({'status': 'ok'})
def monthly_report(request):
error_message = None
start_date_param = request.GET.get('start_date', '')
end_date_param = request.GET.get('end_date', '')
all_time = request.GET.get('all_time') == '1'
store_param = request.GET.get('store_id', '')
category_param = request.GET.get('expense_category_id', '')
description_query = request.GET.get('description', '').strip()
show_report = request.GET.get('show') == '1'
store_id = None
store_is_null = False
expense_category_id = None
expense_category_is_null = False
if store_param == 'unassigned':
store_is_null = True
elif store_param:
try:
store_id = int(store_param)
except ValueError:
error_message = '店舗区分の指定が不正です。'
if category_param == 'unassigned':
expense_category_is_null = True
elif category_param:
try:
expense_category_id = int(category_param)
except ValueError:
error_message = '経費区分の指定が不正です。'
if start_date_param or end_date_param:
all_time = False
if all_time:
start_date = None
end_date = None
end_date_display = ''
else:
start_date = None
end_date = None
end_date_display = end_date_param
if start_date_param:
try:
start_date = datetime.strptime(start_date_param, '%Y-%m-%d').date()
except ValueError:
error_message = '開始日の形式が不正です。'
if end_date_param:
try:
end_date = datetime.strptime(end_date_param, '%Y-%m-%d').date()
except ValueError:
error_message = '終了日の形式が不正です。'
if start_date is None and end_date is None and error_message is None:
start_date = date.today().replace(day=1)
if start_date.month == 12:
next_month = start_date.replace(year=start_date.year + 1, month=1)
else:
next_month = start_date.replace(month=start_date.month + 1)
end_date = next_month - timedelta(days=1)
start_date_param = start_date.strftime('%Y-%m-%d')
end_date_display = end_date.strftime('%Y-%m-%d')
if end_date is not None:
end_date = end_date + timedelta(days=1)
stores = Store.objects.filter(is_active=True).order_by('name')
categories = ExpenseCategory.objects.filter(is_active=True).order_by('name')
report = None
if show_report and error_message is None:
report = build_monthly_report(
start_date,
end_date,
store_id=store_id,
store_is_null=store_is_null,
expense_category_id=expense_category_id,
expense_category_is_null=expense_category_is_null,
description_query=description_query or None,
)
return render(
request,
'expenses/monthly_report.html',
{
'report': report,
'start_date': start_date_param or (start_date.strftime('%Y-%m-%d') if start_date else ''),
'end_date': end_date_display,
'all_time': all_time,
'stores': stores,
'selected_store': store_param,
'categories': categories,
'selected_category': category_param,
'description_query': description_query,
'show_report': show_report,
'error_message': error_message,
},
)
def monthly_report_pdf(request):
start_date_param = request.GET.get('start_date', '')
end_date_param = request.GET.get('end_date', '')
all_time = request.GET.get('all_time') == '1'
store_param = request.GET.get('store_id', '')
category_param = request.GET.get('expense_category_id', '')
description_query = request.GET.get('description', '').strip()
store_id = None
store_is_null = False
expense_category_id = None
expense_category_is_null = False
if store_param == 'unassigned':
store_is_null = True
elif store_param:
try:
store_id = int(store_param)
except ValueError:
store_id = None
if category_param == 'unassigned':
expense_category_is_null = True
elif category_param:
try:
expense_category_id = int(category_param)
except ValueError:
expense_category_id = None
if start_date_param or end_date_param:
all_time = False
if all_time:
start_date = None
end_date = None
target_period = '全期間'
else:
start_date = None
end_date = None
if start_date_param:
try:
start_date = datetime.strptime(start_date_param, '%Y-%m-%d').date()
except ValueError:
start_date = None
if end_date_param:
try:
end_date = datetime.strptime(end_date_param, '%Y-%m-%d').date()
except ValueError:
end_date = None
if start_date is None and end_date is None:
start_date = date.today().replace(day=1)
if start_date.month == 12:
next_month = start_date.replace(year=start_date.year + 1, month=1)
else:
next_month = start_date.replace(month=start_date.month + 1)
end_date = next_month - timedelta(days=1)
start_label = start_date.strftime('%Y-%m-%d') if start_date else '未指定'
end_label = end_date.strftime('%Y-%m-%d') if end_date else '未指定'
target_period = f'{start_label}{end_label}'
if end_date is not None:
end_date = end_date + timedelta(days=1)
report = build_monthly_report(
start_date,
end_date,
store_id=store_id,
store_is_null=store_is_null,
expense_category_id=expense_category_id,
expense_category_is_null=expense_category_is_null,
description_query=description_query or None,
)
return render(
request,
'expenses/monthly_report_pdf.html',
{
'report': report,
'target_period': target_period,
},
)
@require_POST
def expense_cancel(request, expense_id: int):
try:
payload = json.loads(request.body.decode('utf-8'))
except json.JSONDecodeError:
return JsonResponse({'status': 'error', 'message': 'JSON形式が不正です。'}, status=400)
if not isinstance(payload, dict):
return JsonResponse({'status': 'error', 'message': 'JSONはオブジェクトで送信してください。'}, status=400)
if 'is_canceled' not in payload:
return JsonResponse({'status': 'error', 'message': 'is_canceledが必要です。'}, status=400)
if not isinstance(payload['is_canceled'], bool):
return JsonResponse({'status': 'error', 'message': 'is_canceledの型が不正です。'}, status=400)
expense = get_object_or_404(Expense, pk=expense_id)
expense.is_canceled = payload['is_canceled']
expense.human_confirmed = True
expense.save(update_fields=['is_canceled', 'human_confirmed', 'updated_at'])
return JsonResponse({'status': 'ok'})