CSV取込機能を実装し、Expenseモデルと関連するStore・ExpenseCategoryモデルを追加。CSVパーサーを作成し、出光CSVに対応。明細編集画面のAJAX保存機能を実装し、取込結果を表示する機能を追加。作業ログをdiary.mdに追記。

This commit is contained in:
president
2025-12-19 15:51:14 +09:00
parent 5fc2a31f50
commit 89caf3438a
11 changed files with 489 additions and 7 deletions

View File

@@ -0,0 +1,7 @@
from .factory import get_parser
from .idemitsu import IdemitsuParser
__all__ = [
'get_parser',
'IdemitsuParser',
]

View File

@@ -0,0 +1,25 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from typing import Iterable
@dataclass(frozen=True)
class ExpenseRow:
use_date: date
description: str
amount: int
note: str
source: str
source_hash: str
class CSVParserBase:
source: str = ''
def detect(self, header_line: str) -> bool:
raise NotImplementedError
def parse(self, lines: Iterable[str]) -> list[ExpenseRow]:
raise NotImplementedError

View File

@@ -0,0 +1,26 @@
from __future__ import annotations
from typing import Iterable
from .base import CSVParserBase
from .idemitsu import IdemitsuParser
PARSERS: list[CSVParserBase] = [
IdemitsuParser(),
]
def detect_parser(lines: Iterable[str]) -> CSVParserBase | None:
for line in lines:
for parser in PARSERS:
if parser.detect(line):
return parser
return None
def get_parser(lines: Iterable[str]) -> CSVParserBase:
parser = detect_parser(lines)
if parser is None:
raise ValueError('CSVブランドを判定できませんでした。')
return parser

View File

@@ -0,0 +1,67 @@
from __future__ import annotations
import csv
import hashlib
from datetime import datetime
from typing import Iterable
from .base import CSVParserBase, ExpenseRow
class IdemitsuParser(CSVParserBase):
source = 'idemitsu'
def detect(self, header_line: str) -> bool:
return '出光' in header_line or '' in header_line
def parse(self, lines: Iterable[str]) -> list[ExpenseRow]:
rows: list[ExpenseRow] = []
reader = csv.reader(lines)
last_row_index: int | None = None
for columns in reader:
if not columns:
continue
head = columns[0].strip()
if head in ('カード名称', 'お支払日', '今回ご請求額'):
continue
if head == '利用日':
continue
if not head:
if last_row_index is not None:
extra_desc = columns[1].strip() if len(columns) > 1 else ''
extra_note = columns[6].strip() if len(columns) > 6 else ''
if extra_desc:
rows[last_row_index].description += f' {extra_desc}'
if extra_note:
if rows[last_row_index].note:
rows[last_row_index].note += f' {extra_note}'
else:
rows[last_row_index].note = extra_note
continue
if len(columns) < 6:
continue
try:
use_date = datetime.strptime(head, '%Y/%m/%d').date()
except ValueError:
continue
description = columns[1].strip()
amount_text = columns[5].replace(',', '').strip()
note = columns[6].strip() if len(columns) > 6 else ''
try:
amount = int(amount_text)
except ValueError:
continue
raw = f"{use_date.isoformat()}|{description}|{amount}|{note}|{self.source}"
source_hash = hashlib.sha256(raw.encode('utf-8')).hexdigest()
rows.append(
ExpenseRow(
use_date=use_date,
description=description,
amount=amount,
note=note,
source=self.source,
source_hash=source_hash,
)
)
last_row_index = len(rows) - 1
return rows

View File

@@ -0,0 +1,54 @@
# Generated by Django 4.2.27 on 2025-12-19 06:31
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='ExpenseCategory',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=200, unique=True)),
('is_active', models.BooleanField(default=True)),
],
),
migrations.CreateModel(
name='Store',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=200, unique=True)),
('is_active', models.BooleanField(default=True)),
],
),
migrations.CreateModel(
name='Expense',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('use_date', models.DateField()),
('description', models.CharField(max_length=255)),
('amount', models.IntegerField()),
('is_business', models.BooleanField(blank=True, null=True)),
('note', models.TextField(blank=True)),
('source', models.CharField(choices=[('idemitsu', 'Idemitsu')], max_length=50)),
('source_hash', models.CharField(max_length=64)),
('ai_score', models.FloatField(blank=True, null=True)),
('human_confirmed', models.BooleanField(default=False)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('expense_category', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to='expenses.expensecategory')),
('store', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to='expenses.store')),
],
),
migrations.AddConstraint(
model_name='expense',
constraint=models.UniqueConstraint(fields=('source', 'source_hash'), name='uniq_source_hash'),
),
]

View File

@@ -1,3 +1,47 @@
from django.db import models
# Create your models here.
class Store(models.Model):
name = models.CharField(max_length=200, unique=True)
is_active = models.BooleanField(default=True)
def __str__(self) -> str:
return self.name
class ExpenseCategory(models.Model):
name = models.CharField(max_length=200, unique=True)
is_active = models.BooleanField(default=True)
def __str__(self) -> str:
return self.name
class Expense(models.Model):
SOURCE_CHOICES = [
('idemitsu', 'Idemitsu'),
]
use_date = models.DateField()
description = models.CharField(max_length=255)
amount = models.IntegerField()
store = models.ForeignKey(Store, null=True, blank=True, on_delete=models.SET_NULL)
expense_category = models.ForeignKey(
ExpenseCategory, null=True, blank=True, on_delete=models.SET_NULL
)
is_business = models.BooleanField(null=True, blank=True)
note = models.TextField(blank=True)
source = models.CharField(max_length=50, choices=SOURCE_CHOICES)
source_hash = models.CharField(max_length=64)
ai_score = models.FloatField(null=True, blank=True)
human_confirmed = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
constraints = [
models.UniqueConstraint(fields=['source', 'source_hash'], name='uniq_source_hash'),
]
def __str__(self) -> str:
return f'{self.use_date} {self.description} {self.amount}'

36
expenses/services.py Normal file
View File

@@ -0,0 +1,36 @@
from __future__ import annotations
from typing import Iterable
from .csv_parsers import get_parser
from .models import Expense
class CSVImportResult:
def __init__(self, imported: int, duplicated: int) -> None:
self.imported = imported
self.duplicated = duplicated
def import_csv_lines(lines: Iterable[str]) -> CSVImportResult:
buffered_lines = list(lines)
parser = get_parser(buffered_lines)
rows = parser.parse(buffered_lines)
imported = 0
duplicated = 0
for row in rows:
expense, created = Expense.objects.get_or_create(
source=row.source,
source_hash=row.source_hash,
defaults={
'use_date': row.use_date,
'description': row.description,
'amount': row.amount,
'note': row.note,
},
)
if created:
imported += 1
else:
duplicated += 1
return CSVImportResult(imported=imported, duplicated=duplicated)

View File

@@ -6,6 +6,7 @@ from . import views
urlpatterns = [
path('', views.csv_upload, name='csv_upload'),
path('expenses/', views.expense_list, name='expense_list'),
path('expenses/<int:expense_id>/update/', views.expense_update, name='expense_update'),
path('reports/monthly/', views.monthly_report, name='monthly_report'),
path('reports/monthly/pdf/', views.monthly_report_pdf, name='monthly_report_pdf'),
]

View File

@@ -1,12 +1,97 @@
from django.shortcuts import render
import io
import json
from urllib.parse import urlencode
import chardet
from django.http import JsonResponse
from django.shortcuts import redirect
from django.shortcuts import get_object_or_404, render
from django.views.decorators.http import require_POST
from .services import import_csv_lines
from .models import Expense, ExpenseCategory, Store
def csv_upload(request):
return render(request, 'expenses/csv_upload.html')
context = {}
if request.method == 'POST' and request.FILES.get('csv_file'):
upload = request.FILES['csv_file']
raw = upload.read()
detected = chardet.detect(raw)
encoding = detected.get('encoding') or 'utf-8'
text = raw.decode(encoding, errors='ignore')
lines = io.StringIO(text)
result = import_csv_lines(lines)
query = urlencode(
{'imported': result.imported, 'duplicated': result.duplicated, 'encoding': encoding}
)
return redirect(f"/expenses/?{query}")
return render(request, 'expenses/csv_upload.html', context)
def expense_list(request):
return render(request, 'expenses/expense_list.html')
expenses = Expense.objects.select_related('store', 'expense_category').order_by('-use_date', '-id')[:200]
stores = Store.objects.filter(is_active=True).order_by('name')
categories = ExpenseCategory.objects.filter(is_active=True).order_by('name')
return render(
request,
'expenses/expense_list.html',
{
'expenses': expenses,
'stores': stores,
'categories': categories,
'imported': request.GET.get('imported'),
'duplicated': request.GET.get('duplicated'),
'encoding': request.GET.get('encoding'),
},
)
@require_POST
def expense_update(request, expense_id: int):
try:
payload = json.loads(request.body.decode('utf-8'))
except json.JSONDecodeError:
return JsonResponse({'status': 'error', 'message': 'JSON形式が不正です。'}, status=400)
if not isinstance(payload, dict):
return JsonResponse({'status': 'error', 'message': 'JSONはオブジェクトで送信してください。'}, status=400)
expense = get_object_or_404(Expense, pk=expense_id)
fields = {}
for key in ('store_id', 'expense_category_id', 'is_business', 'note'):
if key in payload:
value = payload[key]
if value in ('', None):
fields[key] = None
else:
fields[key] = value
if 'store_id' in fields and fields['store_id'] is not None:
if not isinstance(fields['store_id'], int):
return JsonResponse({'status': 'error', 'message': 'store_idの型が不正です。'}, status=400)
if not Store.objects.filter(id=fields['store_id'], is_active=True).exists():
return JsonResponse({'status': 'error', 'message': 'store_idが不正です。'}, status=400)
if 'expense_category_id' in fields and fields['expense_category_id'] is not None:
if not isinstance(fields['expense_category_id'], int):
return JsonResponse({'status': 'error', 'message': 'expense_category_idの型が不正です。'}, status=400)
if not ExpenseCategory.objects.filter(
id=fields['expense_category_id'], is_active=True
).exists():
return JsonResponse(
{'status': 'error', 'message': 'expense_category_idが不正です。'}, status=400
)
if 'is_business' in fields and fields['is_business'] is not None:
if not isinstance(fields['is_business'], bool):
return JsonResponse({'status': 'error', 'message': 'is_businessの型が不正です。'}, status=400)
if 'note' in fields and fields['note'] is not None:
if not isinstance(fields['note'], str):
return JsonResponse({'status': 'error', 'message': 'noteの型が不正です。'}, status=400)
if len(fields['note']) > 2000:
return JsonResponse({'status': 'error', 'message': 'noteが長すぎます。'}, status=400)
if fields:
for key, value in fields.items():
setattr(expense, key, value)
expense.human_confirmed = True
expense.save()
return JsonResponse({'status': 'ok'})
def monthly_report(request):